queue_test.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package queue
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. )
  10. const (
  11. consumers = 4
  12. rounds = 100
  13. )
  14. func TestQueue(t *testing.T) {
  15. producer := newMockedProducer(rounds)
  16. consumer := newMockedConsumer()
  17. consumer.wait.Add(consumers)
  18. q := NewQueue(func() (Producer, error) {
  19. return producer, nil
  20. }, func() (Consumer, error) {
  21. return consumer, nil
  22. })
  23. q.AddListener(new(mockedListener))
  24. q.SetName("mockqueue")
  25. q.SetNumConsumer(consumers)
  26. q.SetNumProducer(1)
  27. q.pause()
  28. q.resume()
  29. go func() {
  30. producer.wait.Wait()
  31. q.Stop()
  32. }()
  33. q.Start()
  34. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  35. }
  36. func TestQueue_Broadcast(t *testing.T) {
  37. producer := newMockedProducer(rounds)
  38. consumer := newMockedConsumer()
  39. consumer.wait.Add(consumers)
  40. q := NewQueue(func() (Producer, error) {
  41. return producer, nil
  42. }, func() (Consumer, error) {
  43. return consumer, nil
  44. })
  45. q.AddListener(new(mockedListener))
  46. q.SetName("mockqueue")
  47. q.SetNumConsumer(consumers)
  48. q.SetNumProducer(1)
  49. q.Broadcast("message")
  50. go func() {
  51. producer.wait.Wait()
  52. q.Stop()
  53. }()
  54. q.Start()
  55. consumer.wait.Wait()
  56. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  57. assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
  58. }
  59. func TestQueue_PauseResume(t *testing.T) {
  60. producer := newMockedProducer(rounds)
  61. consumer := newMockedConsumer()
  62. consumer.wait.Add(consumers)
  63. q := NewQueue(func() (Producer, error) {
  64. return producer, nil
  65. }, func() (Consumer, error) {
  66. return consumer, nil
  67. })
  68. q.AddListener(new(mockedListener))
  69. q.SetName("mockqueue")
  70. q.SetNumConsumer(consumers)
  71. q.SetNumProducer(1)
  72. go func() {
  73. producer.wait.Wait()
  74. q.Stop()
  75. }()
  76. q.Start()
  77. producer.listener.OnProducerPause()
  78. assert.Equal(t, int32(0), atomic.LoadInt32(&q.active))
  79. producer.listener.OnProducerResume()
  80. assert.Equal(t, int32(1), atomic.LoadInt32(&q.active))
  81. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  82. }
  83. func TestQueue_ConsumeError(t *testing.T) {
  84. producer := newMockedProducer(rounds)
  85. consumer := newMockedConsumer()
  86. consumer.consumeErr = errors.New("consume error")
  87. consumer.wait.Add(consumers)
  88. q := NewQueue(func() (Producer, error) {
  89. return producer, nil
  90. }, func() (Consumer, error) {
  91. return consumer, nil
  92. })
  93. q.AddListener(new(mockedListener))
  94. q.SetName("mockqueue")
  95. q.SetNumConsumer(consumers)
  96. q.SetNumProducer(1)
  97. go func() {
  98. producer.wait.Wait()
  99. q.Stop()
  100. }()
  101. q.Start()
  102. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  103. }
  104. type mockedConsumer struct {
  105. count int32
  106. events int32
  107. consumeErr error
  108. wait sync.WaitGroup
  109. }
  110. func newMockedConsumer() *mockedConsumer {
  111. return new(mockedConsumer)
  112. }
  113. func (c *mockedConsumer) Consume(string) error {
  114. atomic.AddInt32(&c.count, 1)
  115. return c.consumeErr
  116. }
  117. func (c *mockedConsumer) OnEvent(any) {
  118. if atomic.AddInt32(&c.events, 1) <= consumers {
  119. c.wait.Done()
  120. }
  121. }
  122. type mockedProducer struct {
  123. total int32
  124. count int32
  125. listener ProduceListener
  126. wait sync.WaitGroup
  127. }
  128. func newMockedProducer(total int32) *mockedProducer {
  129. p := new(mockedProducer)
  130. p.total = total
  131. p.wait.Add(int(total))
  132. return p
  133. }
  134. func (p *mockedProducer) AddListener(listener ProduceListener) {
  135. p.listener = listener
  136. }
  137. func (p *mockedProducer) Produce() (string, bool) {
  138. if atomic.AddInt32(&p.count, 1) <= p.total {
  139. p.wait.Done()
  140. return "item", true
  141. }
  142. time.Sleep(time.Second)
  143. return "", false
  144. }
  145. type mockedListener struct{}
  146. func (l *mockedListener) OnPause() {
  147. }
  148. func (l *mockedListener) OnResume() {
  149. }