|
@@ -16,7 +16,10 @@ const (
|
|
|
minWorkers = 1
|
|
|
)
|
|
|
|
|
|
-var ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
|
|
|
+var (
|
|
|
+ ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
|
|
|
+ ErrReduceNoOutput = errors.New("reduce not writing value")
|
|
|
+)
|
|
|
|
|
|
type (
|
|
|
GenerateFunc func(source chan<- interface{})
|
|
@@ -93,7 +96,14 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|
|
collector := make(chan interface{}, options.workers)
|
|
|
done := syncx.NewDoneChan()
|
|
|
writer := newGuardedWriter(output, done.Done())
|
|
|
+ var closeOnce sync.Once
|
|
|
var retErr errorx.AtomicError
|
|
|
+ finish := func() {
|
|
|
+ closeOnce.Do(func() {
|
|
|
+ done.Close()
|
|
|
+ close(output)
|
|
|
+ })
|
|
|
+ }
|
|
|
cancel := once(func(err error) {
|
|
|
if err != nil {
|
|
|
retErr.Set(err)
|
|
@@ -102,14 +112,15 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|
|
}
|
|
|
|
|
|
drain(source)
|
|
|
- done.Close()
|
|
|
- close(output)
|
|
|
+ finish()
|
|
|
})
|
|
|
|
|
|
go func() {
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
cancel(fmt.Errorf("%v", r))
|
|
|
+ } else {
|
|
|
+ finish()
|
|
|
}
|
|
|
}()
|
|
|
reducer(collector, writer, cancel)
|
|
@@ -122,7 +133,7 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|
|
} else if ok {
|
|
|
return value, nil
|
|
|
} else {
|
|
|
- return nil, nil
|
|
|
+ return nil, ErrReduceNoOutput
|
|
|
}
|
|
|
}
|
|
|
|