Kevin Wan 4 лет назад
Родитель
Сommit
a87978568a
1 измененных файлов с 3 добавлено и 2 удалено
  1. 3 2
      core/mr/mapreduce.go

+ 3 - 2
core/mr/mapreduce.go

@@ -136,14 +136,16 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 
 	go func() {
 		defer func() {
+			drain(collector)
+
 			if r := recover(); r != nil {
 				cancel(fmt.Errorf("%v", r))
 			} else {
 				finish()
 			}
 		}()
+
 		reducer(collector, writer, cancel)
-		drain(collector)
 	}()
 
 	go executeMappers(func(item interface{}, w Writer) {
@@ -165,7 +167,6 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
 	_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
 		reducer(input, cancel)
-		drain(input)
 		// We need to write a placeholder to let MapReduce to continue on reducer done,
 		// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
 		writer.Write(lang.Placeholder)