stream_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. package fx
  2. import (
  3. "io"
  4. "log"
  5. "math/rand"
  6. "reflect"
  7. "runtime"
  8. "sort"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/wuntsong-org/go-zero-plus/core/stringx"
  15. "go.uber.org/goleak"
  16. )
  17. func TestBuffer(t *testing.T) {
  18. runCheckedTest(t, func(t *testing.T) {
  19. const N = 5
  20. var count int32
  21. var wait sync.WaitGroup
  22. wait.Add(1)
  23. From(func(source chan<- any) {
  24. ticker := time.NewTicker(10 * time.Millisecond)
  25. defer ticker.Stop()
  26. for i := 0; i < 2*N; i++ {
  27. select {
  28. case source <- i:
  29. atomic.AddInt32(&count, 1)
  30. case <-ticker.C:
  31. wait.Done()
  32. return
  33. }
  34. }
  35. }).Buffer(N).ForAll(func(pipe <-chan any) {
  36. wait.Wait()
  37. // why N+1, because take one more to wait for sending into the channel
  38. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  39. })
  40. })
  41. }
  42. func TestBufferNegative(t *testing.T) {
  43. runCheckedTest(t, func(t *testing.T) {
  44. var result int
  45. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan any) (any, error) {
  46. for item := range pipe {
  47. result += item.(int)
  48. }
  49. return result, nil
  50. })
  51. assert.Equal(t, 10, result)
  52. })
  53. }
  54. func TestCount(t *testing.T) {
  55. runCheckedTest(t, func(t *testing.T) {
  56. tests := []struct {
  57. name string
  58. elements []any
  59. }{
  60. {
  61. name: "no elements with nil",
  62. },
  63. {
  64. name: "no elements",
  65. elements: []any{},
  66. },
  67. {
  68. name: "1 element",
  69. elements: []any{1},
  70. },
  71. {
  72. name: "multiple elements",
  73. elements: []any{1, 2, 3},
  74. },
  75. }
  76. for _, test := range tests {
  77. t.Run(test.name, func(t *testing.T) {
  78. val := Just(test.elements...).Count()
  79. assert.Equal(t, len(test.elements), val)
  80. })
  81. }
  82. })
  83. }
  84. func TestDone(t *testing.T) {
  85. runCheckedTest(t, func(t *testing.T) {
  86. var count int32
  87. Just(1, 2, 3).Walk(func(item any, pipe chan<- any) {
  88. time.Sleep(time.Millisecond * 100)
  89. atomic.AddInt32(&count, int32(item.(int)))
  90. }).Done()
  91. assert.Equal(t, int32(6), count)
  92. })
  93. }
  94. func TestJust(t *testing.T) {
  95. runCheckedTest(t, func(t *testing.T) {
  96. var result int
  97. Just(1, 2, 3, 4).Reduce(func(pipe <-chan any) (any, error) {
  98. for item := range pipe {
  99. result += item.(int)
  100. }
  101. return result, nil
  102. })
  103. assert.Equal(t, 10, result)
  104. })
  105. }
  106. func TestDistinct(t *testing.T) {
  107. runCheckedTest(t, func(t *testing.T) {
  108. var result int
  109. Just(4, 1, 3, 2, 3, 4).Distinct(func(item any) any {
  110. return item
  111. }).Reduce(func(pipe <-chan any) (any, error) {
  112. for item := range pipe {
  113. result += item.(int)
  114. }
  115. return result, nil
  116. })
  117. assert.Equal(t, 10, result)
  118. })
  119. }
  120. func TestFilter(t *testing.T) {
  121. runCheckedTest(t, func(t *testing.T) {
  122. var result int
  123. Just(1, 2, 3, 4).Filter(func(item any) bool {
  124. return item.(int)%2 == 0
  125. }).Reduce(func(pipe <-chan any) (any, error) {
  126. for item := range pipe {
  127. result += item.(int)
  128. }
  129. return result, nil
  130. })
  131. assert.Equal(t, 6, result)
  132. })
  133. }
  134. func TestFirst(t *testing.T) {
  135. runCheckedTest(t, func(t *testing.T) {
  136. assert.Nil(t, Just().First())
  137. assert.Equal(t, "foo", Just("foo").First())
  138. assert.Equal(t, "foo", Just("foo", "bar").First())
  139. })
  140. }
  141. func TestForAll(t *testing.T) {
  142. runCheckedTest(t, func(t *testing.T) {
  143. var result int
  144. Just(1, 2, 3, 4).Filter(func(item any) bool {
  145. return item.(int)%2 == 0
  146. }).ForAll(func(pipe <-chan any) {
  147. for item := range pipe {
  148. result += item.(int)
  149. }
  150. })
  151. assert.Equal(t, 6, result)
  152. })
  153. }
  154. func TestGroup(t *testing.T) {
  155. runCheckedTest(t, func(t *testing.T) {
  156. var groups [][]int
  157. Just(10, 11, 20, 21).Group(func(item any) any {
  158. v := item.(int)
  159. return v / 10
  160. }).ForEach(func(item any) {
  161. v := item.([]any)
  162. var group []int
  163. for _, each := range v {
  164. group = append(group, each.(int))
  165. }
  166. groups = append(groups, group)
  167. })
  168. assert.Equal(t, 2, len(groups))
  169. for _, group := range groups {
  170. assert.Equal(t, 2, len(group))
  171. assert.True(t, group[0]/10 == group[1]/10)
  172. }
  173. })
  174. }
  175. func TestHead(t *testing.T) {
  176. runCheckedTest(t, func(t *testing.T) {
  177. var result int
  178. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan any) (any, error) {
  179. for item := range pipe {
  180. result += item.(int)
  181. }
  182. return result, nil
  183. })
  184. assert.Equal(t, 3, result)
  185. })
  186. }
  187. func TestHeadZero(t *testing.T) {
  188. runCheckedTest(t, func(t *testing.T) {
  189. assert.Panics(t, func() {
  190. Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan any) (any, error) {
  191. return nil, nil
  192. })
  193. })
  194. })
  195. }
  196. func TestHeadMore(t *testing.T) {
  197. runCheckedTest(t, func(t *testing.T) {
  198. var result int
  199. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan any) (any, error) {
  200. for item := range pipe {
  201. result += item.(int)
  202. }
  203. return result, nil
  204. })
  205. assert.Equal(t, 10, result)
  206. })
  207. }
  208. func TestLast(t *testing.T) {
  209. runCheckedTest(t, func(t *testing.T) {
  210. goroutines := runtime.NumGoroutine()
  211. assert.Nil(t, Just().Last())
  212. assert.Equal(t, "foo", Just("foo").Last())
  213. assert.Equal(t, "bar", Just("foo", "bar").Last())
  214. // let scheduler schedule first
  215. runtime.Gosched()
  216. assert.Equal(t, goroutines, runtime.NumGoroutine())
  217. })
  218. }
  219. func TestMap(t *testing.T) {
  220. runCheckedTest(t, func(t *testing.T) {
  221. log.SetOutput(io.Discard)
  222. tests := []struct {
  223. mapper MapFunc
  224. expect int
  225. }{
  226. {
  227. mapper: func(item any) any {
  228. v := item.(int)
  229. return v * v
  230. },
  231. expect: 30,
  232. },
  233. {
  234. mapper: func(item any) any {
  235. v := item.(int)
  236. if v%2 == 0 {
  237. return 0
  238. }
  239. return v * v
  240. },
  241. expect: 10,
  242. },
  243. {
  244. mapper: func(item any) any {
  245. v := item.(int)
  246. if v%2 == 0 {
  247. panic(v)
  248. }
  249. return v * v
  250. },
  251. expect: 10,
  252. },
  253. }
  254. // Map(...) works even WithWorkers(0)
  255. for i, test := range tests {
  256. t.Run(stringx.Rand(), func(t *testing.T) {
  257. var result int
  258. var workers int
  259. if i%2 == 0 {
  260. workers = 0
  261. } else {
  262. workers = runtime.NumCPU()
  263. }
  264. From(func(source chan<- any) {
  265. for i := 1; i < 5; i++ {
  266. source <- i
  267. }
  268. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  269. func(pipe <-chan any) (any, error) {
  270. for item := range pipe {
  271. result += item.(int)
  272. }
  273. return result, nil
  274. })
  275. assert.Equal(t, test.expect, result)
  276. })
  277. }
  278. })
  279. }
  280. func TestMerge(t *testing.T) {
  281. runCheckedTest(t, func(t *testing.T) {
  282. Just(1, 2, 3, 4).Merge().ForEach(func(item any) {
  283. assert.ElementsMatch(t, []any{1, 2, 3, 4}, item.([]any))
  284. })
  285. })
  286. }
  287. func TestParallelJust(t *testing.T) {
  288. runCheckedTest(t, func(t *testing.T) {
  289. var count int32
  290. Just(1, 2, 3).Parallel(func(item any) {
  291. time.Sleep(time.Millisecond * 100)
  292. atomic.AddInt32(&count, int32(item.(int)))
  293. }, UnlimitedWorkers())
  294. assert.Equal(t, int32(6), count)
  295. })
  296. }
  297. func TestReverse(t *testing.T) {
  298. runCheckedTest(t, func(t *testing.T) {
  299. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item any) {
  300. assert.ElementsMatch(t, []any{4, 3, 2, 1}, item.([]any))
  301. })
  302. })
  303. }
  304. func TestSort(t *testing.T) {
  305. runCheckedTest(t, func(t *testing.T) {
  306. var prev int
  307. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b any) bool {
  308. return a.(int) < b.(int)
  309. }).ForEach(func(item any) {
  310. next := item.(int)
  311. assert.True(t, prev < next)
  312. prev = next
  313. })
  314. })
  315. }
  316. func TestSplit(t *testing.T) {
  317. runCheckedTest(t, func(t *testing.T) {
  318. assert.Panics(t, func() {
  319. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
  320. })
  321. var chunks [][]any
  322. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item any) {
  323. chunk := item.([]any)
  324. chunks = append(chunks, chunk)
  325. })
  326. assert.EqualValues(t, [][]any{
  327. {1, 2, 3, 4},
  328. {5, 6, 7, 8},
  329. {9, 10},
  330. }, chunks)
  331. })
  332. }
  333. func TestTail(t *testing.T) {
  334. runCheckedTest(t, func(t *testing.T) {
  335. var result int
  336. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan any) (any, error) {
  337. for item := range pipe {
  338. result += item.(int)
  339. }
  340. return result, nil
  341. })
  342. assert.Equal(t, 7, result)
  343. })
  344. }
  345. func TestTailZero(t *testing.T) {
  346. runCheckedTest(t, func(t *testing.T) {
  347. assert.Panics(t, func() {
  348. Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan any) (any, error) {
  349. return nil, nil
  350. })
  351. })
  352. })
  353. }
  354. func TestWalk(t *testing.T) {
  355. runCheckedTest(t, func(t *testing.T) {
  356. var result int
  357. Just(1, 2, 3, 4, 5).Walk(func(item any, pipe chan<- any) {
  358. if item.(int)%2 != 0 {
  359. pipe <- item
  360. }
  361. }, UnlimitedWorkers()).ForEach(func(item any) {
  362. result += item.(int)
  363. })
  364. assert.Equal(t, 9, result)
  365. })
  366. }
  367. func TestStream_AnyMach(t *testing.T) {
  368. runCheckedTest(t, func(t *testing.T) {
  369. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item any) bool {
  370. return item.(int) == 4
  371. }))
  372. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item any) bool {
  373. return item.(int) == 0
  374. }))
  375. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item any) bool {
  376. return item.(int) == 2
  377. }))
  378. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item any) bool {
  379. return item.(int) == 2
  380. }))
  381. })
  382. }
  383. func TestStream_AllMach(t *testing.T) {
  384. runCheckedTest(t, func(t *testing.T) {
  385. assetEqual(
  386. t, true, Just(1, 2, 3).AllMach(func(item any) bool {
  387. return true
  388. }),
  389. )
  390. assetEqual(
  391. t, false, Just(1, 2, 3).AllMach(func(item any) bool {
  392. return false
  393. }),
  394. )
  395. assetEqual(
  396. t, false, Just(1, 2, 3).AllMach(func(item any) bool {
  397. return item.(int) == 1
  398. }),
  399. )
  400. })
  401. }
  402. func TestStream_NoneMatch(t *testing.T) {
  403. runCheckedTest(t, func(t *testing.T) {
  404. assetEqual(
  405. t, true, Just(1, 2, 3).NoneMatch(func(item any) bool {
  406. return false
  407. }),
  408. )
  409. assetEqual(
  410. t, false, Just(1, 2, 3).NoneMatch(func(item any) bool {
  411. return true
  412. }),
  413. )
  414. assetEqual(
  415. t, true, Just(1, 2, 3).NoneMatch(func(item any) bool {
  416. return item.(int) == 4
  417. }),
  418. )
  419. })
  420. }
  421. func TestConcat(t *testing.T) {
  422. runCheckedTest(t, func(t *testing.T) {
  423. a1 := []any{1, 2, 3}
  424. a2 := []any{4, 5, 6}
  425. s1 := Just(a1...)
  426. s2 := Just(a2...)
  427. stream := Concat(s1, s2)
  428. var items []any
  429. for item := range stream.source {
  430. items = append(items, item)
  431. }
  432. sort.Slice(items, func(i, j int) bool {
  433. return items[i].(int) < items[j].(int)
  434. })
  435. ints := make([]any, 0)
  436. ints = append(ints, a1...)
  437. ints = append(ints, a2...)
  438. assetEqual(t, ints, items)
  439. })
  440. }
  441. func TestStream_Skip(t *testing.T) {
  442. runCheckedTest(t, func(t *testing.T) {
  443. assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
  444. assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
  445. assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
  446. equal(t, Just(1, 2, 3, 4).Skip(3), []any{4})
  447. assert.Panics(t, func() {
  448. Just(1, 2, 3, 4).Skip(-1)
  449. })
  450. })
  451. }
  452. func TestStream_Concat(t *testing.T) {
  453. runCheckedTest(t, func(t *testing.T) {
  454. stream := Just(1).Concat(Just(2), Just(3))
  455. var items []any
  456. for item := range stream.source {
  457. items = append(items, item)
  458. }
  459. sort.Slice(items, func(i, j int) bool {
  460. return items[i].(int) < items[j].(int)
  461. })
  462. assetEqual(t, []any{1, 2, 3}, items)
  463. just := Just(1)
  464. equal(t, just.Concat(just), []any{1})
  465. })
  466. }
  467. func TestStream_Max(t *testing.T) {
  468. runCheckedTest(t, func(t *testing.T) {
  469. tests := []struct {
  470. name string
  471. elements []any
  472. max any
  473. }{
  474. {
  475. name: "no elements with nil",
  476. },
  477. {
  478. name: "no elements",
  479. elements: []any{},
  480. max: nil,
  481. },
  482. {
  483. name: "1 element",
  484. elements: []any{1},
  485. max: 1,
  486. },
  487. {
  488. name: "multiple elements",
  489. elements: []any{1, 2, 9, 5, 8},
  490. max: 9,
  491. },
  492. }
  493. for _, test := range tests {
  494. t.Run(test.name, func(t *testing.T) {
  495. val := Just(test.elements...).Max(func(a, b any) bool {
  496. return a.(int) < b.(int)
  497. })
  498. assetEqual(t, test.max, val)
  499. })
  500. }
  501. })
  502. }
  503. func TestStream_Min(t *testing.T) {
  504. runCheckedTest(t, func(t *testing.T) {
  505. tests := []struct {
  506. name string
  507. elements []any
  508. min any
  509. }{
  510. {
  511. name: "no elements with nil",
  512. min: nil,
  513. },
  514. {
  515. name: "no elements",
  516. elements: []any{},
  517. min: nil,
  518. },
  519. {
  520. name: "1 element",
  521. elements: []any{1},
  522. min: 1,
  523. },
  524. {
  525. name: "multiple elements",
  526. elements: []any{-1, 1, 2, 9, 5, 8},
  527. min: -1,
  528. },
  529. }
  530. for _, test := range tests {
  531. t.Run(test.name, func(t *testing.T) {
  532. val := Just(test.elements...).Min(func(a, b any) bool {
  533. return a.(int) < b.(int)
  534. })
  535. assetEqual(t, test.min, val)
  536. })
  537. }
  538. })
  539. }
  540. func BenchmarkParallelMapReduce(b *testing.B) {
  541. b.ReportAllocs()
  542. mapper := func(v any) any {
  543. return v.(int64) * v.(int64)
  544. }
  545. reducer := func(input <-chan any) (any, error) {
  546. var result int64
  547. for v := range input {
  548. result += v.(int64)
  549. }
  550. return result, nil
  551. }
  552. b.ResetTimer()
  553. From(func(input chan<- any) {
  554. b.RunParallel(func(pb *testing.PB) {
  555. for pb.Next() {
  556. input <- int64(rand.Int())
  557. }
  558. })
  559. }).Map(mapper).Reduce(reducer)
  560. }
  561. func BenchmarkMapReduce(b *testing.B) {
  562. b.ReportAllocs()
  563. mapper := func(v any) any {
  564. return v.(int64) * v.(int64)
  565. }
  566. reducer := func(input <-chan any) (any, error) {
  567. var result int64
  568. for v := range input {
  569. result += v.(int64)
  570. }
  571. return result, nil
  572. }
  573. b.ResetTimer()
  574. From(func(input chan<- any) {
  575. for i := 0; i < b.N; i++ {
  576. input <- int64(rand.Int())
  577. }
  578. }).Map(mapper).Reduce(reducer)
  579. }
  580. func assetEqual(t *testing.T, except, data any) {
  581. if !reflect.DeepEqual(except, data) {
  582. t.Errorf(" %v, want %v", data, except)
  583. }
  584. }
  585. func equal(t *testing.T, stream Stream, data []any) {
  586. items := make([]any, 0)
  587. for item := range stream.source {
  588. items = append(items, item)
  589. }
  590. if !reflect.DeepEqual(items, data) {
  591. t.Errorf(" %v, want %v", items, data)
  592. }
  593. }
  594. func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
  595. defer goleak.VerifyNone(t)
  596. fn(t)
  597. }