mapreduce_fuzzcase_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. //go:build fuzz
  2. // +build fuzz
  3. package mr
  4. import (
  5. "fmt"
  6. "math/rand"
  7. "runtime"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/zeromicro/go-zero/core/threading"
  15. "gopkg.in/cheggaaa/pb.v1"
  16. )
  17. // If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
  18. // so we need to simulate the fuzz test in test mode.
  19. func TestMapReduceRandom(t *testing.T) {
  20. rand.Seed(time.Now().UnixNano())
  21. const (
  22. times = 10000
  23. nRange = 500
  24. mega = 1024 * 1024
  25. )
  26. bar := pb.New(times).Start()
  27. runner := threading.NewTaskRunner(runtime.NumCPU())
  28. var wg sync.WaitGroup
  29. wg.Add(times)
  30. for i := 0; i < times; i++ {
  31. runner.Schedule(func() {
  32. start := time.Now()
  33. defer func() {
  34. if time.Since(start) > time.Minute {
  35. t.Fatal("timeout")
  36. }
  37. wg.Done()
  38. }()
  39. t.Run(strconv.Itoa(i), func(t *testing.T) {
  40. n := rand.Int63n(nRange)%nRange + nRange
  41. workers := rand.Int()%50 + runtime.NumCPU()/2
  42. genPanic := rand.Intn(100) == 0
  43. mapperPanic := rand.Intn(100) == 0
  44. reducerPanic := rand.Intn(100) == 0
  45. genIdx := rand.Int63n(n)
  46. mapperIdx := rand.Int63n(n)
  47. reducerIdx := rand.Int63n(n)
  48. squareSum := (n - 1) * n * (2*n - 1) / 6
  49. fn := func() (any, error) {
  50. return MapReduce(func(source chan<- any) {
  51. for i := int64(0); i < n; i++ {
  52. source <- i
  53. if genPanic && i == genIdx {
  54. panic("foo")
  55. }
  56. }
  57. }, func(item any, writer Writer, cancel func(error)) {
  58. v := item.(int64)
  59. if mapperPanic && v == mapperIdx {
  60. panic("bar")
  61. }
  62. writer.Write(v * v)
  63. }, func(pipe <-chan any, writer Writer, cancel func(error)) {
  64. var idx int64
  65. var total int64
  66. for v := range pipe {
  67. if reducerPanic && idx == reducerIdx {
  68. panic("baz")
  69. }
  70. total += v.(int64)
  71. idx++
  72. }
  73. writer.Write(total)
  74. }, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
  75. }
  76. if genPanic || mapperPanic || reducerPanic {
  77. var buf strings.Builder
  78. buf.WriteString(fmt.Sprintf("n: %d", n))
  79. buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
  80. buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
  81. buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
  82. buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
  83. buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
  84. buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
  85. assert.Panicsf(t, func() { fn() }, buf.String())
  86. } else {
  87. val, err := fn()
  88. assert.Nil(t, err)
  89. assert.Equal(t, squareSum, val.(int64))
  90. }
  91. bar.Increment()
  92. })
  93. })
  94. }
  95. wg.Wait()
  96. bar.Finish()
  97. }