pusher.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package rq
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "zero/core/discov"
  10. "zero/core/errorx"
  11. "zero/core/jsonx"
  12. "zero/core/lang"
  13. "zero/core/logx"
  14. "zero/core/queue"
  15. "zero/core/redisqueue"
  16. "zero/core/stores/redis"
  17. "zero/core/threading"
  18. "zero/rq/constant"
  19. "zero/rq/update"
  20. )
  21. const (
  22. retryTimes = 3
  23. etcdRedisFields = 4
  24. )
  25. var ErrPusherTypeError = errors.New("not a QueuePusher instance")
  26. type (
  27. KeyFn func(string) (key, payload string, err error)
  28. KeysFn func(string) (ctx context.Context, keys []string, err error)
  29. AssembleFn func(context.Context, []string) (payload string, err error)
  30. PusherOption func(*Pusher) error
  31. // just push once or do it retryTimes, it's a choice.
  32. // because only when at least a server is alive, and
  33. // pushing to the server failed, we'll return with an error
  34. // if waken up, but the server is going down very quickly,
  35. // we're going to wait again. so it's safe to push once.
  36. pushStrategy interface {
  37. addListener(listener discov.Listener)
  38. push(string) error
  39. }
  40. batchConsistentStrategy struct {
  41. keysFn KeysFn
  42. assembleFn AssembleFn
  43. subClient *discov.BatchConsistentSubClient
  44. }
  45. consistentStrategy struct {
  46. keyFn KeyFn
  47. subClient *discov.ConsistentSubClient
  48. }
  49. roundRobinStrategy struct {
  50. subClient *discov.RoundRobinSubClient
  51. }
  52. serverListener struct {
  53. updater *update.IncrementalUpdater
  54. }
  55. Pusher struct {
  56. name string
  57. endpoints []string
  58. key string
  59. failovers sync.Map
  60. strategy pushStrategy
  61. serverSensitive bool
  62. }
  63. )
  64. func NewPusher(endpoints []string, key string, opts ...PusherOption) (*Pusher, error) {
  65. pusher := &Pusher{
  66. name: getName(key),
  67. endpoints: endpoints,
  68. key: key,
  69. }
  70. if len(opts) == 0 {
  71. opts = []PusherOption{WithRoundRobinStrategy()}
  72. }
  73. for _, opt := range opts {
  74. if err := opt(pusher); err != nil {
  75. return nil, err
  76. }
  77. }
  78. if pusher.serverSensitive {
  79. listener := new(serverListener)
  80. listener.updater = update.NewIncrementalUpdater(listener.update)
  81. pusher.strategy.addListener(listener)
  82. }
  83. return pusher, nil
  84. }
  85. func (pusher *Pusher) Name() string {
  86. return pusher.name
  87. }
  88. func (pusher *Pusher) Push(message string) error {
  89. return pusher.strategy.push(message)
  90. }
  91. func (pusher *Pusher) close(server string, conn interface{}) error {
  92. logx.Errorf("dropped redis node: %s", server)
  93. return pusher.failover(server)
  94. }
  95. func (pusher *Pusher) dial(server string) (interface{}, error) {
  96. pusher.failovers.Delete(server)
  97. p, err := newPusher(server)
  98. if err != nil {
  99. return nil, err
  100. }
  101. logx.Infof("new redis node: %s", server)
  102. return p, nil
  103. }
  104. func (pusher *Pusher) failover(server string) error {
  105. pusher.failovers.Store(server, lang.Placeholder)
  106. rds, key, option, err := newRedisWithKey(server)
  107. if err != nil {
  108. return err
  109. }
  110. threading.GoSafe(func() {
  111. defer pusher.failovers.Delete(server)
  112. for {
  113. _, ok := pusher.failovers.Load(server)
  114. if !ok {
  115. logx.Infof("redis queue (%s) revived", server)
  116. return
  117. }
  118. message, err := rds.Lpop(key)
  119. if err != nil {
  120. logx.Error(err)
  121. return
  122. }
  123. if len(message) == 0 {
  124. logx.Infof("repush redis queue (%s) done", server)
  125. return
  126. }
  127. if option == constant.TimedQueueType {
  128. message, err = unwrapTimedMessage(message)
  129. if err != nil {
  130. logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error())
  131. return
  132. }
  133. }
  134. if err = pusher.strategy.push(message); err != nil {
  135. logx.Error(err)
  136. return
  137. }
  138. }
  139. })
  140. return nil
  141. }
  142. func UnmarshalPusher(server string) (queue.QueuePusher, error) {
  143. store, key, option, err := newRedisWithKey(server)
  144. if err != nil {
  145. return nil, err
  146. }
  147. if option == constant.TimedQueueType {
  148. return redisqueue.NewPusher(store, key, redisqueue.WithTime()), nil
  149. }
  150. return redisqueue.NewPusher(store, key), nil
  151. }
  152. func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption {
  153. return func(pusher *Pusher) error {
  154. subClient, err := discov.NewBatchConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial,
  155. pusher.close, opts...)
  156. if err != nil {
  157. return err
  158. }
  159. pusher.strategy = batchConsistentStrategy{
  160. keysFn: keysFn,
  161. assembleFn: assembleFn,
  162. subClient: subClient,
  163. }
  164. return nil
  165. }
  166. }
  167. func WithConsistentStrategy(keyFn KeyFn, opts ...discov.BalanceOption) PusherOption {
  168. return func(pusher *Pusher) error {
  169. subClient, err := discov.NewConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close, opts...)
  170. if err != nil {
  171. return err
  172. }
  173. pusher.strategy = consistentStrategy{
  174. keyFn: keyFn,
  175. subClient: subClient,
  176. }
  177. return nil
  178. }
  179. }
  180. func WithRoundRobinStrategy() PusherOption {
  181. return func(pusher *Pusher) error {
  182. subClient, err := discov.NewRoundRobinSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close)
  183. if err != nil {
  184. return err
  185. }
  186. pusher.strategy = roundRobinStrategy{
  187. subClient: subClient,
  188. }
  189. return nil
  190. }
  191. }
  192. func WithServerSensitive() PusherOption {
  193. return func(pusher *Pusher) error {
  194. pusher.serverSensitive = true
  195. return nil
  196. }
  197. }
  198. func (bcs batchConsistentStrategy) addListener(listener discov.Listener) {
  199. bcs.subClient.AddListener(listener)
  200. }
  201. func (bcs batchConsistentStrategy) balance(keys []string) map[interface{}][]string {
  202. // we need to make sure the servers are available, otherwise wait forever
  203. for {
  204. if mapping, ok := bcs.subClient.Next(keys); ok {
  205. return mapping
  206. } else {
  207. bcs.subClient.WaitForServers()
  208. // make sure we don't flood logs too much in extreme conditions
  209. time.Sleep(time.Second)
  210. }
  211. }
  212. }
  213. func (bcs batchConsistentStrategy) push(message string) error {
  214. ctx, keys, err := bcs.keysFn(message)
  215. if err != nil {
  216. return err
  217. }
  218. var batchError errorx.BatchError
  219. mapping := bcs.balance(keys)
  220. for conn, connKeys := range mapping {
  221. payload, err := bcs.assembleFn(ctx, connKeys)
  222. if err != nil {
  223. batchError.Add(err)
  224. continue
  225. }
  226. for i := 0; i < retryTimes; i++ {
  227. if err = bcs.pushOnce(conn, payload); err != nil {
  228. batchError.Add(err)
  229. } else {
  230. break
  231. }
  232. }
  233. }
  234. return batchError.Err()
  235. }
  236. func (bcs batchConsistentStrategy) pushOnce(server interface{}, payload string) error {
  237. pusher, ok := server.(queue.QueuePusher)
  238. if ok {
  239. return pusher.Push(payload)
  240. } else {
  241. return ErrPusherTypeError
  242. }
  243. }
  244. func (cs consistentStrategy) addListener(listener discov.Listener) {
  245. cs.subClient.AddListener(listener)
  246. }
  247. func (cs consistentStrategy) push(message string) error {
  248. var batchError errorx.BatchError
  249. key, payload, err := cs.keyFn(message)
  250. if err != nil {
  251. return err
  252. }
  253. for i := 0; i < retryTimes; i++ {
  254. if err = cs.pushOnce(key, payload); err != nil {
  255. batchError.Add(err)
  256. } else {
  257. return nil
  258. }
  259. }
  260. return batchError.Err()
  261. }
  262. func (cs consistentStrategy) pushOnce(key, payload string) error {
  263. // we need to make sure the servers are available, otherwise wait forever
  264. for {
  265. if server, ok := cs.subClient.Next(key); ok {
  266. pusher, ok := server.(queue.QueuePusher)
  267. if ok {
  268. return pusher.Push(payload)
  269. } else {
  270. return ErrPusherTypeError
  271. }
  272. } else {
  273. cs.subClient.WaitForServers()
  274. // make sure we don't flood logs too much in extreme conditions
  275. time.Sleep(time.Second)
  276. }
  277. }
  278. }
  279. func (rrs roundRobinStrategy) addListener(listener discov.Listener) {
  280. rrs.subClient.AddListener(listener)
  281. }
  282. func (rrs roundRobinStrategy) push(message string) error {
  283. var batchError errorx.BatchError
  284. for i := 0; i < retryTimes; i++ {
  285. if err := rrs.pushOnce(message); err != nil {
  286. batchError.Add(err)
  287. } else {
  288. return nil
  289. }
  290. }
  291. return batchError.Err()
  292. }
  293. func (rrs roundRobinStrategy) pushOnce(message string) error {
  294. if server, ok := rrs.subClient.Next(); ok {
  295. pusher, ok := server.(queue.QueuePusher)
  296. if ok {
  297. return pusher.Push(message)
  298. } else {
  299. return ErrPusherTypeError
  300. }
  301. } else {
  302. rrs.subClient.WaitForServers()
  303. return rrs.pushOnce(message)
  304. }
  305. }
  306. func getName(key string) string {
  307. return fmt.Sprintf("etcd:%s", key)
  308. }
  309. func newPusher(server string) (queue.QueuePusher, error) {
  310. if rds, key, option, err := newRedisWithKey(server); err != nil {
  311. return nil, err
  312. } else if option == constant.TimedQueueType {
  313. return redisqueue.NewPusher(rds, key, redisqueue.WithTime()), nil
  314. } else {
  315. return redisqueue.NewPusher(rds, key), nil
  316. }
  317. }
  318. func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) {
  319. fields := strings.Split(server, constant.Delimeter)
  320. if len(fields) < etcdRedisFields {
  321. err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server)
  322. return
  323. }
  324. addr := fields[0]
  325. tp := fields[1]
  326. pass := fields[2]
  327. key = fields[3]
  328. if len(fields) > etcdRedisFields {
  329. option = fields[4]
  330. }
  331. rds = redis.NewRedis(addr, tp, pass)
  332. return
  333. }
  334. func (sl *serverListener) OnUpdate(keys []string, servers []string, newKey string) {
  335. sl.updater.Update(keys, servers, newKey)
  336. }
  337. func (sl *serverListener) OnReload() {
  338. sl.updater.Update(nil, nil, "")
  339. }
  340. func (sl *serverListener) update(change update.ServerChange) {
  341. content, err := change.Marshal()
  342. if err != nil {
  343. logx.Error(err)
  344. }
  345. if err = broadcast(change.Servers, content); err != nil {
  346. logx.Error(err)
  347. }
  348. }
  349. func broadcast(servers []string, message string) error {
  350. var be errorx.BatchError
  351. for _, server := range servers {
  352. q, err := UnmarshalPusher(server)
  353. if err != nil {
  354. be.Add(err)
  355. } else {
  356. q.Push(message)
  357. }
  358. }
  359. return be.Err()
  360. }
  361. func unwrapTimedMessage(message string) (string, error) {
  362. var tm redisqueue.TimedMessage
  363. if err := jsonx.UnmarshalFromString(message, &tm); err != nil {
  364. return "", err
  365. }
  366. return tm.Payload, nil
  367. }