Explorar o código

remove packages

kevin %!s(int64=4) %!d(string=hai) anos
pai
achega
6d9602fa35
Modificáronse 47 ficheiros con 0 adicións e 3590 borrados
  1. 0 44
      core/queue/balancedqueuepusher.go
  2. 0 43
      core/queue/balancedqueuepusher_test.go
  3. 0 10
      core/queue/consumer.go
  4. 0 6
      core/queue/messagequeue.go
  5. 0 31
      core/queue/multiqueuepusher.go
  6. 0 39
      core/queue/multiqueuepusher_test.go
  7. 0 15
      core/queue/producer.go
  8. 0 239
      core/queue/queue.go
  9. 0 94
      core/queue/queue_test.go
  10. 0 12
      core/queue/util.go
  11. 0 78
      core/queue/util_test.go
  12. 0 15
      dq/config.go
  13. 0 65
      dq/connection.go
  14. 0 100
      dq/consumer.go
  15. 0 95
      dq/consumernode.go
  16. 0 156
      dq/producer.go
  17. 0 98
      dq/producernode.go
  18. 0 15
      dq/vars.go
  19. 0 42
      example/beanstalk/consumer/consumer.go
  20. 0 40
      example/beanstalk/producer/producer.go
  21. 0 11
      example/jobqueue/jobqueue.go
  22. 0 12
      example/kmq/consumer/config.json
  23. 0 20
      example/kmq/consumer/queue.go
  24. 0 51
      example/kmq/producer/produce.go
  25. 0 86
      example/queue/poll/poller.go
  26. 0 31
      example/queue/push/pusher.go
  27. 0 62
      example/redis/cluster.go
  28. 0 142
      example/sqlc/user.go
  29. 0 5
      go.mod
  30. 0 18
      go.sum
  31. 0 21
      kq/config.go
  32. 0 101
      kq/pusher.go
  33. 0 229
      kq/queue.go
  34. 0 18
      rq/config.go
  35. 0 19
      rq/etc/config.json
  36. 0 19
      rq/internal/conf.go
  37. 0 7
      rq/internal/const.go
  38. 0 39
      rq/internal/hashchange.go
  39. 0 6
      rq/internal/message.go
  40. 0 82
      rq/internal/redisqueue_test.go
  41. 0 166
      rq/internal/redisqueueproducer.go
  42. 0 78
      rq/internal/redisqueuepusher.go
  43. 0 179
      rq/internal/update/incrementalupdater.go
  44. 0 106
      rq/internal/update/serverchange.go
  45. 0 445
      rq/pusher.go
  46. 0 338
      rq/queue.go
  47. 0 62
      rq/queue_test.go

+ 0 - 44
core/queue/balancedqueuepusher.go

@@ -1,44 +0,0 @@
-package queue
-
-import (
-	"errors"
-	"sync/atomic"
-
-	"zero/core/logx"
-)
-
-var ErrNoAvailablePusher = errors.New("no available pusher")
-
-type BalancedQueuePusher struct {
-	name    string
-	pushers []QueuePusher
-	index   uint64
-}
-
-func NewBalancedQueuePusher(pushers []QueuePusher) QueuePusher {
-	return &BalancedQueuePusher{
-		name:    generateName(pushers),
-		pushers: pushers,
-	}
-}
-
-func (pusher *BalancedQueuePusher) Name() string {
-	return pusher.name
-}
-
-func (pusher *BalancedQueuePusher) Push(message string) error {
-	size := len(pusher.pushers)
-
-	for i := 0; i < size; i++ {
-		index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
-		target := pusher.pushers[index]
-
-		if err := target.Push(message); err != nil {
-			logx.Error(err)
-		} else {
-			return nil
-		}
-	}
-
-	return ErrNoAvailablePusher
-}

+ 0 - 43
core/queue/balancedqueuepusher_test.go

@@ -1,43 +0,0 @@
-package queue
-
-import (
-	"fmt"
-	"strconv"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestBalancedQueuePusher(t *testing.T) {
-	const numPushers = 100
-	var pushers []QueuePusher
-	var mockedPushers []*mockedPusher
-	for i := 0; i < numPushers; i++ {
-		p := &mockedPusher{
-			name: "pusher:" + strconv.Itoa(i),
-		}
-		pushers = append(pushers, p)
-		mockedPushers = append(mockedPushers, p)
-	}
-
-	pusher := NewBalancedQueuePusher(pushers)
-	assert.True(t, len(pusher.Name()) > 0)
-
-	for i := 0; i < numPushers*1000; i++ {
-		assert.Nil(t, pusher.Push("item"))
-	}
-
-	var counts []int
-	for _, p := range mockedPushers {
-		counts = append(counts, p.count)
-	}
-	mean := calcMean(counts)
-	variance := calcVariance(mean, counts)
-	assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
-}
-
-func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
-	pusher := NewBalancedQueuePusher(nil)
-	assert.True(t, len(pusher.Name()) == 0)
-	assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
-}

+ 0 - 10
core/queue/consumer.go

@@ -1,10 +0,0 @@
-package queue
-
-type (
-	Consumer interface {
-		Consume(string) error
-		OnEvent(event interface{})
-	}
-
-	ConsumerFactory func() (Consumer, error)
-)

+ 0 - 6
core/queue/messagequeue.go

@@ -1,6 +0,0 @@
-package queue
-
-type MessageQueue interface {
-	Start()
-	Stop()
-}

+ 0 - 31
core/queue/multiqueuepusher.go

@@ -1,31 +0,0 @@
-package queue
-
-import "zero/core/errorx"
-
-type MultiQueuePusher struct {
-	name    string
-	pushers []QueuePusher
-}
-
-func NewMultiQueuePusher(pushers []QueuePusher) QueuePusher {
-	return &MultiQueuePusher{
-		name:    generateName(pushers),
-		pushers: pushers,
-	}
-}
-
-func (pusher *MultiQueuePusher) Name() string {
-	return pusher.name
-}
-
-func (pusher *MultiQueuePusher) Push(message string) error {
-	var batchError errorx.BatchError
-
-	for _, each := range pusher.pushers {
-		if err := each.Push(message); err != nil {
-			batchError.Add(err)
-		}
-	}
-
-	return batchError.Err()
-}

+ 0 - 39
core/queue/multiqueuepusher_test.go

@@ -1,39 +0,0 @@
-package queue
-
-import (
-	"fmt"
-	"math"
-	"strconv"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestMultiQueuePusher(t *testing.T) {
-	const numPushers = 100
-	var pushers []QueuePusher
-	var mockedPushers []*mockedPusher
-	for i := 0; i < numPushers; i++ {
-		p := &mockedPusher{
-			name: "pusher:" + strconv.Itoa(i),
-		}
-		pushers = append(pushers, p)
-		mockedPushers = append(mockedPushers, p)
-	}
-
-	pusher := NewMultiQueuePusher(pushers)
-	assert.True(t, len(pusher.Name()) > 0)
-
-	for i := 0; i < 1000; i++ {
-		_ = pusher.Push("item")
-	}
-
-	var counts []int
-	for _, p := range mockedPushers {
-		counts = append(counts, p.count)
-	}
-	mean := calcMean(counts)
-	variance := calcVariance(mean, counts)
-	assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10)
-	assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
-}

+ 0 - 15
core/queue/producer.go

@@ -1,15 +0,0 @@
-package queue
-
-type (
-	Producer interface {
-		AddListener(listener ProduceListener)
-		Produce() (string, bool)
-	}
-
-	ProduceListener interface {
-		OnProducerPause()
-		OnProducerResume()
-	}
-
-	ProducerFactory func() (Producer, error)
-)

+ 0 - 239
core/queue/queue.go

@@ -1,239 +0,0 @@
-package queue
-
-import (
-	"runtime"
-	"sync"
-	"sync/atomic"
-	"time"
-
-	"zero/core/logx"
-	"zero/core/rescue"
-	"zero/core/stat"
-	"zero/core/threading"
-	"zero/core/timex"
-)
-
-const queueName = "queue"
-
-type (
-	Queue struct {
-		name                 string
-		metrics              *stat.Metrics
-		producerFactory      ProducerFactory
-		producerRoutineGroup *threading.RoutineGroup
-		consumerFactory      ConsumerFactory
-		consumerRoutineGroup *threading.RoutineGroup
-		producerCount        int
-		consumerCount        int
-		active               int32
-		channel              chan string
-		quit                 chan struct{}
-		listeners            []QueueListener
-		eventLock            sync.Mutex
-		eventChannels        []chan interface{}
-	}
-
-	QueueListener interface {
-		OnPause()
-		OnResume()
-	}
-
-	QueuePoller interface {
-		Name() string
-		Poll() string
-	}
-
-	QueuePusher interface {
-		Name() string
-		Push(string) error
-	}
-)
-
-func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
-	queue := &Queue{
-		metrics:              stat.NewMetrics(queueName),
-		producerFactory:      producerFactory,
-		producerRoutineGroup: threading.NewRoutineGroup(),
-		consumerFactory:      consumerFactory,
-		consumerRoutineGroup: threading.NewRoutineGroup(),
-		producerCount:        runtime.NumCPU(),
-		consumerCount:        runtime.NumCPU() << 1,
-		channel:              make(chan string),
-		quit:                 make(chan struct{}),
-	}
-	queue.SetName(queueName)
-
-	return queue
-}
-
-func (queue *Queue) AddListener(listener QueueListener) {
-	queue.listeners = append(queue.listeners, listener)
-}
-
-func (queue *Queue) Broadcast(message interface{}) {
-	go func() {
-		queue.eventLock.Lock()
-		defer queue.eventLock.Unlock()
-
-		for _, channel := range queue.eventChannels {
-			channel <- message
-		}
-	}()
-}
-
-func (queue *Queue) SetName(name string) {
-	queue.name = name
-	queue.metrics.SetName(name)
-}
-
-func (queue *Queue) SetNumConsumer(count int) {
-	queue.consumerCount = count
-}
-
-func (queue *Queue) SetNumProducer(count int) {
-	queue.producerCount = count
-}
-
-func (queue *Queue) Start() {
-	queue.startProducers(queue.producerCount)
-	queue.startConsumers(queue.consumerCount)
-
-	queue.producerRoutineGroup.Wait()
-	close(queue.channel)
-	queue.consumerRoutineGroup.Wait()
-}
-
-func (queue *Queue) Stop() {
-	close(queue.quit)
-}
-
-func (queue *Queue) consume(eventChan chan interface{}) {
-	var consumer Consumer
-
-	for {
-		var err error
-		if consumer, err = queue.consumerFactory(); err != nil {
-			logx.Errorf("Error on creating consumer: %v", err)
-			time.Sleep(time.Second)
-		} else {
-			break
-		}
-	}
-
-	for {
-		select {
-		case message, ok := <-queue.channel:
-			if ok {
-				queue.consumeOne(consumer, message)
-			} else {
-				logx.Info("Task channel was closed, quitting consumer...")
-				return
-			}
-		case event := <-eventChan:
-			consumer.OnEvent(event)
-		}
-	}
-}
-
-func (queue *Queue) consumeOne(consumer Consumer, message string) {
-	threading.RunSafe(func() {
-		startTime := timex.Now()
-		defer func() {
-			duration := timex.Since(startTime)
-			queue.metrics.Add(stat.Task{
-				Duration: duration,
-			})
-			logx.WithDuration(duration).Infof("%s", message)
-		}()
-
-		if err := consumer.Consume(message); err != nil {
-			logx.Errorf("Error occurred while consuming %v: %v", message, err)
-		}
-	})
-}
-
-func (queue *Queue) pause() {
-	for _, listener := range queue.listeners {
-		listener.OnPause()
-	}
-}
-
-func (queue *Queue) produce() {
-	var producer Producer
-
-	for {
-		var err error
-		if producer, err = queue.producerFactory(); err != nil {
-			logx.Errorf("Error on creating producer: %v", err)
-			time.Sleep(time.Second)
-		} else {
-			break
-		}
-	}
-
-	atomic.AddInt32(&queue.active, 1)
-	producer.AddListener(routineListener{
-		queue: queue,
-	})
-
-	for {
-		select {
-		case <-queue.quit:
-			logx.Info("Quitting producer")
-			return
-		default:
-			if v, ok := queue.produceOne(producer); ok {
-				queue.channel <- v
-			}
-		}
-	}
-}
-
-func (queue *Queue) produceOne(producer Producer) (string, bool) {
-	// avoid panic quit the producer, just log it and continue
-	defer rescue.Recover()
-
-	return producer.Produce()
-}
-
-func (queue *Queue) resume() {
-	for _, listener := range queue.listeners {
-		listener.OnResume()
-	}
-}
-
-func (queue *Queue) startConsumers(number int) {
-	for i := 0; i < number; i++ {
-		eventChan := make(chan interface{})
-		queue.eventLock.Lock()
-		queue.eventChannels = append(queue.eventChannels, eventChan)
-		queue.eventLock.Unlock()
-		queue.consumerRoutineGroup.Run(func() {
-			queue.consume(eventChan)
-		})
-	}
-}
-
-func (queue *Queue) startProducers(number int) {
-	for i := 0; i < number; i++ {
-		queue.producerRoutineGroup.Run(func() {
-			queue.produce()
-		})
-	}
-}
-
-type routineListener struct {
-	queue *Queue
-}
-
-func (rl routineListener) OnProducerPause() {
-	if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
-		rl.queue.pause()
-	}
-}
-
-func (rl routineListener) OnProducerResume() {
-	if atomic.AddInt32(&rl.queue.active, 1) == 1 {
-		rl.queue.resume()
-	}
-}

