Bladeren bron

add queue package

kevin 4 jaren geleden
bovenliggende
commit
6fdee77fa9

+ 44 - 0
core/queue/balancedqueuepusher.go

@@ -0,0 +1,44 @@
+package queue
+
+import (
+	"errors"
+	"sync/atomic"
+
+	"github.com/tal-tech/go-zero/core/logx"
+)
+
+var ErrNoAvailablePusher = errors.New("no available pusher")
+
+type BalancedQueuePusher struct {
+	name    string
+	pushers []Pusher
+	index   uint64
+}
+
+func NewBalancedQueuePusher(pushers []Pusher) Pusher {
+	return &BalancedQueuePusher{
+		name:    generateName(pushers),
+		pushers: pushers,
+	}
+}
+
+func (pusher *BalancedQueuePusher) Name() string {
+	return pusher.name
+}
+
+func (pusher *BalancedQueuePusher) Push(message string) error {
+	size := len(pusher.pushers)
+
+	for i := 0; i < size; i++ {
+		index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
+		target := pusher.pushers[index]
+
+		if err := target.Push(message); err != nil {
+			logx.Error(err)
+		} else {
+			return nil
+		}
+	}
+
+	return ErrNoAvailablePusher
+}

+ 43 - 0
core/queue/balancedqueuepusher_test.go

@@ -0,0 +1,43 @@
+package queue
+
+import (
+	"fmt"
+	"strconv"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestBalancedQueuePusher(t *testing.T) {
+	const numPushers = 100
+	var pushers []Pusher
+	var mockedPushers []*mockedPusher
+	for i := 0; i < numPushers; i++ {
+		p := &mockedPusher{
+			name: "pusher:" + strconv.Itoa(i),
+		}
+		pushers = append(pushers, p)
+		mockedPushers = append(mockedPushers, p)
+	}
+
+	pusher := NewBalancedQueuePusher(pushers)
+	assert.True(t, len(pusher.Name()) > 0)
+
+	for i := 0; i < numPushers*1000; i++ {
+		assert.Nil(t, pusher.Push("item"))
+	}
+
+	var counts []int
+	for _, p := range mockedPushers {
+		counts = append(counts, p.count)
+	}
+	mean := calcMean(counts)
+	variance := calcVariance(mean, counts)
+	assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
+}
+
+func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
+	pusher := NewBalancedQueuePusher(nil)
+	assert.True(t, len(pusher.Name()) == 0)
+	assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
+}

+ 10 - 0
core/queue/consumer.go

@@ -0,0 +1,10 @@
+package queue
+
+type (
+	Consumer interface {
+		Consume(string) error
+		OnEvent(event interface{})
+	}
+
+	ConsumerFactory func() (Consumer, error)
+)

+ 6 - 0
core/queue/messagequeue.go

@@ -0,0 +1,6 @@
+package queue
+
+type MessageQueue interface {
+	Start()
+	Stop()
+}

+ 31 - 0
core/queue/multiqueuepusher.go

@@ -0,0 +1,31 @@
+package queue
+
+import "github.com/tal-tech/go-zero/core/errorx"
+
+type MultiQueuePusher struct {
+	name    string
+	pushers []Pusher
+}
+
+func NewMultiQueuePusher(pushers []Pusher) Pusher {
+	return &MultiQueuePusher{
+		name:    generateName(pushers),
+		pushers: pushers,
+	}
+}
+
+func (pusher *MultiQueuePusher) Name() string {
+	return pusher.name
+}
+
+func (pusher *MultiQueuePusher) Push(message string) error {
+	var batchError errorx.BatchError
+
+	for _, each := range pusher.pushers {
+		if err := each.Push(message); err != nil {
+			batchError.Add(err)
+		}
+	}
+
+	return batchError.Err()
+}

+ 39 - 0
core/queue/multiqueuepusher_test.go

