Przeglądaj źródła

fix: mr goroutine leak on context deadline (#1433)

* fix: mr goroutine leak on context deadline

* test: update fx test check
Kevin Wan 3 lat temu
rodzic
commit
ea4f2af67f
5 zmienionych plików z 54 dodań i 12 usunięć
  1. 2 4
      core/fx/stream_test.go
  2. 12 7
      core/mr/mapreduce.go
  3. 36 1
      core/mr/mapreduce_test.go
  4. 1 0
      go.mod
  5. 3 0
      go.sum

+ 2 - 4
core/fx/stream_test.go

@@ -14,6 +14,7 @@ import (
 
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/assert"
 	"github.com/zeromicro/go-zero/core/stringx"
 	"github.com/zeromicro/go-zero/core/stringx"
+	"go.uber.org/goleak"
 )
 )
 
 
 func TestBuffer(t *testing.T) {
 func TestBuffer(t *testing.T) {
@@ -563,9 +564,6 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
 }
 }
 
 
 func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
 func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
-	goroutines := runtime.NumGoroutine()
+	defer goleak.VerifyNone(t)
 	fn(t)
 	fn(t)
-	// let scheduler schedule first
-	time.Sleep(time.Millisecond)
-	assert.True(t, runtime.NumGoroutine() <= goroutines)
 }
 }

+ 12 - 7
core/mr/mapreduce.go

@@ -160,13 +160,18 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 		mapper(item, w, cancel)
 		mapper(item, w, cancel)
 	}, source, collector, done, options.workers)
 	}, source, collector, done, options.workers)
 
 
-	value, ok := <-output
-	if err := retErr.Load(); err != nil {
-		return nil, err
-	} else if ok {
-		return value, nil
-	} else {
-		return nil, ErrReduceNoOutput
+	select {
+	case <-options.ctx.Done():
+		cancel(context.DeadlineExceeded)
+		return nil, context.DeadlineExceeded
+	case value, ok := <-output:
+		if err := retErr.Load(); err != nil {
+			return nil, err
+		} else if ok {
+			return value, nil
+		} else {
+			return nil, ErrReduceNoOutput
+		}
 	}
 	}
 }
 }
 
 

+ 36 - 1
core/mr/mapreduce_test.go

@@ -13,6 +13,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/assert"
 	"github.com/zeromicro/go-zero/core/stringx"
 	"github.com/zeromicro/go-zero/core/stringx"
 	"github.com/zeromicro/go-zero/core/syncx"
 	"github.com/zeromicro/go-zero/core/syncx"
+	"go.uber.org/goleak"
 )
 )
 
 
 var errDummy = errors.New("dummy")
 var errDummy = errors.New("dummy")
