mapreduce_fuzzcase_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. //go:build fuzz
  2. package mr
  3. import (
  4. "fmt"
  5. "math/rand"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/wuntsong-org/go-zero-plus/core/threading"
  14. "gopkg.in/cheggaaa/pb.v1"
  15. )
  16. // If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
  17. // so we need to simulate the fuzz test in test mode.
  18. func TestMapReduceRandom(t *testing.T) {
  19. rand.Seed(time.Now().UnixNano())
  20. const (
  21. times = 10000
  22. nRange = 500
  23. mega = 1024 * 1024
  24. )
  25. bar := pb.New(times).Start()
  26. runner := threading.NewTaskRunner(runtime.NumCPU())
  27. var wg sync.WaitGroup
  28. wg.Add(times)
  29. for i := 0; i < times; i++ {
  30. runner.Schedule(func() {
  31. start := time.Now()
  32. defer func() {
  33. if time.Since(start) > time.Minute {
  34. t.Fatal("timeout")
  35. }
  36. wg.Done()
  37. }()
  38. t.Run(strconv.Itoa(i), func(t *testing.T) {
  39. n := rand.Int63n(nRange)%nRange + nRange
  40. workers := rand.Int()%50 + runtime.NumCPU()/2
  41. genPanic := rand.Intn(100) == 0
  42. mapperPanic := rand.Intn(100) == 0
  43. reducerPanic := rand.Intn(100) == 0
  44. genIdx := rand.Int63n(n)
  45. mapperIdx := rand.Int63n(n)
  46. reducerIdx := rand.Int63n(n)
  47. squareSum := (n - 1) * n * (2*n - 1) / 6
  48. fn := func() (int64, error) {
  49. return MapReduce(func(source chan<- int64) {
  50. for i := int64(0); i < n; i++ {
  51. source <- i
  52. if genPanic && i == genIdx {
  53. panic("foo")
  54. }
  55. }
  56. }, func(v int64, writer Writer[int64], cancel func(error)) {
  57. if mapperPanic && v == mapperIdx {
  58. panic("bar")
  59. }
  60. writer.Write(v * v)
  61. }, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
  62. var idx int64
  63. var total int64
  64. for v := range pipe {
  65. if reducerPanic && idx == reducerIdx {
  66. panic("baz")
  67. }
  68. total += v
  69. idx++
  70. }
  71. writer.Write(total)
  72. }, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
  73. }
  74. if genPanic || mapperPanic || reducerPanic {
  75. var buf strings.Builder
  76. buf.WriteString(fmt.Sprintf("n: %d", n))
  77. buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
  78. buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
  79. buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
  80. buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
  81. buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
  82. buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
  83. assert.Panicsf(t, func() { fn() }, buf.String())
  84. } else {
  85. val, err := fn()
  86. assert.Nil(t, err)
  87. assert.Equal(t, squareSum, val)
  88. }
  89. bar.Increment()
  90. })
  91. })
  92. }
  93. wg.Wait()
  94. bar.Finish()
  95. }