timingwheel.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package collection
  2. import (
  3. "container/list"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/zeromicro/go-zero/core/lang"
  8. "github.com/zeromicro/go-zero/core/threading"
  9. "github.com/zeromicro/go-zero/core/timex"
  10. )
  11. const drainWorkers = 8
  12. var (
  13. ErrClosed = errors.New("TimingWheel is closed already")
  14. ErrArgument = errors.New("incorrect task argument")
  15. )
  16. type (
  17. // Execute defines the method to execute the task.
  18. Execute func(key, value interface{})
  19. // A TimingWheel is a timing wheel object to schedule tasks.
  20. TimingWheel struct {
  21. interval time.Duration
  22. ticker timex.Ticker
  23. slots []*list.List
  24. timers *SafeMap
  25. tickedPos int
  26. numSlots int
  27. execute Execute
  28. setChannel chan timingEntry
  29. moveChannel chan baseEntry
  30. removeChannel chan interface{}
  31. drainChannel chan func(key, value interface{})
  32. stopChannel chan lang.PlaceholderType
  33. }
  34. timingEntry struct {
  35. baseEntry
  36. value interface{}
  37. circle int
  38. diff int
  39. removed bool
  40. }
  41. baseEntry struct {
  42. delay time.Duration
  43. key interface{}
  44. }
  45. positionEntry struct {
  46. pos int
  47. item *timingEntry
  48. }
  49. timingTask struct {
  50. key interface{}
  51. value interface{}
  52. }
  53. )
  54. // NewTimingWheel returns a TimingWheel.
  55. func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
  56. if interval <= 0 || numSlots <= 0 || execute == nil {
  57. return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p", interval, numSlots, execute)
  58. }
  59. return newTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval))
  60. }
  61. func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute,
  62. ticker timex.Ticker) (*TimingWheel, error) {
  63. tw := &TimingWheel{
  64. interval: interval,
  65. ticker: ticker,
  66. slots: make([]*list.List, numSlots),
  67. timers: NewSafeMap(),
  68. tickedPos: numSlots - 1, // at previous virtual circle
  69. execute: execute,
  70. numSlots: numSlots,
  71. setChannel: make(chan timingEntry),
  72. moveChannel: make(chan baseEntry),
  73. removeChannel: make(chan interface{}),
  74. drainChannel: make(chan func(key, value interface{})),
  75. stopChannel: make(chan lang.PlaceholderType),
  76. }
  77. tw.initSlots()
  78. go tw.run()
  79. return tw, nil
  80. }
  81. // Drain drains all items and executes them.
  82. func (tw *TimingWheel) Drain(fn func(key, value interface{})) error {
  83. select {
  84. case tw.drainChannel <- fn:
  85. return nil
  86. case <-tw.stopChannel:
  87. return ErrClosed
  88. }
  89. }
  90. // MoveTimer moves the task with the given key to the given delay.
  91. func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) error {
  92. if delay <= 0 || key == nil {
  93. return ErrArgument
  94. }
  95. select {
  96. case tw.moveChannel <- baseEntry{
  97. delay: delay,
  98. key: key,
  99. }:
  100. return nil
  101. case <-tw.stopChannel:
  102. return ErrClosed
  103. }
  104. }
  105. // RemoveTimer removes the task with the given key.
  106. func (tw *TimingWheel) RemoveTimer(key interface{}) error {
  107. if key == nil {
  108. return ErrArgument
  109. }
  110. select {
  111. case tw.removeChannel <- key:
  112. return nil
  113. case <-tw.stopChannel:
  114. return ErrClosed
  115. }
  116. }
  117. // SetTimer sets the task value with the given key to the delay.
  118. func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) error {
  119. if delay <= 0 || key == nil {
  120. return ErrArgument
  121. }
  122. select {
  123. case tw.setChannel <- timingEntry{
  124. baseEntry: baseEntry{
  125. delay: delay,
  126. key: key,
  127. },
  128. value: value,
  129. }:
  130. return nil
  131. case <-tw.stopChannel:
  132. return ErrClosed
  133. }
  134. }
  135. // Stop stops tw. No more actions after stopping a TimingWheel.
  136. func (tw *TimingWheel) Stop() {
  137. close(tw.stopChannel)
  138. }
  139. func (tw *TimingWheel) drainAll(fn func(key, value interface{})) {
  140. runner := threading.NewTaskRunner(drainWorkers)
  141. for _, slot := range tw.slots {
  142. for e := slot.Front(); e != nil; {
  143. task := e.Value.(*timingEntry)
  144. next := e.Next()
  145. slot.Remove(e)
  146. e = next
  147. if !task.removed {
  148. runner.Schedule(func() {
  149. fn(task.key, task.value)
  150. })
  151. }
  152. }
  153. }
  154. }
  155. func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {
  156. steps := int(d / tw.interval)
  157. pos = (tw.tickedPos + steps) % tw.numSlots
  158. circle = (steps - 1) / tw.numSlots
  159. return
  160. }
  161. func (tw *TimingWheel) initSlots() {
  162. for i := 0; i < tw.numSlots; i++ {
  163. tw.slots[i] = list.New()
  164. }
  165. }
  166. func (tw *TimingWheel) moveTask(task baseEntry) {
  167. val, ok := tw.timers.Get(task.key)
  168. if !ok {
  169. return
  170. }
  171. timer := val.(*positionEntry)
  172. if task.delay < tw.interval {
  173. threading.GoSafe(func() {
  174. tw.execute(timer.item.key, timer.item.value)
  175. })
  176. return
  177. }
  178. pos, circle := tw.getPositionAndCircle(task.delay)
  179. if pos >= timer.pos {
  180. timer.item.circle = circle
  181. timer.item.diff = pos - timer.pos
  182. } else if circle > 0 {
  183. circle--
  184. timer.item.circle = circle
  185. timer.item.diff = tw.numSlots + pos - timer.pos
  186. } else {
  187. timer.item.removed = true
  188. newItem := &timingEntry{
  189. baseEntry: task,
  190. value: timer.item.value,
  191. }
  192. tw.slots[pos].PushBack(newItem)
  193. tw.setTimerPosition(pos, newItem)
  194. }
  195. }
  196. func (tw *TimingWheel) onTick() {
  197. tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  198. l := tw.slots[tw.tickedPos]
  199. tw.scanAndRunTasks(l)
  200. }
  201. func (tw *TimingWheel) removeTask(key interface{}) {
  202. val, ok := tw.timers.Get(key)
  203. if !ok {
  204. return
  205. }
  206. timer := val.(*positionEntry)
  207. timer.item.removed = true
  208. tw.timers.Del(key)
  209. }
  210. func (tw *TimingWheel) run() {
  211. for {
  212. select {
  213. case <-tw.ticker.Chan():
  214. tw.onTick()
  215. case task := <-tw.setChannel:
  216. tw.setTask(&task)
  217. case key := <-tw.removeChannel:
  218. tw.removeTask(key)
  219. case task := <-tw.moveChannel:
  220. tw.moveTask(task)
  221. case fn := <-tw.drainChannel:
  222. tw.drainAll(fn)
  223. case <-tw.stopChannel:
  224. tw.ticker.Stop()
  225. return
  226. }
  227. }
  228. }
  229. func (tw *TimingWheel) runTasks(tasks []timingTask) {
  230. if len(tasks) == 0 {
  231. return
  232. }
  233. go func() {
  234. for i := range tasks {
  235. threading.RunSafe(func() {
  236. tw.execute(tasks[i].key, tasks[i].value)
  237. })
  238. }
  239. }()
  240. }
  241. func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
  242. var tasks []timingTask
  243. for e := l.Front(); e != nil; {
  244. task := e.Value.(*timingEntry)
  245. if task.removed {
  246. next := e.Next()
  247. l.Remove(e)
  248. e = next
  249. continue
  250. } else if task.circle > 0 {
  251. task.circle--
  252. e = e.Next()
  253. continue
  254. } else if task.diff > 0 {
  255. next := e.Next()
  256. l.Remove(e)
  257. // (tw.tickedPos+task.diff)%tw.numSlots
  258. // cannot be the same value of tw.tickedPos
  259. pos := (tw.tickedPos + task.diff) % tw.numSlots
  260. tw.slots[pos].PushBack(task)
  261. tw.setTimerPosition(pos, task)
  262. task.diff = 0
  263. e = next
  264. continue
  265. }
  266. tasks = append(tasks, timingTask{
  267. key: task.key,
  268. value: task.value,
  269. })
  270. next := e.Next()
  271. l.Remove(e)
  272. tw.timers.Del(task.key)
  273. e = next
  274. }
  275. tw.runTasks(tasks)
  276. }
  277. func (tw *TimingWheel) setTask(task *timingEntry) {
  278. if task.delay < tw.interval {
  279. task.delay = tw.interval
  280. }
  281. if val, ok := tw.timers.Get(task.key); ok {
  282. entry := val.(*positionEntry)
  283. entry.item.value = task.value
  284. tw.moveTask(task.baseEntry)
  285. } else {
  286. pos, circle := tw.getPositionAndCircle(task.delay)
  287. task.circle = circle
  288. tw.slots[pos].PushBack(task)
  289. tw.setTimerPosition(pos, task)
  290. }
  291. }
  292. func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
  293. if val, ok := tw.timers.Get(task.key); ok {
  294. timer := val.(*positionEntry)
  295. timer.item = task
  296. timer.pos = pos
  297. } else {
  298. tw.timers.Set(task.key, &positionEntry{
  299. pos: pos,
  300. item: task,
  301. })
  302. }
  303. }