Kevin Wan 1 год назад
Родитель
Сommit
32f78668db
1 измененных файлов с 82 добавлено и 7 удалено
  1. 82 7
      core/queue/queue_test.go

+ 82 - 7
core/queue/queue_test.go

@@ -1,6 +1,7 @@
 package queue
 
 import (
+	"errors"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -37,10 +38,82 @@ func TestQueue(t *testing.T) {
 	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
 }
 
+func TestQueue_Broadcast(t *testing.T) {
+	producer := newMockedProducer(rounds)
+	consumer := newMockedConsumer()
+	consumer.wait.Add(consumers)
+	q := NewQueue(func() (Producer, error) {
+		return producer, nil
+	}, func() (Consumer, error) {
+		return consumer, nil
+	})
+	q.AddListener(new(mockedListener))
+	q.SetName("mockqueue")
+	q.SetNumConsumer(consumers)
+	q.SetNumProducer(1)
+	q.Broadcast("message")
+	go func() {
+		producer.wait.Wait()
+		q.Stop()
+	}()
+	q.Start()
+	consumer.wait.Wait()
+	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
+	assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
+}
+
+func TestQueue_PauseResume(t *testing.T) {
+	producer := newMockedProducer(rounds)
+	consumer := newMockedConsumer()
+	consumer.wait.Add(consumers)
+	q := NewQueue(func() (Producer, error) {
+		return producer, nil
+	}, func() (Consumer, error) {
+		return consumer, nil
+	})
+	q.AddListener(new(mockedListener))
+	q.SetName("mockqueue")
+	q.SetNumConsumer(consumers)
+	q.SetNumProducer(1)
+	go func() {
+		producer.wait.Wait()
+		q.Stop()
+	}()
+	q.Start()
+	producer.listener.OnProducerPause()
+	assert.Equal(t, int32(0), atomic.LoadInt32(&q.active))
+	producer.listener.OnProducerResume()
+	assert.Equal(t, int32(1), atomic.LoadInt32(&q.active))
+	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
+}
+
+func TestQueue_ConsumeError(t *testing.T) {
+	producer := newMockedProducer(rounds)
+	consumer := newMockedConsumer()
+	consumer.consumeErr = errors.New("consume error")
+	consumer.wait.Add(consumers)
+	q := NewQueue(func() (Producer, error) {
+		return producer, nil
+	}, func() (Consumer, error) {
+		return consumer, nil
+	})
+	q.AddListener(new(mockedListener))
+	q.SetName("mockqueue")
+	q.SetNumConsumer(consumers)
+	q.SetNumProducer(1)
+	go func() {
+		producer.wait.Wait()
+		q.Stop()
+	}()
+	q.Start()
+	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
+}
+
 type mockedConsumer struct {
-	count  int32
-	events int32
-	wait   sync.WaitGroup
+	count      int32
+	events     int32
+	consumeErr error
+	wait       sync.WaitGroup
 }
 
 func newMockedConsumer() *mockedConsumer {
@@ -49,7 +122,7 @@ func newMockedConsumer() *mockedConsumer {
 
 func (c *mockedConsumer) Consume(string) error {
 	atomic.AddInt32(&c.count, 1)
-	return nil
+	return c.consumeErr
 }
 
 func (c *mockedConsumer) OnEvent(any) {
@@ -59,9 +132,10 @@ func (c *mockedConsumer) OnEvent(any) {
 }
 
 type mockedProducer struct {
-	total int32
-	count int32
-	wait  sync.WaitGroup
+	total    int32
+	count    int32
+	listener ProduceListener
+	wait     sync.WaitGroup
 }
 
 func newMockedProducer(total int32) *mockedProducer {
@@ -72,6 +146,7 @@ func newMockedProducer(total int32) *mockedProducer {
 }
 
 func (p *mockedProducer) AddListener(listener ProduceListener) {
+	p.listener = listener
 }
 
 func (p *mockedProducer) Produce() (string, bool) {