浏览代码

refactor fx (#759)

* refactor fx

* refactor fx, format code
Kevin Wan 3 年之前
父节点
当前提交
8520db4fd9
共有 2 个文件被更改,包括 127 次插入163 次删除
  1. 122 144
      core/fx/stream.go
  2. 5 19
      core/fx/stream_test.go

+ 122 - 144
core/fx/stream.go

@@ -49,19 +49,9 @@ type (
 	}
 )
 
-// empty a empty Stream.
-var empty Stream
-
-func init() {
-	// initial empty Stream.
-	source := make(chan interface{})
-	close(source)
-	empty.source = source
-}
-
-// Empty Returns a empty stream.
-func Empty() Stream {
-	return empty
+// Concat returns a concatenated Stream.
+func Concat(s Stream, others ...Stream) Stream {
+	return s.Concat(others...)
 }
 
 // From constructs a Stream from the given GenerateFunc.
@@ -94,21 +84,42 @@ func Range(source <-chan interface{}) Stream {
 	}
 }
 
-// Concat Returns a concat Stream.
-func Concat(a Stream, others ...Stream) Stream {
-	return a.Concat(others...)
+// AllMach returns whether all elements of this stream match the provided predicate.
+// May not evaluate the predicate on all elements if not necessary for determining the result.
+// If the stream is empty then true is returned and the predicate is not evaluated.
+func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
+	for item := range s.source {
+		if !predicate(item) {
+			return false
+		}
+	}
+
+	return true
+}
+
+// AnyMach returns whether any elements of this stream match the provided predicate.
+// May not evaluate the predicate on all elements if not necessary for determining the result.
+// If the stream is empty then false is returned and the predicate is not evaluated.
+func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
+	for item := range s.source {
+		if predicate(item) {
+			return true
+		}
+	}
+
+	return false
 }
 
 // Buffer buffers the items into a queue with size n.
 // It can balance the producer and the consumer if their processing throughput don't match.
