timingwheel_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. package collection
  2. import (
  3. "sort"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/lang"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "github.com/tal-tech/go-zero/core/syncx"
  12. "github.com/tal-tech/go-zero/core/timex"
  13. )
  14. const (
  15. testStep = time.Minute
  16. waitTime = time.Second
  17. )
  18. func TestNewTimingWheel(t *testing.T) {
  19. _, err := NewTimingWheel(0, 10, func(key, value interface{}) {})
  20. assert.NotNil(t, err)
  21. }
  22. func TestTimingWheel_Drain(t *testing.T) {
  23. ticker := timex.NewFakeTicker()
  24. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  25. }, ticker)
  26. defer tw.Stop()
  27. tw.SetTimer("first", 3, testStep*4)
  28. tw.SetTimer("second", 5, testStep*7)
  29. tw.SetTimer("third", 7, testStep*7)
  30. var keys []string
  31. var vals []int
  32. var lock sync.Mutex
  33. var wg sync.WaitGroup
  34. wg.Add(3)
  35. tw.Drain(func(key, value interface{}) {
  36. lock.Lock()
  37. defer lock.Unlock()
  38. keys = append(keys, key.(string))
  39. vals = append(vals, value.(int))
  40. wg.Done()
  41. })
  42. wg.Wait()
  43. sort.Strings(keys)
  44. sort.Ints(vals)
  45. assert.Equal(t, 3, len(keys))
  46. assert.EqualValues(t, []string{"first", "second", "third"}, keys)
  47. assert.EqualValues(t, []int{3, 5, 7}, vals)
  48. var count int
  49. tw.Drain(func(key, value interface{}) {
  50. count++
  51. })
  52. time.Sleep(time.Millisecond * 100)
  53. assert.Equal(t, 0, count)
  54. }
  55. func TestTimingWheel_SetTimerSoon(t *testing.T) {
  56. run := syncx.NewAtomicBool()
  57. ticker := timex.NewFakeTicker()
  58. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  59. assert.True(t, run.CompareAndSwap(false, true))
  60. assert.Equal(t, "any", k)
  61. assert.Equal(t, 3, v.(int))
  62. ticker.Done()
  63. }, ticker)
  64. defer tw.Stop()
  65. tw.SetTimer("any", 3, testStep>>1)
  66. ticker.Tick()
  67. assert.Nil(t, ticker.Wait(waitTime))
  68. assert.True(t, run.True())
  69. }
  70. func TestTimingWheel_SetTimerTwice(t *testing.T) {
  71. run := syncx.NewAtomicBool()
  72. ticker := timex.NewFakeTicker()
  73. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  74. assert.True(t, run.CompareAndSwap(false, true))
  75. assert.Equal(t, "any", k)
  76. assert.Equal(t, 5, v.(int))
  77. ticker.Done()
  78. }, ticker)
  79. defer tw.Stop()
  80. tw.SetTimer("any", 3, testStep*4)
  81. tw.SetTimer("any", 5, testStep*7)
  82. for i := 0; i < 8; i++ {
  83. ticker.Tick()
  84. }
  85. assert.Nil(t, ticker.Wait(waitTime))
  86. assert.True(t, run.True())
  87. }
  88. func TestTimingWheel_SetTimerWrongDelay(t *testing.T) {
  89. ticker := timex.NewFakeTicker()
  90. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker)
  91. defer tw.Stop()
  92. assert.NotPanics(t, func() {
  93. tw.SetTimer("any", 3, -testStep)
  94. })
  95. }
  96. func TestTimingWheel_MoveTimer(t *testing.T) {
  97. run := syncx.NewAtomicBool()
  98. ticker := timex.NewFakeTicker()
  99. tw, _ := newTimingWheelWithClock(testStep, 3, func(k, v interface{}) {
  100. assert.True(t, run.CompareAndSwap(false, true))
  101. assert.Equal(t, "any", k)
  102. assert.Equal(t, 3, v.(int))
  103. ticker.Done()
  104. }, ticker)
  105. defer tw.Stop()
  106. tw.SetTimer("any", 3, testStep*4)
  107. tw.MoveTimer("any", testStep*7)
  108. tw.MoveTimer("any", -testStep)
  109. tw.MoveTimer("none", testStep)
  110. for i := 0; i < 5; i++ {
  111. ticker.Tick()
  112. }
  113. assert.False(t, run.True())
  114. for i := 0; i < 3; i++ {
  115. ticker.Tick()
  116. }
  117. assert.Nil(t, ticker.Wait(waitTime))
  118. assert.True(t, run.True())
  119. }
  120. func TestTimingWheel_MoveTimerSoon(t *testing.T) {
  121. run := syncx.NewAtomicBool()
  122. ticker := timex.NewFakeTicker()
  123. tw, _ := newTimingWheelWithClock(testStep, 3, func(k, v interface{}) {
  124. assert.True(t, run.CompareAndSwap(false, true))
  125. assert.Equal(t, "any", k)
  126. assert.Equal(t, 3, v.(int))
  127. ticker.Done()
  128. }, ticker)
  129. defer tw.Stop()
  130. tw.SetTimer("any", 3, testStep*4)
  131. tw.MoveTimer("any", testStep>>1)
  132. assert.Nil(t, ticker.Wait(waitTime))
  133. assert.True(t, run.True())
  134. }
  135. func TestTimingWheel_MoveTimerEarlier(t *testing.T) {
  136. run := syncx.NewAtomicBool()
  137. ticker := timex.NewFakeTicker()
  138. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  139. assert.True(t, run.CompareAndSwap(false, true))
  140. assert.Equal(t, "any", k)
  141. assert.Equal(t, 3, v.(int))
  142. ticker.Done()
  143. }, ticker)
  144. defer tw.Stop()
  145. tw.SetTimer("any", 3, testStep*4)
  146. tw.MoveTimer("any", testStep*2)
  147. for i := 0; i < 3; i++ {
  148. ticker.Tick()
  149. }
  150. assert.Nil(t, ticker.Wait(waitTime))
  151. assert.True(t, run.True())
  152. }
  153. func TestTimingWheel_RemoveTimer(t *testing.T) {
  154. ticker := timex.NewFakeTicker()
  155. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker)
  156. tw.SetTimer("any", 3, testStep)
  157. assert.NotPanics(t, func() {
  158. tw.RemoveTimer("any")
  159. tw.RemoveTimer("none")
  160. tw.RemoveTimer(nil)
  161. })
  162. for i := 0; i < 5; i++ {
  163. ticker.Tick()
  164. }
  165. tw.Stop()
  166. }
  167. func TestTimingWheel_SetTimer(t *testing.T) {
  168. tests := []struct {
  169. slots int
  170. setAt time.Duration
  171. }{
  172. {
  173. slots: 5,
  174. setAt: 5,
  175. },
  176. {
  177. slots: 5,
  178. setAt: 7,
  179. },
  180. {
  181. slots: 5,
  182. setAt: 10,
  183. },
  184. {
  185. slots: 5,
  186. setAt: 12,
  187. },
  188. {
  189. slots: 5,
  190. setAt: 7,
  191. },
  192. {
  193. slots: 5,
  194. setAt: 10,
  195. },
  196. {
  197. slots: 5,
  198. setAt: 12,
  199. },
  200. }
  201. for _, test := range tests {
  202. t.Run(stringx.RandId(), func(t *testing.T) {
  203. var count int32
  204. ticker := timex.NewFakeTicker()
  205. tick := func() {
  206. atomic.AddInt32(&count, 1)
  207. ticker.Tick()
  208. time.Sleep(time.Millisecond)
  209. }
  210. var actual int32
  211. done := make(chan lang.PlaceholderType)
  212. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  213. assert.Equal(t, 1, key.(int))
  214. assert.Equal(t, 2, value.(int))
  215. actual = atomic.LoadInt32(&count)
  216. close(done)
  217. }, ticker)
  218. assert.Nil(t, err)
  219. defer tw.Stop()
  220. tw.SetTimer(1, 2, testStep*test.setAt)
  221. for {
  222. select {
  223. case <-done:
  224. assert.Equal(t, int32(test.setAt), actual)
  225. return
  226. default:
  227. tick()
  228. }
  229. }
  230. })
  231. }
  232. }
  233. func TestTimingWheel_SetAndMoveThenStart(t *testing.T) {
  234. tests := []struct {
  235. slots int
  236. setAt time.Duration
  237. moveAt time.Duration
  238. }{
  239. {
  240. slots: 5,
  241. setAt: 3,
  242. moveAt: 5,
  243. },
  244. {
  245. slots: 5,
  246. setAt: 3,
  247. moveAt: 7,
  248. },
  249. {
  250. slots: 5,
  251. setAt: 3,
  252. moveAt: 10,
  253. },
  254. {
  255. slots: 5,
  256. setAt: 3,
  257. moveAt: 12,
  258. },
  259. {
  260. slots: 5,
  261. setAt: 5,
  262. moveAt: 7,
  263. },
  264. {
  265. slots: 5,
  266. setAt: 5,
  267. moveAt: 10,
  268. },
  269. {
  270. slots: 5,
  271. setAt: 5,
  272. moveAt: 12,
  273. },
  274. }
  275. for _, test := range tests {
  276. t.Run(stringx.RandId(), func(t *testing.T) {
  277. var count int32
  278. ticker := timex.NewFakeTicker()
  279. tick := func() {
  280. atomic.AddInt32(&count, 1)
  281. ticker.Tick()
  282. time.Sleep(time.Millisecond * 10)
  283. }
  284. var actual int32
  285. done := make(chan lang.PlaceholderType)
  286. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  287. actual = atomic.LoadInt32(&count)
  288. close(done)
  289. }, ticker)
  290. assert.Nil(t, err)
  291. defer tw.Stop()
  292. tw.SetTimer(1, 2, testStep*test.setAt)
  293. tw.MoveTimer(1, testStep*test.moveAt)
  294. for {
  295. select {
  296. case <-done:
  297. assert.Equal(t, int32(test.moveAt), actual)
  298. return
  299. default:
  300. tick()
  301. }
  302. }
  303. })
  304. }
  305. }
  306. func TestTimingWheel_SetAndMoveTwice(t *testing.T) {
  307. tests := []struct {
  308. slots int
  309. setAt time.Duration
  310. moveAt time.Duration
  311. moveAgainAt time.Duration
  312. }{
  313. {
  314. slots: 5,
  315. setAt: 3,
  316. moveAt: 5,
  317. moveAgainAt: 10,
  318. },
  319. {
  320. slots: 5,
  321. setAt: 3,
  322. moveAt: 7,
  323. moveAgainAt: 12,
  324. },
  325. {
  326. slots: 5,
  327. setAt: 3,
  328. moveAt: 10,
  329. moveAgainAt: 15,
  330. },
  331. {
  332. slots: 5,
  333. setAt: 3,
  334. moveAt: 12,
  335. moveAgainAt: 17,
  336. },
  337. {
  338. slots: 5,
  339. setAt: 5,
  340. moveAt: 7,
  341. moveAgainAt: 12,
  342. },
  343. {
  344. slots: 5,
  345. setAt: 5,
  346. moveAt: 10,
  347. moveAgainAt: 17,
  348. },
  349. {
  350. slots: 5,
  351. setAt: 5,
  352. moveAt: 12,
  353. moveAgainAt: 17,
  354. },
  355. }
  356. for _, test := range tests {
  357. t.Run(stringx.RandId(), func(t *testing.T) {
  358. var count int32
  359. ticker := timex.NewFakeTicker()
  360. tick := func() {
  361. atomic.AddInt32(&count, 1)
  362. ticker.Tick()
  363. time.Sleep(time.Millisecond * 10)
  364. }
  365. var actual int32
  366. done := make(chan lang.PlaceholderType)
  367. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  368. actual = atomic.LoadInt32(&count)
  369. close(done)
  370. }, ticker)
  371. assert.Nil(t, err)
  372. defer tw.Stop()
  373. tw.SetTimer(1, 2, testStep*test.setAt)
  374. tw.MoveTimer(1, testStep*test.moveAt)
  375. tw.MoveTimer(1, testStep*test.moveAgainAt)
  376. for {
  377. select {
  378. case <-done:
  379. assert.Equal(t, int32(test.moveAgainAt), actual)
  380. return
  381. default:
  382. tick()
  383. }
  384. }
  385. })
  386. }
  387. }
  388. func TestTimingWheel_ElapsedAndSet(t *testing.T) {
  389. tests := []struct {
  390. slots int
  391. elapsed time.Duration
  392. setAt time.Duration
  393. }{
  394. {
  395. slots: 5,
  396. elapsed: 3,
  397. setAt: 5,
  398. },
  399. {
  400. slots: 5,
  401. elapsed: 3,
  402. setAt: 7,
  403. },
  404. {
  405. slots: 5,
  406. elapsed: 3,
  407. setAt: 10,
  408. },
  409. {
  410. slots: 5,
  411. elapsed: 3,
  412. setAt: 12,
  413. },
  414. {
  415. slots: 5,
  416. elapsed: 5,
  417. setAt: 7,
  418. },
  419. {
  420. slots: 5,
  421. elapsed: 5,
  422. setAt: 10,
  423. },
  424. {
  425. slots: 5,
  426. elapsed: 5,
  427. setAt: 12,
  428. },
  429. }
  430. for _, test := range tests {
  431. t.Run(stringx.RandId(), func(t *testing.T) {
  432. var count int32
  433. ticker := timex.NewFakeTicker()
  434. tick := func() {
  435. atomic.AddInt32(&count, 1)
  436. ticker.Tick()
  437. time.Sleep(time.Millisecond * 10)
  438. }
  439. var actual int32
  440. done := make(chan lang.PlaceholderType)
  441. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  442. actual = atomic.LoadInt32(&count)
  443. close(done)
  444. }, ticker)
  445. assert.Nil(t, err)
  446. defer tw.Stop()
  447. for i := 0; i < int(test.elapsed); i++ {
  448. tick()
  449. }
  450. tw.SetTimer(1, 2, testStep*test.setAt)
  451. for {
  452. select {
  453. case <-done:
  454. assert.Equal(t, int32(test.elapsed+test.setAt), actual)
  455. return
  456. default:
  457. tick()
  458. }
  459. }
  460. })
  461. }
  462. }
  463. func TestTimingWheel_ElapsedAndSetThenMove(t *testing.T) {
  464. tests := []struct {
  465. slots int
  466. elapsed time.Duration
  467. setAt time.Duration
  468. moveAt time.Duration
  469. }{
  470. {
  471. slots: 5,
  472. elapsed: 3,
  473. setAt: 5,
  474. moveAt: 10,
  475. },
  476. {
  477. slots: 5,
  478. elapsed: 3,
  479. setAt: 7,
  480. moveAt: 12,
  481. },
  482. {
  483. slots: 5,
  484. elapsed: 3,
  485. setAt: 10,
  486. moveAt: 15,
  487. },
  488. {
  489. slots: 5,
  490. elapsed: 3,
  491. setAt: 12,
  492. moveAt: 16,
  493. },
  494. {
  495. slots: 5,
  496. elapsed: 5,
  497. setAt: 7,
  498. moveAt: 12,
  499. },
  500. {
  501. slots: 5,
  502. elapsed: 5,
  503. setAt: 10,
  504. moveAt: 15,
  505. },
  506. {
  507. slots: 5,
  508. elapsed: 5,
  509. setAt: 12,
  510. moveAt: 17,
  511. },
  512. }
  513. for _, test := range tests {
  514. t.Run(stringx.RandId(), func(t *testing.T) {
  515. var count int32
  516. ticker := timex.NewFakeTicker()
  517. tick := func() {
  518. atomic.AddInt32(&count, 1)
  519. ticker.Tick()
  520. time.Sleep(time.Millisecond * 10)
  521. }
  522. var actual int32
  523. done := make(chan lang.PlaceholderType)
  524. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  525. actual = atomic.LoadInt32(&count)
  526. close(done)
  527. }, ticker)
  528. assert.Nil(t, err)
  529. defer tw.Stop()
  530. for i := 0; i < int(test.elapsed); i++ {
  531. tick()
  532. }
  533. tw.SetTimer(1, 2, testStep*test.setAt)
  534. tw.MoveTimer(1, testStep*test.moveAt)
  535. for {
  536. select {
  537. case <-done:
  538. assert.Equal(t, int32(test.elapsed+test.moveAt), actual)
  539. return
  540. default:
  541. tick()
  542. }
  543. }
  544. })
  545. }
  546. }
  547. func BenchmarkTimingWheel(b *testing.B) {
  548. b.ReportAllocs()
  549. tw, _ := NewTimingWheel(time.Second, 100, func(k, v interface{}) {})
  550. for i := 0; i < b.N; i++ {
  551. tw.SetTimer(i, i, time.Second)
  552. tw.SetTimer(b.N+i, b.N+i, time.Second)
  553. tw.MoveTimer(i, time.Second*time.Duration(i))
  554. tw.RemoveTimer(i)
  555. }
  556. }