periodicalexecutor.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package executors
  2. import (
  3. "reflect"
  4. "sync"
  5. "time"
  6. "zero/core/proc"
  7. "zero/core/threading"
  8. "zero/core/timex"
  9. )
  10. const idleRound = 10
  11. type (
  12. // A type that satisfies executors.TaskContainer can be used as the underlying
  13. // container that used to do periodical executions.
  14. TaskContainer interface {
  15. // AddTask adds the task into the container.
  16. // Returns true if the container needs to be flushed after the addition.
  17. AddTask(task interface{}) bool
  18. // Execute handles the collected tasks by the container when flushing.
  19. Execute(tasks interface{})
  20. // RemoveAll removes the contained tasks, and return them.
  21. RemoveAll() interface{}
  22. }
  23. PeriodicalExecutor struct {
  24. commander chan interface{}
  25. interval time.Duration
  26. container TaskContainer
  27. waitGroup sync.WaitGroup
  28. guarded bool
  29. newTicker func(duration time.Duration) timex.Ticker
  30. lock sync.Mutex
  31. }
  32. )
  33. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  34. executor := &PeriodicalExecutor{
  35. // buffer 1 to let the caller go quickly
  36. commander: make(chan interface{}, 1),
  37. interval: interval,
  38. container: container,
  39. newTicker: func(d time.Duration) timex.Ticker {
  40. return timex.NewTicker(interval)
  41. },
  42. }
  43. proc.AddShutdownListener(func() {
  44. executor.Flush()
  45. })
  46. return executor
  47. }
  48. func (pe *PeriodicalExecutor) Add(task interface{}) {
  49. if vals, ok := pe.addAndCheck(task); ok {
  50. pe.commander <- vals
  51. }
  52. }
  53. func (pe *PeriodicalExecutor) Flush() bool {
  54. return pe.executeTasks(func() interface{} {
  55. pe.lock.Lock()
  56. defer pe.lock.Unlock()
  57. return pe.container.RemoveAll()
  58. }())
  59. }
  60. func (pe *PeriodicalExecutor) Sync(fn func()) {
  61. pe.lock.Lock()
  62. defer pe.lock.Unlock()
  63. fn()
  64. }
  65. func (pe *PeriodicalExecutor) Wait() {
  66. pe.waitGroup.Wait()
  67. }
  68. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  69. pe.lock.Lock()
  70. defer func() {
  71. var start bool
  72. if !pe.guarded {
  73. pe.guarded = true
  74. start = true
  75. }
  76. pe.lock.Unlock()
  77. if start {
  78. pe.backgroundFlush()
  79. }
  80. }()
  81. if pe.container.AddTask(task) {
  82. return pe.container.RemoveAll(), true
  83. }
  84. return nil, false
  85. }
  86. func (pe *PeriodicalExecutor) backgroundFlush() {
  87. threading.GoSafe(func() {
  88. ticker := pe.newTicker(pe.interval)
  89. defer ticker.Stop()
  90. var commanded bool
  91. last := timex.Now()
  92. for {
  93. select {
  94. case vals := <-pe.commander:
  95. commanded = true
  96. pe.executeTasks(vals)
  97. last = timex.Now()
  98. case <-ticker.Chan():
  99. if commanded {
  100. commanded = false
  101. } else if pe.Flush() {
  102. last = timex.Now()
  103. } else if timex.Since(last) > pe.interval*idleRound {
  104. pe.lock.Lock()
  105. pe.guarded = false
  106. pe.lock.Unlock()
  107. // flush again to avoid missing tasks
  108. pe.Flush()
  109. return
  110. }
  111. }
  112. }
  113. })
  114. }
  115. func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
  116. pe.waitGroup.Add(1)
  117. defer pe.waitGroup.Done()
  118. ok := pe.hasTasks(tasks)
  119. if ok {
  120. pe.container.Execute(tasks)
  121. }
  122. return ok
  123. }
  124. func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
  125. if tasks == nil {
  126. return false
  127. }
  128. val := reflect.ValueOf(tasks)
  129. switch val.Kind() {
  130. case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
  131. return val.Len() > 0
  132. default:
  133. // unknown type, let caller execute it
  134. return true
  135. }
  136. }