redisqueuepusher.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package internal
  2. import (
  3. "fmt"
  4. "time"
  5. "zero/core/jsonx"
  6. "zero/core/logx"
  7. "zero/core/queue"
  8. "zero/core/stores/redis"
  9. )
  10. type (
  11. PusherOption func(p queue.QueuePusher) queue.QueuePusher
  12. RedisQueuePusher struct {
  13. name string
  14. store *redis.Redis
  15. key string
  16. }
  17. )
  18. func NewPusher(store *redis.Redis, key string, opts ...PusherOption) queue.QueuePusher {
  19. var pusher queue.QueuePusher = &RedisQueuePusher{
  20. name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
  21. store: store,
  22. key: key,
  23. }
  24. for _, opt := range opts {
  25. pusher = opt(pusher)
  26. }
  27. return pusher
  28. }
  29. func (saver *RedisQueuePusher) Name() string {
  30. return saver.name
  31. }
  32. func (saver *RedisQueuePusher) Push(message string) error {
  33. _, err := saver.store.Rpush(saver.key, message)
  34. if nil != err {
  35. return err
  36. }
  37. logx.Infof("<= %s", message)
  38. return nil
  39. }
  40. func WithTime() PusherOption {
  41. return func(p queue.QueuePusher) queue.QueuePusher {
  42. return timedQueuePusher{
  43. pusher: p,
  44. }
  45. }
  46. }
  47. type timedQueuePusher struct {
  48. pusher queue.QueuePusher
  49. }
  50. func (p timedQueuePusher) Name() string {
  51. return p.pusher.Name()
  52. }
  53. func (p timedQueuePusher) Push(message string) error {
  54. tm := TimedMessage{
  55. Time: time.Now().Unix(),
  56. Payload: message,
  57. }
  58. if content, err := jsonx.Marshal(tm); err != nil {
  59. return err
  60. } else {
  61. return p.pusher.Push(string(content))
  62. }
  63. }