queue.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package rq
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "sync"
  8. "time"
  9. "zero/core/discov"
  10. "zero/core/logx"
  11. "zero/core/queue"
  12. "zero/core/service"
  13. "zero/core/stores/redis"
  14. "zero/core/stringx"
  15. "zero/core/threading"
  16. "zero/rq/internal"
  17. "zero/rq/internal/update"
  18. )
  19. const keyLen = 6
  20. var (
  21. ErrTimeout = errors.New("timeout error")
  22. eventHandlerPlaceholder = dummyEventHandler(0)
  23. )
  24. type (
  25. ConsumeHandle func(string) error
  26. ConsumeHandler interface {
  27. Consume(string) error
  28. }
  29. EventHandler interface {
  30. OnEvent(event interface{})
  31. }
  32. QueueOption func(queue *MessageQueue)
  33. queueOptions struct {
  34. renewId int64
  35. }
  36. MessageQueue struct {
  37. c RmqConf
  38. redisQueue *queue.Queue
  39. consumerFactory queue.ConsumerFactory
  40. options queueOptions
  41. eventLock sync.Mutex
  42. lastEvent string
  43. }
  44. )
  45. func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue {
  46. q, err := NewMessageQueue(c, factory, opts...)
  47. if err != nil {
  48. log.Fatal(err)
  49. }
  50. return q
  51. }
  52. func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) {
  53. if err := c.SetUp(); err != nil {
  54. return nil, err
  55. }
  56. q := &MessageQueue{
  57. c: c,
  58. }
  59. if len(q.c.Redis.Key) == 0 {
  60. if len(q.c.Name) == 0 {
  61. q.c.Redis.Key = stringx.Randn(keyLen)
  62. } else {
  63. q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen))
  64. }
  65. }
  66. if q.c.Timeout > 0 {
  67. factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond)
  68. }
  69. factory = wrapWithServerSensitive(q, factory)
  70. q.consumerFactory = factory
  71. q.redisQueue = q.buildQueue()
  72. for _, opt := range opts {
  73. opt(q)
  74. }
  75. return q, nil
  76. }
  77. func (q *MessageQueue) Start() {
  78. serviceGroup := service.NewServiceGroup()
  79. serviceGroup.Add(q.redisQueue)
  80. q.maybeAppendRenewer(serviceGroup, q.redisQueue)
  81. serviceGroup.Start()
  82. }
  83. func (q *MessageQueue) Stop() {
  84. logx.Close()
  85. }
  86. func (q *MessageQueue) buildQueue() *queue.Queue {
  87. inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
  88. producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key,
  89. internal.TimeSensitive(q.c.DropBefore))
  90. mq := queue.NewQueue(producerFactory, q.consumerFactory)
  91. if len(q.c.Name) > 0 {
  92. mq.SetName(q.c.Name)
  93. }
  94. if q.c.NumConsumers > 0 {
  95. mq.SetNumConsumer(q.c.NumConsumers)
  96. }
  97. if q.c.NumProducers > 0 {
  98. mq.SetNumProducer(q.c.NumProducers)
  99. }
  100. return mq
  101. }
  102. func (q *MessageQueue) compareAndSetEvent(event string) bool {
  103. q.eventLock.Lock()
  104. defer q.eventLock.Unlock()
  105. if q.lastEvent == event {
  106. return false
  107. }
  108. q.lastEvent = event
  109. return true
  110. }
  111. func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) {
  112. if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
  113. etcdValue := MarshalQueue(q.c.Redis)
  114. if q.c.DropBefore > 0 {
  115. etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter)
  116. }
  117. keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
  118. mq.AddListener(pauseResumeHandler{
  119. Renewer: keepAliver,
  120. })
  121. group.Add(keepAliver)
  122. }
  123. }
  124. func MarshalQueue(rds redis.RedisKeyConf) string {
  125. return strings.Join([]string{
  126. rds.Host,
  127. rds.Type,
  128. rds.Pass,
  129. rds.Key,
  130. }, internal.Delimeter)
  131. }
  132. func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
  133. return WithHandler(innerConsumerHandler{handle})
  134. }
  135. func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory {
  136. return func() (queue.Consumer, error) {
  137. if len(eventHandlers) < 1 {
  138. return eventConsumer{
  139. consumeHandler: handler,
  140. eventHandler: eventHandlerPlaceholder,
  141. }, nil
  142. } else {
  143. return eventConsumer{
  144. consumeHandler: handler,
  145. eventHandler: eventHandlers[0],
  146. }, nil
  147. }
  148. }
  149. }
  150. func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory {
  151. return func() (queue.Consumer, error) {
  152. if handler, err := factory(); err != nil {
  153. return nil, err
  154. } else {
  155. return eventlessHandler{handler}, nil
  156. }
  157. }
  158. }
  159. func WithRenewId(id int64) QueueOption {
  160. return func(mq *MessageQueue) {
  161. mq.options.renewId = id
  162. }
  163. }
  164. func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory {
  165. return func() (queue.Consumer, error) {
  166. consumer, err := factory()
  167. if err != nil {
  168. return nil, err
  169. }
  170. return &serverSensitiveConsumer{
  171. mq: mq,
  172. consumer: consumer,
  173. }, nil
  174. }
  175. }
  176. func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory {
  177. return func() (queue.Consumer, error) {
  178. consumer, err := factory()
  179. if err != nil {
  180. return nil, err
  181. }
  182. return &timeoutConsumer{
  183. consumer: consumer,
  184. dt: dt,
  185. timer: time.NewTimer(dt),
  186. }, nil
  187. }
  188. }
  189. type innerConsumerHandler struct {
  190. handle ConsumeHandle
  191. }
  192. func (h innerConsumerHandler) Consume(v string) error {
  193. return h.handle(v)
  194. }
  195. type serverSensitiveConsumer struct {
  196. mq *MessageQueue
  197. consumer queue.Consumer
  198. }
  199. func (c *serverSensitiveConsumer) Consume(msg string) error {
  200. if update.IsServerChange(msg) {
  201. change, err := update.UnmarshalServerChange(msg)
  202. if err != nil {
  203. return err
  204. }
  205. code := change.GetCode()
  206. if !c.mq.compareAndSetEvent(code) {
  207. return nil
  208. }
  209. oldHash := change.CreatePrevHash()
  210. newHash := change.CreateCurrentHash()
  211. hashChange := internal.NewHashChange(oldHash, newHash)
  212. c.mq.redisQueue.Broadcast(hashChange)
  213. return nil
  214. }
  215. return c.consumer.Consume(msg)
  216. }
  217. func (c *serverSensitiveConsumer) OnEvent(event interface{}) {
  218. c.consumer.OnEvent(event)
  219. }
  220. type timeoutConsumer struct {
  221. consumer queue.Consumer
  222. dt time.Duration
  223. timer *time.Timer
  224. }
  225. func (c *timeoutConsumer) Consume(msg string) error {
  226. done := make(chan error)
  227. threading.GoSafe(func() {
  228. if err := c.consumer.Consume(msg); err != nil {
  229. done <- err
  230. }
  231. close(done)
  232. })
  233. c.timer.Reset(c.dt)
  234. select {
  235. case err, ok := <-done:
  236. c.timer.Stop()
  237. if ok {
  238. return err
  239. } else {
  240. return nil
  241. }
  242. case <-c.timer.C:
  243. return ErrTimeout
  244. }
  245. }
  246. func (c *timeoutConsumer) OnEvent(event interface{}) {
  247. c.consumer.OnEvent(event)
  248. }
  249. type pauseResumeHandler struct {
  250. discov.Renewer
  251. }
  252. func (pr pauseResumeHandler) OnPause() {
  253. pr.Pause()
  254. }
  255. func (pr pauseResumeHandler) OnResume() {
  256. pr.Resume()
  257. }
  258. type eventConsumer struct {
  259. consumeHandler ConsumeHandler
  260. eventHandler EventHandler
  261. }
  262. func (ec eventConsumer) Consume(msg string) error {
  263. return ec.consumeHandler.Consume(msg)
  264. }
  265. func (ec eventConsumer) OnEvent(event interface{}) {
  266. ec.eventHandler.OnEvent(event)
  267. }
  268. type eventlessHandler struct {
  269. handler ConsumeHandler
  270. }
  271. func (h eventlessHandler) Consume(msg string) error {
  272. return h.handler.Consume(msg)
  273. }
  274. func (h eventlessHandler) OnEvent(event interface{}) {
  275. }
  276. type dummyEventHandler int
  277. func (eh dummyEventHandler) OnEvent(event interface{}) {
  278. }