1
0

timingwheel_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. package collection
  2. import (
  3. "sort"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/wuntsong-org/go-zero-plus/core/lang"
  10. "github.com/wuntsong-org/go-zero-plus/core/stringx"
  11. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  12. "github.com/wuntsong-org/go-zero-plus/core/timex"
  13. )
  14. const (
  15. testStep = time.Minute
  16. waitTime = time.Second
  17. )
  18. func TestNewTimingWheel(t *testing.T) {
  19. _, err := NewTimingWheel(0, 10, func(key, value any) {})
  20. assert.NotNil(t, err)
  21. }
  22. func TestTimingWheel_Drain(t *testing.T) {
  23. ticker := timex.NewFakeTicker()
  24. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
  25. }, ticker)
  26. tw.SetTimer("first", 3, testStep*4)
  27. tw.SetTimer("second", 5, testStep*7)
  28. tw.SetTimer("third", 7, testStep*7)
  29. var keys []string
  30. var vals []int
  31. var lock sync.Mutex
  32. var wg sync.WaitGroup
  33. wg.Add(3)
  34. tw.Drain(func(key, value any) {
  35. lock.Lock()
  36. defer lock.Unlock()
  37. keys = append(keys, key.(string))
  38. vals = append(vals, value.(int))
  39. wg.Done()
  40. })
  41. wg.Wait()
  42. sort.Strings(keys)
  43. sort.Ints(vals)
  44. assert.Equal(t, 3, len(keys))
  45. assert.EqualValues(t, []string{"first", "second", "third"}, keys)
  46. assert.EqualValues(t, []int{3, 5, 7}, vals)
  47. var count int
  48. tw.Drain(func(key, value any) {
  49. count++
  50. })
  51. time.Sleep(time.Millisecond * 100)
  52. assert.Equal(t, 0, count)
  53. tw.Stop()
  54. assert.Equal(t, ErrClosed, tw.Drain(func(key, value any) {}))
  55. }
  56. func TestTimingWheel_SetTimerSoon(t *testing.T) {
  57. run := syncx.NewAtomicBool()
  58. ticker := timex.NewFakeTicker()
  59. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
  60. assert.True(t, run.CompareAndSwap(false, true))
  61. assert.Equal(t, "any", k)
  62. assert.Equal(t, 3, v.(int))
  63. ticker.Done()
  64. }, ticker)
  65. defer tw.Stop()
  66. tw.SetTimer("any", 3, testStep>>1)
  67. ticker.Tick()
  68. assert.Nil(t, ticker.Wait(waitTime))
  69. assert.True(t, run.True())
  70. }
  71. func TestTimingWheel_SetTimerTwice(t *testing.T) {
  72. run := syncx.NewAtomicBool()
  73. ticker := timex.NewFakeTicker()
  74. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
  75. assert.True(t, run.CompareAndSwap(false, true))
  76. assert.Equal(t, "any", k)
  77. assert.Equal(t, 5, v.(int))
  78. ticker.Done()
  79. }, ticker)
  80. defer tw.Stop()
  81. tw.SetTimer("any", 3, testStep*4)
  82. tw.SetTimer("any", 5, testStep*7)
  83. for i := 0; i < 8; i++ {
  84. ticker.Tick()
  85. }
  86. assert.Nil(t, ticker.Wait(waitTime))
  87. assert.True(t, run.True())
  88. }
  89. func TestTimingWheel_SetTimerWrongDelay(t *testing.T) {
  90. ticker := timex.NewFakeTicker()
  91. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {}, ticker)
  92. defer tw.Stop()
  93. assert.NotPanics(t, func() {
  94. tw.SetTimer("any", 3, -testStep)
  95. })
  96. }
  97. func TestTimingWheel_SetTimerAfterClose(t *testing.T) {
  98. ticker := timex.NewFakeTicker()
  99. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {}, ticker)
  100. tw.Stop()
  101. assert.Equal(t, ErrClosed, tw.SetTimer("any", 3, testStep))
  102. }
  103. func TestTimingWheel_MoveTimer(t *testing.T) {
  104. run := syncx.NewAtomicBool()
  105. ticker := timex.NewFakeTicker()
  106. tw, _ := NewTimingWheelWithTicker(testStep, 3, func(k, v any) {
  107. assert.True(t, run.CompareAndSwap(false, true))
  108. assert.Equal(t, "any", k)
  109. assert.Equal(t, 3, v.(int))
  110. ticker.Done()
  111. }, ticker)
  112. tw.SetTimer("any", 3, testStep*4)
  113. tw.MoveTimer("any", testStep*7)
  114. tw.MoveTimer("any", -testStep)
  115. tw.MoveTimer("none", testStep)
  116. for i := 0; i < 5; i++ {
  117. ticker.Tick()
  118. }
  119. assert.False(t, run.True())
  120. for i := 0; i < 3; i++ {
  121. ticker.Tick()
  122. }
  123. assert.Nil(t, ticker.Wait(waitTime))
  124. assert.True(t, run.True())
  125. tw.Stop()
  126. assert.Equal(t, ErrClosed, tw.MoveTimer("any", time.Millisecond))
  127. }
  128. func TestTimingWheel_MoveTimerSoon(t *testing.T) {
  129. run := syncx.NewAtomicBool()
  130. ticker := timex.NewFakeTicker()
  131. tw, _ := NewTimingWheelWithTicker(testStep, 3, func(k, v any) {
  132. assert.True(t, run.CompareAndSwap(false, true))
  133. assert.Equal(t, "any", k)
  134. assert.Equal(t, 3, v.(int))
  135. ticker.Done()
  136. }, ticker)
  137. defer tw.Stop()
  138. tw.SetTimer("any", 3, testStep*4)
  139. tw.MoveTimer("any", testStep>>1)
  140. assert.Nil(t, ticker.Wait(waitTime))
  141. assert.True(t, run.True())
  142. }
  143. func TestTimingWheel_MoveTimerEarlier(t *testing.T) {
  144. run := syncx.NewAtomicBool()
  145. ticker := timex.NewFakeTicker()
  146. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
  147. assert.True(t, run.CompareAndSwap(false, true))
  148. assert.Equal(t, "any", k)
  149. assert.Equal(t, 3, v.(int))
  150. ticker.Done()
  151. }, ticker)
  152. defer tw.Stop()
  153. tw.SetTimer("any", 3, testStep*4)
  154. tw.MoveTimer("any", testStep*2)
  155. for i := 0; i < 3; i++ {
  156. ticker.Tick()
  157. }
  158. assert.Nil(t, ticker.Wait(waitTime))
  159. assert.True(t, run.True())
  160. }
  161. func TestTimingWheel_RemoveTimer(t *testing.T) {
  162. ticker := timex.NewFakeTicker()
  163. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {}, ticker)
  164. tw.SetTimer("any", 3, testStep)
  165. assert.NotPanics(t, func() {
  166. tw.RemoveTimer("any")
  167. tw.RemoveTimer("none")
  168. tw.RemoveTimer(nil)
  169. })
  170. for i := 0; i < 5; i++ {
  171. ticker.Tick()
  172. }
  173. tw.Stop()
  174. assert.Equal(t, ErrClosed, tw.RemoveTimer("any"))
  175. }
  176. func TestTimingWheel_SetTimer(t *testing.T) {
  177. tests := []struct {
  178. slots int
  179. setAt time.Duration
  180. }{
  181. {
  182. slots: 5,
  183. setAt: 5,
  184. },
  185. {
  186. slots: 5,
  187. setAt: 7,
  188. },
  189. {
  190. slots: 5,
  191. setAt: 10,
  192. },
  193. {
  194. slots: 5,
  195. setAt: 12,
  196. },
  197. {
  198. slots: 5,
  199. setAt: 7,
  200. },
  201. {
  202. slots: 5,
  203. setAt: 10,
  204. },
  205. {
  206. slots: 5,
  207. setAt: 12,
  208. },
  209. }
  210. for _, test := range tests {
  211. test := test
  212. t.Run(stringx.RandId(), func(t *testing.T) {
  213. t.Parallel()
  214. var count int32
  215. ticker := timex.NewFakeTicker()
  216. tick := func() {
  217. atomic.AddInt32(&count, 1)
  218. ticker.Tick()
  219. time.Sleep(time.Millisecond)
  220. }
  221. var actual int32
  222. done := make(chan lang.PlaceholderType)
  223. tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value any) {
  224. assert.Equal(t, 1, key.(int))
  225. assert.Equal(t, 2, value.(int))
  226. actual = atomic.LoadInt32(&count)
  227. close(done)
  228. }, ticker)
  229. assert.Nil(t, err)
  230. defer tw.Stop()
  231. tw.SetTimer(1, 2, testStep*test.setAt)
  232. for {
  233. select {
  234. case <-done:
  235. assert.Equal(t, int32(test.setAt), actual)
  236. return
  237. default:
  238. tick()
  239. }
  240. }
  241. })
  242. }
  243. }
  244. func TestTimingWheel_SetAndMoveThenStart(t *testing.T) {
  245. tests := []struct {
  246. slots int
  247. setAt time.Duration
  248. moveAt time.Duration
  249. }{
  250. {
  251. slots: 5,
  252. setAt: 3,
  253. moveAt: 5,
  254. },
  255. {
  256. slots: 5,
  257. setAt: 3,
  258. moveAt: 7,
  259. },
  260. {
  261. slots: 5,
  262. setAt: 3,
  263. moveAt: 10,
  264. },
  265. {
  266. slots: 5,
  267. setAt: 3,
  268. moveAt: 12,
  269. },
  270. {
  271. slots: 5,
  272. setAt: 5,
  273. moveAt: 7,
  274. },
  275. {
  276. slots: 5,
  277. setAt: 5,
  278. moveAt: 10,
  279. },
  280. {
  281. slots: 5,
  282. setAt: 5,
  283. moveAt: 12,
  284. },
  285. }
  286. for _, test := range tests {
  287. test := test
  288. t.Run(stringx.RandId(), func(t *testing.T) {
  289. t.Parallel()
  290. var count int32
  291. ticker := timex.NewFakeTicker()
  292. tick := func() {
  293. atomic.AddInt32(&count, 1)
  294. ticker.Tick()
  295. time.Sleep(time.Millisecond * 10)
  296. }
  297. var actual int32
  298. done := make(chan lang.PlaceholderType)
  299. tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value any) {
  300. actual = atomic.LoadInt32(&count)
  301. close(done)
  302. }, ticker)
  303. assert.Nil(t, err)
  304. defer tw.Stop()
  305. tw.SetTimer(1, 2, testStep*test.setAt)
  306. tw.MoveTimer(1, testStep*test.moveAt)
  307. for {
  308. select {
  309. case <-done:
  310. assert.Equal(t, int32(test.moveAt), actual)
  311. return
  312. default:
  313. tick()
  314. }
  315. }
  316. })
  317. }
  318. }
  319. func TestTimingWheel_SetAndMoveTwice(t *testing.T) {
  320. tests := []struct {
  321. slots int
  322. setAt time.Duration
  323. moveAt time.Duration
  324. moveAgainAt time.Duration
  325. }{
  326. {
  327. slots: 5,
  328. setAt: 3,
  329. moveAt: 5,
  330. moveAgainAt: 10,
  331. },
  332. {
  333. slots: 5,
  334. setAt: 3,
  335. moveAt: 7,
  336. moveAgainAt: 12,
  337. },
  338. {
  339. slots: 5,
  340. setAt: 3,
  341. moveAt: 10,
  342. moveAgainAt: 15,
  343. },
  344. {
  345. slots: 5,
  346. setAt: 3,
  347. moveAt: 12,
  348. moveAgainAt: 17,
  349. },
  350. {
  351. slots: 5,
  352. setAt: 5,
  353. moveAt: 7,
  354. moveAgainAt: 12,
  355. },
  356. {
  357. slots: 5,
  358. setAt: 5,
  359. moveAt: 10,
  360. moveAgainAt: 17,
  361. },
  362. {
  363. slots: 5,
  364. setAt: 5,
  365. moveAt: 12,
  366. moveAgainAt: 17,
  367. },
  368. }
  369. for _, test := range tests {
  370. test := test
  371. t.Run(stringx.RandId(), func(t *testing.T) {
  372. t.Parallel()
  373. var count int32
  374. ticker := timex.NewFakeTicker()
  375. tick := func() {
  376. atomic.AddInt32(&count, 1)
  377. ticker.Tick()
  378. time.Sleep(time.Millisecond * 10)
  379. }
  380. var actual int32
  381. done := make(chan lang.PlaceholderType)
  382. tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value any) {
  383. actual = atomic.LoadInt32(&count)
  384. close(done)
  385. }, ticker)
  386. assert.Nil(t, err)
  387. defer tw.Stop()
  388. tw.SetTimer(1, 2, testStep*test.setAt)
  389. tw.MoveTimer(1, testStep*test.moveAt)
  390. tw.MoveTimer(1, testStep*test.moveAgainAt)
  391. for {
  392. select {
  393. case <-done:
  394. assert.Equal(t, int32(test.moveAgainAt), actual)
  395. return
  396. default:
  397. tick()
  398. }
  399. }
  400. })
  401. }
  402. }
  403. func TestTimingWheel_ElapsedAndSet(t *testing.T) {
  404. tests := []struct {
  405. slots int
  406. elapsed time.Duration
  407. setAt time.Duration
  408. }{
  409. {
  410. slots: 5,
  411. elapsed: 3,
  412. setAt: 5,
  413. },
  414. {
  415. slots: 5,
  416. elapsed: 3,
  417. setAt: 7,
  418. },
  419. {
  420. slots: 5,
  421. elapsed: 3,
  422. setAt: 10,
  423. },
  424. {
  425. slots: 5,
  426. elapsed: 3,
  427. setAt: 12,
  428. },
  429. {
  430. slots: 5,
  431. elapsed: 5,
  432. setAt: 7,
  433. },
  434. {
  435. slots: 5,
  436. elapsed: 5,
  437. setAt: 10,
  438. },
  439. {
  440. slots: 5,
  441. elapsed: 5,
  442. setAt: 12,
  443. },
  444. }
  445. for _, test := range tests {
  446. test := test
  447. t.Run(stringx.RandId(), func(t *testing.T) {
  448. t.Parallel()
  449. var count int32
  450. ticker := timex.NewFakeTicker()
  451. tick := func() {
  452. atomic.AddInt32(&count, 1)
  453. ticker.Tick()
  454. time.Sleep(time.Millisecond * 10)
  455. }
  456. var actual int32
  457. done := make(chan lang.PlaceholderType)
  458. tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value any) {
  459. actual = atomic.LoadInt32(&count)
  460. close(done)
  461. }, ticker)
  462. assert.Nil(t, err)
  463. defer tw.Stop()
  464. for i := 0; i < int(test.elapsed); i++ {
  465. tick()
  466. }
  467. tw.SetTimer(1, 2, testStep*test.setAt)
  468. for {
  469. select {
  470. case <-done:
  471. assert.Equal(t, int32(test.elapsed+test.setAt), actual)
  472. return
  473. default:
  474. tick()
  475. }
  476. }
  477. })
  478. }
  479. }
  480. func TestTimingWheel_ElapsedAndSetThenMove(t *testing.T) {
  481. tests := []struct {
  482. slots int
  483. elapsed time.Duration
  484. setAt time.Duration
  485. moveAt time.Duration
  486. }{
  487. {
  488. slots: 5,
  489. elapsed: 3,
  490. setAt: 5,
  491. moveAt: 10,
  492. },
  493. {
  494. slots: 5,
  495. elapsed: 3,
  496. setAt: 7,
  497. moveAt: 12,
  498. },
  499. {
  500. slots: 5,
  501. elapsed: 3,
  502. setAt: 10,
  503. moveAt: 15,
  504. },
  505. {
  506. slots: 5,
  507. elapsed: 3,
  508. setAt: 12,
  509. moveAt: 16,
  510. },
  511. {
  512. slots: 5,
  513. elapsed: 5,
  514. setAt: 7,
  515. moveAt: 12,
  516. },
  517. {
  518. slots: 5,
  519. elapsed: 5,
  520. setAt: 10,
  521. moveAt: 15,
  522. },
  523. {
  524. slots: 5,
  525. elapsed: 5,
  526. setAt: 12,
  527. moveAt: 17,
  528. },
  529. }
  530. for _, test := range tests {
  531. test := test
  532. t.Run(stringx.RandId(), func(t *testing.T) {
  533. t.Parallel()
  534. var count int32
  535. ticker := timex.NewFakeTicker()
  536. tick := func() {
  537. atomic.AddInt32(&count, 1)
  538. ticker.Tick()
  539. time.Sleep(time.Millisecond * 10)
  540. }
  541. var actual int32
  542. done := make(chan lang.PlaceholderType)
  543. tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value any) {
  544. actual = atomic.LoadInt32(&count)
  545. close(done)
  546. }, ticker)
  547. assert.Nil(t, err)
  548. defer tw.Stop()
  549. for i := 0; i < int(test.elapsed); i++ {
  550. tick()
  551. }
  552. tw.SetTimer(1, 2, testStep*test.setAt)
  553. tw.MoveTimer(1, testStep*test.moveAt)
  554. for {
  555. select {
  556. case <-done:
  557. assert.Equal(t, int32(test.elapsed+test.moveAt), actual)
  558. return
  559. default:
  560. tick()
  561. }
  562. }
  563. })
  564. }
  565. }
  566. func TestMoveAndRemoveTask(t *testing.T) {
  567. ticker := timex.NewFakeTicker()
  568. tick := func(v int) {
  569. for i := 0; i < v; i++ {
  570. ticker.Tick()
  571. }
  572. }
  573. var keys []int
  574. tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
  575. assert.Equal(t, "any", k)
  576. assert.Equal(t, 3, v.(int))
  577. keys = append(keys, v.(int))
  578. ticker.Done()
  579. }, ticker)
  580. defer tw.Stop()
  581. tw.SetTimer("any", 3, testStep*8)
  582. tick(6)
  583. tw.MoveTimer("any", testStep*7)
  584. tick(3)
  585. tw.RemoveTimer("any")
  586. tick(30)
  587. time.Sleep(time.Millisecond)
  588. assert.Equal(t, 0, len(keys))
  589. }
  590. func BenchmarkTimingWheel(b *testing.B) {
  591. b.ReportAllocs()
  592. tw, _ := NewTimingWheel(time.Second, 100, func(k, v any) {})
  593. for i := 0; i < b.N; i++ {
  594. tw.SetTimer(i, i, time.Second)
  595. tw.SetTimer(b.N+i, b.N+i, time.Second)
  596. tw.MoveTimer(i, time.Second*time.Duration(i))
  597. tw.RemoveTimer(i)
  598. }
  599. }