+ 0 - 94
core/queue/queue_test.go

@@ -1,94 +0,0 @@
-package queue
-
-import (
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-)
-
-const (
-	consumers = 4
-	rounds    = 100
-)
-
-func TestQueue(t *testing.T) {
-	producer := newMockedProducer(rounds)
-	consumer := newMockedConsumer()
-	consumer.wait.Add(consumers)
-	q := NewQueue(func() (Producer, error) {
-		return producer, nil
-	}, func() (Consumer, error) {
-		return consumer, nil
-	})
-	q.AddListener(new(mockedListener))
-	q.SetName("mockqueue")
-	q.SetNumConsumer(consumers)
-	q.SetNumProducer(1)
-	q.pause()
-	q.resume()
-	go func() {
-		producer.wait.Wait()
-		q.Stop()
-	}()
-	q.Start()
-	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
-}
-
-type mockedConsumer struct {
-	count  int32
-	events int32
-	wait   sync.WaitGroup
-}
-
-func newMockedConsumer() *mockedConsumer {
-	return new(mockedConsumer)
-}
-
-func (c *mockedConsumer) Consume(string) error {
-	atomic.AddInt32(&c.count, 1)
-	return nil
-}
-
-func (c *mockedConsumer) OnEvent(interface{}) {
-	if atomic.AddInt32(&c.events, 1) <= consumers {
-		c.wait.Done()
-	}
-}
-
-type mockedProducer struct {
-	total int32
-	count int32
-	wait  sync.WaitGroup
-}
-
-func newMockedProducer(total int32) *mockedProducer {
-	p := new(mockedProducer)
-	p.total = total
-	p.wait.Add(int(total))
-	return p
-}
-
-func (p *mockedProducer) AddListener(listener ProduceListener) {
-}
-
-func (p *mockedProducer) Produce() (string, bool) {
-	if atomic.AddInt32(&p.count, 1) <= p.total {
-		p.wait.Done()
-		return "item", true
-	} else {
-		time.Sleep(time.Second)
-		return "", false
-	}
-}
-
-type mockedListener struct {
-}
-
-func (l *mockedListener) OnPause() {
-}
-
-func (l *mockedListener) OnResume() {
-}

+ 0 - 12
core/queue/util.go

@@ -1,12 +0,0 @@
-package queue
-
-import "strings"
-
-func generateName(pushers []QueuePusher) string {
-	names := make([]string, len(pushers))
-	for i, pusher := range pushers {
-		names[i] = pusher.Name()
-	}
-
-	return strings.Join(names, ",")
-}

+ 0 - 78
core/queue/util_test.go

@@ -1,78 +0,0 @@
-package queue
-
-import (
-	"errors"
-	"math"
-	"testing"
-
-	"zero/core/logx"
-	"zero/core/mathx"
-
-	"github.com/stretchr/testify/assert"
-)
-
-var (
-	proba     = mathx.NewProba()
-	failProba = 0.01
-)
-
-func init() {
-	logx.Disable()
-}
-
-func TestGenerateName(t *testing.T) {
-	pushers := []QueuePusher{
-		&mockedPusher{name: "first"},
-		&mockedPusher{name: "second"},
-		&mockedPusher{name: "third"},
-	}
-
-	assert.Equal(t, "first,second,third", generateName(pushers))
-}
-
-func TestGenerateNameNil(t *testing.T) {
-	var pushers []QueuePusher
-	assert.Equal(t, "", generateName(pushers))
-}
-
-func calcMean(vals []int) float64 {
-	if len(vals) == 0 {
-		return 0
-	}
-
-	var result float64
-	for _, val := range vals {
-		result += float64(val)
-	}
-	return result / float64(len(vals))
-}
-
-func calcVariance(mean float64, vals []int) float64 {
-	if len(vals) == 0 {
-		return 0
-	}
-
-	var result float64
-	for _, val := range vals {
-		result += math.Pow(float64(val)-mean, 2)
-	}
-	return result / float64(len(vals))
-}
-
-type mockedPusher struct {
-	name  string
-	count int
-}
-
-func (p *mockedPusher) Name() string {
-	return p.name
-}
-
-func (p *mockedPusher) Push(s string) error {
-	if proba.TrueOnProba(failProba) {
-		return errors.New("dummy")
-	}
-
-	p.count++
-	return nil
-}

+ 0 - 15
dq/config.go

@@ -1,15 +0,0 @@
-package dq
-
-import "zero/core/stores/redis"
-
-type (
-	Beanstalk struct {
-		Endpoint string
-		Tube     string
-	}
-
-	DqConf struct {
-		Beanstalks []Beanstalk
-		Redis      redis.RedisConf
-	}
-)

+ 0 - 65
dq/connection.go

@@ -1,65 +0,0 @@
-package dq
-
-import (
-	"sync"
-
-	"github.com/beanstalkd/go-beanstalk"
-)
-
-type connection struct {
-	lock     sync.RWMutex
-	endpoint string
-	tube     string
-	conn     *beanstalk.Conn
-}
-
-func newConnection(endpint, tube string) *connection {
-	return &connection{
-		endpoint: endpint,
-		tube:     tube,
-	}
-}
-
-func (c *connection) Close() error {
-	c.lock.Lock()
-	conn := c.conn
-	c.conn = nil
-	defer c.lock.Unlock()
-
-	if conn != nil {
-		return conn.Close()
-	}
-
-	return nil
-}
-
-func (c *connection) get() (*beanstalk.Conn, error) {
-	c.lock.RLock()
-	conn := c.conn
-	c.lock.RUnlock()
-	if conn != nil {
-		return conn, nil
-	}
-
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	var err error
-	c.conn, err = beanstalk.Dial("tcp", c.endpoint)
-	if err != nil {
-		return nil, err
-	}
-
-	c.conn.Tube.Name = c.tube
-	return c.conn, err
-}
-
-func (c *connection) reset() {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	if c.conn != nil {
-		c.conn.Close()
-		c.conn = nil
-	}
-}

+ 0 - 100
dq/consumer.go

@@ -1,100 +0,0 @@
-package dq
-
-import (
-	"strconv"
-	"time"
-
-	"zero/core/hash"
-	"zero/core/logx"
-	"zero/core/service"
-	"zero/core/stores/redis"
-)
-
-const (
-	expiration = 3600 // seconds
-	guardValue = "1"
-	tolerance  = time.Minute * 30
-)
-
-var maxCheckBytes = getMaxTimeLen()
-
-type (
-	Consume func(body []byte)
-
-	Consumer interface {
-		Consume(consume Consume)
-	}
-
-	consumerCluster struct {
-		nodes []*consumerNode
-		red   *redis.Redis
-	}
-)
-
-func NewConsumer(c DqConf) Consumer {
-	var nodes []*consumerNode
-	for _, node := range c.Beanstalks {
-		nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube))
-	}
-	return &consumerCluster{
-		nodes: nodes,
-		red:   c.Redis.NewRedis(),
-	}
-}
-
-func (c *consumerCluster) Consume(consume Consume) {
-	guardedConsume := func(body []byte) {
-		key := hash.Md5Hex(body)
-		body, ok := c.unwrap(body)
-		if !ok {
-			logx.Errorf("discarded: %q", string(body))
-			return
-		}
-
-		ok, err := c.red.SetnxEx(key, guardValue, expiration)
-		if err != nil {
-			logx.Error(err)
-		} else if ok {
-			consume(body)
-		}
-	}
-
-	group := service.NewServiceGroup()
-	for _, node := range c.nodes {
-		group.Add(consumeService{
-			c:       node,
-			consume: guardedConsume,
-		})
-	}
-	group.Start()
-}
-
-func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) {
-	var pos = -1
-	for i := 0; i < maxCheckBytes; i++ {
-		if body[i] == timeSep {
-			pos = i
-			break
-		}
-	}
-	if pos < 0 {
-		return nil, false
-	}
-
-	val, err := strconv.ParseInt(string(body[:pos]), 10, 64)
-	if err != nil {
-		logx.Error(err)
-		return nil, false
-	}
-
-	t := time.Unix(0, val)
-	if t.Add(tolerance).Before(time.Now()) {
-		return nil, false
-	}
-
-	return body[pos+1:], true
-}
-
-func getMaxTimeLen() int {
-	return len(strconv.FormatInt(time.Now().UnixNano(), 10)) + 2
-}

+ 0 - 95
dq/consumernode.go

@@ -1,95 +0,0 @@
-package dq
-
-import (
-	"time"
-
-	"zero/core/logx"
-	"zero/core/syncx"
-
-	"github.com/beanstalkd/go-beanstalk"
-)
-
-type (
-	consumerNode struct {
-		conn *connection
-		tube string
-		on   *syncx.AtomicBool
-	}
-
-	consumeService struct {
-		c       *consumerNode
-		consume Consume
-	}
-)
-
-func newConsumerNode(endpoint, tube string) *consumerNode {
-	return &consumerNode{
-		conn: newConnection(endpoint, tube),
-		tube: tube,
-		on:   syncx.ForAtomicBool(true),
-	}
-}
-
-func (c *consumerNode) dispose() {
-	c.on.Set(false)
-}
-
-func (c *consumerNode) consumeEvents(consume Consume) {
-	for c.on.True() {
-		conn, err := c.conn.get()
-		if err != nil {
-			logx.Error(err)
-			time.Sleep(time.Second)
-			continue
-		}
-
-		// because getting conn takes at most one second, reserve tasks at most 5 seconds,
-		// if don't check on/off here, the conn might not be closed due to
-		// graceful shutdon waits at most 5.5 seconds.
-		if !c.on.True() {
-			break
-		}
-
-		conn.Tube.Name = c.tube
-		conn.TubeSet.Name[c.tube] = true
-		id, body, err := conn.Reserve(reserveTimeout)
-		if err == nil {
-			conn.Delete(id)
-			consume(body)
-			continue
-		}
-
-		// the error can only be beanstalk.NameError or beanstalk.ConnError
-		switch cerr := err.(type) {
-		case beanstalk.ConnError:
-			switch cerr.Err {
-			case beanstalk.ErrTimeout:
-				// timeout error on timeout, just continue the loop
-			case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
-				beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
-				beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
-				// won't reset
-				logx.Error(err)
-			default:
-				// beanstalk.ErrOOM, beanstalk.ErrUnknown and other errors
-				logx.Error(err)
-				c.conn.reset()
-				time.Sleep(time.Second)
-			}
-		default:
-			logx.Error(err)
-		}
-	}
-
-	if err := c.conn.Close(); err != nil {
-		logx.Error(err)
-	}
-}
-
-func (cs consumeService) Start() {
-	cs.c.consumeEvents(cs.consume)
-}
-
-func (cs consumeService) Stop() {
-	cs.c.dispose()
-}

