Sfoglia il codice sorgente

simplify mapreduce code

kevin 4 anni fa
parent
commit
df37597ac3
2 ha cambiato i file con 6 aggiunte e 16 eliminazioni
  1. 5 16
      core/mr/mapreduce.go
  2. 1 0
      readme.md

+ 5 - 16
core/mr/mapreduce.go

@@ -79,7 +79,7 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}
 	collector := make(chan interface{}, options.workers)
 	done := syncx.NewDoneChan()
 
-	go mapDispatcher(mapper, source, collector, done.Done(), options.workers)
+	go executeMappers(mapper, source, collector, done.Done(), options.workers)
 
 	return collector
 }
@@ -126,7 +126,10 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 		reducer(collector, writer, cancel)
 		drain(collector)
 	}()
-	go mapperDispatcher(mapper, source, collector, done.Done(), cancel, options.workers)
+
+	go executeMappers(func(item interface{}, writer Writer) {
+		mapper(item, writer, cancel)
+	}, source, collector, done.Done(), options.workers)
 
 	value, ok := <-output
 	if err := retErr.Load(); err != nil {
@@ -226,20 +229,6 @@ func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- i
 	}
 }
 
-func mapDispatcher(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
-	done <-chan lang.PlaceholderType, workers int) {
-	executeMappers(func(item interface{}, writer Writer) {
-		mapper(item, writer)
-	}, input, collector, done, workers)
-}
-
-func mapperDispatcher(mapper MapperFunc, input <-chan interface{}, collector chan<- interface{},
-	done <-chan lang.PlaceholderType, cancel func(error), workers int) {
-	executeMappers(func(item interface{}, writer Writer) {
-		mapper(item, writer, cancel)
-	}, input, collector, done, workers)
-}
-
 func newOptions() *mapReduceOptions {
 	return &mapReduceOptions{
 		workers: defaultWorkers,

+ 1 - 0
readme.md

@@ -157,6 +157,7 @@ go get -u github.com/tal-tech/go-zero
 * [通过MapReduce降低服务响应时间](doc/mapreduce.md)
 * [关键字替换和敏感词过滤工具](doc/keywords.md)
 * [进程内缓存使用方法](doc/collection.md)
+* [防止缓存击穿之进程内共享调用](doc/sharedcalls.md)
 * [基于prometheus的微服务指标监控](doc/metric.md)
 * [文本序列化和反序列化](doc/mapping.md)