periodicalexecutor_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package executors
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/wuntsong-org/go-zero-plus/core/proc"
  10. "github.com/wuntsong-org/go-zero-plus/core/timex"
  11. )
  12. const threshold = 10
  13. type container struct {
  14. interval time.Duration
  15. tasks []int
  16. execute func(tasks any)
  17. }
  18. func newContainer(interval time.Duration, execute func(tasks any)) *container {
  19. return &container{
  20. interval: interval,
  21. execute: execute,
  22. }
  23. }
  24. func (c *container) AddTask(task any) bool {
  25. c.tasks = append(c.tasks, task.(int))
  26. return len(c.tasks) > threshold
  27. }
  28. func (c *container) Execute(tasks any) {
  29. if c.execute != nil {
  30. c.execute(tasks)
  31. } else {
  32. time.Sleep(c.interval)
  33. }
  34. }
  35. func (c *container) RemoveAll() any {
  36. tasks := c.tasks
  37. c.tasks = nil
  38. return tasks
  39. }
  40. func TestPeriodicalExecutor_Sync(t *testing.T) {
  41. var done int32
  42. exec := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  43. exec.Sync(func() {
  44. atomic.AddInt32(&done, 1)
  45. })
  46. assert.Equal(t, int32(1), atomic.LoadInt32(&done))
  47. }
  48. func TestPeriodicalExecutor_QuitGoroutine(t *testing.T) {
  49. ticker := timex.NewFakeTicker()
  50. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
  51. exec.newTicker = func(d time.Duration) timex.Ticker {
  52. return ticker
  53. }
  54. routines := runtime.NumGoroutine()
  55. exec.Add(1)
  56. ticker.Tick()
  57. ticker.Wait(time.Millisecond * idleRound * 2)
  58. ticker.Tick()
  59. ticker.Wait(time.Millisecond * idleRound)
  60. assert.Equal(t, routines, runtime.NumGoroutine())
  61. proc.Shutdown()
  62. }
  63. func TestPeriodicalExecutor_Bulk(t *testing.T) {
  64. ticker := timex.NewFakeTicker()
  65. var vals []int
  66. // avoid data race
  67. var lock sync.Mutex
  68. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) {
  69. t := tasks.([]int)
  70. for _, each := range t {
  71. lock.Lock()
  72. vals = append(vals, each)
  73. lock.Unlock()
  74. }
  75. }))
  76. exec.newTicker = func(d time.Duration) timex.Ticker {
  77. return ticker
  78. }
  79. for i := 0; i < threshold*10; i++ {
  80. if i%threshold == 5 {
  81. time.Sleep(time.Millisecond * idleRound * 2)
  82. }
  83. exec.Add(i)
  84. }
  85. ticker.Tick()
  86. ticker.Wait(time.Millisecond * idleRound * 2)
  87. ticker.Tick()
  88. ticker.Tick()
  89. ticker.Wait(time.Millisecond * idleRound)
  90. var expect []int
  91. for i := 0; i < threshold*10; i++ {
  92. expect = append(expect, i)
  93. }
  94. lock.Lock()
  95. assert.EqualValues(t, expect, vals)
  96. lock.Unlock()
  97. }
  98. func TestPeriodicalExecutor_Panic(t *testing.T) {
  99. // avoid data race
  100. var lock sync.Mutex
  101. ticker := timex.NewFakeTicker()
  102. var (
  103. executedTasks []int
  104. expected []int
  105. )
  106. executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) {
  107. tt := tasks.([]int)
  108. lock.Lock()
  109. executedTasks = append(executedTasks, tt...)
  110. lock.Unlock()
  111. if tt[0] == 0 {
  112. panic("test")
  113. }
  114. }))
  115. executor.newTicker = func(duration time.Duration) timex.Ticker {
  116. return ticker
  117. }
  118. for i := 0; i < 30; i++ {
  119. executor.Add(i)
  120. expected = append(expected, i)
  121. }
  122. ticker.Tick()
  123. ticker.Tick()
  124. time.Sleep(time.Millisecond)
  125. lock.Lock()
  126. assert.Equal(t, expected, executedTasks)
  127. lock.Unlock()
  128. }
  129. func TestPeriodicalExecutor_FlushPanic(t *testing.T) {
  130. var (
  131. executedTasks []int
  132. expected []int
  133. lock sync.Mutex
  134. )
  135. executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) {
  136. tt := tasks.([]int)
  137. lock.Lock()
  138. executedTasks = append(executedTasks, tt...)
  139. lock.Unlock()
  140. if tt[0] == 0 {
  141. panic("flush panic")
  142. }
  143. }))
  144. for i := 0; i < 8; i++ {
  145. executor.Add(i)
  146. expected = append(expected, i)
  147. }
  148. executor.Flush()
  149. lock.Lock()
  150. assert.Equal(t, expected, executedTasks)
  151. lock.Unlock()
  152. }
  153. func TestPeriodicalExecutor_Wait(t *testing.T) {
  154. var lock sync.Mutex
  155. executor := NewBulkExecutor(func(tasks []any) {
  156. lock.Lock()
  157. defer lock.Unlock()
  158. time.Sleep(10 * time.Millisecond)
  159. }, WithBulkTasks(1), WithBulkInterval(time.Second))
  160. for i := 0; i < 10; i++ {
  161. executor.Add(1)
  162. }
  163. executor.Flush()
  164. executor.Wait()
  165. }
  166. func TestPeriodicalExecutor_WaitFast(t *testing.T) {
  167. const total = 3
  168. var cnt int
  169. var lock sync.Mutex
  170. executor := NewBulkExecutor(func(tasks []any) {
  171. defer func() {
  172. cnt++
  173. }()
  174. lock.Lock()
  175. defer lock.Unlock()
  176. time.Sleep(10 * time.Millisecond)
  177. }, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
  178. for i := 0; i < total; i++ {
  179. executor.Add(2)
  180. }
  181. executor.Flush()
  182. executor.Wait()
  183. assert.Equal(t, total, cnt)
  184. }
  185. func TestPeriodicalExecutor_Deadlock(t *testing.T) {
  186. executor := NewBulkExecutor(func(tasks []any) {
  187. }, WithBulkTasks(1), WithBulkInterval(time.Millisecond))
  188. for i := 0; i < 1e5; i++ {
  189. executor.Add(1)
  190. }
  191. }
  192. func TestPeriodicalExecutor_hasTasks(t *testing.T) {
  193. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
  194. assert.False(t, exec.hasTasks(nil))
  195. assert.True(t, exec.hasTasks(1))
  196. }
  197. // go test -benchtime 10s -bench .
  198. func BenchmarkExecutor(b *testing.B) {
  199. b.ReportAllocs()
  200. executor := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  201. for i := 0; i < b.N; i++ {
  202. executor.Add(1)
  203. }
  204. }