-func (p Stream) Buffer(n int) Stream {
+func (s Stream) Buffer(n int) Stream {
 	if n < 0 {
 		n = 0
 	}
 
 	source := make(chan interface{}, n)
 	go func() {
-		for item := range p.source {
+		for item := range s.source {
 			source <- item
 		}
 		close(source)
@@ -117,23 +128,51 @@ func (p Stream) Buffer(n int) Stream {
 	return Range(source)
 }
 
+// Concat returns a Stream that concatenated other streams
+func (s Stream) Concat(others ...Stream) Stream {
+	source := make(chan interface{})
+
+	go func() {
+		group := threading.NewRoutineGroup()
+		group.Run(func() {
+			for item := range s.source {
+				source <- item
+			}
+		})
+
+		for _, each := range others {
+			each := each
+			group.Run(func() {
+				for item := range each.source {
+					source <- item
+				}
+			})
+		}
+
+		group.Wait()
+		close(source)
+	}()
+
+	return Range(source)
+}
+
 // Count counts the number of elements in the result.
-func (p Stream) Count() (count int) {
-	for range p.source {
+func (s Stream) Count() (count int) {
+	for range s.source {
 		count++
 	}
 	return
 }
 
 // Distinct removes the duplicated items base on the given KeyFunc.
-func (p Stream) Distinct(fn KeyFunc) Stream {
+func (s Stream) Distinct(fn KeyFunc) Stream {
 	source := make(chan interface{})
 
 	threading.GoSafe(func() {
 		defer close(source)
 
 		keys := make(map[interface{}]lang.PlaceholderType)
-		for item := range p.source {
+		for item := range s.source {
 			key := fn(item)
 			if _, ok := keys[key]; !ok {
 				source <- item
@@ -146,14 +185,14 @@ func (p Stream) Distinct(fn KeyFunc) Stream {
 }
 
 // Done waits all upstreaming operations to be done.
-func (p Stream) Done() {
-	for range p.source {
+func (s Stream) Done() {
+	for range s.source {
 	}
 }
 
 // Filter filters the items by the given FilterFunc.
-func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
-	return p.Walk(func(item interface{}, pipe chan<- interface{}) {
+func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
+	return s.Walk(func(item interface{}, pipe chan<- interface{}) {
 		if fn(item) {
 			pipe <- item
 		}
@@ -161,21 +200,21 @@ func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
 }
 
 // ForAll handles the streaming elements from the source and no later streams.
-func (p Stream) ForAll(fn ForAllFunc) {
-	fn(p.source)
+func (s Stream) ForAll(fn ForAllFunc) {
+	fn(s.source)
 }
 
 // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
-func (p Stream) ForEach(fn ForEachFunc) {
-	for item := range p.source {
+func (s Stream) ForEach(fn ForEachFunc) {
+	for item := range s.source {
 		fn(item)
 	}
 }
 
 // Group groups the elements into different groups based on their keys.
-func (p Stream) Group(fn KeyFunc) Stream {
+func (s Stream) Group(fn KeyFunc) Stream {
 	groups := make(map[interface{}][]interface{})
-	for item := range p.source {
+	for item := range s.source {
 		key := fn(item)
 		groups[key] = append(groups[key], item)
 	}
@@ -192,7 +231,7 @@ func (p Stream) Group(fn KeyFunc) Stream {
 }
 
 // Head returns the first n elements in p.
-func (p Stream) Head(n int64) Stream {
+func (s Stream) Head(n int64) Stream {
 	if n < 1 {
 		panic("n must be greater than 0")
 	}
@@ -200,7 +239,7 @@ func (p Stream) Head(n int64) Stream {
 	source := make(chan interface{})
 
 	go func() {
-		for item := range p.source {
+		for item := range s.source {
 			n--
 			if n >= 0 {
 				source <- item
@@ -221,16 +260,16 @@ func (p Stream) Head(n int64) Stream {
 }
 
 // Map converts each item to another corresponding item, which means it's a 1:1 model.
-func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
-	return p.Walk(func(item interface{}, pipe chan<- interface{}) {
+func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
+	return s.Walk(func(item interface{}, pipe chan<- interface{}) {
 		pipe <- fn(item)
 	}, opts...)
 }
 
 // Merge merges all the items into a slice and generates a new stream.
-func (p Stream) Merge() Stream {
+func (s Stream) Merge() Stream {
 	var items []interface{}
-	for item := range p.source {
+	for item := range s.source {
 		items = append(items, item)
 	}
 
@@ -242,21 +281,21 @@ func (p Stream) Merge() Stream {
 }
 
 // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
-func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
-	p.Walk(func(item interface{}, pipe chan<- interface{}) {
+func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
+	s.Walk(func(item interface{}, pipe chan<- interface{}) {
 		fn(item)
 	}, opts...).Done()
 }
 
 // Reduce is a utility method to let the caller deal with the underlying channel.
-func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
-	return fn(p.source)
+func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) {
+	return fn(s.source)
 }
 
 // Reverse reverses the elements in the stream.
-func (p Stream) Reverse() Stream {
+func (s Stream) Reverse() Stream {
 	var items []interface{}
-	for item := range p.source {
+	for item := range s.source {
 		items = append(items, item)
 	}
 	// reverse, official method
@@ -268,10 +307,36 @@ func (p Stream) Reverse() Stream {
 	return Just(items...)
 }
 
+// Skip returns a Stream that skips size elements.
+func (s Stream) Skip(n int64) Stream {
+	if n < 0 {
+		panic("n must not be negative")
+	}
+	if n == 0 {
+		return s
+	}
+
+	source := make(chan interface{})
+
+	go func() {
+		for item := range s.source {
+			n--
+			if n >= 0 {
+				continue
+			} else {
+				source <- item
+			}
+		}
+		close(source)
+	}()
+
+	return Range(source)
+}
+
 // Sort sorts the items from the underlying source.
-func (p Stream) Sort(less LessFunc) Stream {
+func (s Stream) Sort(less LessFunc) Stream {
 	var items []interface{}
-	for item := range p.source {
+	for item := range s.source {
 		items = append(items, item)
 	}
 	sort.Slice(items, func(i, j int) bool {
@@ -283,7 +348,7 @@ func (p Stream) Sort(less LessFunc) Stream {
 
 // Split splits the elements into chunk with size up to n,
 // might be less than n on tailing elements.
-func (p Stream) Split(n int) Stream {
+func (s Stream) Split(n int) Stream {
 	if n < 1 {
 		panic("n should be greater than 0")
 	}
@@ -291,7 +356,7 @@ func (p Stream) Split(n int) Stream {
 	source := make(chan interface{})
 	go func() {
 		var chunk []interface{}
-		for item := range p.source {
+		for item := range s.source {
 			chunk = append(chunk, item)
 			if len(chunk) == n {
 				source <- chunk
@@ -308,7 +373,7 @@ func (p Stream) Split(n int) Stream {
 }
 
 // Tail returns the last n elements in p.
-func (p Stream) Tail(n int64) Stream {
+func (s Stream) Tail(n int64) Stream {
 	if n < 1 {
 		panic("n should be greater than 0")
 	}
@@ -317,7 +382,7 @@ func (p Stream) Tail(n int64) Stream {
 
 	go func() {
 		ring := collection.NewRing(int(n))
-		for item := range p.source {
+		for item := range s.source {
 			ring.Add(item)
 		}
 		for _, item := range ring.Take() {
@@ -330,16 +395,16 @@ func (p Stream) Tail(n int64) Stream {
 }
 
 // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
-func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
+func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {
 	option := buildOptions(opts...)
 	if option.unlimitedWorkers {
-		return p.walkUnlimited(fn, option)
+		return s.walkUnlimited(fn, option)
 	}
 
-	return p.walkLimited(fn, option)
+	return s.walkLimited(fn, option)
 }
 
-func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
+func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
 	pipe := make(chan interface{}, option.workers)
 
 	go func() {
@@ -348,7 +413,7 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
 
 		for {
 			pool <- lang.Placeholder
-			item, ok := <-p.source
+			item, ok := <-s.source
 			if !ok {
 				<-pool
 				break
@@ -373,14 +438,14 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
 	return Range(pipe)
 }
 
-func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
+func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
 	pipe := make(chan interface{}, defaultWorkers)
 
 	go func() {
 		var wg sync.WaitGroup
 
 		for {
-			item, ok := <-p.source
+			item, ok := <-s.source
 			if !ok {
 				break
 			}
@@ -400,93 +465,6 @@ func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
 	return Range(pipe)
 }
 
-// AnyMach Returns whether any elements of this stream match the provided predicate.
-// May not evaluate the predicate on all elements if not necessary for determining the result.
-// If the stream is empty then false is returned and the predicate is not evaluated.
-func (p Stream) AnyMach(f func(item interface{}) bool) (isFind bool) {
-	for item := range p.source {
-		if f(item) {
-			isFind = true
-			return
-		}
-	}
-	return
-}
-
-// AllMach Returns whether all elements of this stream match the provided predicate.
-// May not evaluate the predicate on all elements if not necessary for determining the result.
-// If the stream is empty then true is returned and the predicate is not evaluated.
-func (p Stream) AllMach(f func(item interface{}) bool) (isFind bool) {
-	isFind = true
-	for item := range p.source {
-		if !f(item) {
-			isFind = false
-			return
-		}
-	}
-	return
-}
-
-// Concat Returns a Stream that concat others streams
-func (p Stream) Concat(others ...Stream) Stream {
-	source := make(chan interface{})
-	wg := sync.WaitGroup{}
-	for _, other := range others {
-		if p == other {
-			continue
-		}
-		wg.Add(1)
-		go func(iother Stream) {
-			for item := range iother.source {
-				source <- item
-			}
-			wg.Done()
-		}(other)
-
-	}
-
-	wg.Add(1)
-	go func() {
-		for item := range p.source {
-			source <- item
-		}
-		wg.Done()
-	}()
-	go func() {
-		wg.Wait()
-		close(source)
-	}()
-	return Range(source)
-}
-
-// Skip Returns a Stream that skips size elements.
-func (p Stream) Skip(size int64) Stream {
-	if size == 0 {
-		return p
-	}
-	if size < 0 {
-		panic("size must be greater than -1")
-	}
-	source := make(chan interface{})
-
-	go func() {
-		i := 0
-		for item := range p.source {
-			if i >= int(size) {
-				source <- item
-			}
-			i++
-		}
-		close(source)
-	}()
-	return Range(source)
-}
-
-// Chan Returns a channel of Stream.
-func (p Stream) Chan() <-chan interface{} {
-	return p.source
-}
-
 // UnlimitedWorkers lets the caller to use as many workers as the tasks.
 func UnlimitedWorkers() Option {
 	return func(opts *rxOptions) {

+ 5 - 19
core/fx/stream_test.go

@@ -353,9 +353,7 @@ func BenchmarkParallelMapReduce(b *testing.B) {
 				input <- int64(rand.Int())
 			}
 		})
-
 	}).Map(mapper).Reduce(reducer)
-
 }
 
 func BenchmarkMapReduce(b *testing.B) {
@@ -377,7 +375,6 @@ func BenchmarkMapReduce(b *testing.B) {
 			input <- int64(rand.Int())
 		}
 	}).Map(mapper).Reduce(reducer)
-
 }
 
 func equal(t *testing.T, stream Stream, data []interface{}) {
@@ -389,12 +386,13 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
 		t.Errorf(" %v, want %v", items, data)
 	}
 }
-func assetEqual(t *testing.T, except interface{}, data interface{}) {
+
+func assetEqual(t *testing.T, except, data interface{}) {
 	if !reflect.DeepEqual(except, data) {
 		t.Errorf(" %v, want %v", data, except)
 	}
-
 }
+
 func TestStream_AnyMach(t *testing.T) {
 	assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
 		return 4 == item.(int)
@@ -409,6 +407,7 @@ func TestStream_AnyMach(t *testing.T) {
 		return 2 == item.(int)
 	}))
 }
+
 func TestStream_AllMach(t *testing.T) {
 	assetEqual(
 		t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
@@ -426,11 +425,7 @@ func TestStream_AllMach(t *testing.T) {
 		}),
 	)
 }
-func TestEmpty(t *testing.T) {
-	empty := Empty()
-	assetEqual(t, len(empty.source), 0)
-	assetEqual(t, cap(empty.source), 0)
-}
+
 func TestConcat(t *testing.T) {
 	a1 := []interface{}{1, 2, 3}
 	a2 := []interface{}{4, 5, 6}
@@ -448,15 +443,6 @@ func TestConcat(t *testing.T) {
 	ints = append(ints, a1...)
 	ints = append(ints, a2...)
 	assetEqual(t, ints, items)
-
-}
-func TestStream_Chan(t *testing.T) {
-	var items []interface{}
-
-	for item := range Just(1, 2, 3).Chan() {
-		items = append(items, item)
-	}
-	assetEqual(t, items, []interface{}{1, 2, 3})
 }
 
 func TestStream_Skip(t *testing.T) {