+ 0 - 156
dq/producer.go

@@ -1,156 +0,0 @@
-package dq
-
-import (
-	"bytes"
-	"log"
-	"math/rand"
-	"strconv"
-	"strings"
-	"time"
-
-	"zero/core/errorx"
-	"zero/core/fx"
-	"zero/core/logx"
-)
-
-const (
-	replicaNodes    = 3
-	minWrittenNodes = 2
-)
-
-type (
-	Producer interface {
-		At(body []byte, at time.Time) (string, error)
-		Close() error
-		Delay(body []byte, delay time.Duration) (string, error)
-		Revoke(ids string) error
-	}
-
-	producerCluster struct {
-		nodes []Producer
-	}
-)
-
-func init() {
-	rand.Seed(time.Now().UnixNano())
-}
-
-func NewProducer(beanstalks []Beanstalk) Producer {
-	if len(beanstalks) < minWrittenNodes {
-		log.Fatalf("nodes must be equal or greater than %d", minWrittenNodes)
-	}
-
-	var nodes []Producer
-	for _, node := range beanstalks {
-		nodes = append(nodes, NewProducerNode(node.Endpoint, node.Tube))
-	}
-	return &producerCluster{nodes: nodes}
-}
-
-func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
-	return p.insert(func(node Producer) (string, error) {
-		return node.At(p.wrap(body, at), at)
-	})
-}
-
-func (p *producerCluster) Close() error {
-	var be errorx.BatchError
-	for _, node := range p.nodes {
-		if err := node.Close(); err != nil {
-			be.Add(err)
-		}
-	}
-	return be.Err()
-}
-
-func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
-	return p.insert(func(node Producer) (string, error) {
-		return node.Delay(p.wrap(body, time.Now().Add(delay)), delay)
-	})
-}
-
-func (p *producerCluster) Revoke(ids string) error {
-	var be errorx.BatchError
-
-	fx.From(func(source chan<- interface{}) {
-		for _, node := range p.nodes {
-			source <- node
-		}
-	}).Map(func(item interface{}) interface{} {
-		node := item.(Producer)
-		return node.Revoke(ids)
-	}).ForEach(func(item interface{}) {
-		if item != nil {
-			be.Add(item.(error))
-		}
-	})
-
-	return be.Err()
-}
-
-func (p *producerCluster) cloneNodes() []Producer {
-	return append([]Producer(nil), p.nodes...)
-}
-
-func (p *producerCluster) getWriteNodes() []Producer {
-	if len(p.nodes) <= replicaNodes {
-		return p.nodes
-	}
-
-	nodes := p.cloneNodes()
-	rand.Shuffle(len(nodes), func(i, j int) {
-		nodes[i], nodes[j] = nodes[j], nodes[i]
-	})
-	return nodes[:replicaNodes]
-}
-
-func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string, error) {
-	type idErr struct {
-		id  string
-		err error
-	}
-	var ret []idErr
-	fx.From(func(source chan<- interface{}) {
-		for _, node := range p.getWriteNodes() {
-			source <- node
-		}
-	}).Map(func(item interface{}) interface{} {
-		node := item.(Producer)
-		id, err := fn(node)
-		return idErr{
-			id:  id,
-			err: err,
-		}
-	}).ForEach(func(item interface{}) {
-		ret = append(ret, item.(idErr))
-	})
-
-	var ids []string
-	var be errorx.BatchError
-	for _, val := range ret {
-		if val.err != nil {
-			be.Add(val.err)
-		} else {
-			ids = append(ids, val.id)
-		}
-	}
-
-	jointId := strings.Join(ids, idSep)
-	if len(ids) >= minWrittenNodes {
-		return jointId, nil
-	}
-
-	if err := p.Revoke(jointId); err != nil {
-		logx.Error(err)
-	}
-
-	return "", be.Err()
-}
-
-func (p *producerCluster) wrap(body []byte, at time.Time) []byte {
-	var builder bytes.Buffer
-	builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
-	builder.WriteByte(timeSep)
-	builder.Write(body)
-	return builder.Bytes()
-}

+ 0 - 98
dq/producernode.go

@@ -1,98 +0,0 @@
-package dq
-
-import (
-	"errors"
-	"fmt"
-	"strconv"
-	"strings"
-	"time"
-
-	"github.com/beanstalkd/go-beanstalk"
-)
-
-var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
-
-type producerNode struct {
-	endpoint string
-	tube     string
-	conn     *connection
-}
-
-func NewProducerNode(endpoint, tube string) Producer {
-	return &producerNode{
-		endpoint: endpoint,
-		tube:     tube,
-		conn:     newConnection(endpoint, tube),
-	}
-}
-
-func (p *producerNode) At(body []byte, at time.Time) (string, error) {
-	now := time.Now()
-	if at.Before(now) {
-		return "", ErrTimeBeforeNow
-	}
-
-	duration := at.Sub(now)
-	return p.Delay(body, duration)
-}
-
-func (p *producerNode) Close() error {
-	return p.conn.Close()
-}
-
-func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
-	conn, err := p.conn.get()
-	if err != nil {
-		return "", err
-	}
-
-	id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
-	if err == nil {
-		return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
-	}
-
-	// the error can only be beanstalk.NameError or beanstalk.ConnError
-	// just return when the error is beanstalk.NameError, don't reset
-	switch cerr := err.(type) {
-	case beanstalk.ConnError:
-		switch cerr.Err {
-		case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
-			beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
-			beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
-			// won't reset
-		default:
-			// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
-			p.conn.reset()
-		}
-	}
-
-	return "", err
-}
-
-func (p *producerNode) Revoke(jointId string) error {
-	ids := strings.Split(jointId, idSep)
-	for _, id := range ids {
-		fields := strings.Split(id, "/")
-		if len(fields) < 3 {
-			continue
-		}
-		if fields[0] != p.endpoint || fields[1] != p.tube {
-			continue
-		}
-
-		conn, err := p.conn.get()
-		if err != nil {
-			return err
-		}
-
-		n, err := strconv.ParseUint(fields[2], 10, 64)
-		if err != nil {
-			return err
-		}
-
-		return conn.Delete(n)
-	}
-
-	// if not in this beanstalk, ignore
-	return nil
-}

+ 0 - 15
dq/vars.go

@@ -1,15 +0,0 @@
-package dq
-
-import "time"
-
-const (
-	PriHigh   = 1
-	PriNormal = 2
-	PriLow    = 3
-
-	defaultTimeToRun = time.Second * 5
-	reserveTimeout   = time.Second * 5
-
-	idSep   = ","
-	timeSep = '/'
-)

+ 0 - 42
example/beanstalk/consumer/consumer.go

@@ -1,42 +0,0 @@
-package main
-
-import (
-	"fmt"
-
-	"zero/core/stores/redis"
-	"zero/dq"
-)
-
-func main() {
-	consumer := dq.NewConsumer(dq.DqConf{
-		Beanstalks: []dq.Beanstalk{
-			{
-				Endpoint: "localhost:11300",
-				Tube:     "tube",
-			},
-			{
-				Endpoint: "localhost:11301",
-				Tube:     "tube",
-			},
-			{
-				Endpoint: "localhost:11302",
-				Tube:     "tube",
-			},
-			{
-				Endpoint: "localhost:11303",
-				Tube:     "tube",
-			},
-			{
-				Endpoint: "localhost:11304",
-				Tube:     "tube",
-			},
-		},
-		Redis: redis.RedisConf{
-			Host: "localhost:6379",
-			Type: redis.NodeType,
-		},
-	})
-	consumer.Consume(func(body []byte) {
-		fmt.Println(string(body))
-	})
-}

+ 0 - 40
example/beanstalk/producer/producer.go

@@ -1,40 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"strconv"
-	"time"
-
-	"zero/dq"
-)
-
-func main() {
-	producer := dq.NewProducer([]dq.Beanstalk{
-		{
-			Endpoint: "localhost:11300",
-			Tube:     "tube",
-		},
-		{
-			Endpoint: "localhost:11301",
-			Tube:     "tube",
-		},
-		{
-			Endpoint: "localhost:11302",
-			Tube:     "tube",
-		},
-		{
-			Endpoint: "localhost:11303",
-			Tube:     "tube",
-		},
-		{
-			Endpoint: "localhost:11304",
-			Tube:     "tube",
-		},
-	})
-	for i := 0; i < 5; i++ {
-		_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*10))
-		if err != nil {
-			fmt.Println(err)
-		}
-	}
-}

+ 0 - 11
example/jobqueue/jobqueue.go

@@ -1,11 +0,0 @@
-package main
-
-import "zero/core/threading"
-
-func main() {
-	q := threading.NewTaskRunner(5)
-	q.Schedule(func() {
-		panic("hello")
-	})
-	select {}
-}

+ 0 - 12
example/kmq/consumer/config.json

@@ -1,12 +0,0 @@
-{
-    "Name": "kmq",
-    "Brokers": [
-        "172.16.56.64:19092",
-        "172.16.56.65:19092",
-        "172.16.56.66:19092"
-    ],
-    "Group": "adhoc",
-    "Topic": "kevin",
-    "Offset": "first",
-    "NumProducers": 1
-}

+ 0 - 20
example/kmq/consumer/queue.go

@@ -1,20 +0,0 @@
-package main
-
-import (
-	"fmt"
-
-	"zero/core/conf"
-	"zero/kq"
-)
-
-func main() {
-	var c kq.KqConf
-	conf.MustLoad("config.json", &c)
-
-	q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
-		fmt.Printf("=> %s\n", v)
-		return nil
-	}))
-	defer q.Stop()
-	q.Start()
-}

+ 0 - 51
example/kmq/producer/produce.go

@@ -1,51 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"log"
-	"math/rand"
-	"strconv"
-	"time"
-
-	"zero/core/cmdline"
-	"zero/kq"
-)
-
-type message struct {
-	Key     string `json:"key"`
-	Value   string `json:"value"`
-	Payload string `json:"message"`
-}
-
-func main() {
-	pusher := kq.NewPusher([]string{
-		"172.16.56.64:19092",
-		"172.16.56.65:19092",
-		"172.16.56.66:19092",
-	}, "kevin")
-
-	ticker := time.NewTicker(time.Millisecond)
-	for round := 0; round < 3; round++ {
-		select {
-		case <-ticker.C:
-			count := rand.Intn(100)
-			m := message{
-				Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
-				Value:   fmt.Sprintf("%d,%d", round, count),
-				Payload: fmt.Sprintf("%d,%d", round, count),
-			}
-			body, err := json.Marshal(m)
-			if err != nil {
-				log.Fatal(err)
-			}
-
-			fmt.Println(string(body))
-			if err := pusher.Push(string(body)); err != nil {
-				log.Fatal(err)
-			}
-		}
-	}
-
-	cmdline.EnterToContinue()
-}

+ 0 - 86
example/queue/poll/poller.go

