Kevin Wan 3 年之前
父節點
當前提交
d0f9e57022
共有 2 個文件被更改,包括 22 次插入0 次删除
  1. 6 0
      core/mr/mapreduce.go
  2. 16 0
      core/mr/mapreduce_test.go

+ 6 - 0
core/mr/mapreduce.go

@@ -112,6 +112,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 	opts ...Option) (interface{}, error) {
 	options := buildOptions(opts...)
 	output := make(chan interface{})
+	defer func() {
+		for range output {
+			panic("more than one element written in reducer")
+		}
+	}()
+
 	collector := make(chan interface{}, options.workers)
 	done := syncx.NewDoneChan()
 	writer := newGuardedWriter(output, done.Done())

+ 16 - 0
core/mr/mapreduce_test.go

@@ -202,6 +202,22 @@ func TestMapReduce(t *testing.T) {
 	}
 }
 
+func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
+	assert.Panics(t, func() {
+		MapReduce(func(source chan<- interface{}) {
+			for i := 0; i < 10; i++ {
+				source <- i
+			}
+		}, func(item interface{}, writer Writer, cancel func(error)) {
+			writer.Write(item)
+		}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
+			drain(pipe)
+			writer.Write("one")
+			writer.Write("two")
+		})
+	})
+}
+
 func TestMapReduceVoid(t *testing.T) {
 	var value uint32
 	tests := []struct {