pusher.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package rq
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "zero/core/discov"
  10. "zero/core/errorx"
  11. "zero/core/jsonx"
  12. "zero/core/lang"
  13. "zero/core/logx"
  14. "zero/core/queue"
  15. "zero/core/stores/redis"
  16. "zero/core/threading"
  17. "zero/rq/internal"
  18. "zero/rq/internal/update"
  19. )
  20. const (
  21. retryTimes = 3
  22. etcdRedisFields = 4
  23. )
  24. var ErrPusherTypeError = errors.New("not a QueuePusher instance")
  25. type (
  26. KeyFn func(string) (key, payload string, err error)
  27. KeysFn func(string) (ctx context.Context, keys []string, err error)
  28. AssembleFn func(context.Context, []string) (payload string, err error)
  29. PusherOption func(*Pusher) error
  30. // just push once or do it retryTimes, it's a choice.
  31. // because only when at least a server is alive, and
  32. // pushing to the server failed, we'll return with an error
  33. // if waken up, but the server is going down very quickly,
  34. // we're going to wait again. so it's safe to push once.
  35. pushStrategy interface {
  36. addListener(listener discov.Listener)
  37. push(string) error
  38. }
  39. batchConsistentStrategy struct {
  40. keysFn KeysFn
  41. assembleFn AssembleFn
  42. subClient *discov.BatchConsistentSubClient
  43. }
  44. consistentStrategy struct {
  45. keyFn KeyFn
  46. subClient *discov.ConsistentSubClient
  47. }
  48. roundRobinStrategy struct {
  49. subClient *discov.RoundRobinSubClient
  50. }
  51. serverListener struct {
  52. updater *update.IncrementalUpdater
  53. }
  54. Pusher struct {
  55. name string
  56. endpoints []string
  57. key string
  58. failovers sync.Map
  59. strategy pushStrategy
  60. serverSensitive bool
  61. }
  62. )
  63. func NewPusher(endpoints []string, key string, opts ...PusherOption) (*Pusher, error) {
  64. pusher := &Pusher{
  65. name: getName(key),
  66. endpoints: endpoints,
  67. key: key,
  68. }
  69. if len(opts) == 0 {
  70. opts = []PusherOption{WithRoundRobinStrategy()}
  71. }
  72. for _, opt := range opts {
  73. if err := opt(pusher); err != nil {
  74. return nil, err
  75. }
  76. }
  77. if pusher.serverSensitive {
  78. listener := new(serverListener)
  79. listener.updater = update.NewIncrementalUpdater(listener.update)
  80. pusher.strategy.addListener(listener)
  81. }
  82. return pusher, nil
  83. }
  84. func (pusher *Pusher) Name() string {
  85. return pusher.name
  86. }
  87. func (pusher *Pusher) Push(message string) error {
  88. return pusher.strategy.push(message)
  89. }
  90. func (pusher *Pusher) close(server string, conn interface{}) error {
  91. logx.Errorf("dropped redis node: %s", server)
  92. return pusher.failover(server)
  93. }
  94. func (pusher *Pusher) dial(server string) (interface{}, error) {
  95. pusher.failovers.Delete(server)
  96. p, err := newPusher(server)
  97. if err != nil {
  98. return nil, err
  99. }
  100. logx.Infof("new redis node: %s", server)
  101. return p, nil
  102. }
  103. func (pusher *Pusher) failover(server string) error {
  104. pusher.failovers.Store(server, lang.Placeholder)
  105. rds, key, option, err := newRedisWithKey(server)
  106. if err != nil {
  107. return err
  108. }
  109. threading.GoSafe(func() {
  110. defer pusher.failovers.Delete(server)
  111. for {
  112. _, ok := pusher.failovers.Load(server)
  113. if !ok {
  114. logx.Infof("redis queue (%s) revived", server)
  115. return
  116. }
  117. message, err := rds.Lpop(key)
  118. if err != nil {
  119. logx.Error(err)
  120. return
  121. }
  122. if len(message) == 0 {
  123. logx.Infof("repush redis queue (%s) done", server)
  124. return
  125. }
  126. if option == internal.TimedQueueType {
  127. message, err = unwrapTimedMessage(message)
  128. if err != nil {
  129. logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error())
  130. return
  131. }
  132. }
  133. if err = pusher.strategy.push(message); err != nil {
  134. logx.Error(err)
  135. return
  136. }
  137. }
  138. })
  139. return nil
  140. }
  141. func UnmarshalPusher(server string) (queue.QueuePusher, error) {
  142. store, key, option, err := newRedisWithKey(server)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if option == internal.TimedQueueType {
  147. return internal.NewPusher(store, key, internal.WithTime()), nil
  148. }
  149. return internal.NewPusher(store, key), nil
  150. }
  151. func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption {
  152. return func(pusher *Pusher) error {
  153. subClient, err := discov.NewBatchConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial,
  154. pusher.close, opts...)
  155. if err != nil {
  156. return err
  157. }
  158. pusher.strategy = batchConsistentStrategy{
  159. keysFn: keysFn,
  160. assembleFn: assembleFn,
  161. subClient: subClient,
  162. }
  163. return nil
  164. }
  165. }
  166. func WithConsistentStrategy(keyFn KeyFn, opts ...discov.BalanceOption) PusherOption {
  167. return func(pusher *Pusher) error {
  168. subClient, err := discov.NewConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close, opts...)
  169. if err != nil {
  170. return err
  171. }
  172. pusher.strategy = consistentStrategy{
  173. keyFn: keyFn,
  174. subClient: subClient,
  175. }
  176. return nil
  177. }
  178. }
  179. func WithRoundRobinStrategy() PusherOption {
  180. return func(pusher *Pusher) error {
  181. subClient, err := discov.NewRoundRobinSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close)
  182. if err != nil {
  183. return err
  184. }
  185. pusher.strategy = roundRobinStrategy{
  186. subClient: subClient,
  187. }
  188. return nil
  189. }
  190. }
  191. func WithServerSensitive() PusherOption {
  192. return func(pusher *Pusher) error {
  193. pusher.serverSensitive = true
  194. return nil
  195. }
  196. }
  197. func (bcs batchConsistentStrategy) addListener(listener discov.Listener) {
  198. bcs.subClient.AddListener(listener)
  199. }
  200. func (bcs batchConsistentStrategy) balance(keys []string) map[interface{}][]string {
  201. // we need to make sure the servers are available, otherwise wait forever
  202. for {
  203. if mapping, ok := bcs.subClient.Next(keys); ok {
  204. return mapping
  205. } else {
  206. bcs.subClient.WaitForServers()
  207. // make sure we don't flood logs too much in extreme conditions
  208. time.Sleep(time.Second)
  209. }
  210. }
  211. }
  212. func (bcs batchConsistentStrategy) push(message string) error {
  213. ctx, keys, err := bcs.keysFn(message)
  214. if err != nil {
  215. return err
  216. }
  217. var batchError errorx.BatchError
  218. mapping := bcs.balance(keys)
  219. for conn, connKeys := range mapping {
  220. payload, err := bcs.assembleFn(ctx, connKeys)
  221. if err != nil {
  222. batchError.Add(err)
  223. continue
  224. }
  225. for i := 0; i < retryTimes; i++ {
  226. if err = bcs.pushOnce(conn, payload); err != nil {
  227. batchError.Add(err)
  228. } else {
  229. break
  230. }
  231. }
  232. }
  233. return batchError.Err()
  234. }
  235. func (bcs batchConsistentStrategy) pushOnce(server interface{}, payload string) error {
  236. pusher, ok := server.(queue.QueuePusher)
  237. if ok {
  238. return pusher.Push(payload)
  239. } else {
  240. return ErrPusherTypeError
  241. }
  242. }
  243. func (cs consistentStrategy) addListener(listener discov.Listener) {
  244. cs.subClient.AddListener(listener)
  245. }
  246. func (cs consistentStrategy) push(message string) error {
  247. var batchError errorx.BatchError
  248. key, payload, err := cs.keyFn(message)
  249. if err != nil {
  250. return err
  251. }
  252. for i := 0; i < retryTimes; i++ {
  253. if err = cs.pushOnce(key, payload); err != nil {
  254. batchError.Add(err)
  255. } else {
  256. return nil
  257. }
  258. }
  259. return batchError.Err()
  260. }
  261. func (cs consistentStrategy) pushOnce(key, payload string) error {
  262. // we need to make sure the servers are available, otherwise wait forever
  263. for {
  264. if server, ok := cs.subClient.Next(key); ok {
  265. pusher, ok := server.(queue.QueuePusher)
  266. if ok {
  267. return pusher.Push(payload)
  268. } else {
  269. return ErrPusherTypeError
  270. }
  271. } else {
  272. cs.subClient.WaitForServers()
  273. // make sure we don't flood logs too much in extreme conditions
  274. time.Sleep(time.Second)
  275. }
  276. }
  277. }
  278. func (rrs roundRobinStrategy) addListener(listener discov.Listener) {
  279. rrs.subClient.AddListener(listener)
  280. }
  281. func (rrs roundRobinStrategy) push(message string) error {
  282. var batchError errorx.BatchError
  283. for i := 0; i < retryTimes; i++ {
  284. if err := rrs.pushOnce(message); err != nil {
  285. batchError.Add(err)
  286. } else {
  287. return nil
  288. }
  289. }
  290. return batchError.Err()
  291. }
  292. func (rrs roundRobinStrategy) pushOnce(message string) error {
  293. if server, ok := rrs.subClient.Next(); ok {
  294. pusher, ok := server.(queue.QueuePusher)
  295. if ok {
  296. return pusher.Push(message)
  297. } else {
  298. return ErrPusherTypeError
  299. }
  300. } else {
  301. rrs.subClient.WaitForServers()
  302. return rrs.pushOnce(message)
  303. }
  304. }
  305. func getName(key string) string {
  306. return fmt.Sprintf("etcd:%s", key)
  307. }
  308. func newPusher(server string) (queue.QueuePusher, error) {
  309. if rds, key, option, err := newRedisWithKey(server); err != nil {
  310. return nil, err
  311. } else if option == internal.TimedQueueType {
  312. return internal.NewPusher(rds, key, internal.WithTime()), nil
  313. } else {
  314. return internal.NewPusher(rds, key), nil
  315. }
  316. }
  317. func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) {
  318. fields := strings.Split(server, internal.Delimeter)
  319. if len(fields) < etcdRedisFields {
  320. err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server)
  321. return
  322. }
  323. addr := fields[0]
  324. tp := fields[1]
  325. pass := fields[2]
  326. key = fields[3]
  327. if len(fields) > etcdRedisFields {
  328. option = fields[4]
  329. }
  330. rds = redis.NewRedis(addr, tp, pass)
  331. return
  332. }
  333. func (sl *serverListener) OnUpdate(keys []string, servers []string, newKey string) {
  334. sl.updater.Update(keys, servers, newKey)
  335. }
  336. func (sl *serverListener) OnReload() {
  337. sl.updater.Update(nil, nil, "")
  338. }
  339. func (sl *serverListener) update(change update.ServerChange) {
  340. content, err := change.Marshal()
  341. if err != nil {
  342. logx.Error(err)
  343. }
  344. if err = broadcast(change.Servers, content); err != nil {
  345. logx.Error(err)
  346. }
  347. }
  348. func broadcast(servers []string, message string) error {
  349. var be errorx.BatchError
  350. for _, server := range servers {
  351. q, err := UnmarshalPusher(server)
  352. if err != nil {
  353. be.Add(err)
  354. } else {
  355. q.Push(message)
  356. }
  357. }
  358. return be.Err()
  359. }
  360. func unwrapTimedMessage(message string) (string, error) {
  361. var tm internal.TimedMessage
  362. if err := jsonx.UnmarshalFromString(message, &tm); err != nil {
  363. return "", err
  364. }
  365. return tm.Payload, nil
  366. }