periodicalexecutor.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package executors
  2. import (
  3. "reflect"
  4. "sync"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/proc"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. "github.com/tal-tech/go-zero/core/threading"
  10. "github.com/tal-tech/go-zero/core/timex"
  11. )
  12. const idleRound = 10
  13. type (
  14. // A type that satisfies executors.TaskContainer can be used as the underlying
  15. // container that used to do periodical executions.
  16. TaskContainer interface {
  17. // AddTask adds the task into the container.
  18. // Returns true if the container needs to be flushed after the addition.
  19. AddTask(task interface{}) bool
  20. // Execute handles the collected tasks by the container when flushing.
  21. Execute(tasks interface{})
  22. // RemoveAll removes the contained tasks, and return them.
  23. RemoveAll() interface{}
  24. }
  25. PeriodicalExecutor struct {
  26. commander chan interface{}
  27. interval time.Duration
  28. container TaskContainer
  29. waitGroup sync.WaitGroup
  30. // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
  31. wgBarrier syncx.Barrier
  32. confirmChan chan lang.PlaceholderType
  33. guarded bool
  34. newTicker func(duration time.Duration) timex.Ticker
  35. lock sync.Mutex
  36. }
  37. )
  38. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  39. executor := &PeriodicalExecutor{
  40. // buffer 1 to let the caller go quickly
  41. commander: make(chan interface{}, 1),
  42. interval: interval,
  43. container: container,
  44. confirmChan: make(chan lang.PlaceholderType),
  45. newTicker: func(d time.Duration) timex.Ticker {
  46. return timex.NewTicker(interval)
  47. },
  48. }
  49. proc.AddShutdownListener(func() {
  50. executor.Flush()
  51. })
  52. return executor
  53. }
  54. func (pe *PeriodicalExecutor) Add(task interface{}) {
  55. if vals, ok := pe.addAndCheck(task); ok {
  56. pe.commander <- vals
  57. <-pe.confirmChan
  58. }
  59. }
  60. func (pe *PeriodicalExecutor) Flush() bool {
  61. pe.enterExecution()
  62. return pe.executeTasks(func() interface{} {
  63. pe.lock.Lock()
  64. defer pe.lock.Unlock()
  65. return pe.container.RemoveAll()
  66. }())
  67. }
  68. func (pe *PeriodicalExecutor) Sync(fn func()) {
  69. pe.lock.Lock()
  70. defer pe.lock.Unlock()
  71. fn()
  72. }
  73. func (pe *PeriodicalExecutor) Wait() {
  74. pe.Flush()
  75. pe.wgBarrier.Guard(func() {
  76. pe.waitGroup.Wait()
  77. })
  78. }
  79. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  80. pe.lock.Lock()
  81. defer func() {
  82. var start bool
  83. if !pe.guarded {
  84. pe.guarded = true
  85. start = true
  86. }
  87. pe.lock.Unlock()
  88. if start {
  89. pe.backgroundFlush()
  90. }
  91. }()
  92. if pe.container.AddTask(task) {
  93. return pe.container.RemoveAll(), true
  94. }
  95. return nil, false
  96. }
  97. func (pe *PeriodicalExecutor) backgroundFlush() {
  98. threading.GoSafe(func() {
  99. ticker := pe.newTicker(pe.interval)
  100. defer ticker.Stop()
  101. var commanded bool
  102. last := timex.Now()
  103. for {
  104. select {
  105. case vals := <-pe.commander:
  106. commanded = true
  107. pe.enterExecution()
  108. pe.confirmChan <- lang.Placeholder
  109. pe.executeTasks(vals)
  110. last = timex.Now()
  111. case <-ticker.Chan():
  112. if commanded {
  113. commanded = false
  114. } else if pe.Flush() {
  115. last = timex.Now()
  116. } else if timex.Since(last) > pe.interval*idleRound {
  117. pe.lock.Lock()
  118. pe.guarded = false
  119. pe.lock.Unlock()
  120. // flush again to avoid missing tasks
  121. pe.Flush()
  122. return
  123. }
  124. }
  125. }
  126. })
  127. }
  128. func (pe *PeriodicalExecutor) doneExecution() {
  129. pe.waitGroup.Done()
  130. }
  131. func (pe *PeriodicalExecutor) enterExecution() {
  132. pe.wgBarrier.Guard(func() {
  133. pe.waitGroup.Add(1)
  134. })
  135. }
  136. func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
  137. defer pe.doneExecution()
  138. ok := pe.hasTasks(tasks)
  139. if ok {
  140. pe.container.Execute(tasks)
  141. }
  142. return ok
  143. }
  144. func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
  145. if tasks == nil {
  146. return false
  147. }
  148. val := reflect.ValueOf(tasks)
  149. switch val.Kind() {
  150. case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
  151. return val.Len() > 0
  152. default:
  153. // unknown type, let caller execute it
  154. return true
  155. }
  156. }