pusher.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package kq
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "zero/core/executors"
  7. "zero/core/logx"
  8. "github.com/segmentio/kafka-go"
  9. "github.com/segmentio/kafka-go/snappy"
  10. )
  11. type (
  12. PushOption func(options *chunkOptions)
  13. Pusher struct {
  14. produer *kafka.Writer
  15. topic string
  16. executor *executors.ChunkExecutor
  17. }
  18. chunkOptions struct {
  19. chunkSize int
  20. flushInterval time.Duration
  21. }
  22. )
  23. func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
  24. producer := kafka.NewWriter(kafka.WriterConfig{
  25. Brokers: addrs,
  26. Topic: topic,
  27. Balancer: &kafka.LeastBytes{},
  28. CompressionCodec: snappy.NewCompressionCodec(),
  29. })
  30. pusher := &Pusher{
  31. produer: producer,
  32. topic: topic,
  33. }
  34. pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
  35. chunk := make([]kafka.Message, len(tasks))
  36. for i := range tasks {
  37. chunk[i] = tasks[i].(kafka.Message)
  38. }
  39. if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
  40. logx.Error(err)
  41. }
  42. }, newOptions(opts)...)
  43. return pusher
  44. }
  45. func (p *Pusher) Close() error {
  46. return p.produer.Close()
  47. }
  48. func (p *Pusher) Name() string {
  49. return p.topic
  50. }
  51. func (p *Pusher) Push(v string) error {
  52. msg := kafka.Message{
  53. Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
  54. Value: []byte(v),
  55. }
  56. if p.executor != nil {
  57. return p.executor.Add(msg, len(v))
  58. } else {
  59. return p.produer.WriteMessages(context.Background(), msg)
  60. }
  61. }
  62. func WithChunkSize(chunkSize int) PushOption {
  63. return func(options *chunkOptions) {
  64. options.chunkSize = chunkSize
  65. }
  66. }
  67. func WithFlushInterval(interval time.Duration) PushOption {
  68. return func(options *chunkOptions) {
  69. options.flushInterval = interval
  70. }
  71. }
  72. func newOptions(opts []PushOption) []executors.ChunkOption {
  73. var options chunkOptions
  74. for _, opt := range opts {
  75. opt(&options)
  76. }
  77. var chunkOpts []executors.ChunkOption
  78. if options.chunkSize > 0 {
  79. chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
  80. }
  81. if options.flushInterval > 0 {
  82. chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
  83. }
  84. return chunkOpts
  85. }