@@ -1,86 +0,0 @@
-package main
-
-import (
-	"flag"
-	"fmt"
-	"log"
-	"sync"
-	"time"
-
-	"zero/core/discov"
-	"zero/core/lang"
-	"zero/core/logx"
-	"zero/core/service"
-	"zero/core/stores/redis"
-	"zero/rq"
-)
-
-var (
-	redisHost  = flag.String("redis", "localhost:6379", "")
-	redisType  = flag.String("type", "node", "")
-	redisKey   = flag.String("key", "queue", "")
-	producers  = flag.Int("producers", 1, "")
-	dropBefore = flag.Int64("drop", 0, "messages before seconds to drop")
-)
-
-type Consumer struct {
-	lock      sync.Mutex
-	resources map[string]interface{}
-}
-
-func NewConsumer() *Consumer {
-	return &Consumer{
-		resources: make(map[string]interface{}),
-	}
-}
-
-func (c *Consumer) Consume(msg string) error {
-	fmt.Println("=>", msg)
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	c.resources[msg] = lang.Placeholder
-
-	return nil
-}
-
-func (c *Consumer) OnEvent(event interface{}) {
-	fmt.Printf("event: %+v\n", event)
-}
-
-func main() {
-	flag.Parse()
-
-	consumer := NewConsumer()
-	q, err := rq.NewMessageQueue(rq.RmqConf{
-		ServiceConf: service.ServiceConf{
-			Name: "queue",
-			Log: logx.LogConf{
-				Path:     "logs",
-				KeepDays: 3,
-				Compress: true,
-			},
-		},
-		Redis: redis.RedisKeyConf{
-			RedisConf: redis.RedisConf{
-				Host: *redisHost,
-				Type: *redisType,
-			},
-			Key: *redisKey,
-		},
-		Etcd: discov.EtcdConf{
-			Hosts: []string{
-				"localhost:2379",
-			},
-			Key: "queue",
-		},
-		DropBefore:   *dropBefore,
-		NumProducers: *producers,
-	}, rq.WithHandler(consumer), rq.WithRenewId(time.Now().UnixNano()))
-	if err != nil {
-		log.Fatal(err)
-	}
-	defer q.Stop()
-
-	q.Start()
-}

+ 0 - 31
example/queue/push/pusher.go

@@ -1,31 +0,0 @@
-package main
-
-import (
-	"log"
-	"strconv"
-	"time"
-
-	"zero/core/discov"
-	"zero/rq"
-
-	"github.com/google/gops/agent"
-)
-
-func main() {
-	if err := agent.Listen(agent.Options{}); err != nil {
-		log.Fatal(err)
-	}
-
-	pusher, err := rq.NewPusher([]string{"localhost:2379"}, "queue", rq.WithConsistentStrategy(
-		func(msg string) (string, string, error) {
-			return msg, msg, nil
-		}, discov.BalanceWithId()), rq.WithServerSensitive())
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	for i := 0; ; i++ {
-		pusher.Push(strconv.Itoa(i))
-		time.Sleep(time.Second)
-	}
-}

+ 0 - 62
example/redis/cluster.go

@@ -1,62 +0,0 @@
-package main
-
-import (
-	"flag"
-	"log"
-
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/service"
-	"zero/core/stores/redis"
-	"zero/rq"
-)
-
-var (
-	host = flag.String("s", "10.24.232.63:7002", "server address")
-	mode = flag.String("m", "queue", "cluster test mode")
-)
-
-type bridgeHandler struct {
-	pusher queue.QueuePusher
-}
-
-func newBridgeHandler() rq.ConsumeHandler {
-	return bridgeHandler{}
-}
-
-func (h bridgeHandler) Consume(str string) error {
-	logx.Info("=>", str)
-	return nil
-}
-
-func main() {
-	flag.Parse()
-
-	if *mode == "queue" {
-		mq, err := rq.NewMessageQueue(rq.RmqConf{
-			ServiceConf: service.ServiceConf{
-				Log: logx.LogConf{
-					Path: "logs",
-				},
-			},
-			Redis: redis.RedisKeyConf{
-				RedisConf: redis.RedisConf{
-					Host: *host,
-					Type: "cluster",
-				},
-				Key: "notexist",
-			},
-			NumProducers: 1,
-		}, rq.WithHandler(newBridgeHandler()))
-		if err != nil {
-			log.Fatal(err)
-		}
-		defer mq.Stop()
-
-		mq.Start()
-	} else {
-		rds := redis.NewRedis(*host, "cluster")
-		rds.Llen("notexist")
-		select {}
-	}
-}

+ 0 - 142
example/sqlc/user.go

@@ -1,142 +0,0 @@
-package main
-
-import (
-	"database/sql"
-	"fmt"
-
-	"zero/core/stores/cache"
-	"zero/core/stores/sqlc"
-	"zero/core/stores/sqlx"
-	"zero/kq"
-)
-
-var (
-	userRows = "id, mobile, name, sex"
-
-	cacheUserMobilePrefix = "cache#user#mobile#"
-	cacheUserIdPrefix     = "cache#user#id#"
-
-	ErrNotFound = sqlc.ErrNotFound
-)
-
-type (
-	User struct {
-		Id     int64  `db:"id" json:"id,omitempty"`
-		Mobile string `db:"mobile" json:"mobile,omitempty"`
-		Name   string `db:"name" json:"name,omitempty"`
-		Sex    int    `db:"sex" json:"sex,omitempty"`
-	}
-
-	UserModel struct {
-		sqlc.CachedConn
-		// sqlx.SqlConn
-		table string
-
-		// kafka use kq not kmq
-		push *kq.Pusher
-	}
-)
-
-func NewUserModel(db sqlx.SqlConn, c cache.CacheConf, table string, pusher *kq.Pusher) *UserModel {
-	return &UserModel{
-		CachedConn: sqlc.NewConn(db, c),
-		table:      table,
-		push:       pusher,
-	}
-}
-
-func (um *UserModel) FindOne(id int64) (*User, error) {
-	key := fmt.Sprintf("%s%d", cacheUserIdPrefix, id)
-	var user User
-	err := um.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
-		query := fmt.Sprintf("SELECT %s FROM user WHERE id=?", userRows)
-		return conn.QueryRow(v, query, id)
-	})
-	switch err {
-	case nil:
-		return &user, nil
-	case sqlc.ErrNotFound:
-		return nil, ErrNotFound
-	default:
-		return nil, err
-	}
-}
-
-func (um *UserModel) FindByMobile(mobile string) (*User, error) {
-	var user User
-	key := fmt.Sprintf("%s%s", cacheUserMobilePrefix, mobile)
-	err := um.QueryRowIndex(&user, key, func(primary interface{}) string {
-		return fmt.Sprintf("%s%d", cacheUserIdPrefix, primary.(int64))
-	}, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
-		query := fmt.Sprintf("SELECT %s FROM user WHERE mobile=?", userRows)
-		if err := conn.QueryRow(&user, query, mobile); err != nil {
-			return nil, err
-		}
-		return user.Id, nil
-	}, func(conn sqlx.SqlConn, v interface{}, primary interface{}) error {
-		return conn.QueryRow(v, "SELECT * FROM user WHERE id=?", primary)
-	})
-	switch err {
-	case nil:
-		return &user, nil
-	case sqlc.ErrNotFound:
-		return nil, ErrNotFound
-	default:
-		return nil, err
-	}
-}
-
-// Count for no cache
-func (um *UserModel) Count() (int64, error) {
-	var count int64
-	err := um.QueryRowNoCache(&count, "SELECT count(1) FROM user")
-	if err != nil {
-		return 0, err
-	}
-	return count, nil
-}
-
-// Query rows
-func (um *UserModel) FindByName(name string) ([]*User, error) {
-	var users []*User
-	query := fmt.Sprintf("SELECT %s FROM user WHERE name=?", userRows)
-	err := um.QueryRowsNoCache(&userRows, query, name)
-	if err != nil {
-		return nil, err
-	}
-	return users, nil
-}
-
-func (um *UserModel) UpdateSexById(sex int, id int64) error {
-	key := fmt.Sprintf("%s%d", cacheUserIdPrefix, id)
-	_, err := um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
-		query := fmt.Sprintf("UPDATE user SET sex=? WHERE id=?")
-		return conn.Exec(query, sex, id)
-	}, key)
-	return err
-}
-
-func (um *UserModel) UpdateMobileById(mobile string, id int64) error {
-	idKey := fmt.Sprintf("%s%d", cacheUserIdPrefix, id)
-	mobileKey := fmt.Sprintf("%s%s", cacheUserMobilePrefix, mobile)
-	_, err := um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
-		query := fmt.Sprintf("UPDATE user SET mobile=? WHERE id=?")
-		return conn.Exec(query, mobile, id)
-	}, idKey, mobileKey)
-	return err
-}
-
-func (um *UserModel) Update(u *User) error {
-	oldUser, err := um.FindOne(u.Id)
-	if err != nil {
-		return err
-	}
-
-	idKey := fmt.Sprintf("%s%d", cacheUserIdPrefix, oldUser.Id)
-	mobileKey := fmt.Sprintf("%s%s", cacheUserMobilePrefix, oldUser.Mobile)
-	_, err = um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
-		query := fmt.Sprintf("UPDATE user SET mobile=?, name=?, sex=? WHERE id=?")
-		return conn.Exec(query, u.Mobile, u.Name, u.Sex, u.Id)
-	}, idKey, mobileKey)
-	return err
-}

+ 0 - 5
go.mod

@@ -4,10 +4,8 @@ go 1.14
 
 require (
 	github.com/DATA-DOG/go-sqlmock v1.4.1
-	github.com/DataDog/zstd v1.4.5 // indirect
 	github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
 	github.com/alicebob/miniredis v2.5.0+incompatible
-	github.com/beanstalkd/go-beanstalk v0.1.0
 	github.com/coreos/bbolt v1.3.1-coreos.6 // indirect
 	github.com/coreos/etcd v3.3.18+incompatible
 	github.com/coreos/go-semver v0.2.0 // indirect
@@ -24,7 +22,6 @@ require (
 	github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
 	github.com/golang/mock v1.4.3
 	github.com/golang/protobuf v1.4.2
-	github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect
 	github.com/gomodule/redigo v2.0.0+incompatible // indirect
 	github.com/google/btree v1.0.0 // indirect
 	github.com/google/gops v0.3.7
@@ -49,13 +46,11 @@ require (
 	github.com/pierrec/lz4 v2.5.1+incompatible // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/prometheus/client_golang v1.5.1
-	github.com/segmentio/kafka-go v0.3.5
 	github.com/soheilhy/cmux v0.1.4 // indirect
 	github.com/spaolacci/murmur3 v1.1.0
 	github.com/stretchr/testify v1.5.1
 	github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
 	github.com/urfave/cli v1.22.4
-	github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc // indirect
 	github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
 	github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
 	go.etcd.io/etcd v3.3.17+incompatible

+ 0 - 18
go.sum

@@ -2,9 +2,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM=
 github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
-github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
-github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
-github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/StackExchange/wmi v0.0.0-20170410192909-ea383cf3ba6e/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
@@ -16,8 +13,6 @@ github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGn
 github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
 github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
 github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
-github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w=
-github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -52,8 +47,6 @@ github.com/dchest/siphash v1.2.1 h1:4cLinnzVJDKxTCl9B01807Yiy+W7ZzVHj/KIroQRvT4=
 github.com/dchest/siphash v1.2.1/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
-github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
-github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
@@ -100,9 +93,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
 github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws=
-github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
 github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
 github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
@@ -222,8 +212,6 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
-github.com/segmentio/kafka-go v0.3.5 h1:2JVT1inno7LxEASWj+HflHh5sWGfM0gkRiLAxkXhGG4=
-github.com/segmentio/kafka-go v0.3.5/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
 github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U=
 github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
@@ -248,11 +236,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVD
 github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA=
 github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
-github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
-github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
-github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
-github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc h1:vIp1tjhVogU0yBy7w96P027ewvNPeH6gzuNcoc+NReU=
-github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 h1:YdYsPAZ2pC6Tow/nPZOPQ96O3hm/ToAkGsPLzedXERk=
@@ -275,7 +258,6 @@ go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw=
 go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=

+ 0 - 21
kq/config.go

@@ -1,21 +0,0 @@
-package kq
-
-import "zero/core/service"
-
-const (
-	firstOffset = "first"
-	lastOffset  = "last"
-)
-
-type KqConf struct {
-	service.ServiceConf
-	Brokers      []string
-	Group        string
-	Topic        string
-	Offset       string `json:",options=first|last,default=last"`
-	NumConns     int    `json:",default=1"`
-	NumProducers int    `json:",default=8"`
-	NumConsumers int    `json:",default=8"`
-	MinBytes     int    `json:",default=10240"`    // 10K
-	MaxBytes     int    `json:",default=10485760"` // 10M
-}

+ 0 - 101
kq/pusher.go

@@ -1,101 +0,0 @@
-package kq
-
-import (
-	"context"
-	"strconv"
-	"time"
-
-	"zero/core/executors"
-	"zero/core/logx"
-
-	"github.com/segmentio/kafka-go"
-	"github.com/segmentio/kafka-go/snappy"
-)
-
-type (
-	PushOption func(options *chunkOptions)
-
-	Pusher struct {
-		produer  *kafka.Writer
-		topic    string
-		executor *executors.ChunkExecutor
-	}
-
-	chunkOptions struct {
-		chunkSize     int
-		flushInterval time.Duration
-	}
-)
-
-func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
-	producer := kafka.NewWriter(kafka.WriterConfig{
-		Brokers:          addrs,
-		Topic:            topic,
-		Balancer:         &kafka.LeastBytes{},
-		CompressionCodec: snappy.NewCompressionCodec(),
-	})
-
-	pusher := &Pusher{
-		produer: producer,
-		topic:   topic,
-	}
-	pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
-		chunk := make([]kafka.Message, len(tasks))
-		for i := range tasks {
-			chunk[i] = tasks[i].(kafka.Message)
-		}
-		if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
-			logx.Error(err)
-		}
-	}, newOptions(opts)...)
-
-	return pusher
-}
-
-func (p *Pusher) Close() error {
-	return p.produer.Close()
-}
-
-func (p *Pusher) Name() string {
-	return p.topic
-}
-
-func (p *Pusher) Push(v string) error {
-	msg := kafka.Message{
-		Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
-		Value: []byte(v),
-	}
-	if p.executor != nil {
-		return p.executor.Add(msg, len(v))
-	} else {
-		return p.produer.WriteMessages(context.Background(), msg)
-	}
-}
-
-func WithChunkSize(chunkSize int) PushOption {
-	return func(options *chunkOptions) {
-		options.chunkSize = chunkSize
-	}
-}
-
-func WithFlushInterval(interval time.Duration) PushOption {
-	return func(options *chunkOptions) {
-		options.flushInterval = interval
-	}
-}
-
-func newOptions(opts []PushOption) []executors.ChunkOption {
-	var options chunkOptions
-	for _, opt := range opts {
-		opt(&options)
-	}
-
-	var chunkOpts []executors.ChunkOption
-	if options.chunkSize > 0 {
-		chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
-	}
-	if options.flushInterval > 0 {
-		chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
-	}
-	return chunkOpts
-}

