123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package internal
- import (
- "fmt"
- "sync"
- "time"
- "zero/core/jsonx"
- "zero/core/logx"
- "zero/core/queue"
- "zero/core/stores/redis"
- )
- const (
- logIntervalMillis = 1000
- retryRedisInterval = time.Second
- )
- type (
- ProducerOption func(p queue.Producer) queue.Producer
- RedisQueueProducer struct {
- name string
- store *redis.Redis
- key string
- redisNode redis.ClosableNode
- listeners []queue.ProduceListener
- }
- )
- func NewProducerFactory(store *redis.Redis, key string, opts ...ProducerOption) queue.ProducerFactory {
- return func() (queue.Producer, error) {
- return newProducer(store, key, opts...)
- }
- }
- func (p *RedisQueueProducer) AddListener(listener queue.ProduceListener) {
- p.listeners = append(p.listeners, listener)
- }
- func (p *RedisQueueProducer) Name() string {
- return p.name
- }
- func (p *RedisQueueProducer) Produce() (string, bool) {
- lessLogger := logx.NewLessLogger(logIntervalMillis)
- for {
- value, ok, err := p.store.BlpopEx(p.redisNode, p.key)
- if err == nil {
- return value, ok
- } else if err == redis.Nil {
- // timed out without elements popped
- continue
- } else {
- lessLogger.Errorf("Error on blpop: %v", err)
- p.waitForRedisAvailable()
- }
- }
- }
- func newProducer(store *redis.Redis, key string, opts ...ProducerOption) (queue.Producer, error) {
- redisNode, err := redis.CreateBlockingNode(store)
- if err != nil {
- return nil, err
- }
- var producer queue.Producer = &RedisQueueProducer{
- name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
- store: store,
- key: key,
- redisNode: redisNode,
- }
- for _, opt := range opts {
- producer = opt(producer)
- }
- return producer, nil
- }
- func (p *RedisQueueProducer) resetRedisConnection() error {
- if p.redisNode != nil {
- p.redisNode.Close()
- p.redisNode = nil
- }
- redisNode, err := redis.CreateBlockingNode(p.store)
- if err != nil {
- return err
- }
- p.redisNode = redisNode
- return nil
- }
- func (p *RedisQueueProducer) waitForRedisAvailable() {
- var paused bool
- var pauseOnce sync.Once
- for {
- if err := p.resetRedisConnection(); err != nil {
- pauseOnce.Do(func() {
- paused = true
- for _, listener := range p.listeners {
- listener.OnProducerPause()
- }
- })
- logx.Errorf("Error occurred while connect to redis: %v", err)
- time.Sleep(retryRedisInterval)
- } else {
- break
- }
- }
- if paused {
- for _, listener := range p.listeners {
- listener.OnProducerResume()
- }
- }
- }
- func TimeSensitive(seconds int64) ProducerOption {
- return func(p queue.Producer) queue.Producer {
- if seconds > 0 {
- return autoDropQueueProducer{
- seconds: seconds,
- producer: p,
- }
- }
- return p
- }
- }
- type autoDropQueueProducer struct {
- seconds int64 // seconds before to drop
- producer queue.Producer
- }
- func (p autoDropQueueProducer) AddListener(listener queue.ProduceListener) {
- p.producer.AddListener(listener)
- }
- func (p autoDropQueueProducer) Produce() (string, bool) {
- lessLogger := logx.NewLessLogger(logIntervalMillis)
- for {
- content, ok := p.producer.Produce()
- if !ok {
- return "", false
- }
- var timedMsg TimedMessage
- if err := jsonx.UnmarshalFromString(content, &timedMsg); err != nil {
- lessLogger.Errorf("invalid timedMessage: %s, error: %s", content, err.Error())
- continue
- }
- if timedMsg.Time+p.seconds < time.Now().Unix() {
- lessLogger.Errorf("expired timedMessage: %s", content)
- }
- return timedMsg.Payload, true
- }
- }
|