@@ -0,0 +1,39 @@
+package queue
+
+import (
+	"fmt"
+	"math"
+	"strconv"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestMultiQueuePusher(t *testing.T) {
+	const numPushers = 100
+	var pushers []Pusher
+	var mockedPushers []*mockedPusher
+	for i := 0; i < numPushers; i++ {
+		p := &mockedPusher{
+			name: "pusher:" + strconv.Itoa(i),
+		}
+		pushers = append(pushers, p)
+		mockedPushers = append(mockedPushers, p)
+	}
+
+	pusher := NewMultiQueuePusher(pushers)
+	assert.True(t, len(pusher.Name()) > 0)
+
+	for i := 0; i < 1000; i++ {
+		_ = pusher.Push("item")
+	}
+
+	var counts []int
+	for _, p := range mockedPushers {
+		counts = append(counts, p.count)
+	}
+	mean := calcMean(counts)
+	variance := calcVariance(mean, counts)
+	assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10)
+	assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
+}

+ 15 - 0
core/queue/producer.go

@@ -0,0 +1,15 @@
+package queue
+
+type (
+	Producer interface {
+		AddListener(listener ProduceListener)
+		Produce() (string, bool)
+	}
+
+	ProduceListener interface {
+		OnProducerPause()
+		OnProducerResume()
+	}
+
+	ProducerFactory func() (Producer, error)
+)

+ 239 - 0
core/queue/queue.go