+ 0 - 229
kq/queue.go

@@ -1,229 +0,0 @@
-package kq
-
-import (
-	"context"
-	"io"
-	"log"
-	"time"
-
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/service"
-	"zero/core/stat"
-	"zero/core/threading"
-	"zero/core/timex"
-
-	"github.com/segmentio/kafka-go"
-	_ "github.com/segmentio/kafka-go/gzip"
-	_ "github.com/segmentio/kafka-go/lz4"
-	_ "github.com/segmentio/kafka-go/snappy"
-)
-
-const (
-	defaultCommitInterval = time.Second
-	defaultMaxWait        = time.Second
-)
-
-type (
-	ConsumeHandle func(key, value string) error
-
-	ConsumeHandler interface {
-		Consume(key, value string) error
-	}
-
-	queueOptions struct {
-		commitInterval time.Duration
-		maxWait        time.Duration
-		metrics        *stat.Metrics
-	}
-
-	QueueOption func(*queueOptions)
-
-	kafkaQueue struct {
-		c                KqConf
-		consumer         *kafka.Reader
-		handler          ConsumeHandler
-		channel          chan kafka.Message
-		producerRoutines *threading.RoutineGroup
-		consumerRoutines *threading.RoutineGroup
-		metrics          *stat.Metrics
-	}
-
-	kafkaQueues struct {
-		queues []queue.MessageQueue
-		group  *service.ServiceGroup
-	}
-)
-
-func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
-	q, err := NewQueue(c, handler, opts...)
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	return q
-}
-
-func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
-	if err := c.SetUp(); err != nil {
-		return nil, err
-	}
-
-	var options queueOptions
-	for _, opt := range opts {
-		opt(&options)
-	}
-	ensureQueueOptions(c, &options)
-
-	if c.NumConns < 1 {
-		c.NumConns = 1
-	}
-	q := kafkaQueues{
-		group: service.NewServiceGroup(),
-	}
-	for i := 0; i < c.NumConns; i++ {
-		q.queues = append(q.queues, newKafkaQueue(c, handler, options))
-	}
-
-	return q, nil
-}
-
-func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
-	var offset int64
-	if c.Offset == firstOffset {
-		offset = kafka.FirstOffset
-	} else {
-		offset = kafka.LastOffset
-	}
-	consumer := kafka.NewReader(kafka.ReaderConfig{
-		Brokers:        c.Brokers,
-		GroupID:        c.Group,
-		Topic:          c.Topic,
-		StartOffset:    offset,
-		MinBytes:       c.MinBytes, // 10KB
-		MaxBytes:       c.MaxBytes, // 10MB
-		MaxWait:        options.maxWait,
-		CommitInterval: options.commitInterval,
-	})
-
-	return &kafkaQueue{
-		c:                c,
-		consumer:         consumer,
-		handler:          handler,
-		channel:          make(chan kafka.Message),
-		producerRoutines: threading.NewRoutineGroup(),
-		consumerRoutines: threading.NewRoutineGroup(),
-		metrics:          options.metrics,
-	}
-}
-
-func (q *kafkaQueue) Start() {
-	q.startConsumers()
-	q.startProducers()
-
-	q.producerRoutines.Wait()
-	close(q.channel)
-	q.consumerRoutines.Wait()
-}
-
-func (q *kafkaQueue) Stop() {
-	q.consumer.Close()
-	logx.Close()
-}
-
-func (q *kafkaQueue) consumeOne(key, val string) error {
-	startTime := timex.Now()
-	err := q.handler.Consume(key, val)
-	q.metrics.Add(stat.Task{
-		Duration: timex.Since(startTime),
-	})
-	return err
-}
-
-func (q *kafkaQueue) startConsumers() {
-	for i := 0; i < q.c.NumConsumers; i++ {
-		q.consumerRoutines.Run(func() {
-			for msg := range q.channel {
-				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
-					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
-				}
-			}
-		})
-	}
-}
-
-func (q *kafkaQueue) startProducers() {
-	for i := 0; i < q.c.NumProducers; i++ {
-		q.producerRoutines.Run(func() {
-			for {
-				msg, err := q.consumer.ReadMessage(context.Background())
-				// io.EOF means consumer closed
-				// io.ErrClosedPipe means committing messages on the consumer,
-				// kafka will refire the messages on uncommitted messages, ignore
-				if err == io.EOF || err == io.ErrClosedPipe {
-					return
-				}
-				if err != nil {
-					logx.Errorf("Error on reading mesage, %q", err.Error())
-					continue
-				}
-				q.channel <- msg
-			}
-		})
-	}
-}
-
-func (q kafkaQueues) Start() {
-	for _, each := range q.queues {
-		q.group.Add(each)
-	}
-	q.group.Start()
-}
-
-func (q kafkaQueues) Stop() {
-	q.group.Stop()
-}
-
-func WithCommitInterval(interval time.Duration) QueueOption {
-	return func(options *queueOptions) {
-		options.commitInterval = interval
-	}
-}
-
-func WithHandle(handle ConsumeHandle) ConsumeHandler {
-	return innerConsumeHandler{
-		handle: handle,
-	}
-}
-
-func WithMaxWait(wait time.Duration) QueueOption {
-	return func(options *queueOptions) {
-		options.maxWait = wait
-	}
-}
-
-func WithMetrics(metrics *stat.Metrics) QueueOption {
-	return func(options *queueOptions) {
-		options.metrics = metrics
-	}
-}
-
-type innerConsumeHandler struct {
-	handle ConsumeHandle
-}
-
-func (ch innerConsumeHandler) Consume(k, v string) error {
-	return ch.handle(k, v)
-}
-
-func ensureQueueOptions(c KqConf, options *queueOptions) {
-	if options.commitInterval == 0 {
-		options.commitInterval = defaultCommitInterval
-	}
-	if options.maxWait == 0 {
-		options.maxWait = defaultMaxWait
-	}
-	if options.metrics == nil {
-		options.metrics = stat.NewMetrics(c.Name)
-	}
-}

+ 0 - 18
rq/config.go

@@ -1,18 +0,0 @@
-package rq
-
-import (
-	"zero/core/discov"
-	"zero/core/service"
-	"zero/core/stores/redis"
-)
-
-type RmqConf struct {
-	service.ServiceConf
-	Redis           redis.RedisKeyConf
-	Etcd            discov.EtcdConf `json:",optional"`
-	NumProducers    int             `json:",optional"`
-	NumConsumers    int             `json:",optional"`
-	Timeout         int64           `json:",optional"`
-	DropBefore      int64           `json:",optional"`
-	ServerSensitive bool            `json:",default=false"`
-}

+ 0 - 19
rq/etc/config.json

@@ -1,19 +0,0 @@
-{
-    "Log": {
-        "Access": "logs/access.log",
-        "Error": "logs/error.log",
-        "Stat": "logs/stat.log"
-    },
-    "MetricsUrl": "http://localhost:2222/add",
-    "Redis": {
-        "Host": "localhost:6379",
-        "Type": "node",
-        "Key": "reqs"
-    },
-    "Etcd": {
-        "Hosts": [
-            "localhost:2379"
-        ],
-        "EtcdKey": "rq"
-    }
-}

+ 0 - 19
rq/internal/conf.go

@@ -1,19 +0,0 @@
-package internal
-
-import (
-	"zero/core/queue"
-	"zero/core/stores/redis"
-)
-
-type RedisKeyConf struct {
-	redis.RedisConf
-	Key string `json:",optional"`
-}
-
-func (rkc RedisKeyConf) NewProducer(opts ...ProducerOption) (queue.Producer, error) {
-	return newProducer(rkc.NewRedis(), rkc.Key, opts...)
-}
-
-func (rkc RedisKeyConf) NewPusher(opts ...PusherOption) queue.QueuePusher {
-	return NewPusher(rkc.NewRedis(), rkc.Key, opts...)
-}

+ 0 - 7
rq/internal/const.go

@@ -1,7 +0,0 @@
-package internal
-
-const (
-	Delimeter             = "/"
-	ServerSensitivePrefix = '*'
-	TimedQueueType        = "timed"
-)

+ 0 - 39
rq/internal/hashchange.go

@@ -1,39 +0,0 @@
-package internal
-
-import (
-	"math/rand"
-
-	"zero/core/hash"
-)
-
-type HashChange struct {
-	id      int64
-	oldHash *hash.ConsistentHash
-	newHash *hash.ConsistentHash
-}
-
-func NewHashChange(oldHash, newHash *hash.ConsistentHash) HashChange {
-	return HashChange{
-		id:      rand.Int63(),
-		oldHash: oldHash,
-		newHash: newHash,
-	}
-}
-
-func (hc HashChange) GetId() int64 {
-	return hc.id
-}
-
-func (hc HashChange) ShallEvict(key interface{}) bool {
-	oldTarget, oldOk := hc.oldHash.Get(key)
-	if !oldOk {
-		return false
-	}
-
-	newTarget, newOk := hc.newHash.Get(key)
-	if !newOk {
-		return false
-	}
-
-	return oldTarget != newTarget
-}

+ 0 - 6
rq/internal/message.go

@@ -1,6 +0,0 @@
-package internal
-
-type TimedMessage struct {
-	Time    int64  `json:"time"`
-	Payload string `json:"payload"`
-}

+ 0 - 82
rq/internal/redisqueue_test.go

