redisqueue_test.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package internal
  2. import (
  3. "strconv"
  4. "sync"
  5. "testing"
  6. "zero/core/logx"
  7. "zero/core/queue"
  8. "zero/core/stores/redis"
  9. "github.com/alicebob/miniredis"
  10. "github.com/stretchr/testify/assert"
  11. )
  12. func init() {
  13. logx.Disable()
  14. }
  15. func TestRedisQueue(t *testing.T) {
  16. const (
  17. total = 1000
  18. key = "queue"
  19. )
  20. r, err := miniredis.Run()
  21. assert.Nil(t, err)
  22. c := RedisKeyConf{
  23. RedisConf: redis.RedisConf{
  24. Host: r.Addr(),
  25. Type: redis.NodeType,
  26. },
  27. Key: key,
  28. }
  29. pusher := NewPusher(c.NewRedis(), key, WithTime())
  30. assert.True(t, len(pusher.Name()) > 0)
  31. for i := 0; i < total; i++ {
  32. err := pusher.Push(strconv.Itoa(i))
  33. assert.Nil(t, err)
  34. }
  35. consumer := new(mockedConsumer)
  36. consumer.wait.Add(total)
  37. q := queue.NewQueue(func() (queue.Producer, error) {
  38. return c.NewProducer(TimeSensitive(5))
  39. }, func() (queue.Consumer, error) {
  40. return consumer, nil
  41. })
  42. q.SetNumProducer(1)
  43. q.SetNumConsumer(1)
  44. go func() {
  45. q.Start()
  46. }()
  47. consumer.wait.Wait()
  48. q.Stop()
  49. var expect int
  50. for i := 0; i < total; i++ {
  51. expect ^= i
  52. }
  53. assert.Equal(t, expect, consumer.xor)
  54. }
  55. type mockedConsumer struct {
  56. wait sync.WaitGroup
  57. xor int
  58. }
  59. func (c *mockedConsumer) Consume(s string) error {
  60. val, err := strconv.Atoi(s)
  61. if err != nil {
  62. return err
  63. }
  64. c.xor ^= val
  65. c.wait.Done()
  66. return nil
  67. }
  68. func (c *mockedConsumer) OnEvent(event interface{}) {
  69. }