consumer.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package dq
  2. import (
  3. "strconv"
  4. "time"
  5. "zero/core/hash"
  6. "zero/core/logx"
  7. "zero/core/service"
  8. "zero/core/stores/redis"
  9. )
  10. const (
  11. expiration = 3600 // seconds
  12. guardValue = "1"
  13. tolerance = time.Minute * 30
  14. )
  15. var maxCheckBytes = getMaxTimeLen()
  16. type (
  17. Consume func(body []byte)
  18. Consumer interface {
  19. Consume(consume Consume)
  20. }
  21. consumerCluster struct {
  22. nodes []*consumerNode
  23. red *redis.Redis
  24. }
  25. )
  26. func NewConsumer(c DqConf) Consumer {
  27. var nodes []*consumerNode
  28. for _, node := range c.Beanstalks {
  29. nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube))
  30. }
  31. return &consumerCluster{
  32. nodes: nodes,
  33. red: c.Redis.NewRedis(),
  34. }
  35. }
  36. func (c *consumerCluster) Consume(consume Consume) {
  37. guardedConsume := func(body []byte) {
  38. key := hash.Md5Hex(body)
  39. body, ok := c.unwrap(body)
  40. if !ok {
  41. logx.Errorf("discarded: %q", string(body))
  42. return
  43. }
  44. ok, err := c.red.SetnxEx(key, guardValue, expiration)
  45. if err != nil {
  46. logx.Error(err)
  47. } else if ok {
  48. consume(body)
  49. }
  50. }
  51. group := service.NewServiceGroup()
  52. for _, node := range c.nodes {
  53. group.Add(consumeService{
  54. c: node,
  55. consume: guardedConsume,
  56. })
  57. }
  58. group.Start()
  59. }
  60. func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) {
  61. var pos = -1
  62. for i := 0; i < maxCheckBytes; i++ {
  63. if body[i] == timeSep {
  64. pos = i
  65. break
  66. }
  67. }
  68. if pos < 0 {
  69. return nil, false
  70. }
  71. val, err := strconv.ParseInt(string(body[:pos]), 10, 64)
  72. if err != nil {
  73. logx.Error(err)
  74. return nil, false
  75. }
  76. t := time.Unix(0, val)
  77. if t.Add(tolerance).Before(time.Now()) {
  78. return nil, false
  79. }
  80. return body[pos+1:], true
  81. }
  82. func getMaxTimeLen() int {
  83. return len(strconv.FormatInt(time.Now().UnixNano(), 10)) + 2
  84. }