@@ -1,82 +0,0 @@
-package internal
-
-import (
-	"strconv"
-	"sync"
-	"testing"
-
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/stores/redis"
-
-	"github.com/alicebob/miniredis"
-	"github.com/stretchr/testify/assert"
-)
-
-func init() {
-	logx.Disable()
-}
-
-func TestRedisQueue(t *testing.T) {
-	const (
-		total = 1000
-		key   = "queue"
-	)
-	r, err := miniredis.Run()
-	assert.Nil(t, err)
-
-	c := RedisKeyConf{
-		RedisConf: redis.RedisConf{
-			Host: r.Addr(),
-			Type: redis.NodeType,
-		},
-		Key: key,
-	}
-
-	pusher := NewPusher(c.NewRedis(), key, WithTime())
-	assert.True(t, len(pusher.Name()) > 0)
-	for i := 0; i < total; i++ {
-		err := pusher.Push(strconv.Itoa(i))
-		assert.Nil(t, err)
-	}
-
-	consumer := new(mockedConsumer)
-	consumer.wait.Add(total)
-	q := queue.NewQueue(func() (queue.Producer, error) {
-		return c.NewProducer(TimeSensitive(5))
-	}, func() (queue.Consumer, error) {
-		return consumer, nil
-	})
-	q.SetNumProducer(1)
-	q.SetNumConsumer(1)
-	go func() {
-		q.Start()
-	}()
-	consumer.wait.Wait()
-	q.Stop()
-
-	var expect int
-	for i := 0; i < total; i++ {
-		expect ^= i
-	}
-	assert.Equal(t, expect, consumer.xor)
-}
-
-type mockedConsumer struct {
-	wait sync.WaitGroup
-	xor  int
-}
-
-func (c *mockedConsumer) Consume(s string) error {
-	val, err := strconv.Atoi(s)
-	if err != nil {
-		return err
-	}
-
-	c.xor ^= val
-	c.wait.Done()
-	return nil
-}
-
-func (c *mockedConsumer) OnEvent(event interface{}) {
-}

+ 0 - 166
rq/internal/redisqueueproducer.go

@@ -1,166 +0,0 @@
-package internal
-
-import (
-	"fmt"
-	"sync"
-	"time"
-
-	"zero/core/jsonx"
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/stores/redis"
-)
-
-const (
-	logIntervalMillis  = 1000
-	retryRedisInterval = time.Second
-)
-
-type (
-	ProducerOption func(p queue.Producer) queue.Producer
-
-	RedisQueueProducer struct {
-		name      string
-		store     *redis.Redis
-		key       string
-		redisNode redis.ClosableNode
-		listeners []queue.ProduceListener
-	}
-)
-
-func NewProducerFactory(store *redis.Redis, key string, opts ...ProducerOption) queue.ProducerFactory {
-	return func() (queue.Producer, error) {
-		return newProducer(store, key, opts...)
-	}
-}
-
-func (p *RedisQueueProducer) AddListener(listener queue.ProduceListener) {
-	p.listeners = append(p.listeners, listener)
-}
-
-func (p *RedisQueueProducer) Name() string {
-	return p.name
-}
-
-func (p *RedisQueueProducer) Produce() (string, bool) {
-	lessLogger := logx.NewLessLogger(logIntervalMillis)
-
-	for {
-		value, ok, err := p.store.BlpopEx(p.redisNode, p.key)
-		if err == nil {
-			return value, ok
-		} else if err == redis.Nil {
-			// timed out without elements popped
-			continue
-		} else {
-			lessLogger.Errorf("Error on blpop: %v", err)
-			p.waitForRedisAvailable()
-		}
-	}
-}
-
-func newProducer(store *redis.Redis, key string, opts ...ProducerOption) (queue.Producer, error) {
-	redisNode, err := redis.CreateBlockingNode(store)
-	if err != nil {
-		return nil, err
-	}
-
-	var producer queue.Producer = &RedisQueueProducer{
-		name:      fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
-		store:     store,
-		key:       key,
-		redisNode: redisNode,
-	}
-
-	for _, opt := range opts {
-		producer = opt(producer)
-	}
-
-	return producer, nil
-}
-
-func (p *RedisQueueProducer) resetRedisConnection() error {
-	if p.redisNode != nil {
-		p.redisNode.Close()
-		p.redisNode = nil
-	}
-
-	redisNode, err := redis.CreateBlockingNode(p.store)
-	if err != nil {
-		return err
-	}
-
-	p.redisNode = redisNode
-	return nil
-}
-
-func (p *RedisQueueProducer) waitForRedisAvailable() {
-	var paused bool
-	var pauseOnce sync.Once
-
-	for {
-		if err := p.resetRedisConnection(); err != nil {
-			pauseOnce.Do(func() {
-				paused = true
-				for _, listener := range p.listeners {
-					listener.OnProducerPause()
-				}
-			})
-			logx.Errorf("Error occurred while connect to redis: %v", err)
-			time.Sleep(retryRedisInterval)
-		} else {
-			break
-		}
-	}
-
-	if paused {
-		for _, listener := range p.listeners {
-			listener.OnProducerResume()
-		}
-	}
-}
-
-func TimeSensitive(seconds int64) ProducerOption {
-	return func(p queue.Producer) queue.Producer {
-		if seconds > 0 {
-			return autoDropQueueProducer{
-				seconds:  seconds,
-				producer: p,
-			}
-		}
-
-		return p
-	}
-}
-
-type autoDropQueueProducer struct {
-	seconds  int64 // seconds before to drop
-	producer queue.Producer
-}
-
-func (p autoDropQueueProducer) AddListener(listener queue.ProduceListener) {
-	p.producer.AddListener(listener)
-}
-
-func (p autoDropQueueProducer) Produce() (string, bool) {
-	lessLogger := logx.NewLessLogger(logIntervalMillis)
-
-	for {
-		content, ok := p.producer.Produce()
-		if !ok {
-			return "", false
-		}
-
-		var timedMsg TimedMessage
-		if err := jsonx.UnmarshalFromString(content, &timedMsg); err != nil {
-			lessLogger.Errorf("invalid timedMessage: %s, error: %s", content, err.Error())
-			continue
-		}
-
-		if timedMsg.Time+p.seconds < time.Now().Unix() {
-			lessLogger.Errorf("expired timedMessage: %s", content)
-		}
-
-		return timedMsg.Payload, true
-	}
-}

+ 0 - 78
rq/internal/redisqueuepusher.go

@@ -1,78 +0,0 @@
-package internal
-
-import (
-	"fmt"
-	"time"
-
-	"zero/core/jsonx"
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/stores/redis"
-)
-
-type (
-	PusherOption func(p queue.QueuePusher) queue.QueuePusher
-
-	RedisQueuePusher struct {
-		name  string
-		store *redis.Redis
-		key   string
-	}
-)
-
-func NewPusher(store *redis.Redis, key string, opts ...PusherOption) queue.QueuePusher {
-	var pusher queue.QueuePusher = &RedisQueuePusher{
-		name:  fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
-		store: store,
-		key:   key,
-	}
-
-	for _, opt := range opts {
-		pusher = opt(pusher)
-	}
-
-	return pusher
-}
-
-func (saver *RedisQueuePusher) Name() string {
-	return saver.name
-}
-
-func (saver *RedisQueuePusher) Push(message string) error {
-	_, err := saver.store.Rpush(saver.key, message)
-	if nil != err {
-		return err
-	}
-
-	logx.Infof("<= %s", message)
-	return nil
-}
-
-func WithTime() PusherOption {
-	return func(p queue.QueuePusher) queue.QueuePusher {
-		return timedQueuePusher{
-			pusher: p,
-		}
-	}
-}
-
-type timedQueuePusher struct {
-	pusher queue.QueuePusher
-}
-
-func (p timedQueuePusher) Name() string {
-	return p.pusher.Name()
-}
-
-func (p timedQueuePusher) Push(message string) error {
-	tm := TimedMessage{
-		Time:    time.Now().Unix(),
-		Payload: message,
-	}
-
-	if content, err := jsonx.Marshal(tm); err != nil {
-		return err
-	} else {
-		return p.pusher.Push(string(content))
-	}
-}

+ 0 - 179
rq/internal/update/incrementalupdater.go

@@ -1,179 +0,0 @@
-package update
-
-import (
-	"sync"
-	"time"
-
-	"zero/core/hash"
-	"zero/core/stringx"
-)
-
-const (
-	incrementalStep = 5
-	stepDuration    = time.Second * 3
-)
-
-type (
-	updateEvent struct {
-		keys    []string
-		newKey  string
-		servers []string
-	}
-
-	UpdateFunc func(change ServerChange)
-
-	IncrementalUpdater struct {
-		lock          sync.Mutex
-		started       bool
-		taskChan      chan updateEvent
-		updates       ServerChange
-		updateFn      UpdateFunc
-		pendingEvents []updateEvent
-	}
-)
-
-func NewIncrementalUpdater(updateFn UpdateFunc) *IncrementalUpdater {
-	return &IncrementalUpdater{
-		taskChan: make(chan updateEvent),
-		updates: ServerChange{
-			Current: Snapshot{
-				Keys:         make([]string, 0),
-				WeightedKeys: make([]weightedKey, 0),
-			},
-			Servers: make([]string, 0),
-		},
-		updateFn: updateFn,
-	}
-}
-
-func (ru *IncrementalUpdater) Update(keys []string, servers []string, newKey string) {
-	ru.lock.Lock()
-	defer ru.lock.Unlock()
-
-	if !ru.started {
-		go ru.run()
-		ru.started = true
-	}
-
-	ru.taskChan <- updateEvent{
-		keys:    keys,
-		newKey:  newKey,
-		servers: servers,
-	}
-}
-
-// Return true if incremental update is done
-func (ru *IncrementalUpdater) advance() bool {
-	previous := ru.updates.Current
-	keys := make([]string, 0)
-	weightedKeys := make([]weightedKey, 0)
-	servers := ru.updates.Servers
-	for _, key := range ru.updates.Current.Keys {
-		keys = append(keys, key)
-	}
-	for _, wkey := range ru.updates.Current.WeightedKeys {
-		weight := wkey.Weight + incrementalStep
-		if weight >= hash.TopWeight {
-			keys = append(keys, wkey.Key)
-		} else {
-			weightedKeys = append(weightedKeys, weightedKey{
-				Key:    wkey.Key,
-				Weight: weight,
-			})
-		}
-	}
-
-	for _, event := range ru.pendingEvents {
-		// ignore reload events
-		if len(event.newKey) == 0 || len(event.servers) == 0 {
-			continue
-		}
-
-		// anyway, add the servers, just to avoid missing notify any server
-		servers = stringx.Union(servers, event.servers)
-		if keyExists(keys, weightedKeys, event.newKey) {
-			continue
-		}
-
-		weightedKeys = append(weightedKeys, weightedKey{
-			Key:    event.newKey,
-			Weight: incrementalStep,
-		})
-	}
-
-	// clear pending events
-	ru.pendingEvents = ru.pendingEvents[:0]
-
-	change := ServerChange{
-		Previous: previous,
-		Current: Snapshot{
-			Keys:         keys,
-			WeightedKeys: weightedKeys,
-		},
-		Servers: servers,
-	}
-	ru.updates = change
-	ru.updateFn(change)
-
-	return len(weightedKeys) == 0
-}
-
-func (ru *IncrementalUpdater) run() {
-	defer func() {
-		ru.lock.Lock()
-		ru.started = false
-		ru.lock.Unlock()
-	}()
-
-	ticker := time.NewTicker(stepDuration)
-	defer ticker.Stop()
-
-	for {
-		select {
-		case <-ticker.C:
-			if ru.advance() {
-				return
-			}
-		case event := <-ru.taskChan:
-			ru.updateKeys(event)
-		}
-	}
-}
-
-func (ru *IncrementalUpdater) updateKeys(event updateEvent) {
-	isWeightedKey := func(key string) bool {
-		for _, wkey := range ru.updates.Current.WeightedKeys {
-			if wkey.Key == key {
-				return true
-			}
-		}
-
-		return false
-	}
-
-	keys := make([]string, 0, len(event.keys))
-	for _, key := range event.keys {
-		if !isWeightedKey(key) {
-			keys = append(keys, key)
-		}
-	}
-
-	ru.updates.Current.Keys = keys
-	ru.pendingEvents = append(ru.pendingEvents, event)
-}
-
-func keyExists(keys []string, weightedKeys []weightedKey, key string) bool {
-	for _, each := range keys {
-		if key == each {
-			return true
-		}
-	}
-
-	for _, wkey := range weightedKeys {
-		if wkey.Key == key {
-			return true
-		}
-	}
-
-	return false
-}

