mapreduce_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. package mr
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "runtime"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "go.uber.org/goleak"
  13. )
  14. var errDummy = errors.New("dummy")
  15. func init() {
  16. log.SetOutput(io.Discard)
  17. }
  18. func TestFinish(t *testing.T) {
  19. defer goleak.VerifyNone(t)
  20. var total uint32
  21. err := Finish(func() error {
  22. atomic.AddUint32(&total, 2)
  23. return nil
  24. }, func() error {
  25. atomic.AddUint32(&total, 3)
  26. return nil
  27. }, func() error {
  28. atomic.AddUint32(&total, 5)
  29. return nil
  30. })
  31. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  32. assert.Nil(t, err)
  33. }
  34. func TestFinishNone(t *testing.T) {
  35. defer goleak.VerifyNone(t)
  36. assert.Nil(t, Finish())
  37. }
  38. func TestFinishVoidNone(t *testing.T) {
  39. defer goleak.VerifyNone(t)
  40. FinishVoid()
  41. }
  42. func TestFinishErr(t *testing.T) {
  43. defer goleak.VerifyNone(t)
  44. var total uint32
  45. err := Finish(func() error {
  46. atomic.AddUint32(&total, 2)
  47. return nil
  48. }, func() error {
  49. atomic.AddUint32(&total, 3)
  50. return errDummy
  51. }, func() error {
  52. atomic.AddUint32(&total, 5)
  53. return nil
  54. })
  55. assert.Equal(t, errDummy, err)
  56. }
  57. func TestFinishVoid(t *testing.T) {
  58. defer goleak.VerifyNone(t)
  59. var total uint32
  60. FinishVoid(func() {
  61. atomic.AddUint32(&total, 2)
  62. }, func() {
  63. atomic.AddUint32(&total, 3)
  64. }, func() {
  65. atomic.AddUint32(&total, 5)
  66. })
  67. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  68. }
  69. func TestForEach(t *testing.T) {
  70. const tasks = 1000
  71. t.Run("all", func(t *testing.T) {
  72. defer goleak.VerifyNone(t)
  73. var count uint32
  74. ForEach(func(source chan<- any) {
  75. for i := 0; i < tasks; i++ {
  76. source <- i
  77. }
  78. }, func(item any) {
  79. atomic.AddUint32(&count, 1)
  80. }, WithWorkers(-1))
  81. assert.Equal(t, tasks, int(count))
  82. })
  83. t.Run("odd", func(t *testing.T) {
  84. defer goleak.VerifyNone(t)
  85. var count uint32
  86. ForEach(func(source chan<- any) {
  87. for i := 0; i < tasks; i++ {
  88. source <- i
  89. }
  90. }, func(item any) {
  91. if item.(int)%2 == 0 {
  92. atomic.AddUint32(&count, 1)
  93. }
  94. })
  95. assert.Equal(t, tasks/2, int(count))
  96. })
  97. t.Run("all", func(t *testing.T) {
  98. defer goleak.VerifyNone(t)
  99. assert.PanicsWithValue(t, "foo", func() {
  100. ForEach(func(source chan<- any) {
  101. for i := 0; i < tasks; i++ {
  102. source <- i
  103. }
  104. }, func(item any) {
  105. panic("foo")
  106. })
  107. })
  108. })
  109. }
  110. func TestGeneratePanic(t *testing.T) {
  111. defer goleak.VerifyNone(t)
  112. t.Run("all", func(t *testing.T) {
  113. assert.PanicsWithValue(t, "foo", func() {
  114. ForEach(func(source chan<- any) {
  115. panic("foo")
  116. }, func(item any) {
  117. })
  118. })
  119. })
  120. }
  121. func TestMapperPanic(t *testing.T) {
  122. defer goleak.VerifyNone(t)
  123. const tasks = 1000
  124. var run int32
  125. t.Run("all", func(t *testing.T) {
  126. assert.PanicsWithValue(t, "foo", func() {
  127. _, _ = MapReduce(func(source chan<- any) {
  128. for i := 0; i < tasks; i++ {
  129. source <- i
  130. }
  131. }, func(item any, writer Writer, cancel func(error)) {
  132. atomic.AddInt32(&run, 1)
  133. panic("foo")
  134. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  135. })
  136. })
  137. assert.True(t, atomic.LoadInt32(&run) < tasks/2)
  138. })
  139. }
  140. func TestMapReduce(t *testing.T) {
  141. defer goleak.VerifyNone(t)
  142. tests := []struct {
  143. name string
  144. mapper MapperFunc
  145. reducer ReducerFunc
  146. expectErr error
  147. expectValue any
  148. }{
  149. {
  150. name: "simple",
  151. expectErr: nil,
  152. expectValue: 30,
  153. },
  154. {
  155. name: "cancel with error",
  156. mapper: func(item any, writer Writer, cancel func(error)) {
  157. v := item.(int)
  158. if v%3 == 0 {
  159. cancel(errDummy)
  160. }
  161. writer.Write(v * v)
  162. },
  163. expectErr: errDummy,
  164. },
  165. {
  166. name: "cancel with nil",
  167. mapper: func(item any, writer Writer, cancel func(error)) {
  168. v := item.(int)
  169. if v%3 == 0 {
  170. cancel(nil)
  171. }
  172. writer.Write(v * v)
  173. },
  174. expectErr: ErrCancelWithNil,
  175. expectValue: nil,
  176. },
  177. {
  178. name: "cancel with more",
  179. reducer: func(pipe <-chan any, writer Writer, cancel func(error)) {
  180. var result int
  181. for item := range pipe {
  182. result += item.(int)
  183. if result > 10 {
  184. cancel(errDummy)
  185. }
  186. }
  187. writer.Write(result)
  188. },
  189. expectErr: errDummy,
  190. },
  191. }
  192. t.Run("MapReduce", func(t *testing.T) {
  193. for _, test := range tests {
  194. t.Run(test.name, func(t *testing.T) {
  195. if test.mapper == nil {
  196. test.mapper = func(item any, writer Writer, cancel func(error)) {
  197. v := item.(int)
  198. writer.Write(v * v)
  199. }
  200. }
  201. if test.reducer == nil {
  202. test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) {
  203. var result int
  204. for item := range pipe {
  205. result += item.(int)
  206. }
  207. writer.Write(result)
  208. }
  209. }
  210. value, err := MapReduce(func(source chan<- any) {
  211. for i := 1; i < 5; i++ {
  212. source <- i
  213. }
  214. }, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
  215. assert.Equal(t, test.expectErr, err)
  216. assert.Equal(t, test.expectValue, value)
  217. })
  218. }
  219. })
  220. t.Run("MapReduce", func(t *testing.T) {
  221. for _, test := range tests {
  222. t.Run(test.name, func(t *testing.T) {
  223. if test.mapper == nil {
  224. test.mapper = func(item any, writer Writer, cancel func(error)) {
  225. v := item.(int)
  226. writer.Write(v * v)
  227. }
  228. }
  229. if test.reducer == nil {
  230. test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) {
  231. var result int
  232. for item := range pipe {
  233. result += item.(int)
  234. }
  235. writer.Write(result)
  236. }
  237. }
  238. source := make(chan any)
  239. go func() {
  240. for i := 1; i < 5; i++ {
  241. source <- i
  242. }
  243. close(source)
  244. }()
  245. value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
  246. assert.Equal(t, test.expectErr, err)
  247. assert.Equal(t, test.expectValue, value)
  248. })
  249. }
  250. })
  251. }
  252. func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
  253. defer goleak.VerifyNone(t)
  254. assert.Panics(t, func() {
  255. MapReduce(func(source chan<- any) {
  256. for i := 0; i < 10; i++ {
  257. source <- i
  258. }
  259. }, func(item any, writer Writer, cancel func(error)) {
  260. writer.Write(item)
  261. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  262. drain(pipe)
  263. writer.Write("one")
  264. writer.Write("two")
  265. })
  266. })
  267. }
  268. func TestMapReduceVoid(t *testing.T) {
  269. defer goleak.VerifyNone(t)
  270. var value uint32
  271. tests := []struct {
  272. name string
  273. mapper MapperFunc
  274. reducer VoidReducerFunc
  275. expectValue uint32
  276. expectErr error
  277. }{
  278. {
  279. name: "simple",
  280. expectValue: 30,
  281. expectErr: nil,
  282. },
  283. {
  284. name: "cancel with error",
  285. mapper: func(item any, writer Writer, cancel func(error)) {
  286. v := item.(int)
  287. if v%3 == 0 {
  288. cancel(errDummy)
  289. }
  290. writer.Write(v * v)
  291. },
  292. expectErr: errDummy,
  293. },
  294. {
  295. name: "cancel with nil",
  296. mapper: func(item any, writer Writer, cancel func(error)) {
  297. v := item.(int)
  298. if v%3 == 0 {
  299. cancel(nil)
  300. }
  301. writer.Write(v * v)
  302. },
  303. expectErr: ErrCancelWithNil,
  304. },
  305. {
  306. name: "cancel with more",
  307. reducer: func(pipe <-chan any, cancel func(error)) {
  308. for item := range pipe {
  309. result := atomic.AddUint32(&value, uint32(item.(int)))
  310. if result > 10 {
  311. cancel(errDummy)
  312. }
  313. }
  314. },
  315. expectErr: errDummy,
  316. },
  317. }
  318. for _, test := range tests {
  319. t.Run(test.name, func(t *testing.T) {
  320. atomic.StoreUint32(&value, 0)
  321. if test.mapper == nil {
  322. test.mapper = func(item any, writer Writer, cancel func(error)) {
  323. v := item.(int)
  324. writer.Write(v * v)
  325. }
  326. }
  327. if test.reducer == nil {
  328. test.reducer = func(pipe <-chan any, cancel func(error)) {
  329. for item := range pipe {
  330. atomic.AddUint32(&value, uint32(item.(int)))
  331. }
  332. }
  333. }
  334. err := MapReduceVoid(func(source chan<- any) {
  335. for i := 1; i < 5; i++ {
  336. source <- i
  337. }
  338. }, test.mapper, test.reducer)
  339. assert.Equal(t, test.expectErr, err)
  340. if err == nil {
  341. assert.Equal(t, test.expectValue, atomic.LoadUint32(&value))
  342. }
  343. })
  344. }
  345. }
  346. func TestMapReduceVoidWithDelay(t *testing.T) {
  347. defer goleak.VerifyNone(t)
  348. var result []int
  349. err := MapReduceVoid(func(source chan<- any) {
  350. source <- 0
  351. source <- 1
  352. }, func(item any, writer Writer, cancel func(error)) {
  353. i := item.(int)
  354. if i == 0 {
  355. time.Sleep(time.Millisecond * 50)
  356. }
  357. writer.Write(i)
  358. }, func(pipe <-chan any, cancel func(error)) {
  359. for item := range pipe {
  360. i := item.(int)
  361. result = append(result, i)
  362. }
  363. })
  364. assert.Nil(t, err)
  365. assert.Equal(t, 2, len(result))
  366. assert.Equal(t, 1, result[0])
  367. assert.Equal(t, 0, result[1])
  368. }
  369. func TestMapReducePanic(t *testing.T) {
  370. defer goleak.VerifyNone(t)
  371. assert.Panics(t, func() {
  372. _, _ = MapReduce(func(source chan<- any) {
  373. source <- 0
  374. source <- 1
  375. }, func(item any, writer Writer, cancel func(error)) {
  376. i := item.(int)
  377. writer.Write(i)
  378. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  379. for range pipe {
  380. panic("panic")
  381. }
  382. })
  383. })
  384. }
  385. func TestMapReducePanicOnce(t *testing.T) {
  386. defer goleak.VerifyNone(t)
  387. assert.Panics(t, func() {
  388. _, _ = MapReduce(func(source chan<- any) {
  389. for i := 0; i < 100; i++ {
  390. source <- i
  391. }
  392. }, func(item any, writer Writer, cancel func(error)) {
  393. i := item.(int)
  394. if i == 0 {
  395. panic("foo")
  396. }
  397. writer.Write(i)
  398. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  399. for range pipe {
  400. panic("bar")
  401. }
  402. })
  403. })
  404. }
  405. func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
  406. defer goleak.VerifyNone(t)
  407. assert.Panics(t, func() {
  408. _, _ = MapReduce(func(source chan<- any) {
  409. source <- 0
  410. source <- 1
  411. }, func(item any, writer Writer, cancel func(error)) {
  412. panic("foo")
  413. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  414. panic("bar")
  415. })
  416. })
  417. }
  418. func TestMapReduceVoidCancel(t *testing.T) {
  419. defer goleak.VerifyNone(t)
  420. var result []int
  421. err := MapReduceVoid(func(source chan<- any) {
  422. source <- 0
  423. source <- 1
  424. }, func(item any, writer Writer, cancel func(error)) {
  425. i := item.(int)
  426. if i == 1 {
  427. cancel(errors.New("anything"))
  428. }
  429. writer.Write(i)
  430. }, func(pipe <-chan any, cancel func(error)) {
  431. for item := range pipe {
  432. i := item.(int)
  433. result = append(result, i)
  434. }
  435. })
  436. assert.NotNil(t, err)
  437. assert.Equal(t, "anything", err.Error())
  438. }
  439. func TestMapReduceVoidCancelWithRemains(t *testing.T) {
  440. defer goleak.VerifyNone(t)
  441. var done int32
  442. var result []int
  443. err := MapReduceVoid(func(source chan<- any) {
  444. for i := 0; i < defaultWorkers*2; i++ {
  445. source <- i
  446. }
  447. atomic.AddInt32(&done, 1)
  448. }, func(item any, writer Writer, cancel func(error)) {
  449. i := item.(int)
  450. if i == defaultWorkers/2 {
  451. cancel(errors.New("anything"))
  452. }
  453. writer.Write(i)
  454. }, func(pipe <-chan any, cancel func(error)) {
  455. for item := range pipe {
  456. i := item.(int)
  457. result = append(result, i)
  458. }
  459. })
  460. assert.NotNil(t, err)
  461. assert.Equal(t, "anything", err.Error())
  462. assert.Equal(t, int32(1), done)
  463. }
  464. func TestMapReduceWithoutReducerWrite(t *testing.T) {
  465. defer goleak.VerifyNone(t)
  466. uids := []int{1, 2, 3}
  467. res, err := MapReduce(func(source chan<- any) {
  468. for _, uid := range uids {
  469. source <- uid
  470. }
  471. }, func(item any, writer Writer, cancel func(error)) {
  472. writer.Write(item)
  473. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  474. drain(pipe)
  475. // not calling writer.Write(...), should not panic
  476. })
  477. assert.Equal(t, ErrReduceNoOutput, err)
  478. assert.Nil(t, res)
  479. }
  480. func TestMapReduceVoidPanicInReducer(t *testing.T) {
  481. defer goleak.VerifyNone(t)
  482. const message = "foo"
  483. assert.Panics(t, func() {
  484. var done int32
  485. _ = MapReduceVoid(func(source chan<- any) {
  486. for i := 0; i < defaultWorkers*2; i++ {
  487. source <- i
  488. }
  489. atomic.AddInt32(&done, 1)
  490. }, func(item any, writer Writer, cancel func(error)) {
  491. i := item.(int)
  492. writer.Write(i)
  493. }, func(pipe <-chan any, cancel func(error)) {
  494. panic(message)
  495. }, WithWorkers(1))
  496. })
  497. }
  498. func TestForEachWithContext(t *testing.T) {
  499. defer goleak.VerifyNone(t)
  500. var done int32
  501. ctx, cancel := context.WithCancel(context.Background())
  502. ForEach(func(source chan<- any) {
  503. for i := 0; i < defaultWorkers*2; i++ {
  504. source <- i
  505. }
  506. atomic.AddInt32(&done, 1)
  507. }, func(item any) {
  508. i := item.(int)
  509. if i == defaultWorkers/2 {
  510. cancel()
  511. }
  512. }, WithContext(ctx))
  513. }
  514. func TestMapReduceWithContext(t *testing.T) {
  515. defer goleak.VerifyNone(t)
  516. var done int32
  517. var result []int
  518. ctx, cancel := context.WithCancel(context.Background())
  519. err := MapReduceVoid(func(source chan<- any) {
  520. for i := 0; i < defaultWorkers*2; i++ {
  521. source <- i
  522. }
  523. atomic.AddInt32(&done, 1)
  524. }, func(item any, writer Writer, c func(error)) {
  525. i := item.(int)
  526. if i == defaultWorkers/2 {
  527. cancel()
  528. }
  529. writer.Write(i)
  530. }, func(pipe <-chan any, cancel func(error)) {
  531. for item := range pipe {
  532. i := item.(int)
  533. result = append(result, i)
  534. }
  535. }, WithContext(ctx))
  536. assert.NotNil(t, err)
  537. assert.Equal(t, context.DeadlineExceeded, err)
  538. }
  539. func BenchmarkMapReduce(b *testing.B) {
  540. b.ReportAllocs()
  541. mapper := func(v any, writer Writer, cancel func(error)) {
  542. writer.Write(v.(int64) * v.(int64))
  543. }
  544. reducer := func(input <-chan any, writer Writer, cancel func(error)) {
  545. var result int64
  546. for v := range input {
  547. result += v.(int64)
  548. }
  549. writer.Write(result)
  550. }
  551. for i := 0; i < b.N; i++ {
  552. MapReduce(func(input chan<- any) {
  553. for j := 0; j < 2; j++ {
  554. input <- int64(j)
  555. }
  556. }, mapper, reducer)
  557. }
  558. }