stream_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. package fx
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "math/rand"
  6. "reflect"
  7. "runtime"
  8. "sort"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/tal-tech/go-zero/core/stringx"
  15. )
  16. func TestBuffer(t *testing.T) {
  17. runCheckedTest(t, func(t *testing.T) {
  18. const N = 5
  19. var count int32
  20. var wait sync.WaitGroup
  21. wait.Add(1)
  22. From(func(source chan<- interface{}) {
  23. ticker := time.NewTicker(10 * time.Millisecond)
  24. defer ticker.Stop()
  25. for i := 0; i < 2*N; i++ {
  26. select {
  27. case source <- i:
  28. atomic.AddInt32(&count, 1)
  29. case <-ticker.C:
  30. wait.Done()
  31. return
  32. }
  33. }
  34. }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
  35. wait.Wait()
  36. // why N+1, because take one more to wait for sending into the channel
  37. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  38. })
  39. })
  40. }
  41. func TestBufferNegative(t *testing.T) {
  42. runCheckedTest(t, func(t *testing.T) {
  43. var result int
  44. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  45. for item := range pipe {
  46. result += item.(int)
  47. }
  48. return result, nil
  49. })
  50. assert.Equal(t, 10, result)
  51. })
  52. }
  53. func TestCount(t *testing.T) {
  54. runCheckedTest(t, func(t *testing.T) {
  55. tests := []struct {
  56. name string
  57. elements []interface{}
  58. }{
  59. {
  60. name: "no elements with nil",
  61. },
  62. {
  63. name: "no elements",
  64. elements: []interface{}{},
  65. },
  66. {
  67. name: "1 element",
  68. elements: []interface{}{1},
  69. },
  70. {
  71. name: "multiple elements",
  72. elements: []interface{}{1, 2, 3},
  73. },
  74. }
  75. for _, test := range tests {
  76. t.Run(test.name, func(t *testing.T) {
  77. val := Just(test.elements...).Count()
  78. assert.Equal(t, len(test.elements), val)
  79. })
  80. }
  81. })
  82. }
  83. func TestDone(t *testing.T) {
  84. runCheckedTest(t, func(t *testing.T) {
  85. var count int32
  86. Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
  87. time.Sleep(time.Millisecond * 100)
  88. atomic.AddInt32(&count, int32(item.(int)))
  89. }).Done()
  90. assert.Equal(t, int32(6), count)
  91. })
  92. }
  93. func TestJust(t *testing.T) {
  94. runCheckedTest(t, func(t *testing.T) {
  95. var result int
  96. Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  97. for item := range pipe {
  98. result += item.(int)
  99. }
  100. return result, nil
  101. })
  102. assert.Equal(t, 10, result)
  103. })
  104. }
  105. func TestDistinct(t *testing.T) {
  106. runCheckedTest(t, func(t *testing.T) {
  107. var result int
  108. Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
  109. return item
  110. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  111. for item := range pipe {
  112. result += item.(int)
  113. }
  114. return result, nil
  115. })
  116. assert.Equal(t, 10, result)
  117. })
  118. }
  119. func TestFilter(t *testing.T) {
  120. runCheckedTest(t, func(t *testing.T) {
  121. var result int
  122. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  123. return item.(int)%2 == 0
  124. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  125. for item := range pipe {
  126. result += item.(int)
  127. }
  128. return result, nil
  129. })
  130. assert.Equal(t, 6, result)
  131. })
  132. }
  133. func TestFirst(t *testing.T) {
  134. runCheckedTest(t, func(t *testing.T) {
  135. assert.Nil(t, Just().First())
  136. assert.Equal(t, "foo", Just("foo").First())
  137. assert.Equal(t, "foo", Just("foo", "bar").First())
  138. })
  139. }
  140. func TestForAll(t *testing.T) {
  141. runCheckedTest(t, func(t *testing.T) {
  142. var result int
  143. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  144. return item.(int)%2 == 0
  145. }).ForAll(func(pipe <-chan interface{}) {
  146. for item := range pipe {
  147. result += item.(int)
  148. }
  149. })
  150. assert.Equal(t, 6, result)
  151. })
  152. }
  153. func TestGroup(t *testing.T) {
  154. runCheckedTest(t, func(t *testing.T) {
  155. var groups [][]int
  156. Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
  157. v := item.(int)
  158. return v / 10
  159. }).ForEach(func(item interface{}) {
  160. v := item.([]interface{})
  161. var group []int
  162. for _, each := range v {
  163. group = append(group, each.(int))
  164. }
  165. groups = append(groups, group)
  166. })
  167. assert.Equal(t, 2, len(groups))
  168. for _, group := range groups {
  169. assert.Equal(t, 2, len(group))
  170. assert.True(t, group[0]/10 == group[1]/10)
  171. }
  172. })
  173. }
  174. func TestHead(t *testing.T) {
  175. runCheckedTest(t, func(t *testing.T) {
  176. var result int
  177. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  178. for item := range pipe {
  179. result += item.(int)
  180. }
  181. return result, nil
  182. })
  183. assert.Equal(t, 3, result)
  184. })
  185. }
  186. func TestHeadZero(t *testing.T) {
  187. runCheckedTest(t, func(t *testing.T) {
  188. assert.Panics(t, func() {
  189. Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  190. return nil, nil
  191. })
  192. })
  193. })
  194. }
  195. func TestHeadMore(t *testing.T) {
  196. runCheckedTest(t, func(t *testing.T) {
  197. var result int
  198. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  199. for item := range pipe {
  200. result += item.(int)
  201. }
  202. return result, nil
  203. })
  204. assert.Equal(t, 10, result)
  205. })
  206. }
  207. func TestLast(t *testing.T) {
  208. runCheckedTest(t, func(t *testing.T) {
  209. goroutines := runtime.NumGoroutine()
  210. assert.Nil(t, Just().Last())
  211. assert.Equal(t, "foo", Just("foo").Last())
  212. assert.Equal(t, "bar", Just("foo", "bar").Last())
  213. // let scheduler schedule first
  214. runtime.Gosched()
  215. assert.Equal(t, goroutines, runtime.NumGoroutine())
  216. })
  217. }
  218. func TestMap(t *testing.T) {
  219. runCheckedTest(t, func(t *testing.T) {
  220. log.SetOutput(ioutil.Discard)
  221. tests := []struct {
  222. mapper MapFunc
  223. expect int
  224. }{
  225. {
  226. mapper: func(item interface{}) interface{} {
  227. v := item.(int)
  228. return v * v
  229. },
  230. expect: 30,
  231. },
  232. {
  233. mapper: func(item interface{}) interface{} {
  234. v := item.(int)
  235. if v%2 == 0 {
  236. return 0
  237. }
  238. return v * v
  239. },
  240. expect: 10,
  241. },
  242. {
  243. mapper: func(item interface{}) interface{} {
  244. v := item.(int)
  245. if v%2 == 0 {
  246. panic(v)
  247. }
  248. return v * v
  249. },
  250. expect: 10,
  251. },
  252. }
  253. // Map(...) works even WithWorkers(0)
  254. for i, test := range tests {
  255. t.Run(stringx.Rand(), func(t *testing.T) {
  256. var result int
  257. var workers int
  258. if i%2 == 0 {
  259. workers = 0
  260. } else {
  261. workers = runtime.NumCPU()
  262. }
  263. From(func(source chan<- interface{}) {
  264. for i := 1; i < 5; i++ {
  265. source <- i
  266. }
  267. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  268. func(pipe <-chan interface{}) (interface{}, error) {
  269. for item := range pipe {
  270. result += item.(int)
  271. }
  272. return result, nil
  273. })
  274. assert.Equal(t, test.expect, result)
  275. })
  276. }
  277. })
  278. }
  279. func TestMerge(t *testing.T) {
  280. runCheckedTest(t, func(t *testing.T) {
  281. Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
  282. assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
  283. })
  284. })
  285. }
  286. func TestParallelJust(t *testing.T) {
  287. runCheckedTest(t, func(t *testing.T) {
  288. var count int32
  289. Just(1, 2, 3).Parallel(func(item interface{}) {
  290. time.Sleep(time.Millisecond * 100)
  291. atomic.AddInt32(&count, int32(item.(int)))
  292. }, UnlimitedWorkers())
  293. assert.Equal(t, int32(6), count)
  294. })
  295. }
  296. func TestReverse(t *testing.T) {
  297. runCheckedTest(t, func(t *testing.T) {
  298. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
  299. assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
  300. })
  301. })
  302. }
  303. func TestSort(t *testing.T) {
  304. runCheckedTest(t, func(t *testing.T) {
  305. var prev int
  306. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
  307. return a.(int) < b.(int)
  308. }).ForEach(func(item interface{}) {
  309. next := item.(int)
  310. assert.True(t, prev < next)
  311. prev = next
  312. })
  313. })
  314. }
  315. func TestSplit(t *testing.T) {
  316. runCheckedTest(t, func(t *testing.T) {
  317. assert.Panics(t, func() {
  318. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
  319. })
  320. var chunks [][]interface{}
  321. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
  322. chunk := item.([]interface{})
  323. chunks = append(chunks, chunk)
  324. })
  325. assert.EqualValues(t, [][]interface{}{
  326. {1, 2, 3, 4},
  327. {5, 6, 7, 8},
  328. {9, 10},
  329. }, chunks)
  330. })
  331. }
  332. func TestTail(t *testing.T) {
  333. runCheckedTest(t, func(t *testing.T) {
  334. var result int
  335. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  336. for item := range pipe {
  337. result += item.(int)
  338. }
  339. return result, nil
  340. })
  341. assert.Equal(t, 7, result)
  342. })
  343. }
  344. func TestTailZero(t *testing.T) {
  345. runCheckedTest(t, func(t *testing.T) {
  346. assert.Panics(t, func() {
  347. Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  348. return nil, nil
  349. })
  350. })
  351. })
  352. }
  353. func TestWalk(t *testing.T) {
  354. runCheckedTest(t, func(t *testing.T) {
  355. var result int
  356. Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
  357. if item.(int)%2 != 0 {
  358. pipe <- item
  359. }
  360. }, UnlimitedWorkers()).ForEach(func(item interface{}) {
  361. result += item.(int)
  362. })
  363. assert.Equal(t, 9, result)
  364. })
  365. }
  366. func TestStream_AnyMach(t *testing.T) {
  367. runCheckedTest(t, func(t *testing.T) {
  368. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  369. return item.(int) == 4
  370. }))
  371. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  372. return item.(int) == 0
  373. }))
  374. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  375. return item.(int) == 2
  376. }))
  377. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  378. return item.(int) == 2
  379. }))
  380. })
  381. }
  382. func TestStream_AllMach(t *testing.T) {
  383. runCheckedTest(t, func(t *testing.T) {
  384. assetEqual(
  385. t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  386. return true
  387. }),
  388. )
  389. assetEqual(
  390. t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  391. return false
  392. }),
  393. )
  394. assetEqual(
  395. t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  396. return item.(int) == 1
  397. }),
  398. )
  399. })
  400. }
  401. func TestStream_NoneMatch(t *testing.T) {
  402. runCheckedTest(t, func(t *testing.T) {
  403. assetEqual(
  404. t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
  405. return false
  406. }),
  407. )
  408. assetEqual(
  409. t, false, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
  410. return true
  411. }),
  412. )
  413. assetEqual(
  414. t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
  415. return item.(int) == 4
  416. }),
  417. )
  418. })
  419. }
  420. func TestConcat(t *testing.T) {
  421. runCheckedTest(t, func(t *testing.T) {
  422. a1 := []interface{}{1, 2, 3}
  423. a2 := []interface{}{4, 5, 6}
  424. s1 := Just(a1...)
  425. s2 := Just(a2...)
  426. stream := Concat(s1, s2)
  427. var items []interface{}
  428. for item := range stream.source {
  429. items = append(items, item)
  430. }
  431. sort.Slice(items, func(i, j int) bool {
  432. return items[i].(int) < items[j].(int)
  433. })
  434. ints := make([]interface{}, 0)
  435. ints = append(ints, a1...)
  436. ints = append(ints, a2...)
  437. assetEqual(t, ints, items)
  438. })
  439. }
  440. func TestStream_Skip(t *testing.T) {
  441. runCheckedTest(t, func(t *testing.T) {
  442. assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
  443. assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
  444. assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
  445. equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
  446. assert.Panics(t, func() {
  447. Just(1, 2, 3, 4).Skip(-1)
  448. })
  449. })
  450. }
  451. func TestStream_Concat(t *testing.T) {
  452. runCheckedTest(t, func(t *testing.T) {
  453. stream := Just(1).Concat(Just(2), Just(3))
  454. var items []interface{}
  455. for item := range stream.source {
  456. items = append(items, item)
  457. }
  458. sort.Slice(items, func(i, j int) bool {
  459. return items[i].(int) < items[j].(int)
  460. })
  461. assetEqual(t, []interface{}{1, 2, 3}, items)
  462. just := Just(1)
  463. equal(t, just.Concat(just), []interface{}{1})
  464. })
  465. }
  466. func BenchmarkParallelMapReduce(b *testing.B) {
  467. b.ReportAllocs()
  468. mapper := func(v interface{}) interface{} {
  469. return v.(int64) * v.(int64)
  470. }
  471. reducer := func(input <-chan interface{}) (interface{}, error) {
  472. var result int64
  473. for v := range input {
  474. result += v.(int64)
  475. }
  476. return result, nil
  477. }
  478. b.ResetTimer()
  479. From(func(input chan<- interface{}) {
  480. b.RunParallel(func(pb *testing.PB) {
  481. for pb.Next() {
  482. input <- int64(rand.Int())
  483. }
  484. })
  485. }).Map(mapper).Reduce(reducer)
  486. }
  487. func BenchmarkMapReduce(b *testing.B) {
  488. b.ReportAllocs()
  489. mapper := func(v interface{}) interface{} {
  490. return v.(int64) * v.(int64)
  491. }
  492. reducer := func(input <-chan interface{}) (interface{}, error) {
  493. var result int64
  494. for v := range input {
  495. result += v.(int64)
  496. }
  497. return result, nil
  498. }
  499. b.ResetTimer()
  500. From(func(input chan<- interface{}) {
  501. for i := 0; i < b.N; i++ {
  502. input <- int64(rand.Int())
  503. }
  504. }).Map(mapper).Reduce(reducer)
  505. }
  506. func assetEqual(t *testing.T, except, data interface{}) {
  507. if !reflect.DeepEqual(except, data) {
  508. t.Errorf(" %v, want %v", data, except)
  509. }
  510. }
  511. func equal(t *testing.T, stream Stream, data []interface{}) {
  512. items := make([]interface{}, 0)
  513. for item := range stream.source {
  514. items = append(items, item)
  515. }
  516. if !reflect.DeepEqual(items, data) {
  517. t.Errorf(" %v, want %v", items, data)
  518. }
  519. }
  520. func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
  521. goroutines := runtime.NumGoroutine()
  522. fn(t)
  523. // let scheduler schedule first
  524. time.Sleep(time.Millisecond)
  525. assert.Equal(t, goroutines, runtime.NumGoroutine())
  526. }