+ 0 - 106
rq/internal/update/serverchange.go

@@ -1,106 +0,0 @@
-package update
-
-import (
-	"crypto/md5"
-	"errors"
-	"fmt"
-	"io"
-	"sort"
-
-	"zero/core/hash"
-	"zero/core/jsonx"
-	"zero/rq/internal"
-)
-
-var ErrInvalidServerChange = errors.New("not a server change message")
-
-type (
-	weightedKey struct {
-		Key    string
-		Weight int
-	}
-
-	Snapshot struct {
-		Keys         []string
-		WeightedKeys []weightedKey
-	}
-
-	ServerChange struct {
-		Previous Snapshot
-		Current  Snapshot
-		Servers  []string
-	}
-)
-
-func (s Snapshot) GetCode() string {
-	keys := append([]string(nil), s.Keys...)
-	sort.Strings(keys)
-	weightedKeys := append([]weightedKey(nil), s.WeightedKeys...)
-	sort.SliceStable(weightedKeys, func(i, j int) bool {
-		return weightedKeys[i].Key < weightedKeys[j].Key
-	})
-
-	digest := md5.New()
-	for _, key := range keys {
-		io.WriteString(digest, fmt.Sprintf("%s\n", key))
-	}
-	for _, wkey := range weightedKeys {
-		io.WriteString(digest, fmt.Sprintf("%s:%d\n", wkey.Key, wkey.Weight))
-	}
-
-	return fmt.Sprintf("%x", digest.Sum(nil))
-}
-
-func (sc ServerChange) CreateCurrentHash() *hash.ConsistentHash {
-	curHash := hash.NewConsistentHash()
-
-	for _, key := range sc.Current.Keys {
-		curHash.Add(key)
-	}
-	for _, wkey := range sc.Current.WeightedKeys {
-		curHash.AddWithWeight(wkey.Key, wkey.Weight)
-	}
-
-	return curHash
-}
-
-func (sc ServerChange) CreatePrevHash() *hash.ConsistentHash {
-	prevHash := hash.NewConsistentHash()
-
-	for _, key := range sc.Previous.Keys {
-		prevHash.Add(key)
-	}
-	for _, wkey := range sc.Previous.WeightedKeys {
-		prevHash.AddWithWeight(wkey.Key, wkey.Weight)
-	}
-
-	return prevHash
-}
-
-func (sc ServerChange) GetCode() string {
-	return sc.Current.GetCode()
-}
-
-func IsServerChange(message string) bool {
-	return len(message) > 0 && message[0] == internal.ServerSensitivePrefix
-}
-
-func (sc ServerChange) Marshal() (string, error) {
-	body, err := jsonx.Marshal(sc)
-	if err != nil {
-		return "", err
-	}
-
-	return string(append([]byte{internal.ServerSensitivePrefix}, body...)), nil
-}
-
-func UnmarshalServerChange(body string) (ServerChange, error) {
-	if len(body) == 0 {
-		return ServerChange{}, ErrInvalidServerChange
-	}
-
-	var change ServerChange
-	err := jsonx.UnmarshalFromString(body[1:], &change)
-
-	return change, err
-}

+ 0 - 445
rq/pusher.go

@@ -1,445 +0,0 @@
-package rq
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"strings"
-	"sync"
-	"time"
-
-	"zero/core/discov"
-	"zero/core/errorx"
-	"zero/core/jsonx"
-	"zero/core/lang"
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/stores/redis"
-	"zero/core/threading"
-	"zero/rq/internal"
-	"zero/rq/internal/update"
-)
-
-const (
-	retryTimes      = 3
-	etcdRedisFields = 4
-)
-
-var ErrPusherTypeError = errors.New("not a QueuePusher instance")
-
-type (
-	KeyFn        func(string) (key, payload string, err error)
-	KeysFn       func(string) (ctx context.Context, keys []string, err error)
-	AssembleFn   func(context.Context, []string) (payload string, err error)
-	PusherOption func(*Pusher) error
-
-	// just push once or do it retryTimes, it's a choice.
-	// because only when at least a server is alive, and
-	// pushing to the server failed, we'll return with an error
-	// if waken up, but the server is going down very quickly,
-	// we're going to wait again. so it's safe to push once.
-	pushStrategy interface {
-		addListener(listener discov.Listener)
-		push(string) error
-	}
-
-	batchConsistentStrategy struct {
-		keysFn     KeysFn
-		assembleFn AssembleFn
-		subClient  *discov.BatchConsistentSubClient
-	}
-
-	consistentStrategy struct {
-		keyFn     KeyFn
-		subClient *discov.ConsistentSubClient
-	}
-
-	roundRobinStrategy struct {
-		subClient *discov.RoundRobinSubClient
-	}
-
-	serverListener struct {
-		updater *update.IncrementalUpdater
-	}
-
-	Pusher struct {
-		name            string
-		endpoints       []string
-		key             string
-		failovers       sync.Map
-		strategy        pushStrategy
-		serverSensitive bool
-	}
-)
-
-func NewPusher(endpoints []string, key string, opts ...PusherOption) (*Pusher, error) {
-	pusher := &Pusher{
-		name:      getName(key),
-		endpoints: endpoints,
-		key:       key,
-	}
-
-	if len(opts) == 0 {
-		opts = []PusherOption{WithRoundRobinStrategy()}
-	}
-
-	for _, opt := range opts {
-		if err := opt(pusher); err != nil {
-			return nil, err
-		}
-	}
-
-	if pusher.serverSensitive {
-		listener := new(serverListener)
-		listener.updater = update.NewIncrementalUpdater(listener.update)
-		pusher.strategy.addListener(listener)
-	}
-
-	return pusher, nil
-}
-
-func (pusher *Pusher) Name() string {
-	return pusher.name
-}
-
-func (pusher *Pusher) Push(message string) error {
-	return pusher.strategy.push(message)
-}
-
-func (pusher *Pusher) close(server string, conn interface{}) error {
-	logx.Errorf("dropped redis node: %s", server)
-
-	return pusher.failover(server)
-}
-
-func (pusher *Pusher) dial(server string) (interface{}, error) {
-	pusher.failovers.Delete(server)
-
-	p, err := newPusher(server)
-	if err != nil {
-		return nil, err
-	}
-
-	logx.Infof("new redis node: %s", server)
-
-	return p, nil
-}
-
-func (pusher *Pusher) failover(server string) error {
-	pusher.failovers.Store(server, lang.Placeholder)
-
-	rds, key, option, err := newRedisWithKey(server)
-	if err != nil {
-		return err
-	}
-
-	threading.GoSafe(func() {
-		defer pusher.failovers.Delete(server)
-
-		for {
-			_, ok := pusher.failovers.Load(server)
-			if !ok {
-				logx.Infof("redis queue (%s) revived", server)
-				return
-			}
-
-			message, err := rds.Lpop(key)
-			if err != nil {
-				logx.Error(err)
-				return
-			}
-
-			if len(message) == 0 {
-				logx.Infof("repush redis queue (%s) done", server)
-				return
-			}
-
-			if option == internal.TimedQueueType {
-				message, err = unwrapTimedMessage(message)
-				if err != nil {
-					logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error())
-					return
-				}
-			}
-
-			if err = pusher.strategy.push(message); err != nil {
-				logx.Error(err)
-				return
-			}
-		}
-	})
-
-	return nil
-}
-
-func UnmarshalPusher(server string) (queue.QueuePusher, error) {
-	store, key, option, err := newRedisWithKey(server)
-	if err != nil {
-		return nil, err
-	}
-
-	if option == internal.TimedQueueType {
-		return internal.NewPusher(store, key, internal.WithTime()), nil
-	}
-
-	return internal.NewPusher(store, key), nil
-}
-
-func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption {
-	return func(pusher *Pusher) error {
-		subClient, err := discov.NewBatchConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial,
-			pusher.close, opts...)
-		if err != nil {
-			return err
-		}
-
-		pusher.strategy = batchConsistentStrategy{
-			keysFn:     keysFn,
-			assembleFn: assembleFn,
-			subClient:  subClient,
-		}
-
-		return nil
-	}
-}
-
-func WithConsistentStrategy(keyFn KeyFn, opts ...discov.BalanceOption) PusherOption {
-	return func(pusher *Pusher) error {
-		subClient, err := discov.NewConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close, opts...)
-		if err != nil {
-			return err
-		}
-
-		pusher.strategy = consistentStrategy{
-			keyFn:     keyFn,
-			subClient: subClient,
-		}
-
-		return nil
-	}
-}
-
-func WithRoundRobinStrategy() PusherOption {
-	return func(pusher *Pusher) error {
-		subClient, err := discov.NewRoundRobinSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close)
-		if err != nil {
-			return err
-		}
-
-		pusher.strategy = roundRobinStrategy{
-			subClient: subClient,
-		}
-
-		return nil
-	}
-}
-
-func WithServerSensitive() PusherOption {
-	return func(pusher *Pusher) error {
-		pusher.serverSensitive = true
-		return nil
-	}
-}
-
-func (bcs batchConsistentStrategy) addListener(listener discov.Listener) {
-	bcs.subClient.AddListener(listener)
-}
-
-func (bcs batchConsistentStrategy) balance(keys []string) map[interface{}][]string {
-	// we need to make sure the servers are available, otherwise wait forever
-	for {
-		if mapping, ok := bcs.subClient.Next(keys); ok {
-			return mapping
-		} else {
-			bcs.subClient.WaitForServers()
-			// make sure we don't flood logs too much in extreme conditions
-			time.Sleep(time.Second)
-		}
-	}
-}
-
-func (bcs batchConsistentStrategy) push(message string) error {
-	ctx, keys, err := bcs.keysFn(message)
-	if err != nil {
-		return err
-	}
-
-	var batchError errorx.BatchError
-	mapping := bcs.balance(keys)
-	for conn, connKeys := range mapping {
-		payload, err := bcs.assembleFn(ctx, connKeys)
-		if err != nil {
-			batchError.Add(err)
-			continue
-		}
-
-		for i := 0; i < retryTimes; i++ {
-			if err = bcs.pushOnce(conn, payload); err != nil {
-				batchError.Add(err)
-			} else {
-				break
-			}
-		}
-	}
-
-	return batchError.Err()
-}
-
-func (bcs batchConsistentStrategy) pushOnce(server interface{}, payload string) error {
-	pusher, ok := server.(queue.QueuePusher)
-	if ok {
-		return pusher.Push(payload)
-	} else {
-		return ErrPusherTypeError
-	}
-}
-
-func (cs consistentStrategy) addListener(listener discov.Listener) {
-	cs.subClient.AddListener(listener)
-}
-
-func (cs consistentStrategy) push(message string) error {
-	var batchError errorx.BatchError
-
-	key, payload, err := cs.keyFn(message)
-	if err != nil {
-		return err
-	}
-
-	for i := 0; i < retryTimes; i++ {
-		if err = cs.pushOnce(key, payload); err != nil {
-			batchError.Add(err)
-		} else {
-			return nil
-		}
-	}
-
-	return batchError.Err()
-}
-
-func (cs consistentStrategy) pushOnce(key, payload string) error {
-	// we need to make sure the servers are available, otherwise wait forever
-	for {
-		if server, ok := cs.subClient.Next(key); ok {
-			pusher, ok := server.(queue.QueuePusher)
-			if ok {
-				return pusher.Push(payload)
-			} else {
-				return ErrPusherTypeError
-			}
-		} else {
-			cs.subClient.WaitForServers()
-			// make sure we don't flood logs too much in extreme conditions
-			time.Sleep(time.Second)
-		}
-	}
-}
-
-func (rrs roundRobinStrategy) addListener(listener discov.Listener) {
-	rrs.subClient.AddListener(listener)
-}
-
-func (rrs roundRobinStrategy) push(message string) error {
-	var batchError errorx.BatchError
-
-	for i := 0; i < retryTimes; i++ {
-		if err := rrs.pushOnce(message); err != nil {
-			batchError.Add(err)
-		} else {
-			return nil
-		}
-	}
-
-	return batchError.Err()
-}
-
-func (rrs roundRobinStrategy) pushOnce(message string) error {
-	if server, ok := rrs.subClient.Next(); ok {
-		pusher, ok := server.(queue.QueuePusher)
-		if ok {
-			return pusher.Push(message)
-		} else {
-			return ErrPusherTypeError
-		}
-	} else {
-		rrs.subClient.WaitForServers()
-		return rrs.pushOnce(message)
-	}
-}
-
-func getName(key string) string {
-	return fmt.Sprintf("etcd:%s", key)
-}
-
-func newPusher(server string) (queue.QueuePusher, error) {
-	if rds, key, option, err := newRedisWithKey(server); err != nil {
-		return nil, err
-	} else if option == internal.TimedQueueType {
-		return internal.NewPusher(rds, key, internal.WithTime()), nil
-	} else {
-		return internal.NewPusher(rds, key), nil
-	}
-}
-
-func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) {
-	fields := strings.Split(server, internal.Delimeter)
-	if len(fields) < etcdRedisFields {
-		err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server)
-		return
-	}
-
-	addr := fields[0]
-	tp := fields[1]
-	pass := fields[2]
-	key = fields[3]
-
-	if len(fields) > etcdRedisFields {
-		option = fields[4]
-	}
-
-	rds = redis.NewRedis(addr, tp, pass)
-	return
-}
-
-func (sl *serverListener) OnUpdate(keys []string, servers []string, newKey string) {
-	sl.updater.Update(keys, servers, newKey)
-}
-
-func (sl *serverListener) OnReload() {
-	sl.updater.Update(nil, nil, "")
-}
-
-func (sl *serverListener) update(change update.ServerChange) {
-	content, err := change.Marshal()
-	if err != nil {
-		logx.Error(err)
-	}
-
-	if err = broadcast(change.Servers, content); err != nil {
-		logx.Error(err)
-	}
-}
-
-func broadcast(servers []string, message string) error {
-	var be errorx.BatchError
-
-	for _, server := range servers {
-		q, err := UnmarshalPusher(server)
-		if err != nil {
-			be.Add(err)
-		} else {
-			q.Push(message)
-		}
-	}
-
-	return be.Err()
-}
-
-func unwrapTimedMessage(message string) (string, error) {
-	var tm internal.TimedMessage
-	if err := jsonx.UnmarshalFromString(message, &tm); err != nil {
-		return "", err
-	}
-
-	return tm.Payload, nil
-}