@@ -0,0 +1,239 @@
+package queue
+
+import (
+	"runtime"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/tal-tech/go-zero/core/logx"
+	"github.com/tal-tech/go-zero/core/rescue"
+	"github.com/tal-tech/go-zero/core/stat"
+	"github.com/tal-tech/go-zero/core/threading"
+	"github.com/tal-tech/go-zero/core/timex"
+)
+
+const queueName = "queue"
+
+type (
+	Queue struct {
+		name                 string
+		metrics              *stat.Metrics
+		producerFactory      ProducerFactory
+		producerRoutineGroup *threading.RoutineGroup
+		consumerFactory      ConsumerFactory
+		consumerRoutineGroup *threading.RoutineGroup
+		producerCount        int
+		consumerCount        int
+		active               int32
+		channel              chan string
+		quit                 chan struct{}
+		listeners            []Listener
+		eventLock            sync.Mutex
+		eventChannels        []chan interface{}
+	}
+
+	Listener interface {
+		OnPause()
+		OnResume()
+	}
+
+	Poller interface {
+		Name() string
+		Poll() string
+	}
+
+	Pusher interface {
+		Name() string
+		Push(string) error
+	}
+)
+
+func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
+	queue := &Queue{
+		metrics:              stat.NewMetrics(queueName),
+		producerFactory:      producerFactory,
+		producerRoutineGroup: threading.NewRoutineGroup(),
+		consumerFactory:      consumerFactory,
+		consumerRoutineGroup: threading.NewRoutineGroup(),
+		producerCount:        runtime.NumCPU(),
+		consumerCount:        runtime.NumCPU() << 1,
+		channel:              make(chan string),
+		quit:                 make(chan struct{}),
+	}
+	queue.SetName(queueName)
+
+	return queue
+}
+
+func (queue *Queue) AddListener(listener Listener) {
+	queue.listeners = append(queue.listeners, listener)
+}
+
+func (queue *Queue) Broadcast(message interface{}) {
+	go func() {
+		queue.eventLock.Lock()
+		defer queue.eventLock.Unlock()
+
+		for _, channel := range queue.eventChannels {
+			channel <- message
+		}
+	}()
+}
+
+func (queue *Queue) SetName(name string) {
+	queue.name = name
+	queue.metrics.SetName(name)
+}
+
+func (queue *Queue) SetNumConsumer(count int) {
+	queue.consumerCount = count
+}
+
+func (queue *Queue) SetNumProducer(count int) {
+	queue.producerCount = count
+}
+
+func (queue *Queue) Start() {
+	queue.startProducers(queue.producerCount)
+	queue.startConsumers(queue.consumerCount)
+
+	queue.producerRoutineGroup.Wait()
+	close(queue.channel)
+	queue.consumerRoutineGroup.Wait()
+}
+
+func (queue *Queue) Stop() {
+	close(queue.quit)
+}
+
+func (queue *Queue) consume(eventChan chan interface{}) {
+	var consumer Consumer
+
+	for {
+		var err error
+		if consumer, err = queue.consumerFactory(); err != nil {
+			logx.Errorf("Error on creating consumer: %v", err)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	for {
+		select {
+		case message, ok := <-queue.channel:
+			if ok {
+				queue.consumeOne(consumer, message)
+			} else {
+				logx.Info("Task channel was closed, quitting consumer...")
+				return
+			}
+		case event := <-eventChan:
+			consumer.OnEvent(event)
+		}
+	}
+}
+
+func (queue *Queue) consumeOne(consumer Consumer, message string) {
+	threading.RunSafe(func() {
+		startTime := timex.Now()
+		defer func() {
+			duration := timex.Since(startTime)
+			queue.metrics.Add(stat.Task{
+				Duration: duration,
+			})
+			logx.WithDuration(duration).Infof("%s", message)
+		}()
+
+		if err := consumer.Consume(message); err != nil {
+			logx.Errorf("Error occurred while consuming %v: %v", message, err)
+		}
+	})
+}
+
+func (queue *Queue) pause() {
+	for _, listener := range queue.listeners {
+		listener.OnPause()
+	}
+}
+
+func (queue *Queue) produce() {
+	var producer Producer
+
+	for {
+		var err error
+		if producer, err = queue.producerFactory(); err != nil {
+			logx.Errorf("Error on creating producer: %v", err)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	atomic.AddInt32(&queue.active, 1)
+	producer.AddListener(routineListener{
+		queue: queue,
+	})
+
+	for {
+		select {
+		case <-queue.quit:
+			logx.Info("Quitting producer")
+			return
+		default:
+			if v, ok := queue.produceOne(producer); ok {
+				queue.channel <- v
+			}
+		}
+	}
+}
+
+func (queue *Queue) produceOne(producer Producer) (string, bool) {
+	// avoid panic quit the producer, just log it and continue
+	defer rescue.Recover()
+
+	return producer.Produce()
+}
+
+func (queue *Queue) resume() {
+	for _, listener := range queue.listeners {
+		listener.OnResume()
+	}
+}
+
+func (queue *Queue) startConsumers(number int) {
+	for i := 0; i < number; i++ {
+		eventChan := make(chan interface{})
+		queue.eventLock.Lock()
+		queue.eventChannels = append(queue.eventChannels, eventChan)
+		queue.eventLock.Unlock()
+		queue.consumerRoutineGroup.Run(func() {
+			queue.consume(eventChan)
+		})
+	}
+}
+
+func (queue *Queue) startProducers(number int) {
+	for i := 0; i < number; i++ {
+		queue.producerRoutineGroup.Run(func() {
+			queue.produce()
+		})
+	}
+}
+
+type routineListener struct {
+	queue *Queue
+}
+
+func (rl routineListener) OnProducerPause() {
+	if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
+		rl.queue.pause()
+	}
+}
+
+func (rl routineListener) OnProducerResume() {
+	if atomic.AddInt32(&rl.queue.active, 1) == 1 {
+		rl.queue.resume()
+	}
+}

+ 94 - 0
core/queue/queue_test.go

@@ -0,0 +1,94 @@
+package queue
+
+import (
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+const (
+	consumers = 4
+	rounds    = 100
+)
+
+func TestQueue(t *testing.T) {
+	producer := newMockedProducer(rounds)
+	consumer := newMockedConsumer()
+	consumer.wait.Add(consumers)
+	q := NewQueue(func() (Producer, error) {
+		return producer, nil
+	}, func() (Consumer, error) {
+		return consumer, nil
+	})
+	q.AddListener(new(mockedListener))
+	q.SetName("mockqueue")
+	q.SetNumConsumer(consumers)
+	q.SetNumProducer(1)
+	q.pause()
+	q.resume()
+	go func() {
+		producer.wait.Wait()
+		q.Stop()
+	}()
+	q.Start()
+	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
+}
+
+type mockedConsumer struct {
+	count  int32
+	events int32
+	wait   sync.WaitGroup
+}
+
+func newMockedConsumer() *mockedConsumer {
+	return new(mockedConsumer)
+}
+
+func (c *mockedConsumer) Consume(string) error {
+	atomic.AddInt32(&c.count, 1)
+	return nil
+}
+
+func (c *mockedConsumer) OnEvent(interface{}) {
+	if atomic.AddInt32(&c.events, 1) <= consumers {
+		c.wait.Done()
+	}
+}
+
+type mockedProducer struct {
+	total int32
+	count int32
+	wait  sync.WaitGroup
+}
+
+func newMockedProducer(total int32) *mockedProducer {
+	p := new(mockedProducer)
+	p.total = total
+	p.wait.Add(int(total))
+	return p
+}
+
+func (p *mockedProducer) AddListener(listener ProduceListener) {
+}
+
+func (p *mockedProducer) Produce() (string, bool) {
+	if atomic.AddInt32(&p.count, 1) <= p.total {
+		p.wait.Done()
+		return "item", true
+	} else {
+		time.Sleep(time.Second)
+		return "", false
+	}
+}
+
+type mockedListener struct {
+}
+
+func (l *mockedListener) OnPause() {
+}
+
+func (l *mockedListener) OnResume() {
+}

+ 12 - 0
core/queue/util.go

@@ -0,0 +1,12 @@
+package queue
+
+import "strings"
+
+func generateName(pushers []Pusher) string {
+	names := make([]string, len(pushers))
+	for i, pusher := range pushers {
+		names[i] = pusher.Name()
+	}
+
+	return strings.Join(names, ",")
+}

+ 77 - 0
core/queue/util_test.go

@@ -0,0 +1,77 @@
+package queue
+
+import (
+	"errors"
+	"math"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/tal-tech/go-zero/core/logx"
+	"github.com/tal-tech/go-zero/core/mathx"
+)
+
+var (
+	proba     = mathx.NewProba()
+	failProba = 0.01
+)
+
+func init() {
+	logx.Disable()
+}
+
+func TestGenerateName(t *testing.T) {
+	pushers := []Pusher{
+		&mockedPusher{name: "first"},
+		&mockedPusher{name: "second"},
+		&mockedPusher{name: "third"},
+	}
+
+	assert.Equal(t, "first,second,third", generateName(pushers))
+}
+
+func TestGenerateNameNil(t *testing.T) {
+	var pushers []Pusher
+	assert.Equal(t, "", generateName(pushers))
+}
+
+func calcMean(vals []int) float64 {
+	if len(vals) == 0 {
+		return 0
+	}
+
+	var result float64
+	for _, val := range vals {
+		result += float64(val)
+	}
+	return result / float64(len(vals))
+}
+
+func calcVariance(mean float64, vals []int) float64 {
+	if len(vals) == 0 {
+		return 0
+	}
+
+	var result float64
+	for _, val := range vals {
+		result += math.Pow(float64(val)-mean, 2)
+	}
+	return result / float64(len(vals))
+}
+
+type mockedPusher struct {
+	name  string
+	count int
+}
+
+func (p *mockedPusher) Name() string {
+	return p.name
+}
+
+func (p *mockedPusher) Push(s string) error {
+	if proba.TrueOnProba(failProba) {
+		return errors.New("dummy")
+	}
+
+	p.count++
+	return nil
+}