1
0

mapreduce_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. package mr
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "log"
  7. "runtime"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "go.uber.org/goleak"
  13. )
  14. var errDummy = errors.New("dummy")
  15. func init() {
  16. log.SetOutput(io.Discard)
  17. }
  18. func TestFinish(t *testing.T) {
  19. defer goleak.VerifyNone(t)
  20. var total uint32
  21. err := Finish(func() error {
  22. atomic.AddUint32(&total, 2)
  23. return nil
  24. }, func() error {
  25. atomic.AddUint32(&total, 3)
  26. return nil
  27. }, func() error {
  28. atomic.AddUint32(&total, 5)
  29. return nil
  30. })
  31. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  32. assert.Nil(t, err)
  33. }
  34. func TestFinishNone(t *testing.T) {
  35. defer goleak.VerifyNone(t)
  36. assert.Nil(t, Finish())
  37. }
  38. func TestFinishVoidNone(t *testing.T) {
  39. defer goleak.VerifyNone(t)
  40. FinishVoid()
  41. }
  42. func TestFinishErr(t *testing.T) {
  43. defer goleak.VerifyNone(t)
  44. var total uint32
  45. err := Finish(func() error {
  46. atomic.AddUint32(&total, 2)
  47. return nil
  48. }, func() error {
  49. atomic.AddUint32(&total, 3)
  50. return errDummy
  51. }, func() error {
  52. atomic.AddUint32(&total, 5)
  53. return nil
  54. })
  55. assert.Equal(t, errDummy, err)
  56. }
  57. func TestFinishVoid(t *testing.T) {
  58. defer goleak.VerifyNone(t)
  59. var total uint32
  60. FinishVoid(func() {
  61. atomic.AddUint32(&total, 2)
  62. }, func() {
  63. atomic.AddUint32(&total, 3)
  64. }, func() {
  65. atomic.AddUint32(&total, 5)
  66. })
  67. assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
  68. }
  69. func TestForEach(t *testing.T) {
  70. const tasks = 1000
  71. t.Run("all", func(t *testing.T) {
  72. defer goleak.VerifyNone(t)
  73. var count uint32
  74. ForEach(func(source chan<- int) {
  75. for i := 0; i < tasks; i++ {
  76. source <- i
  77. }
  78. }, func(item int) {
  79. atomic.AddUint32(&count, 1)
  80. }, WithWorkers(-1))
  81. assert.Equal(t, tasks, int(count))
  82. })
  83. t.Run("odd", func(t *testing.T) {
  84. defer goleak.VerifyNone(t)
  85. var count uint32
  86. ForEach(func(source chan<- int) {
  87. for i := 0; i < tasks; i++ {
  88. source <- i
  89. }
  90. }, func(item int) {
  91. if item%2 == 0 {
  92. atomic.AddUint32(&count, 1)
  93. }
  94. })
  95. assert.Equal(t, tasks/2, int(count))
  96. })
  97. t.Run("all", func(t *testing.T) {
  98. defer goleak.VerifyNone(t)
  99. assert.PanicsWithValue(t, "foo", func() {
  100. ForEach(func(source chan<- int) {
  101. for i := 0; i < tasks; i++ {
  102. source <- i
  103. }
  104. }, func(item int) {
  105. panic("foo")
  106. })
  107. })
  108. })
  109. }
  110. func TestGeneratePanic(t *testing.T) {
  111. defer goleak.VerifyNone(t)
  112. t.Run("all", func(t *testing.T) {
  113. assert.PanicsWithValue(t, "foo", func() {
  114. ForEach(func(source chan<- int) {
  115. panic("foo")
  116. }, func(item int) {
  117. })
  118. })
  119. })
  120. }
  121. func TestMapperPanic(t *testing.T) {
  122. defer goleak.VerifyNone(t)
  123. const tasks = 1000
  124. var run int32
  125. t.Run("all", func(t *testing.T) {
  126. assert.PanicsWithValue(t, "foo", func() {
  127. _, _ = MapReduce(func(source chan<- int) {
  128. for i := 0; i < tasks; i++ {
  129. source <- i
  130. }
  131. }, func(item int, writer Writer[int], cancel func(error)) {
  132. atomic.AddInt32(&run, 1)
  133. panic("foo")
  134. }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  135. })
  136. })
  137. assert.True(t, atomic.LoadInt32(&run) < tasks/2)
  138. })
  139. }
  140. func TestMapReduce(t *testing.T) {
  141. defer goleak.VerifyNone(t)
  142. tests := []struct {
  143. name string
  144. mapper MapperFunc[int, int]
  145. reducer ReducerFunc[int, int]
  146. expectErr error
  147. expectValue int
  148. }{
  149. {
  150. name: "simple",
  151. expectErr: nil,
  152. expectValue: 30,
  153. },
  154. {
  155. name: "cancel with error",
  156. mapper: func(v int, writer Writer[int], cancel func(error)) {
  157. if v%3 == 0 {
  158. cancel(errDummy)
  159. }
  160. writer.Write(v * v)
  161. },
  162. expectErr: errDummy,
  163. },
  164. {
  165. name: "cancel with nil",
  166. mapper: func(v int, writer Writer[int], cancel func(error)) {
  167. if v%3 == 0 {
  168. cancel(nil)
  169. }
  170. writer.Write(v * v)
  171. },
  172. expectErr: ErrCancelWithNil,
  173. },
  174. {
  175. name: "cancel with more",
  176. reducer: func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  177. var result int
  178. for item := range pipe {
  179. result += item
  180. if result > 10 {
  181. cancel(errDummy)
  182. }
  183. }
  184. writer.Write(result)
  185. },
  186. expectErr: errDummy,
  187. },
  188. }
  189. t.Run("MapReduce", func(t *testing.T) {
  190. for _, test := range tests {
  191. t.Run(test.name, func(t *testing.T) {
  192. if test.mapper == nil {
  193. test.mapper = func(v int, writer Writer[int], cancel func(error)) {
  194. writer.Write(v * v)
  195. }
  196. }
  197. if test.reducer == nil {
  198. test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  199. var result int
  200. for item := range pipe {
  201. result += item
  202. }
  203. writer.Write(result)
  204. }
  205. }
  206. value, err := MapReduce(func(source chan<- int) {
  207. for i := 1; i < 5; i++ {
  208. source <- i
  209. }
  210. }, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
  211. assert.Equal(t, test.expectErr, err)
  212. assert.Equal(t, test.expectValue, value)
  213. })
  214. }
  215. })
  216. t.Run("MapReduce", func(t *testing.T) {
  217. for _, test := range tests {
  218. t.Run(test.name, func(t *testing.T) {
  219. if test.mapper == nil {
  220. test.mapper = func(v int, writer Writer[int], cancel func(error)) {
  221. writer.Write(v * v)
  222. }
  223. }
  224. if test.reducer == nil {
  225. test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  226. var result int
  227. for item := range pipe {
  228. result += item
  229. }
  230. writer.Write(result)
  231. }
  232. }
  233. source := make(chan int)
  234. go func() {
  235. for i := 1; i < 5; i++ {
  236. source <- i
  237. }
  238. close(source)
  239. }()
  240. value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
  241. assert.Equal(t, test.expectErr, err)
  242. assert.Equal(t, test.expectValue, value)
  243. })
  244. }
  245. })
  246. }
  247. func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
  248. defer goleak.VerifyNone(t)
  249. assert.Panics(t, func() {
  250. MapReduce(func(source chan<- int) {
  251. for i := 0; i < 10; i++ {
  252. source <- i
  253. }
  254. }, func(item int, writer Writer[int], cancel func(error)) {
  255. writer.Write(item)
  256. }, func(pipe <-chan int, writer Writer[string], cancel func(error)) {
  257. drain(pipe)
  258. writer.Write("one")
  259. writer.Write("two")
  260. })
  261. })
  262. }
  263. func TestMapReduceVoid(t *testing.T) {
  264. defer goleak.VerifyNone(t)
  265. var value uint32
  266. tests := []struct {
  267. name string
  268. mapper MapperFunc[int, int]
  269. reducer VoidReducerFunc[int]
  270. expectValue uint32
  271. expectErr error
  272. }{
  273. {
  274. name: "simple",
  275. expectValue: 30,
  276. expectErr: nil,
  277. },
  278. {
  279. name: "cancel with error",
  280. mapper: func(v int, writer Writer[int], cancel func(error)) {
  281. if v%3 == 0 {
  282. cancel(errDummy)
  283. }
  284. writer.Write(v * v)
  285. },
  286. expectErr: errDummy,
  287. },
  288. {
  289. name: "cancel with nil",
  290. mapper: func(v int, writer Writer[int], cancel func(error)) {
  291. if v%3 == 0 {
  292. cancel(nil)
  293. }
  294. writer.Write(v * v)
  295. },
  296. expectErr: ErrCancelWithNil,
  297. },
  298. {
  299. name: "cancel with more",
  300. reducer: func(pipe <-chan int, cancel func(error)) {
  301. for item := range pipe {
  302. result := atomic.AddUint32(&value, uint32(item))
  303. if result > 10 {
  304. cancel(errDummy)
  305. }
  306. }
  307. },
  308. expectErr: errDummy,
  309. },
  310. }
  311. for _, test := range tests {
  312. t.Run(test.name, func(t *testing.T) {
  313. atomic.StoreUint32(&value, 0)
  314. if test.mapper == nil {
  315. test.mapper = func(v int, writer Writer[int], cancel func(error)) {
  316. writer.Write(v * v)
  317. }
  318. }
  319. if test.reducer == nil {
  320. test.reducer = func(pipe <-chan int, cancel func(error)) {
  321. for item := range pipe {
  322. atomic.AddUint32(&value, uint32(item))
  323. }
  324. }
  325. }
  326. err := MapReduceVoid(func(source chan<- int) {
  327. for i := 1; i < 5; i++ {
  328. source <- i
  329. }
  330. }, test.mapper, test.reducer)
  331. assert.Equal(t, test.expectErr, err)
  332. if err == nil {
  333. assert.Equal(t, test.expectValue, atomic.LoadUint32(&value))
  334. }
  335. })
  336. }
  337. }
  338. func TestMapReduceVoidWithDelay(t *testing.T) {
  339. defer goleak.VerifyNone(t)
  340. var result []int
  341. err := MapReduceVoid(func(source chan<- int) {
  342. source <- 0
  343. source <- 1
  344. }, func(i int, writer Writer[int], cancel func(error)) {
  345. if i == 0 {
  346. time.Sleep(time.Millisecond * 50)
  347. }
  348. writer.Write(i)
  349. }, func(pipe <-chan int, cancel func(error)) {
  350. for item := range pipe {
  351. i := item
  352. result = append(result, i)
  353. }
  354. })
  355. assert.Nil(t, err)
  356. assert.Equal(t, 2, len(result))
  357. assert.Equal(t, 1, result[0])
  358. assert.Equal(t, 0, result[1])
  359. }
  360. func TestMapReducePanic(t *testing.T) {
  361. defer goleak.VerifyNone(t)
  362. assert.Panics(t, func() {
  363. _, _ = MapReduce(func(source chan<- int) {
  364. source <- 0
  365. source <- 1
  366. }, func(i int, writer Writer[int], cancel func(error)) {
  367. writer.Write(i)
  368. }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  369. for range pipe {
  370. panic("panic")
  371. }
  372. })
  373. })
  374. }
  375. func TestMapReducePanicOnce(t *testing.T) {
  376. defer goleak.VerifyNone(t)
  377. assert.Panics(t, func() {
  378. _, _ = MapReduce(func(source chan<- int) {
  379. for i := 0; i < 100; i++ {
  380. source <- i
  381. }
  382. }, func(i int, writer Writer[int], cancel func(error)) {
  383. if i == 0 {
  384. panic("foo")
  385. }
  386. writer.Write(i)
  387. }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  388. for range pipe {
  389. panic("bar")
  390. }
  391. })
  392. })
  393. }
  394. func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
  395. defer goleak.VerifyNone(t)
  396. assert.Panics(t, func() {
  397. _, _ = MapReduce(func(source chan<- int) {
  398. source <- 0
  399. source <- 1
  400. }, func(item int, writer Writer[int], cancel func(error)) {
  401. panic("foo")
  402. }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  403. panic("bar")
  404. })
  405. })
  406. }
  407. func TestMapReduceVoidCancel(t *testing.T) {
  408. defer goleak.VerifyNone(t)
  409. var result []int
  410. err := MapReduceVoid(func(source chan<- int) {
  411. source <- 0
  412. source <- 1
  413. }, func(i int, writer Writer[int], cancel func(error)) {
  414. if i == 1 {
  415. cancel(errors.New("anything"))
  416. }
  417. writer.Write(i)
  418. }, func(pipe <-chan int, cancel func(error)) {
  419. for item := range pipe {
  420. i := item
  421. result = append(result, i)
  422. }
  423. })
  424. assert.NotNil(t, err)
  425. assert.Equal(t, "anything", err.Error())
  426. }
  427. func TestMapReduceVoidCancelWithRemains(t *testing.T) {
  428. defer goleak.VerifyNone(t)
  429. var done int32
  430. var result []int
  431. err := MapReduceVoid(func(source chan<- int) {
  432. for i := 0; i < defaultWorkers*2; i++ {
  433. source <- i
  434. }
  435. atomic.AddInt32(&done, 1)
  436. }, func(i int, writer Writer[int], cancel func(error)) {
  437. if i == defaultWorkers/2 {
  438. cancel(errors.New("anything"))
  439. }
  440. writer.Write(i)
  441. }, func(pipe <-chan int, cancel func(error)) {
  442. for item := range pipe {
  443. result = append(result, item)
  444. }
  445. })
  446. assert.NotNil(t, err)
  447. assert.Equal(t, "anything", err.Error())
  448. assert.Equal(t, int32(1), done)
  449. }
  450. func TestMapReduceWithoutReducerWrite(t *testing.T) {
  451. defer goleak.VerifyNone(t)
  452. uids := []int{1, 2, 3}
  453. res, err := MapReduce(func(source chan<- int) {
  454. for _, uid := range uids {
  455. source <- uid
  456. }
  457. }, func(item int, writer Writer[int], cancel func(error)) {
  458. writer.Write(item)
  459. }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
  460. drain(pipe)
  461. // not calling writer.Write(...), should not panic
  462. })
  463. assert.Equal(t, ErrReduceNoOutput, err)
  464. assert.Equal(t, 0, res)
  465. }
  466. func TestMapReduceVoidPanicInReducer(t *testing.T) {
  467. defer goleak.VerifyNone(t)
  468. const message = "foo"
  469. assert.Panics(t, func() {
  470. var done int32
  471. _ = MapReduceVoid(func(source chan<- int) {
  472. for i := 0; i < defaultWorkers*2; i++ {
  473. source <- i
  474. }
  475. atomic.AddInt32(&done, 1)
  476. }, func(i int, writer Writer[int], cancel func(error)) {
  477. writer.Write(i)
  478. }, func(pipe <-chan int, cancel func(error)) {
  479. panic(message)
  480. }, WithWorkers(1))
  481. })
  482. }
  483. func TestForEachWithContext(t *testing.T) {
  484. defer goleak.VerifyNone(t)
  485. var done int32
  486. ctx, cancel := context.WithCancel(context.Background())
  487. ForEach(func(source chan<- int) {
  488. for i := 0; i < defaultWorkers*2; i++ {
  489. source <- i
  490. }
  491. atomic.AddInt32(&done, 1)
  492. }, func(i int) {
  493. if i == defaultWorkers/2 {
  494. cancel()
  495. }
  496. }, WithContext(ctx))
  497. }
  498. func TestMapReduceWithContext(t *testing.T) {
  499. defer goleak.VerifyNone(t)
  500. var done int32
  501. var result []int
  502. ctx, cancel := context.WithCancel(context.Background())
  503. err := MapReduceVoid(func(source chan<- int) {
  504. for i := 0; i < defaultWorkers*2; i++ {
  505. source <- i
  506. }
  507. atomic.AddInt32(&done, 1)
  508. }, func(i int, writer Writer[int], c func(error)) {
  509. if i == defaultWorkers/2 {
  510. cancel()
  511. }
  512. writer.Write(i)
  513. time.Sleep(time.Millisecond)
  514. }, func(pipe <-chan int, cancel func(error)) {
  515. for item := range pipe {
  516. i := item
  517. result = append(result, i)
  518. }
  519. }, WithContext(ctx))
  520. assert.NotNil(t, err)
  521. assert.Equal(t, context.DeadlineExceeded, err)
  522. }
  523. func BenchmarkMapReduce(b *testing.B) {
  524. b.ReportAllocs()
  525. mapper := func(v int64, writer Writer[int64], cancel func(error)) {
  526. writer.Write(v * v)
  527. }
  528. reducer := func(input <-chan int64, writer Writer[int64], cancel func(error)) {
  529. var result int64
  530. for v := range input {
  531. result += v
  532. }
  533. writer.Write(result)
  534. }
  535. for i := 0; i < b.N; i++ {
  536. MapReduce(func(input chan<- int64) {
  537. for j := 0; j < 2; j++ {
  538. input <- int64(j)
  539. }
  540. }, mapper, reducer)
  541. }
  542. }