+ 0 - 338
rq/queue.go

@@ -1,338 +0,0 @@
-package rq
-
-import (
-	"errors"
-	"fmt"
-	"log"
-	"strings"
-	"sync"
-	"time"
-
-	"zero/core/discov"
-	"zero/core/logx"
-	"zero/core/queue"
-	"zero/core/service"
-	"zero/core/stores/redis"
-	"zero/core/stringx"
-	"zero/core/threading"
-	"zero/rq/internal"
-	"zero/rq/internal/update"
-)
-
-const keyLen = 6
-
-var (
-	ErrTimeout = errors.New("timeout error")
-
-	eventHandlerPlaceholder = dummyEventHandler(0)
-)
-
-type (
-	ConsumeHandle func(string) error
-
-	ConsumeHandler interface {
-		Consume(string) error
-	}
-
-	EventHandler interface {
-		OnEvent(event interface{})
-	}
-
-	QueueOption func(queue *MessageQueue)
-
-	queueOptions struct {
-		renewId int64
-	}
-
-	MessageQueue struct {
-		c               RmqConf
-		redisQueue      *queue.Queue
-		consumerFactory queue.ConsumerFactory
-		options         queueOptions
-		eventLock       sync.Mutex
-		lastEvent       string
-	}
-)
-
-func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue {
-	q, err := NewMessageQueue(c, factory, opts...)
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	return q
-}
-
-func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) {
-	if err := c.SetUp(); err != nil {
-		return nil, err
-	}
-
-	q := &MessageQueue{
-		c: c,
-	}
-
-	if len(q.c.Redis.Key) == 0 {
-		if len(q.c.Name) == 0 {
-			q.c.Redis.Key = stringx.Randn(keyLen)
-		} else {
-			q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen))
-		}
-	}
-	if q.c.Timeout > 0 {
-		factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond)
-	}
-	factory = wrapWithServerSensitive(q, factory)
-	q.consumerFactory = factory
-	q.redisQueue = q.buildQueue()
-
-	for _, opt := range opts {
-		opt(q)
-	}
-
-	return q, nil
-}
-
-func (q *MessageQueue) Start() {
-	serviceGroup := service.NewServiceGroup()
-	serviceGroup.Add(q.redisQueue)
-	q.maybeAppendRenewer(serviceGroup, q.redisQueue)
-	serviceGroup.Start()
-}
-
-func (q *MessageQueue) Stop() {
-	logx.Close()
-}
-
-func (q *MessageQueue) buildQueue() *queue.Queue {
-	inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
-	producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key,
-		internal.TimeSensitive(q.c.DropBefore))
-	mq := queue.NewQueue(producerFactory, q.consumerFactory)
-
-	if len(q.c.Name) > 0 {
-		mq.SetName(q.c.Name)
-	}
-	if q.c.NumConsumers > 0 {
-		mq.SetNumConsumer(q.c.NumConsumers)
-	}
-	if q.c.NumProducers > 0 {
-		mq.SetNumProducer(q.c.NumProducers)
-	}
-
-	return mq
-}
-
-func (q *MessageQueue) compareAndSetEvent(event string) bool {
-	q.eventLock.Lock()
-	defer q.eventLock.Unlock()
-
-	if q.lastEvent == event {
-		return false
-	}
-
-	q.lastEvent = event
-	return true
-}
-
-func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) {
-	if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
-		etcdValue := MarshalQueue(q.c.Redis)
-		if q.c.DropBefore > 0 {
-			etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter)
-		}
-		keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
-		mq.AddListener(pauseResumeHandler{
-			Renewer: keepAliver,
-		})
-		group.Add(keepAliver)
-	}
-}
-
-func MarshalQueue(rds redis.RedisKeyConf) string {
-	return strings.Join([]string{
-		rds.Host,
-		rds.Type,
-		rds.Pass,
-		rds.Key,
-	}, internal.Delimeter)
-}
-
-func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
-	return WithHandler(innerConsumerHandler{handle})
-}
-
-func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory {
-	return func() (queue.Consumer, error) {
-		if len(eventHandlers) < 1 {
-			return eventConsumer{
-				consumeHandler: handler,
-				eventHandler:   eventHandlerPlaceholder,
-			}, nil
-		} else {
-			return eventConsumer{
-				consumeHandler: handler,
-				eventHandler:   eventHandlers[0],
-			}, nil
-		}
-	}
-}
-
-func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory {
-	return func() (queue.Consumer, error) {
-		if handler, err := factory(); err != nil {
-			return nil, err
-		} else {
-			return eventlessHandler{handler}, nil
-		}
-	}
-}
-
-func WithRenewId(id int64) QueueOption {
-	return func(mq *MessageQueue) {
-		mq.options.renewId = id
-	}
-}
-
-func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory {
-	return func() (queue.Consumer, error) {
-		consumer, err := factory()
-		if err != nil {
-			return nil, err
-		}
-
-		return &serverSensitiveConsumer{
-			mq:       mq,
-			consumer: consumer,
-		}, nil
-	}
-}
-
-func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory {
-	return func() (queue.Consumer, error) {
-		consumer, err := factory()
-		if err != nil {
-			return nil, err
-		}
-
-		return &timeoutConsumer{
-			consumer: consumer,
-			dt:       dt,
-			timer:    time.NewTimer(dt),
-		}, nil
-	}
-}
-
-type innerConsumerHandler struct {
-	handle ConsumeHandle
-}
-
-func (h innerConsumerHandler) Consume(v string) error {
-	return h.handle(v)
-}
-
-type serverSensitiveConsumer struct {
-	mq       *MessageQueue
-	consumer queue.Consumer
-}
-
-func (c *serverSensitiveConsumer) Consume(msg string) error {
-	if update.IsServerChange(msg) {
-		change, err := update.UnmarshalServerChange(msg)
-		if err != nil {
-			return err
-		}
-
-		code := change.GetCode()
-		if !c.mq.compareAndSetEvent(code) {
-			return nil
-		}
-
-		oldHash := change.CreatePrevHash()
-		newHash := change.CreateCurrentHash()
-		hashChange := internal.NewHashChange(oldHash, newHash)
-		c.mq.redisQueue.Broadcast(hashChange)
-
-		return nil
-	}
-
-	return c.consumer.Consume(msg)
-}
-
-func (c *serverSensitiveConsumer) OnEvent(event interface{}) {
-	c.consumer.OnEvent(event)
-}
-
-type timeoutConsumer struct {
-	consumer queue.Consumer
-	dt       time.Duration
-	timer    *time.Timer
-}
-
-func (c *timeoutConsumer) Consume(msg string) error {
-	done := make(chan error)
-	threading.GoSafe(func() {
-		if err := c.consumer.Consume(msg); err != nil {
-			done <- err
-		}
-		close(done)
-	})
-
-	c.timer.Reset(c.dt)
-	select {
-	case err, ok := <-done:
-		c.timer.Stop()
-		if ok {
-			return err
-		} else {
-			return nil
-		}
-	case <-c.timer.C:
-		return ErrTimeout
-	}
-}
-
-func (c *timeoutConsumer) OnEvent(event interface{}) {
-	c.consumer.OnEvent(event)
-}
-
-type pauseResumeHandler struct {
-	discov.Renewer
-}
-
-func (pr pauseResumeHandler) OnPause() {
-	pr.Pause()
-}
-
-func (pr pauseResumeHandler) OnResume() {
-	pr.Resume()
-}
-
-type eventConsumer struct {
-	consumeHandler ConsumeHandler
-	eventHandler   EventHandler
-}
-
-func (ec eventConsumer) Consume(msg string) error {
-	return ec.consumeHandler.Consume(msg)
-}
-
-func (ec eventConsumer) OnEvent(event interface{}) {
-	ec.eventHandler.OnEvent(event)
-}
-
-type eventlessHandler struct {
-	handler ConsumeHandler
-}
-
-func (h eventlessHandler) Consume(msg string) error {
-	return h.handler.Consume(msg)
-}
-
-func (h eventlessHandler) OnEvent(event interface{}) {
-}
-
-type dummyEventHandler int
-
-func (eh dummyEventHandler) OnEvent(event interface{}) {
-}

+ 0 - 62
rq/queue_test.go

@@ -1,62 +0,0 @@
-package rq
-
-import (
-	"strconv"
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestQueueWithTimeout(t *testing.T) {
-	consumer, err := wrapWithTimeout(WithHandle(func(string) error {
-		time.Sleep(time.Minute)
-		return nil
-	}), 100)()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	assert.Equal(t, ErrTimeout, consumer.Consume("any"))
-}
-
-func TestQueueWithoutTimeout(t *testing.T) {
-	consumer, err := wrapWithTimeout(WithHandle(func(string) error {
-		return nil
-	}), 3600000)()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	assert.Nil(t, consumer.Consume("any"))
-}
-
-func BenchmarkQueue(b *testing.B) {
-	b.ReportAllocs()
-
-	consumer, err := WithHandle(func(string) error {
-		return nil
-	})()
-	if err != nil {
-		b.Fatal(err)
-	}
-
-	for i := 0; i < b.N; i++ {
-		consumer.Consume(strconv.Itoa(i))
-	}
-}
-
-func BenchmarkQueueWithTimeout(b *testing.B) {
-	b.ReportAllocs()
-
-	consumer, err := wrapWithTimeout(WithHandle(func(string) error {
-		return nil
-	}), 1000)()
-	if err != nil {
-		b.Fatal(err)
-	}
-
-	for i := 0; i < b.N; i++ {
-		consumer.Consume(strconv.Itoa(i))
-	}
-}