redisqueueproducer.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package internal
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "zero/core/jsonx"
  7. "zero/core/logx"
  8. "zero/core/queue"
  9. "zero/core/stores/redis"
  10. )
  11. const (
  12. logIntervalMillis = 1000
  13. retryRedisInterval = time.Second
  14. )
  15. type (
  16. ProducerOption func(p queue.Producer) queue.Producer
  17. RedisQueueProducer struct {
  18. name string
  19. store *redis.Redis
  20. key string
  21. redisNode redis.ClosableNode
  22. listeners []queue.ProduceListener
  23. }
  24. )
  25. func NewProducerFactory(store *redis.Redis, key string, opts ...ProducerOption) queue.ProducerFactory {
  26. return func() (queue.Producer, error) {
  27. return newProducer(store, key, opts...)
  28. }
  29. }
  30. func (p *RedisQueueProducer) AddListener(listener queue.ProduceListener) {
  31. p.listeners = append(p.listeners, listener)
  32. }
  33. func (p *RedisQueueProducer) Name() string {
  34. return p.name
  35. }
  36. func (p *RedisQueueProducer) Produce() (string, bool) {
  37. lessLogger := logx.NewLessLogger(logIntervalMillis)
  38. for {
  39. value, ok, err := p.store.BlpopEx(p.redisNode, p.key)
  40. if err == nil {
  41. return value, ok
  42. } else if err == redis.Nil {
  43. // timed out without elements popped
  44. continue
  45. } else {
  46. lessLogger.Errorf("Error on blpop: %v", err)
  47. p.waitForRedisAvailable()
  48. }
  49. }
  50. }
  51. func newProducer(store *redis.Redis, key string, opts ...ProducerOption) (queue.Producer, error) {
  52. redisNode, err := redis.CreateBlockingNode(store)
  53. if err != nil {
  54. return nil, err
  55. }
  56. var producer queue.Producer = &RedisQueueProducer{
  57. name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
  58. store: store,
  59. key: key,
  60. redisNode: redisNode,
  61. }
  62. for _, opt := range opts {
  63. producer = opt(producer)
  64. }
  65. return producer, nil
  66. }
  67. func (p *RedisQueueProducer) resetRedisConnection() error {
  68. if p.redisNode != nil {
  69. p.redisNode.Close()
  70. p.redisNode = nil
  71. }
  72. redisNode, err := redis.CreateBlockingNode(p.store)
  73. if err != nil {
  74. return err
  75. }
  76. p.redisNode = redisNode
  77. return nil
  78. }
  79. func (p *RedisQueueProducer) waitForRedisAvailable() {
  80. var paused bool
  81. var pauseOnce sync.Once
  82. for {
  83. if err := p.resetRedisConnection(); err != nil {
  84. pauseOnce.Do(func() {
  85. paused = true
  86. for _, listener := range p.listeners {
  87. listener.OnProducerPause()
  88. }
  89. })
  90. logx.Errorf("Error occurred while connect to redis: %v", err)
  91. time.Sleep(retryRedisInterval)
  92. } else {
  93. break
  94. }
  95. }
  96. if paused {
  97. for _, listener := range p.listeners {
  98. listener.OnProducerResume()
  99. }
  100. }
  101. }
  102. func TimeSensitive(seconds int64) ProducerOption {
  103. return func(p queue.Producer) queue.Producer {
  104. if seconds > 0 {
  105. return autoDropQueueProducer{
  106. seconds: seconds,
  107. producer: p,
  108. }
  109. }
  110. return p
  111. }
  112. }
  113. type autoDropQueueProducer struct {
  114. seconds int64 // seconds before to drop
  115. producer queue.Producer
  116. }
  117. func (p autoDropQueueProducer) AddListener(listener queue.ProduceListener) {
  118. p.producer.AddListener(listener)
  119. }
  120. func (p autoDropQueueProducer) Produce() (string, bool) {
  121. lessLogger := logx.NewLessLogger(logIntervalMillis)
  122. for {
  123. content, ok := p.producer.Produce()
  124. if !ok {
  125. return "", false
  126. }
  127. var timedMsg TimedMessage
  128. if err := jsonx.UnmarshalFromString(content, &timedMsg); err != nil {
  129. lessLogger.Errorf("invalid timedMessage: %s, error: %s", content, err.Error())
  130. continue
  131. }
  132. if timedMsg.Time+p.seconds < time.Now().Unix() {
  133. lessLogger.Errorf("expired timedMessage: %s", content)
  134. }
  135. return timedMsg.Payload, true
  136. }
  137. }