123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package kq
- import (
- "context"
- "strconv"
- "time"
- "zero/core/executors"
- "zero/core/logx"
- "github.com/segmentio/kafka-go"
- "github.com/segmentio/kafka-go/snappy"
- )
- type (
- PushOption func(options *chunkOptions)
- Pusher struct {
- produer *kafka.Writer
- topic string
- executor *executors.ChunkExecutor
- }
- chunkOptions struct {
- chunkSize int
- flushInterval time.Duration
- }
- )
- func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
- producer := kafka.NewWriter(kafka.WriterConfig{
- Brokers: addrs,
- Topic: topic,
- Balancer: &kafka.LeastBytes{},
- CompressionCodec: snappy.NewCompressionCodec(),
- })
- pusher := &Pusher{
- produer: producer,
- topic: topic,
- }
- pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
- chunk := make([]kafka.Message, len(tasks))
- for i := range tasks {
- chunk[i] = tasks[i].(kafka.Message)
- }
- if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
- logx.Error(err)
- }
- }, newOptions(opts)...)
- return pusher
- }
- func (p *Pusher) Close() error {
- return p.produer.Close()
- }
- func (p *Pusher) Name() string {
- return p.topic
- }
- func (p *Pusher) Push(v string) error {
- msg := kafka.Message{
- Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
- Value: []byte(v),
- }
- if p.executor != nil {
- return p.executor.Add(msg, len(v))
- } else {
- return p.produer.WriteMessages(context.Background(), msg)
- }
- }
- func WithChunkSize(chunkSize int) PushOption {
- return func(options *chunkOptions) {
- options.chunkSize = chunkSize
- }
- }
- func WithFlushInterval(interval time.Duration) PushOption {
- return func(options *chunkOptions) {
- options.flushInterval = interval
- }
- }
- func newOptions(opts []PushOption) []executors.ChunkOption {
- var options chunkOptions
- for _, opt := range opts {
- opt(&options)
- }
- var chunkOpts []executors.ChunkOption
- if options.chunkSize > 0 {
- chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
- }
- if options.flushInterval > 0 {
- chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
- }
- return chunkOpts
- }
|