periodicalexecutor.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package executors
  2. import (
  3. "reflect"
  4. "sync"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/proc"
  7. "github.com/tal-tech/go-zero/core/syncx"
  8. "github.com/tal-tech/go-zero/core/threading"
  9. "github.com/tal-tech/go-zero/core/timex"
  10. )
  11. const idleRound = 10
  12. type (
  13. // A type that satisfies executors.TaskContainer can be used as the underlying
  14. // container that used to do periodical executions.
  15. TaskContainer interface {
  16. // AddTask adds the task into the container.
  17. // Returns true if the container needs to be flushed after the addition.
  18. AddTask(task interface{}) bool
  19. // Execute handles the collected tasks by the container when flushing.
  20. Execute(tasks interface{})
  21. // RemoveAll removes the contained tasks, and return them.
  22. RemoveAll() interface{}
  23. }
  24. PeriodicalExecutor struct {
  25. commander chan interface{}
  26. interval time.Duration
  27. container TaskContainer
  28. waitGroup sync.WaitGroup
  29. // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
  30. wgBarrier syncx.Barrier
  31. guarded bool
  32. newTicker func(duration time.Duration) timex.Ticker
  33. lock sync.Mutex
  34. }
  35. )
  36. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  37. executor := &PeriodicalExecutor{
  38. // buffer 1 to let the caller go quickly
  39. commander: make(chan interface{}, 1),
  40. interval: interval,
  41. container: container,
  42. newTicker: func(d time.Duration) timex.Ticker {
  43. return timex.NewTicker(interval)
  44. },
  45. }
  46. proc.AddShutdownListener(func() {
  47. executor.Flush()
  48. })
  49. return executor
  50. }
  51. func (pe *PeriodicalExecutor) Add(task interface{}) {
  52. if vals, ok := pe.addAndCheck(task); ok {
  53. pe.commander <- vals
  54. }
  55. }
  56. func (pe *PeriodicalExecutor) Flush() bool {
  57. return pe.executeTasks(func() interface{} {
  58. pe.lock.Lock()
  59. defer pe.lock.Unlock()
  60. return pe.container.RemoveAll()
  61. }())
  62. }
  63. func (pe *PeriodicalExecutor) Sync(fn func()) {
  64. pe.lock.Lock()
  65. defer pe.lock.Unlock()
  66. fn()
  67. }
  68. func (pe *PeriodicalExecutor) Wait() {
  69. pe.wgBarrier.Guard(func() {
  70. pe.waitGroup.Wait()
  71. })
  72. }
  73. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  74. pe.lock.Lock()
  75. defer func() {
  76. var start bool
  77. if !pe.guarded {
  78. pe.guarded = true
  79. start = true
  80. }
  81. pe.lock.Unlock()
  82. if start {
  83. pe.backgroundFlush()
  84. }
  85. }()
  86. if pe.container.AddTask(task) {
  87. return pe.container.RemoveAll(), true
  88. }
  89. return nil, false
  90. }
  91. func (pe *PeriodicalExecutor) backgroundFlush() {
  92. threading.GoSafe(func() {
  93. ticker := pe.newTicker(pe.interval)
  94. defer ticker.Stop()
  95. var commanded bool
  96. last := timex.Now()
  97. for {
  98. select {
  99. case vals := <-pe.commander:
  100. commanded = true
  101. pe.executeTasks(vals)
  102. last = timex.Now()
  103. case <-ticker.Chan():
  104. if commanded {
  105. commanded = false
  106. } else if pe.Flush() {
  107. last = timex.Now()
  108. } else if timex.Since(last) > pe.interval*idleRound {
  109. pe.lock.Lock()
  110. pe.guarded = false
  111. pe.lock.Unlock()
  112. // flush again to avoid missing tasks
  113. pe.Flush()
  114. return
  115. }
  116. }
  117. }
  118. })
  119. }
  120. func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
  121. pe.wgBarrier.Guard(func() {
  122. pe.waitGroup.Add(1)
  123. })
  124. defer pe.wgBarrier.Guard(func() {
  125. pe.waitGroup.Done()
  126. })
  127. ok := pe.hasTasks(tasks)
  128. if ok {
  129. pe.container.Execute(tasks)
  130. }
  131. return ok
  132. }
  133. func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
  134. if tasks == nil {
  135. return false
  136. }
  137. val := reflect.ValueOf(tasks)
  138. switch val.Kind() {
  139. case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
  140. return val.Len() > 0
  141. default:
  142. // unknown type, let caller execute it
  143. return true
  144. }
  145. }