123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- package rq
- import (
- "errors"
- "fmt"
- "log"
- "strings"
- "sync"
- "time"
- "zero/core/discov"
- "zero/core/logx"
- "zero/core/queue"
- "zero/core/service"
- "zero/core/stores/redis"
- "zero/core/stringx"
- "zero/core/threading"
- "zero/rq/internal"
- "zero/rq/internal/update"
- )
- const keyLen = 6
- var (
- ErrTimeout = errors.New("timeout error")
- eventHandlerPlaceholder = dummyEventHandler(0)
- )
- type (
- ConsumeHandle func(string) error
- ConsumeHandler interface {
- Consume(string) error
- }
- EventHandler interface {
- OnEvent(event interface{})
- }
- QueueOption func(queue *MessageQueue)
- queueOptions struct {
- renewId int64
- }
- MessageQueue struct {
- c RmqConf
- redisQueue *queue.Queue
- consumerFactory queue.ConsumerFactory
- options queueOptions
- eventLock sync.Mutex
- lastEvent string
- }
- )
- func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue {
- q, err := NewMessageQueue(c, factory, opts...)
- if err != nil {
- log.Fatal(err)
- }
- return q
- }
- func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) {
- if err := c.SetUp(); err != nil {
- return nil, err
- }
- q := &MessageQueue{
- c: c,
- }
- if len(q.c.Redis.Key) == 0 {
- if len(q.c.Name) == 0 {
- q.c.Redis.Key = stringx.Randn(keyLen)
- } else {
- q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen))
- }
- }
- if q.c.Timeout > 0 {
- factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond)
- }
- factory = wrapWithServerSensitive(q, factory)
- q.consumerFactory = factory
- q.redisQueue = q.buildQueue()
- for _, opt := range opts {
- opt(q)
- }
- return q, nil
- }
- func (q *MessageQueue) Start() {
- serviceGroup := service.NewServiceGroup()
- serviceGroup.Add(q.redisQueue)
- q.maybeAppendRenewer(serviceGroup, q.redisQueue)
- serviceGroup.Start()
- }
- func (q *MessageQueue) Stop() {
- logx.Close()
- }
- func (q *MessageQueue) buildQueue() *queue.Queue {
- inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
- producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key,
- internal.TimeSensitive(q.c.DropBefore))
- mq := queue.NewQueue(producerFactory, q.consumerFactory)
- if len(q.c.Name) > 0 {
- mq.SetName(q.c.Name)
- }
- if q.c.NumConsumers > 0 {
- mq.SetNumConsumer(q.c.NumConsumers)
- }
- if q.c.NumProducers > 0 {
- mq.SetNumProducer(q.c.NumProducers)
- }
- return mq
- }
- func (q *MessageQueue) compareAndSetEvent(event string) bool {
- q.eventLock.Lock()
- defer q.eventLock.Unlock()
- if q.lastEvent == event {
- return false
- }
- q.lastEvent = event
- return true
- }
- func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) {
- if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
- etcdValue := MarshalQueue(q.c.Redis)
- if q.c.DropBefore > 0 {
- etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter)
- }
- keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
- mq.AddListener(pauseResumeHandler{
- Renewer: keepAliver,
- })
- group.Add(keepAliver)
- }
- }
- func MarshalQueue(rds redis.RedisKeyConf) string {
- return strings.Join([]string{
- rds.Host,
- rds.Type,
- rds.Pass,
- rds.Key,
- }, internal.Delimeter)
- }
- func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
- return WithHandler(innerConsumerHandler{handle})
- }
- func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory {
- return func() (queue.Consumer, error) {
- if len(eventHandlers) < 1 {
- return eventConsumer{
- consumeHandler: handler,
- eventHandler: eventHandlerPlaceholder,
- }, nil
- } else {
- return eventConsumer{
- consumeHandler: handler,
- eventHandler: eventHandlers[0],
- }, nil
- }
- }
- }
- func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory {
- return func() (queue.Consumer, error) {
- if handler, err := factory(); err != nil {
- return nil, err
- } else {
- return eventlessHandler{handler}, nil
- }
- }
- }
- func WithRenewId(id int64) QueueOption {
- return func(mq *MessageQueue) {
- mq.options.renewId = id
- }
- }
- func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory {
- return func() (queue.Consumer, error) {
- consumer, err := factory()
- if err != nil {
- return nil, err
- }
- return &serverSensitiveConsumer{
- mq: mq,
- consumer: consumer,
- }, nil
- }
- }
- func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory {
- return func() (queue.Consumer, error) {
- consumer, err := factory()
- if err != nil {
- return nil, err
- }
- return &timeoutConsumer{
- consumer: consumer,
- dt: dt,
- timer: time.NewTimer(dt),
- }, nil
- }
- }
- type innerConsumerHandler struct {
- handle ConsumeHandle
- }
- func (h innerConsumerHandler) Consume(v string) error {
- return h.handle(v)
- }
- type serverSensitiveConsumer struct {
- mq *MessageQueue
- consumer queue.Consumer
- }
- func (c *serverSensitiveConsumer) Consume(msg string) error {
- if update.IsServerChange(msg) {
- change, err := update.UnmarshalServerChange(msg)
- if err != nil {
- return err
- }
- code := change.GetCode()
- if !c.mq.compareAndSetEvent(code) {
- return nil
- }
- oldHash := change.CreatePrevHash()
- newHash := change.CreateCurrentHash()
- hashChange := internal.NewHashChange(oldHash, newHash)
- c.mq.redisQueue.Broadcast(hashChange)
- return nil
- }
- return c.consumer.Consume(msg)
- }
- func (c *serverSensitiveConsumer) OnEvent(event interface{}) {
- c.consumer.OnEvent(event)
- }
- type timeoutConsumer struct {
- consumer queue.Consumer
- dt time.Duration
- timer *time.Timer
- }
- func (c *timeoutConsumer) Consume(msg string) error {
- done := make(chan error)
- threading.GoSafe(func() {
- if err := c.consumer.Consume(msg); err != nil {
- done <- err
- }
- close(done)
- })
- c.timer.Reset(c.dt)
- select {
- case err, ok := <-done:
- c.timer.Stop()
- if ok {
- return err
- } else {
- return nil
- }
- case <-c.timer.C:
- return ErrTimeout
- }
- }
- func (c *timeoutConsumer) OnEvent(event interface{}) {
- c.consumer.OnEvent(event)
- }
- type pauseResumeHandler struct {
- discov.Renewer
- }
- func (pr pauseResumeHandler) OnPause() {
- pr.Pause()
- }
- func (pr pauseResumeHandler) OnResume() {
- pr.Resume()
- }
- type eventConsumer struct {
- consumeHandler ConsumeHandler
- eventHandler EventHandler
- }
- func (ec eventConsumer) Consume(msg string) error {
- return ec.consumeHandler.Consume(msg)
- }
- func (ec eventConsumer) OnEvent(event interface{}) {
- ec.eventHandler.OnEvent(event)
- }
- type eventlessHandler struct {
- handler ConsumeHandler
- }
- func (h eventlessHandler) Consume(msg string) error {
- return h.handler.Consume(msg)
- }
- func (h eventlessHandler) OnEvent(event interface{}) {
- }
- type dummyEventHandler int
- func (eh dummyEventHandler) OnEvent(event interface{}) {
- }
|