queue.go 4.8 KB


  1. package kq
  2. import (
  3. "context"
  4. "io"
  5. "log"
  6. "time"
  7. "zero/core/logx"
  8. "zero/core/queue"
  9. "zero/core/service"
  10. "zero/core/stat"
  11. "zero/core/threading"
  12. "zero/core/timex"
  13. "github.com/segmentio/kafka-go"
  14. _ "github.com/segmentio/kafka-go/gzip"
  15. _ "github.com/segmentio/kafka-go/lz4"
  16. _ "github.com/segmentio/kafka-go/snappy"
  17. )
  18. const (
  19. defaultCommitInterval = time.Second
  20. defaultMaxWait = time.Second
  21. )
  22. type (
  23. ConsumeHandle func(key, value string) error
  24. ConsumeHandler interface {
  25. Consume(key, value string) error
  26. }
  27. queueOptions struct {
  28. commitInterval time.Duration
  29. maxWait time.Duration
  30. metrics *stat.Metrics
  31. }
  32. QueueOption func(*queueOptions)
  33. kafkaQueue struct {
  34. c KqConf
  35. consumer *kafka.Reader
  36. handler ConsumeHandler
  37. channel chan kafka.Message
  38. producerRoutines *threading.RoutineGroup
  39. consumerRoutines *threading.RoutineGroup
  40. metrics *stat.Metrics
  41. }
  42. kafkaQueues struct {
  43. queues []queue.MessageQueue
  44. group *service.ServiceGroup
  45. }
  46. )
  47. func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
  48. q, err := NewQueue(c, handler, opts...)
  49. if err != nil {
  50. log.Fatal(err)
  51. }
  52. return q
  53. }
  54. func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
  55. if err := c.SetUp(); err != nil {
  56. return nil, err
  57. }
  58. var options queueOptions
  59. for _, opt := range opts {
  60. opt(&options)
  61. }
  62. ensureQueueOptions(c, &options)
  63. if c.NumConns < 1 {
  64. c.NumConns = 1
  65. }
  66. q := kafkaQueues{
  67. group: service.NewServiceGroup(),
  68. }
  69. for i := 0; i < c.NumConns; i++ {
  70. q.queues = append(q.queues, newKafkaQueue(c, handler, options))
  71. }
  72. return q, nil
  73. }
  74. func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
  75. var offset int64
  76. if c.Offset == firstOffset {
  77. offset = kafka.FirstOffset
  78. } else {
  79. offset = kafka.LastOffset
  80. }
  81. consumer := kafka.NewReader(kafka.ReaderConfig{
  82. Brokers: c.Brokers,
  83. GroupID: c.Group,
  84. Topic: c.Topic,
  85. StartOffset: offset,
  86. MinBytes: c.MinBytes, // 10KB
  87. MaxBytes: c.MaxBytes, // 10MB
  88. MaxWait: options.maxWait,
  89. CommitInterval: options.commitInterval,
  90. })
  91. return &kafkaQueue{
  92. c: c,
  93. consumer: consumer,
  94. handler: handler,
  95. channel: make(chan kafka.Message),
  96. producerRoutines: threading.NewRoutineGroup(),
  97. consumerRoutines: threading.NewRoutineGroup(),
  98. metrics: options.metrics,
  99. }
  100. }
  101. func (q *kafkaQueue) Start() {
  102. q.startConsumers()
  103. q.startProducers()
  104. q.producerRoutines.Wait()
  105. close(q.channel)
  106. q.consumerRoutines.Wait()
  107. }
  108. func (q *kafkaQueue) Stop() {
  109. q.consumer.Close()
  110. logx.Close()
  111. }
  112. func (q *kafkaQueue) consumeOne(key, val string) error {
  113. startTime := timex.Now()
  114. err := q.handler.Consume(key, val)
  115. q.metrics.Add(stat.Task{
  116. Duration: timex.Since(startTime),
  117. })
  118. return err
  119. }
  120. func (q *kafkaQueue) startConsumers() {
  121. for i := 0; i < q.c.NumConsumers; i++ {
  122. q.consumerRoutines.Run(func() {
  123. for msg := range q.channel {
  124. if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
  125. logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
  126. }
  127. }
  128. })
  129. }
  130. }
  131. func (q *kafkaQueue) startProducers() {
  132. for i := 0; i < q.c.NumProducers; i++ {
  133. q.producerRoutines.Run(func() {
  134. for {
  135. msg, err := q.consumer.ReadMessage(context.Background())
  136. // io.EOF means consumer closed
  137. // io.ErrClosedPipe means committing messages on the consumer,
  138. // kafka will refire the messages on uncommitted messages, ignore
  139. if err == io.EOF || err == io.ErrClosedPipe {
  140. return
  141. }
  142. if err != nil {
  143. logx.Errorf("Error on reading mesage, %q", err.Error())
  144. continue
  145. }
  146. q.channel <- msg
  147. }
  148. })
  149. }
  150. }
  151. func (q kafkaQueues) Start() {
  152. for _, each := range q.queues {
  153. q.group.Add(each)
  154. }
  155. q.group.Start()
  156. }
  157. func (q kafkaQueues) Stop() {
  158. q.group.Stop()
  159. }
  160. func WithCommitInterval(interval time.Duration) QueueOption {
  161. return func(options *queueOptions) {
  162. options.commitInterval = interval
  163. }
  164. }
  165. func WithHandle(handle ConsumeHandle) ConsumeHandler {
  166. return innerConsumeHandler{
  167. handle: handle,
  168. }
  169. }
  170. func WithMaxWait(wait time.Duration) QueueOption {
  171. return func(options *queueOptions) {
  172. options.maxWait = wait
  173. }
  174. }
  175. func WithMetrics(metrics *stat.Metrics) QueueOption {
  176. return func(options *queueOptions) {
  177. options.metrics = metrics
  178. }
  179. }
  180. type innerConsumeHandler struct {
  181. handle ConsumeHandle
  182. }
  183. func (ch innerConsumeHandler) Consume(k, v string) error {
  184. return ch.handle(k, v)
  185. }
  186. func ensureQueueOptions(c KqConf, options *queueOptions) {
  187. if options.commitInterval == 0 {
  188. options.commitInterval = defaultCommitInterval
  189. }
  190. if options.maxWait == 0 {
  191. options.maxWait = defaultMaxWait
  192. }
  193. if options.metrics == nil {
  194. options.metrics = stat.NewMetrics(c.Name)
  195. }
  196. }