@@ -22,6 +23,8 @@ func init() {
 }
 }
 
 
 func TestFinish(t *testing.T) {
 func TestFinish(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var total uint32
 	var total uint32
 	err := Finish(func() error {
 	err := Finish(func() error {
 		atomic.AddUint32(&total, 2)
 		atomic.AddUint32(&total, 2)
@@ -39,14 +42,20 @@ func TestFinish(t *testing.T) {
 }
 }
 
 
 func TestFinishNone(t *testing.T) {
 func TestFinishNone(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	assert.Nil(t, Finish())
 	assert.Nil(t, Finish())
 }
 }
 
 
 func TestFinishVoidNone(t *testing.T) {
 func TestFinishVoidNone(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	FinishVoid()
 	FinishVoid()
 }
 }
 
 
 func TestFinishErr(t *testing.T) {
 func TestFinishErr(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var total uint32
 	var total uint32
 	err := Finish(func() error {
 	err := Finish(func() error {
 		atomic.AddUint32(&total, 2)
 		atomic.AddUint32(&total, 2)
@@ -63,6 +72,8 @@ func TestFinishErr(t *testing.T) {
 }
 }
 
 
 func TestFinishVoid(t *testing.T) {
 func TestFinishVoid(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var total uint32
 	var total uint32
 	FinishVoid(func() {
 	FinishVoid(func() {
 		atomic.AddUint32(&total, 2)
 		atomic.AddUint32(&total, 2)
@@ -76,6 +87,8 @@ func TestFinishVoid(t *testing.T) {
 }
 }
 
 
 func TestMap(t *testing.T) {
 func TestMap(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	tests := []struct {
 	tests := []struct {
 		mapper MapFunc
 		mapper MapFunc
 		expect int
 		expect int
@@ -128,6 +141,8 @@ func TestMap(t *testing.T) {
 }
 }
 
 
 func TestMapReduce(t *testing.T) {
 func TestMapReduce(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	tests := []struct {
 	tests := []struct {
 		mapper      MapperFunc
 		mapper      MapperFunc
 		reducer     ReducerFunc
 		reducer     ReducerFunc
@@ -204,6 +219,8 @@ func TestMapReduce(t *testing.T) {
 }
 }
 
 
 func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
 func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	assert.Panics(t, func() {
 	assert.Panics(t, func() {
 		MapReduce(func(source chan<- interface{}) {
 		MapReduce(func(source chan<- interface{}) {
 			for i := 0; i < 10; i++ {
 			for i := 0; i < 10; i++ {
@@ -220,6 +237,8 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
 }
 }
 
 
 func TestMapReduceVoid(t *testing.T) {
 func TestMapReduceVoid(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var value uint32
 	var value uint32
 	tests := []struct {
 	tests := []struct {
 		mapper      MapperFunc
 		mapper      MapperFunc
@@ -296,6 +315,8 @@ func TestMapReduceVoid(t *testing.T) {
 }
 }
 
 
 func TestMapReduceVoidWithDelay(t *testing.T) {
 func TestMapReduceVoidWithDelay(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var result []int
 	var result []int
 	err := MapReduceVoid(func(source chan<- interface{}) {
 	err := MapReduceVoid(func(source chan<- interface{}) {
 		source <- 0
 		source <- 0
@@ -319,6 +340,8 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
 }
 }
 
 
 func TestMapVoid(t *testing.T) {
 func TestMapVoid(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	const tasks = 1000
 	const tasks = 1000
 	var count uint32
 	var count uint32
 	MapVoid(func(source chan<- interface{}) {
 	MapVoid(func(source chan<- interface{}) {
@@ -333,6 +356,8 @@ func TestMapVoid(t *testing.T) {
 }
 }
 
 
 func TestMapReducePanic(t *testing.T) {
 func TestMapReducePanic(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	v, err := MapReduce(func(source chan<- interface{}) {
 	v, err := MapReduce(func(source chan<- interface{}) {
 		source <- 0
 		source <- 0
 		source <- 1
 		source <- 1
@@ -350,6 +375,8 @@ func TestMapReducePanic(t *testing.T) {
 }
 }
 
 
 func TestMapReduceVoidCancel(t *testing.T) {
 func TestMapReduceVoidCancel(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var result []int
 	var result []int
 	err := MapReduceVoid(func(source chan<- interface{}) {
 	err := MapReduceVoid(func(source chan<- interface{}) {
 		source <- 0
 		source <- 0
@@ -371,6 +398,8 @@ func TestMapReduceVoidCancel(t *testing.T) {
 }
 }
 
 
 func TestMapReduceVoidCancelWithRemains(t *testing.T) {
 func TestMapReduceVoidCancelWithRemains(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var done syncx.AtomicBool
 	var done syncx.AtomicBool
 	var result []int
 	var result []int
 	err := MapReduceVoid(func(source chan<- interface{}) {
 	err := MapReduceVoid(func(source chan<- interface{}) {
@@ -396,6 +425,8 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
 }
 }
 
 
 func TestMapReduceWithoutReducerWrite(t *testing.T) {
 func TestMapReduceWithoutReducerWrite(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	uids := []int{1, 2, 3}
 	uids := []int{1, 2, 3}
 	res, err := MapReduce(func(source chan<- interface{}) {
 	res, err := MapReduce(func(source chan<- interface{}) {
 		for _, uid := range uids {
 		for _, uid := range uids {
@@ -412,6 +443,8 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
 }
 }
 
 
 func TestMapReduceVoidPanicInReducer(t *testing.T) {
 func TestMapReduceVoidPanicInReducer(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	const message = "foo"
 	const message = "foo"
 	var done syncx.AtomicBool
 	var done syncx.AtomicBool
 	err := MapReduceVoid(func(source chan<- interface{}) {
 	err := MapReduceVoid(func(source chan<- interface{}) {
@@ -431,6 +464,8 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
 }
 }
 
 
 func TestMapReduceWithContext(t *testing.T) {
 func TestMapReduceWithContext(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
 	var done syncx.AtomicBool
 	var done syncx.AtomicBool
 	var result []int
 	var result []int
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
@@ -452,7 +487,7 @@ func TestMapReduceWithContext(t *testing.T) {
 		}
 		}
 	}, WithContext(ctx))
 	}, WithContext(ctx))
 	assert.NotNil(t, err)
 	assert.NotNil(t, err)
-	assert.Equal(t, ErrReduceNoOutput, err)
+	assert.Equal(t, context.DeadlineExceeded, err)
 }
 }
 
 
 func BenchmarkMapReduce(b *testing.B) {
 func BenchmarkMapReduce(b *testing.B) {

+ 1 - 0
go.mod

@@ -29,6 +29,7 @@ require (
 	go.opentelemetry.io/otel/sdk v1.1.0
 	go.opentelemetry.io/otel/sdk v1.1.0
 	go.opentelemetry.io/otel/trace v1.1.0
 	go.opentelemetry.io/otel/trace v1.1.0
 	go.uber.org/automaxprocs v1.4.0
 	go.uber.org/automaxprocs v1.4.0
+	go.uber.org/goleak v1.1.12 // indirect
 	golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
 	golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
 	golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68 // indirect
 	golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68 // indirect
 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac

+ 3 - 0
go.sum

@@ -410,6 +410,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=
 go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=
 go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
 go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
+go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
+go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
 go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
 go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
@@ -606,6 +608,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=