queue.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package rq
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "sync"
  8. "time"
  9. "zero/core/discov"
  10. "zero/core/logx"
  11. "zero/core/queue"
  12. "zero/core/redisqueue"
  13. "zero/core/service"
  14. "zero/core/stores/redis"
  15. "zero/core/stringx"
  16. "zero/core/threading"
  17. "zero/rq/constant"
  18. "zero/rq/update"
  19. )
  20. const keyLen = 6
  21. var (
  22. ErrTimeout = errors.New("timeout error")
  23. eventHandlerPlaceholder = dummyEventHandler(0)
  24. )
  25. type (
  26. ConsumeHandle func(string) error
  27. ConsumeHandler interface {
  28. Consume(string) error
  29. }
  30. EventHandler interface {
  31. OnEvent(event interface{})
  32. }
  33. QueueOption func(queue *MessageQueue)
  34. queueOptions struct {
  35. renewId int64
  36. }
  37. MessageQueue struct {
  38. c RmqConf
  39. redisQueue *queue.Queue
  40. consumerFactory queue.ConsumerFactory
  41. options queueOptions
  42. eventLock sync.Mutex
  43. lastEvent string
  44. }
  45. )
  46. func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue {
  47. q, err := NewMessageQueue(c, factory, opts...)
  48. if err != nil {
  49. log.Fatal(err)
  50. }
  51. return q
  52. }
  53. func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) {
  54. if err := c.SetUp(); err != nil {
  55. return nil, err
  56. }
  57. q := &MessageQueue{
  58. c: c,
  59. }
  60. if len(q.c.Redis.Key) == 0 {
  61. if len(q.c.Name) == 0 {
  62. q.c.Redis.Key = stringx.Randn(keyLen)
  63. } else {
  64. q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen))
  65. }
  66. }
  67. if q.c.Timeout > 0 {
  68. factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond)
  69. }
  70. factory = wrapWithServerSensitive(q, factory)
  71. q.consumerFactory = factory
  72. q.redisQueue = q.buildQueue()
  73. for _, opt := range opts {
  74. opt(q)
  75. }
  76. return q, nil
  77. }
  78. func (q *MessageQueue) Start() {
  79. serviceGroup := service.NewServiceGroup()
  80. serviceGroup.Add(q.redisQueue)
  81. q.maybeAppendRenewer(serviceGroup, q.redisQueue)
  82. serviceGroup.Start()
  83. }
  84. func (q *MessageQueue) Stop() {
  85. logx.Close()
  86. }
  87. func (q *MessageQueue) buildQueue() *queue.Queue {
  88. inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
  89. producerFactory := redisqueue.NewProducerFactory(inboundStore, q.c.Redis.Key,
  90. redisqueue.TimeSensitive(q.c.DropBefore))
  91. mq := queue.NewQueue(producerFactory, q.consumerFactory)
  92. if len(q.c.Name) > 0 {
  93. mq.SetName(q.c.Name)
  94. }
  95. if q.c.NumConsumers > 0 {
  96. mq.SetNumConsumer(q.c.NumConsumers)
  97. }
  98. if q.c.NumProducers > 0 {
  99. mq.SetNumProducer(q.c.NumProducers)
  100. }
  101. return mq
  102. }
  103. func (q *MessageQueue) compareAndSetEvent(event string) bool {
  104. q.eventLock.Lock()
  105. defer q.eventLock.Unlock()
  106. if q.lastEvent == event {
  107. return false
  108. }
  109. q.lastEvent = event
  110. return true
  111. }
  112. func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) {
  113. if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
  114. etcdValue := MarshalQueue(q.c.Redis)
  115. if q.c.DropBefore > 0 {
  116. etcdValue = strings.Join([]string{etcdValue, constant.TimedQueueType}, constant.Delimeter)
  117. }
  118. keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
  119. mq.AddListener(pauseResumeHandler{
  120. Renewer: keepAliver,
  121. })
  122. group.Add(keepAliver)
  123. }
  124. }
  125. func MarshalQueue(rds redis.RedisKeyConf) string {
  126. return strings.Join([]string{
  127. rds.Host,
  128. rds.Type,
  129. rds.Pass,
  130. rds.Key,
  131. }, constant.Delimeter)
  132. }
  133. func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
  134. return WithHandler(innerConsumerHandler{handle})
  135. }
  136. func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory {
  137. return func() (queue.Consumer, error) {
  138. if len(eventHandlers) < 1 {
  139. return eventConsumer{
  140. consumeHandler: handler,
  141. eventHandler: eventHandlerPlaceholder,
  142. }, nil
  143. } else {
  144. return eventConsumer{
  145. consumeHandler: handler,
  146. eventHandler: eventHandlers[0],
  147. }, nil
  148. }
  149. }
  150. }
  151. func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory {
  152. return func() (queue.Consumer, error) {
  153. if handler, err := factory(); err != nil {
  154. return nil, err
  155. } else {
  156. return eventlessHandler{handler}, nil
  157. }
  158. }
  159. }
  160. func WithRenewId(id int64) QueueOption {
  161. return func(mq *MessageQueue) {
  162. mq.options.renewId = id
  163. }
  164. }
  165. func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory {
  166. return func() (queue.Consumer, error) {
  167. consumer, err := factory()
  168. if err != nil {
  169. return nil, err
  170. }
  171. return &serverSensitiveConsumer{
  172. mq: mq,
  173. consumer: consumer,
  174. }, nil
  175. }
  176. }
  177. func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory {
  178. return func() (queue.Consumer, error) {
  179. consumer, err := factory()
  180. if err != nil {
  181. return nil, err
  182. }
  183. return &timeoutConsumer{
  184. consumer: consumer,
  185. dt: dt,
  186. timer: time.NewTimer(dt),
  187. }, nil
  188. }
  189. }
  190. type innerConsumerHandler struct {
  191. handle ConsumeHandle
  192. }
  193. func (h innerConsumerHandler) Consume(v string) error {
  194. return h.handle(v)
  195. }
  196. type serverSensitiveConsumer struct {
  197. mq *MessageQueue
  198. consumer queue.Consumer
  199. }
  200. func (c *serverSensitiveConsumer) Consume(msg string) error {
  201. if update.IsServerChange(msg) {
  202. change, err := update.UnmarshalServerChange(msg)
  203. if err != nil {
  204. return err
  205. }
  206. code := change.GetCode()
  207. if !c.mq.compareAndSetEvent(code) {
  208. return nil
  209. }
  210. oldHash := change.CreatePrevHash()
  211. newHash := change.CreateCurrentHash()
  212. hashChange := NewHashChange(oldHash, newHash)
  213. c.mq.redisQueue.Broadcast(hashChange)
  214. return nil
  215. }
  216. return c.consumer.Consume(msg)
  217. }
  218. func (c *serverSensitiveConsumer) OnEvent(event interface{}) {
  219. c.consumer.OnEvent(event)
  220. }
  221. type timeoutConsumer struct {
  222. consumer queue.Consumer
  223. dt time.Duration
  224. timer *time.Timer
  225. }
  226. func (c *timeoutConsumer) Consume(msg string) error {
  227. done := make(chan error)
  228. threading.GoSafe(func() {
  229. if err := c.consumer.Consume(msg); err != nil {
  230. done <- err
  231. }
  232. close(done)
  233. })
  234. c.timer.Reset(c.dt)
  235. select {
  236. case err, ok := <-done:
  237. c.timer.Stop()
  238. if ok {
  239. return err
  240. } else {
  241. return nil
  242. }
  243. case <-c.timer.C:
  244. return ErrTimeout
  245. }
  246. }
  247. func (c *timeoutConsumer) OnEvent(event interface{}) {
  248. c.consumer.OnEvent(event)
  249. }
  250. type pauseResumeHandler struct {
  251. discov.Renewer
  252. }
  253. func (pr pauseResumeHandler) OnPause() {
  254. pr.Pause()
  255. }
  256. func (pr pauseResumeHandler) OnResume() {
  257. pr.Resume()
  258. }
  259. type eventConsumer struct {
  260. consumeHandler ConsumeHandler
  261. eventHandler EventHandler
  262. }
  263. func (ec eventConsumer) Consume(msg string) error {
  264. return ec.consumeHandler.Consume(msg)
  265. }
  266. func (ec eventConsumer) OnEvent(event interface{}) {
  267. ec.eventHandler.OnEvent(event)
  268. }
  269. type eventlessHandler struct {
  270. handler ConsumeHandler
  271. }
  272. func (h eventlessHandler) Consume(msg string) error {
  273. return h.handler.Consume(msg)
  274. }
  275. func (h eventlessHandler) OnEvent(event interface{}) {
  276. }
  277. type dummyEventHandler int
  278. func (eh dummyEventHandler) OnEvent(event interface{}) {
  279. }