Selaa lähdekoodia

refactor: optimize fx (#1404)

* refactor: optimize fx

* chore: add more comments

* ci: make test robust
Kevin Wan 3 vuotta sitten
vanhempi
sitoutus
a8e7fafebf
2 muutettua tiedostoa jossa 8 lisäystä ja 5 poistoa
  1. 7 4
      core/fx/stream.go
  2. 1 1
      core/fx/stream_test.go

+ 7 - 4
core/fx/stream.go

@@ -90,7 +90,8 @@ func Range(source <-chan interface{}) Stream {
 func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
 func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
 	for item := range s.source {
 	for item := range s.source {
 		if !predicate(item) {
 		if !predicate(item) {
-			drain(s.source)
+			// make sure the former goroutine not block, and current func returns fast.
+			go drain(s.source)
 			return false
 			return false
 		}
 		}
 	}
 	}
@@ -104,7 +105,8 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
 func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
 func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
 	for item := range s.source {
 	for item := range s.source {
 		if predicate(item) {
 		if predicate(item) {
-			drain(s.source)
+			// make sure the former goroutine not block, and current func returns fast.
+			go drain(s.source)
 			return true
 			return true
 		}
 		}
 	}
 	}
@@ -215,7 +217,7 @@ func (s Stream) First() interface{} {
 func (s Stream) ForAll(fn ForAllFunc) {
 func (s Stream) ForAll(fn ForAllFunc) {
 	fn(s.source)
 	fn(s.source)
 	// avoid goroutine leak on fn not consuming all items.
 	// avoid goroutine leak on fn not consuming all items.
-	drain(s.source)
+	go drain(s.source)
 }
 }
 
 
 // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
 // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
@@ -310,7 +312,8 @@ func (s Stream) Merge() Stream {
 func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool {
 func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool {
 	for item := range s.source {
 	for item := range s.source {
 		if predicate(item) {
 		if predicate(item) {
-			drain(s.source)
+			// make sure the former goroutine not block, and current func returns fast.
+			go drain(s.source)
 			return false
 			return false
 		}
 		}
 	}
 	}

+ 1 - 1
core/fx/stream_test.go

@@ -567,5 +567,5 @@ func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
 	fn(t)
 	fn(t)
 	// let scheduler schedule first
 	// let scheduler schedule first
 	time.Sleep(time.Millisecond)
 	time.Sleep(time.Millisecond)
-	assert.Equal(t, goroutines, runtime.NumGoroutine())
+	assert.True(t, runtime.NumGoroutine() <= goroutines)
 }
 }