1
0

fn_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package fx
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/tal-tech/go-zero/core/stringx"
  12. )
  13. func TestBuffer(t *testing.T) {
  14. const N = 5
  15. var count int32
  16. var wait sync.WaitGroup
  17. wait.Add(1)
  18. From(func(source chan<- interface{}) {
  19. ticker := time.NewTicker(10 * time.Millisecond)
  20. defer ticker.Stop()
  21. for i := 0; i < 2*N; i++ {
  22. select {
  23. case source <- i:
  24. atomic.AddInt32(&count, 1)
  25. case <-ticker.C:
  26. wait.Done()
  27. return
  28. }
  29. }
  30. }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
  31. wait.Wait()
  32. // why N+1, because take one more to wait for sending into the channel
  33. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  34. })
  35. }
  36. func TestBufferNegative(t *testing.T) {
  37. var result int
  38. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  39. for item := range pipe {
  40. result += item.(int)
  41. }
  42. return result, nil
  43. })
  44. assert.Equal(t, 10, result)
  45. }
  46. func TestCount(t *testing.T) {
  47. tests := []struct {
  48. name string
  49. elements []interface{}
  50. }{
  51. {
  52. name: "no elements with nil",
  53. },
  54. {
  55. name: "no elements",
  56. elements: []interface{}{},
  57. },
  58. {
  59. name: "1 element",
  60. elements: []interface{}{1},
  61. },
  62. {
  63. name: "multiple elements",
  64. elements: []interface{}{1, 2, 3},
  65. },
  66. }
  67. for _, test := range tests {
  68. t.Run(test.name, func(t *testing.T) {
  69. val := Just(test.elements...).Count()
  70. assert.Equal(t, len(test.elements), val)
  71. })
  72. }
  73. }
  74. func TestDone(t *testing.T) {
  75. var count int32
  76. Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
  77. time.Sleep(time.Millisecond * 100)
  78. atomic.AddInt32(&count, int32(item.(int)))
  79. }).Done()
  80. assert.Equal(t, int32(6), count)
  81. }
  82. func TestJust(t *testing.T) {
  83. var result int
  84. Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  85. for item := range pipe {
  86. result += item.(int)
  87. }
  88. return result, nil
  89. })
  90. assert.Equal(t, 10, result)
  91. }
  92. func TestDistinct(t *testing.T) {
  93. var result int
  94. Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
  95. return item
  96. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  97. for item := range pipe {
  98. result += item.(int)
  99. }
  100. return result, nil
  101. })
  102. assert.Equal(t, 10, result)
  103. }
  104. func TestFilter(t *testing.T) {
  105. var result int
  106. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  107. return item.(int)%2 == 0
  108. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  109. for item := range pipe {
  110. result += item.(int)
  111. }
  112. return result, nil
  113. })
  114. assert.Equal(t, 6, result)
  115. }
  116. func TestForAll(t *testing.T) {
  117. var result int
  118. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  119. return item.(int)%2 == 0
  120. }).ForAll(func(pipe <-chan interface{}) {
  121. for item := range pipe {
  122. result += item.(int)
  123. }
  124. })
  125. assert.Equal(t, 6, result)
  126. }
  127. func TestGroup(t *testing.T) {
  128. var groups [][]int
  129. Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
  130. v := item.(int)
  131. return v / 10
  132. }).ForEach(func(item interface{}) {
  133. v := item.([]interface{})
  134. var group []int
  135. for _, each := range v {
  136. group = append(group, each.(int))
  137. }
  138. groups = append(groups, group)
  139. })
  140. assert.Equal(t, 2, len(groups))
  141. for _, group := range groups {
  142. assert.Equal(t, 2, len(group))
  143. assert.True(t, group[0]/10 == group[1]/10)
  144. }
  145. }
  146. func TestHead(t *testing.T) {
  147. var result int
  148. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  149. for item := range pipe {
  150. result += item.(int)
  151. }
  152. return result, nil
  153. })
  154. assert.Equal(t, 3, result)
  155. }
  156. func TestHeadMore(t *testing.T) {
  157. var result int
  158. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  159. for item := range pipe {
  160. result += item.(int)
  161. }
  162. return result, nil
  163. })
  164. assert.Equal(t, 10, result)
  165. }
  166. func TestMap(t *testing.T) {
  167. log.SetOutput(ioutil.Discard)
  168. tests := []struct {
  169. mapper MapFunc
  170. expect int
  171. }{
  172. {
  173. mapper: func(item interface{}) interface{} {
  174. v := item.(int)
  175. return v * v
  176. },
  177. expect: 30,
  178. },
  179. {
  180. mapper: func(item interface{}) interface{} {
  181. v := item.(int)
  182. if v%2 == 0 {
  183. return 0
  184. }
  185. return v * v
  186. },
  187. expect: 10,
  188. },
  189. {
  190. mapper: func(item interface{}) interface{} {
  191. v := item.(int)
  192. if v%2 == 0 {
  193. panic(v)
  194. }
  195. return v * v
  196. },
  197. expect: 10,
  198. },
  199. }
  200. // Map(...) works even WithWorkers(0)
  201. for i, test := range tests {
  202. t.Run(stringx.Rand(), func(t *testing.T) {
  203. var result int
  204. var workers int
  205. if i%2 == 0 {
  206. workers = 0
  207. } else {
  208. workers = runtime.NumCPU()
  209. }
  210. From(func(source chan<- interface{}) {
  211. for i := 1; i < 5; i++ {
  212. source <- i
  213. }
  214. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  215. func(pipe <-chan interface{}) (interface{}, error) {
  216. for item := range pipe {
  217. result += item.(int)
  218. }
  219. return result, nil
  220. })
  221. assert.Equal(t, test.expect, result)
  222. })
  223. }
  224. }
  225. func TestMerge(t *testing.T) {
  226. Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
  227. assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
  228. })
  229. }
  230. func TestParallelJust(t *testing.T) {
  231. var count int32
  232. Just(1, 2, 3).Parallel(func(item interface{}) {
  233. time.Sleep(time.Millisecond * 100)
  234. atomic.AddInt32(&count, int32(item.(int)))
  235. }, UnlimitedWorkers())
  236. assert.Equal(t, int32(6), count)
  237. }
  238. func TestReverse(t *testing.T) {
  239. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
  240. assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
  241. })
  242. }
  243. func TestSort(t *testing.T) {
  244. var prev int
  245. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
  246. return a.(int) < b.(int)
  247. }).ForEach(func(item interface{}) {
  248. next := item.(int)
  249. assert.True(t, prev < next)
  250. prev = next
  251. })
  252. }
  253. func TestTail(t *testing.T) {
  254. var result int
  255. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  256. for item := range pipe {
  257. result += item.(int)
  258. }
  259. return result, nil
  260. })
  261. assert.Equal(t, 7, result)
  262. }
  263. func TestWalk(t *testing.T) {
  264. var result int
  265. Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
  266. if item.(int)%2 != 0 {
  267. pipe <- item
  268. }
  269. }, UnlimitedWorkers()).ForEach(func(item interface{}) {
  270. result += item.(int)
  271. })
  272. assert.Equal(t, 9, result)
  273. }
  274. func BenchmarkMapReduce(b *testing.B) {
  275. b.ReportAllocs()
  276. mapper := func(v interface{}) interface{} {
  277. return v.(int64) * v.(int64)
  278. }
  279. reducer := func(input <-chan interface{}) (interface{}, error) {
  280. var result int64
  281. for v := range input {
  282. result += v.(int64)
  283. }
  284. return result, nil
  285. }
  286. for i := 0; i < b.N; i++ {
  287. From(func(input chan<- interface{}) {
  288. for j := 0; j < 2; j++ {
  289. input <- int64(j)
  290. }
  291. }).Map(mapper).Reduce(reducer)
  292. }
  293. }