|
@@ -17,28 +17,42 @@ const (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
|
|
|
|
|
|
+ // ErrCancelWithNil is an error that mapreduce was cancelled with nil.
|
|
|
|
+ ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
|
|
|
|
+ // ErrReduceNoOutput is an error that reduce did not output a value.
|
|
ErrReduceNoOutput = errors.New("reduce not writing value")
|
|
ErrReduceNoOutput = errors.New("reduce not writing value")
|
|
)
|
|
)
|
|
|
|
|
|
type (
|
|
type (
|
|
- GenerateFunc func(source chan<- interface{})
|
|
|
|
- MapFunc func(item interface{}, writer Writer)
|
|
|
|
- VoidMapFunc func(item interface{})
|
|
|
|
- MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
|
|
|
- ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
|
|
|
|
|
|
+ // GenerateFunc is used to let callers send elements into source.
|
|
|
|
+ GenerateFunc func(source chan<- interface{})
|
|
|
|
+ // MapFunc is used to do element processing and write the output to writer.
|
|
|
|
+ MapFunc func(item interface{}, writer Writer)
|
|
|
|
+ // VoidMapFunc is used to do element processing, but no output.
|
|
|
|
+ VoidMapFunc func(item interface{})
|
|
|
|
+ // MapperFunc is used to do element processing and write the output to writer,
|
|
|
|
+ // use cancel func to cancel the processing.
|
|
|
|
+ MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
|
|
|
+ // ReducerFunc is used to reduce all the mapping output and write to writer,
|
|
|
|
+ // use cancel func to cancel the processing.
|
|
|
|
+ ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
|
|
|
|
+ // VoidReducerFunc is used to reduce all the mapping output, but no output.
|
|
|
|
+ // Use cancel func to cancel the processing.
|
|
VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
|
|
VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
|
|
- Option func(opts *mapReduceOptions)
|
|
|
|
|
|
+ // Option defines the method to customize the mapreduce.
|
|
|
|
+ Option func(opts *mapReduceOptions)
|
|
|
|
|
|
mapReduceOptions struct {
|
|
mapReduceOptions struct {
|
|
workers int
|
|
workers int
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Writer interface wraps Write method.
|
|
Writer interface {
|
|
Writer interface {
|
|
Write(v interface{})
|
|
Write(v interface{})
|
|
}
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// Finish runs fns parallelly, cancelled on any error.
|
|
func Finish(fns ...func() error) error {
|
|
func Finish(fns ...func() error) error {
|
|
if len(fns) == 0 {
|
|
if len(fns) == 0 {
|
|
return nil
|
|
return nil
|
|
@@ -58,6 +72,7 @@ func Finish(fns ...func() error) error {
|
|
}, WithWorkers(len(fns)))
|
|
}, WithWorkers(len(fns)))
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// FinishVoid runs fns parallelly.
|
|
func FinishVoid(fns ...func()) {
|
|
func FinishVoid(fns ...func()) {
|
|
if len(fns) == 0 {
|
|
if len(fns) == 0 {
|
|
return
|
|
return
|
|
@@ -73,6 +88,7 @@ func FinishVoid(fns ...func()) {
|
|
}, WithWorkers(len(fns)))
|
|
}, WithWorkers(len(fns)))
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// Map maps all elements generated from given generate func, and returns an output channel.
|
|
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
|
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
|
options := buildOptions(opts...)
|
|
options := buildOptions(opts...)
|
|
source := buildSource(generate)
|
|
source := buildSource(generate)
|
|
@@ -84,11 +100,14 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}
|
|
return collector
|
|
return collector
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// MapReduce maps all elements generated from given generate func,
|
|
|
|
+// and reduces the output elemenets with given reducer.
|
|
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
|
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
|
source := buildSource(generate)
|
|
source := buildSource(generate)
|
|
return MapReduceWithSource(source, mapper, reducer, opts...)
|
|
return MapReduceWithSource(source, mapper, reducer, opts...)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
|
|
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
|
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
|
opts ...Option) (interface{}, error) {
|
|
opts ...Option) (interface{}, error) {
|
|
options := buildOptions(opts...)
|
|
options := buildOptions(opts...)
|
|
@@ -141,8 +160,10 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
|
|
|
- _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
|
|
|
|
|
+// MapReduceVoid maps all elements generated from given generate,
|
|
|
|
+// and reduce the output elements with given reducer.
|
|
|
|
+func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
|
|
|
+ _, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
|
reducer(input, cancel)
|
|
reducer(input, cancel)
|
|
drain(input)
|
|
drain(input)
|
|
// We need to write a placeholder to let MapReduce to continue on reducer done,
|
|
// We need to write a placeholder to let MapReduce to continue on reducer done,
|
|
@@ -152,12 +173,14 @@ func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReduce
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// MapVoid maps all elements from given generate but no output.
|
|
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
|
|
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
|
|
drain(Map(generate, func(item interface{}, writer Writer) {
|
|
drain(Map(generate, func(item interface{}, writer Writer) {
|
|
mapper(item)
|
|
mapper(item)
|
|
}, opts...))
|
|
}, opts...))
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// WithWorkers customizes a mapreduce processing with given workers.
|
|
func WithWorkers(workers int) Option {
|
|
func WithWorkers(workers int) Option {
|
|
return func(opts *mapReduceOptions) {
|
|
return func(opts *mapReduceOptions) {
|
|
if workers < minWorkers {
|
|
if workers < minWorkers {
|