mapreduce_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package mr
  2. import (
  3. "context"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "runtime"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/tal-tech/go-zero/core/stringx"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. var errDummy = errors.New("dummy")
  16. func init() {
  17. log.SetOutput(ioutil.Discard)
  18. }
  19. func TestFinish(t *testing.T) {
  20. var total uint32
  21. err := Finish(func() error {
  22. atomic.AddUint32(&total, 2)
  23. return nil
  24. }, func() error {
  25. atomic.AddUint32(&total, 3)
  26. return nil
  27. }, func() error {
  28. atomic.AddUint32(&total, 5)
  29. return nil
  30. })
  31. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  32. assert.Nil(t, err)
  33. }
  34. func TestFinishNone(t *testing.T) {
  35. assert.Nil(t, Finish())
  36. }
  37. func TestFinishVoidNone(t *testing.T) {
  38. FinishVoid()
  39. }
  40. func TestFinishErr(t *testing.T) {
  41. var total uint32
  42. err := Finish(func() error {
  43. atomic.AddUint32(&total, 2)
  44. return nil
  45. }, func() error {
  46. atomic.AddUint32(&total, 3)
  47. return errDummy
  48. }, func() error {
  49. atomic.AddUint32(&total, 5)
  50. return nil
  51. })
  52. assert.Equal(t, errDummy, err)
  53. }
  54. func TestFinishVoid(t *testing.T) {
  55. var total uint32
  56. FinishVoid(func() {
  57. atomic.AddUint32(&total, 2)
  58. }, func() {
  59. atomic.AddUint32(&total, 3)
  60. }, func() {
  61. atomic.AddUint32(&total, 5)
  62. })
  63. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  64. }
  65. func TestMap(t *testing.T) {
  66. tests := []struct {
  67. mapper MapFunc
  68. expect int
  69. }{
  70. {
  71. mapper: func(item interface{}, writer Writer) {
  72. v := item.(int)
  73. writer.Write(v * v)
  74. },
  75. expect: 30,
  76. },
  77. {
  78. mapper: func(item interface{}, writer Writer) {
  79. v := item.(int)
  80. if v%2 == 0 {
  81. return
  82. }
  83. writer.Write(v * v)
  84. },
  85. expect: 10,
  86. },
  87. {
  88. mapper: func(item interface{}, writer Writer) {
  89. v := item.(int)
  90. if v%2 == 0 {
  91. panic(v)
  92. }
  93. writer.Write(v * v)
  94. },
  95. expect: 10,
  96. },
  97. }
  98. for _, test := range tests {
  99. t.Run(stringx.Rand(), func(t *testing.T) {
  100. channel := Map(func(source chan<- interface{}) {
  101. for i := 1; i < 5; i++ {
  102. source <- i
  103. }
  104. }, test.mapper, WithWorkers(-1))
  105. var result int
  106. for v := range channel {
  107. result += v.(int)
  108. }
  109. assert.Equal(t, test.expect, result)
  110. })
  111. }
  112. }
  113. func TestMapReduce(t *testing.T) {
  114. tests := []struct {
  115. mapper MapperFunc
  116. reducer ReducerFunc
  117. expectErr error
  118. expectValue interface{}
  119. }{
  120. {
  121. expectErr: nil,
  122. expectValue: 30,
  123. },
  124. {
  125. mapper: func(item interface{}, writer Writer, cancel func(error)) {
  126. v := item.(int)
  127. if v%3 == 0 {
  128. cancel(errDummy)
  129. }
  130. writer.Write(v * v)
  131. },
  132. expectErr: errDummy,
  133. },
  134. {
  135. mapper: func(item interface{}, writer Writer, cancel func(error)) {
  136. v := item.(int)
  137. if v%3 == 0 {
  138. cancel(nil)
  139. }
  140. writer.Write(v * v)
  141. },
  142. expectErr: ErrCancelWithNil,
  143. expectValue: nil,
  144. },
  145. {
  146. reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
  147. var result int
  148. for item := range pipe {
  149. result += item.(int)
  150. if result > 10 {
  151. cancel(errDummy)
  152. }
  153. }
  154. writer.Write(result)
  155. },
  156. expectErr: errDummy,
  157. },
  158. }
  159. for _, test := range tests {
  160. t.Run(stringx.Rand(), func(t *testing.T) {
  161. if test.mapper == nil {
  162. test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
  163. v := item.(int)
  164. writer.Write(v * v)
  165. }
  166. }
  167. if test.reducer == nil {
  168. test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
  169. var result int
  170. for item := range pipe {
  171. result += item.(int)
  172. }
  173. writer.Write(result)
  174. }
  175. }
  176. value, err := MapReduce(func(source chan<- interface{}) {
  177. for i := 1; i < 5; i++ {
  178. source <- i
  179. }
  180. }, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
  181. assert.Equal(t, test.expectErr, err)
  182. assert.Equal(t, test.expectValue, value)
  183. })
  184. }
  185. }
  186. func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
  187. assert.Panics(t, func() {
  188. MapReduce(func(source chan<- interface{}) {
  189. for i := 0; i < 10; i++ {
  190. source <- i
  191. }
  192. }, func(item interface{}, writer Writer, cancel func(error)) {
  193. writer.Write(item)
  194. }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
  195. drain(pipe)
  196. writer.Write("one")
  197. writer.Write("two")
  198. })
  199. })
  200. }
  201. func TestMapReduceVoid(t *testing.T) {
  202. var value uint32
  203. tests := []struct {
  204. mapper MapperFunc
  205. reducer VoidReducerFunc
  206. expectValue uint32
  207. expectErr error
  208. }{
  209. {
  210. expectValue: 30,
  211. expectErr: nil,
  212. },
  213. {
  214. mapper: func(item interface{}, writer Writer, cancel func(error)) {
  215. v := item.(int)
  216. if v%3 == 0 {
  217. cancel(errDummy)
  218. }
  219. writer.Write(v * v)
  220. },
  221. expectErr: errDummy,
  222. },
  223. {
  224. mapper: func(item interface{}, writer Writer, cancel func(error)) {
  225. v := item.(int)
  226. if v%3 == 0 {
  227. cancel(nil)
  228. }
  229. writer.Write(v * v)
  230. },
  231. expectErr: ErrCancelWithNil,
  232. },
  233. {
  234. reducer: func(pipe <-chan interface{}, cancel func(error)) {
  235. for item := range pipe {
  236. result := atomic.AddUint32(&value, uint32(item.(int)))
  237. if result > 10 {
  238. cancel(errDummy)
  239. }
  240. }
  241. },
  242. expectErr: errDummy,
  243. },
  244. }
  245. for _, test := range tests {
  246. t.Run(stringx.Rand(), func(t *testing.T) {
  247. atomic.StoreUint32(&value, 0)
  248. if test.mapper == nil {
  249. test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
  250. v := item.(int)
  251. writer.Write(v * v)
  252. }
  253. }
  254. if test.reducer == nil {
  255. test.reducer = func(pipe <-chan interface{}, cancel func(error)) {
  256. for item := range pipe {
  257. atomic.AddUint32(&value, uint32(item.(int)))
  258. }
  259. }
  260. }
  261. err := MapReduceVoid(func(source chan<- interface{}) {
  262. for i := 1; i < 5; i++ {
  263. source <- i
  264. }
  265. }, test.mapper, test.reducer)
  266. assert.Equal(t, test.expectErr, err)
  267. if err == nil {
  268. assert.Equal(t, test.expectValue, atomic.LoadUint32(&value))
  269. }
  270. })
  271. }
  272. }
  273. func TestMapReduceVoidWithDelay(t *testing.T) {
  274. var result []int
  275. err := MapReduceVoid(func(source chan<- interface{}) {
  276. source <- 0
  277. source <- 1
  278. }, func(item interface{}, writer Writer, cancel func(error)) {
  279. i := item.(int)
  280. if i == 0 {
  281. time.Sleep(time.Millisecond * 50)
  282. }
  283. writer.Write(i)
  284. }, func(pipe <-chan interface{}, cancel func(error)) {
  285. for item := range pipe {
  286. i := item.(int)
  287. result = append(result, i)
  288. }
  289. })
  290. assert.Nil(t, err)
  291. assert.Equal(t, 2, len(result))
  292. assert.Equal(t, 1, result[0])
  293. assert.Equal(t, 0, result[1])
  294. }
  295. func TestMapVoid(t *testing.T) {
  296. const tasks = 1000
  297. var count uint32
  298. MapVoid(func(source chan<- interface{}) {
  299. for i := 0; i < tasks; i++ {
  300. source <- i
  301. }
  302. }, func(item interface{}) {
  303. atomic.AddUint32(&count, 1)
  304. })
  305. assert.Equal(t, tasks, int(count))
  306. }
  307. func TestMapReducePanic(t *testing.T) {
  308. v, err := MapReduce(func(source chan<- interface{}) {
  309. source <- 0
  310. source <- 1
  311. }, func(item interface{}, writer Writer, cancel func(error)) {
  312. i := item.(int)
  313. writer.Write(i)
  314. }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
  315. for range pipe {
  316. panic("panic")
  317. }
  318. })
  319. assert.Nil(t, v)
  320. assert.NotNil(t, err)
  321. assert.Equal(t, "panic", err.Error())
  322. }
  323. func TestMapReduceVoidCancel(t *testing.T) {
  324. var result []int
  325. err := MapReduceVoid(func(source chan<- interface{}) {
  326. source <- 0
  327. source <- 1
  328. }, func(item interface{}, writer Writer, cancel func(error)) {
  329. i := item.(int)
  330. if i == 1 {
  331. cancel(errors.New("anything"))
  332. }
  333. writer.Write(i)
  334. }, func(pipe <-chan interface{}, cancel func(error)) {
  335. for item := range pipe {
  336. i := item.(int)
  337. result = append(result, i)
  338. }
  339. })
  340. assert.NotNil(t, err)
  341. assert.Equal(t, "anything", err.Error())
  342. }
  343. func TestMapReduceVoidCancelWithRemains(t *testing.T) {
  344. var done syncx.AtomicBool
  345. var result []int
  346. err := MapReduceVoid(func(source chan<- interface{}) {
  347. for i := 0; i < defaultWorkers*2; i++ {
  348. source <- i
  349. }
  350. done.Set(true)
  351. }, func(item interface{}, writer Writer, cancel func(error)) {
  352. i := item.(int)
  353. if i == defaultWorkers/2 {
  354. cancel(errors.New("anything"))
  355. }
  356. writer.Write(i)
  357. }, func(pipe <-chan interface{}, cancel func(error)) {
  358. for item := range pipe {
  359. i := item.(int)
  360. result = append(result, i)
  361. }
  362. })
  363. assert.NotNil(t, err)
  364. assert.Equal(t, "anything", err.Error())
  365. assert.True(t, done.True())
  366. }
  367. func TestMapReduceWithoutReducerWrite(t *testing.T) {
  368. uids := []int{1, 2, 3}
  369. res, err := MapReduce(func(source chan<- interface{}) {
  370. for _, uid := range uids {
  371. source <- uid
  372. }
  373. }, func(item interface{}, writer Writer, cancel func(error)) {
  374. writer.Write(item)
  375. }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
  376. drain(pipe)
  377. // not calling writer.Write(...), should not panic
  378. })
  379. assert.Equal(t, ErrReduceNoOutput, err)
  380. assert.Nil(t, res)
  381. }
  382. func TestMapReduceVoidPanicInReducer(t *testing.T) {
  383. const message = "foo"
  384. var done syncx.AtomicBool
  385. err := MapReduceVoid(func(source chan<- interface{}) {
  386. for i := 0; i < defaultWorkers*2; i++ {
  387. source <- i
  388. }
  389. done.Set(true)
  390. }, func(item interface{}, writer Writer, cancel func(error)) {
  391. i := item.(int)
  392. writer.Write(i)
  393. }, func(pipe <-chan interface{}, cancel func(error)) {
  394. panic(message)
  395. }, WithWorkers(1))
  396. assert.NotNil(t, err)
  397. assert.Equal(t, message, err.Error())
  398. assert.True(t, done.True())
  399. }
  400. func TestMapReduceWithContext(t *testing.T) {
  401. var done syncx.AtomicBool
  402. var result []int
  403. ctx, cancel := context.WithCancel(context.Background())
  404. err := MapReduceVoid(func(source chan<- interface{}) {
  405. for i := 0; i < defaultWorkers*2; i++ {
  406. source <- i
  407. }
  408. done.Set(true)
  409. }, func(item interface{}, writer Writer, c func(error)) {
  410. i := item.(int)
  411. if i == defaultWorkers/2 {
  412. cancel()
  413. }
  414. writer.Write(i)
  415. }, func(pipe <-chan interface{}, cancel func(error)) {
  416. for item := range pipe {
  417. i := item.(int)
  418. result = append(result, i)
  419. }
  420. }, WithContext(ctx))
  421. assert.NotNil(t, err)
  422. assert.Equal(t, ErrReduceNoOutput, err)
  423. }
  424. func BenchmarkMapReduce(b *testing.B) {
  425. b.ReportAllocs()
  426. mapper := func(v interface{}, writer Writer, cancel func(error)) {
  427. writer.Write(v.(int64) * v.(int64))
  428. }
  429. reducer := func(input <-chan interface{}, writer Writer, cancel func(error)) {
  430. var result int64
  431. for v := range input {
  432. result += v.(int64)
  433. }
  434. writer.Write(result)
  435. }
  436. for i := 0; i < b.N; i++ {
  437. MapReduce(func(input chan<- interface{}) {
  438. for j := 0; j < 2; j++ {
  439. input <- int64(j)
  440. }
  441. }, mapper, reducer)
  442. }
  443. }