123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package kq
- import (
- "context"
- "io"
- "log"
- "time"
- "zero/core/logx"
- "zero/core/queue"
- "zero/core/service"
- "zero/core/stat"
- "zero/core/threading"
- "zero/core/timex"
- "github.com/segmentio/kafka-go"
- _ "github.com/segmentio/kafka-go/gzip"
- _ "github.com/segmentio/kafka-go/lz4"
- _ "github.com/segmentio/kafka-go/snappy"
- )
- const (
- defaultCommitInterval = time.Second
- defaultMaxWait = time.Second
- )
- type (
- ConsumeHandle func(key, value string) error
- ConsumeHandler interface {
- Consume(key, value string) error
- }
- queueOptions struct {
- commitInterval time.Duration
- maxWait time.Duration
- metrics *stat.Metrics
- }
- QueueOption func(*queueOptions)
- kafkaQueue struct {
- c KqConf
- consumer *kafka.Reader
- handler ConsumeHandler
- channel chan kafka.Message
- producerRoutines *threading.RoutineGroup
- consumerRoutines *threading.RoutineGroup
- metrics *stat.Metrics
- }
- kafkaQueues struct {
- queues []queue.MessageQueue
- group *service.ServiceGroup
- }
- )
- func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
- q, err := NewQueue(c, handler, opts...)
- if err != nil {
- log.Fatal(err)
- }
- return q
- }
- func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
- if err := c.SetUp(); err != nil {
- return nil, err
- }
- var options queueOptions
- for _, opt := range opts {
- opt(&options)
- }
- ensureQueueOptions(c, &options)
- if c.NumConns < 1 {
- c.NumConns = 1
- }
- q := kafkaQueues{
- group: service.NewServiceGroup(),
- }
- for i := 0; i < c.NumConns; i++ {
- q.queues = append(q.queues, newKafkaQueue(c, handler, options))
- }
- return q, nil
- }
- func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
- var offset int64
- if c.Offset == firstOffset {
- offset = kafka.FirstOffset
- } else {
- offset = kafka.LastOffset
- }
- consumer := kafka.NewReader(kafka.ReaderConfig{
- Brokers: c.Brokers,
- GroupID: c.Group,
- Topic: c.Topic,
- StartOffset: offset,
- MinBytes: c.MinBytes, // 10KB
- MaxBytes: c.MaxBytes, // 10MB
- MaxWait: options.maxWait,
- CommitInterval: options.commitInterval,
- })
- return &kafkaQueue{
- c: c,
- consumer: consumer,
- handler: handler,
- channel: make(chan kafka.Message),
- producerRoutines: threading.NewRoutineGroup(),
- consumerRoutines: threading.NewRoutineGroup(),
- metrics: options.metrics,
- }
- }
- func (q *kafkaQueue) Start() {
- q.startConsumers()
- q.startProducers()
- q.producerRoutines.Wait()
- close(q.channel)
- q.consumerRoutines.Wait()
- }
- func (q *kafkaQueue) Stop() {
- q.consumer.Close()
- logx.Close()
- }
- func (q *kafkaQueue) consumeOne(key, val string) error {
- startTime := timex.Now()
- err := q.handler.Consume(key, val)
- q.metrics.Add(stat.Task{
- Duration: timex.Since(startTime),
- })
- return err
- }
- func (q *kafkaQueue) startConsumers() {
- for i := 0; i < q.c.NumConsumers; i++ {
- q.consumerRoutines.Run(func() {
- for msg := range q.channel {
- if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
- logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
- }
- }
- })
- }
- }
- func (q *kafkaQueue) startProducers() {
- for i := 0; i < q.c.NumProducers; i++ {
- q.producerRoutines.Run(func() {
- for {
- msg, err := q.consumer.ReadMessage(context.Background())
- // io.EOF means consumer closed
- // io.ErrClosedPipe means committing messages on the consumer,
- // kafka will refire the messages on uncommitted messages, ignore
- if err == io.EOF || err == io.ErrClosedPipe {
- return
- }
- if err != nil {
- logx.Errorf("Error on reading mesage, %q", err.Error())
- continue
- }
- q.channel <- msg
- }
- })
- }
- }
- func (q kafkaQueues) Start() {
- for _, each := range q.queues {
- q.group.Add(each)
- }
- q.group.Start()
- }
- func (q kafkaQueues) Stop() {
- q.group.Stop()
- }
- func WithCommitInterval(interval time.Duration) QueueOption {
- return func(options *queueOptions) {
- options.commitInterval = interval
- }
- }
- func WithHandle(handle ConsumeHandle) ConsumeHandler {
- return innerConsumeHandler{
- handle: handle,
- }
- }
- func WithMaxWait(wait time.Duration) QueueOption {
- return func(options *queueOptions) {
- options.maxWait = wait
- }
- }
- func WithMetrics(metrics *stat.Metrics) QueueOption {
- return func(options *queueOptions) {
- options.metrics = metrics
- }
- }
- type innerConsumeHandler struct {
- handle ConsumeHandle
- }
- func (ch innerConsumeHandler) Consume(k, v string) error {
- return ch.handle(k, v)
- }
- func ensureQueueOptions(c KqConf, options *queueOptions) {
- if options.commitInterval == 0 {
- options.commitInterval = defaultCommitInterval
- }
- if options.maxWait == 0 {
- options.maxWait = defaultMaxWait
- }
- if options.metrics == nil {
- options.metrics = stat.NewMetrics(c.Name)
- }
- }
|