periodicalexecutor_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package executors
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/timex"
  10. )
  11. const threshold = 10
  12. type container struct {
  13. interval time.Duration
  14. tasks []int
  15. execute func(tasks interface{})
  16. }
  17. func newContainer(interval time.Duration, execute func(tasks interface{})) *container {
  18. return &container{
  19. interval: interval,
  20. execute: execute,
  21. }
  22. }
  23. func (c *container) AddTask(task interface{}) bool {
  24. c.tasks = append(c.tasks, task.(int))
  25. return len(c.tasks) > threshold
  26. }
  27. func (c *container) Execute(tasks interface{}) {
  28. if c.execute != nil {
  29. c.execute(tasks)
  30. } else {
  31. time.Sleep(c.interval)
  32. }
  33. }
  34. func (c *container) RemoveAll() interface{} {
  35. tasks := c.tasks
  36. c.tasks = nil
  37. return tasks
  38. }
  39. func TestPeriodicalExecutor_Sync(t *testing.T) {
  40. var done int32
  41. exec := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  42. exec.Sync(func() {
  43. atomic.AddInt32(&done, 1)
  44. })
  45. assert.Equal(t, int32(1), atomic.LoadInt32(&done))
  46. }
  47. func TestPeriodicalExecutor_QuitGoroutine(t *testing.T) {
  48. ticker := timex.NewFakeTicker()
  49. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
  50. exec.newTicker = func(d time.Duration) timex.Ticker {
  51. return ticker
  52. }
  53. routines := runtime.NumGoroutine()
  54. exec.Add(1)
  55. ticker.Tick()
  56. ticker.Wait(time.Millisecond * idleRound * 2)
  57. ticker.Tick()
  58. ticker.Wait(time.Millisecond * idleRound)
  59. assert.Equal(t, routines, runtime.NumGoroutine())
  60. }
  61. func TestPeriodicalExecutor_Bulk(t *testing.T) {
  62. ticker := timex.NewFakeTicker()
  63. var vals []int
  64. // avoid data race
  65. var lock sync.Mutex
  66. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks interface{}) {
  67. t := tasks.([]int)
  68. for _, each := range t {
  69. lock.Lock()
  70. vals = append(vals, each)
  71. lock.Unlock()
  72. }
  73. }))
  74. exec.newTicker = func(d time.Duration) timex.Ticker {
  75. return ticker
  76. }
  77. for i := 0; i < threshold*10; i++ {
  78. if i%threshold == 5 {
  79. time.Sleep(time.Millisecond * idleRound * 2)
  80. }
  81. exec.Add(i)
  82. }
  83. ticker.Tick()
  84. ticker.Wait(time.Millisecond * idleRound * 2)
  85. ticker.Tick()
  86. ticker.Tick()
  87. ticker.Wait(time.Millisecond * idleRound)
  88. var expect []int
  89. for i := 0; i < threshold*10; i++ {
  90. expect = append(expect, i)
  91. }
  92. lock.Lock()
  93. assert.EqualValues(t, expect, vals)
  94. lock.Unlock()
  95. }
  96. // go test -benchtime 10s -bench .
  97. func BenchmarkExecutor(b *testing.B) {
  98. b.ReportAllocs()
  99. executor := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  100. for i := 0; i < b.N; i++ {
  101. executor.Add(1)
  102. }
  103. }