123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package rq
- import (
- "context"
- "errors"
- "fmt"
- "strings"
- "sync"
- "time"
- "zero/core/discov"
- "zero/core/errorx"
- "zero/core/jsonx"
- "zero/core/lang"
- "zero/core/logx"
- "zero/core/queue"
- "zero/core/redisqueue"
- "zero/core/stores/redis"
- "zero/core/threading"
- "zero/rq/constant"
- "zero/rq/update"
- )
- const (
- retryTimes = 3
- etcdRedisFields = 4
- )
- var ErrPusherTypeError = errors.New("not a QueuePusher instance")
- type (
- KeyFn func(string) (key, payload string, err error)
- KeysFn func(string) (ctx context.Context, keys []string, err error)
- AssembleFn func(context.Context, []string) (payload string, err error)
- PusherOption func(*Pusher) error
- // just push once or do it retryTimes, it's a choice.
- // because only when at least a server is alive, and
- // pushing to the server failed, we'll return with an error
- // if waken up, but the server is going down very quickly,
- // we're going to wait again. so it's safe to push once.
- pushStrategy interface {
- addListener(listener discov.Listener)
- push(string) error
- }
- batchConsistentStrategy struct {
- keysFn KeysFn
- assembleFn AssembleFn
- subClient *discov.BatchConsistentSubClient
- }
- consistentStrategy struct {
- keyFn KeyFn
- subClient *discov.ConsistentSubClient
- }
- roundRobinStrategy struct {
- subClient *discov.RoundRobinSubClient
- }
- serverListener struct {
- updater *update.IncrementalUpdater
- }
- Pusher struct {
- name string
- endpoints []string
- key string
- failovers sync.Map
- strategy pushStrategy
- serverSensitive bool
- }
- )
- func NewPusher(endpoints []string, key string, opts ...PusherOption) (*Pusher, error) {
- pusher := &Pusher{
- name: getName(key),
- endpoints: endpoints,
- key: key,
- }
- if len(opts) == 0 {
- opts = []PusherOption{WithRoundRobinStrategy()}
- }
- for _, opt := range opts {
- if err := opt(pusher); err != nil {
- return nil, err
- }
- }
- if pusher.serverSensitive {
- listener := new(serverListener)
- listener.updater = update.NewIncrementalUpdater(listener.update)
- pusher.strategy.addListener(listener)
- }
- return pusher, nil
- }
- func (pusher *Pusher) Name() string {
- return pusher.name
- }
- func (pusher *Pusher) Push(message string) error {
- return pusher.strategy.push(message)
- }
- func (pusher *Pusher) close(server string, conn interface{}) error {
- logx.Errorf("dropped redis node: %s", server)
- return pusher.failover(server)
- }
- func (pusher *Pusher) dial(server string) (interface{}, error) {
- pusher.failovers.Delete(server)
- p, err := newPusher(server)
- if err != nil {
- return nil, err
- }
- logx.Infof("new redis node: %s", server)
- return p, nil
- }
- func (pusher *Pusher) failover(server string) error {
- pusher.failovers.Store(server, lang.Placeholder)
- rds, key, option, err := newRedisWithKey(server)
- if err != nil {
- return err
- }
- threading.GoSafe(func() {
- defer pusher.failovers.Delete(server)
- for {
- _, ok := pusher.failovers.Load(server)
- if !ok {
- logx.Infof("redis queue (%s) revived", server)
- return
- }
- message, err := rds.Lpop(key)
- if err != nil {
- logx.Error(err)
- return
- }
- if len(message) == 0 {
- logx.Infof("repush redis queue (%s) done", server)
- return
- }
- if option == constant.TimedQueueType {
- message, err = unwrapTimedMessage(message)
- if err != nil {
- logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error())
- return
- }
- }
- if err = pusher.strategy.push(message); err != nil {
- logx.Error(err)
- return
- }
- }
- })
- return nil
- }
- func UnmarshalPusher(server string) (queue.QueuePusher, error) {
- store, key, option, err := newRedisWithKey(server)
- if err != nil {
- return nil, err
- }
- if option == constant.TimedQueueType {
- return redisqueue.NewPusher(store, key, redisqueue.WithTime()), nil
- }
- return redisqueue.NewPusher(store, key), nil
- }
- func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption {
- return func(pusher *Pusher) error {
- subClient, err := discov.NewBatchConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial,
- pusher.close, opts...)
- if err != nil {
- return err
- }
- pusher.strategy = batchConsistentStrategy{
- keysFn: keysFn,
- assembleFn: assembleFn,
- subClient: subClient,
- }
- return nil
- }
- }
- func WithConsistentStrategy(keyFn KeyFn, opts ...discov.BalanceOption) PusherOption {
- return func(pusher *Pusher) error {
- subClient, err := discov.NewConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close, opts...)
- if err != nil {
- return err
- }
- pusher.strategy = consistentStrategy{
- keyFn: keyFn,
- subClient: subClient,
- }
- return nil
- }
- }
- func WithRoundRobinStrategy() PusherOption {
- return func(pusher *Pusher) error {
- subClient, err := discov.NewRoundRobinSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close)
- if err != nil {
- return err
- }
- pusher.strategy = roundRobinStrategy{
- subClient: subClient,
- }
- return nil
- }
- }
- func WithServerSensitive() PusherOption {
- return func(pusher *Pusher) error {
- pusher.serverSensitive = true
- return nil
- }
- }
- func (bcs batchConsistentStrategy) addListener(listener discov.Listener) {
- bcs.subClient.AddListener(listener)
- }
- func (bcs batchConsistentStrategy) balance(keys []string) map[interface{}][]string {
- // we need to make sure the servers are available, otherwise wait forever
- for {
- if mapping, ok := bcs.subClient.Next(keys); ok {
- return mapping
- } else {
- bcs.subClient.WaitForServers()
- // make sure we don't flood logs too much in extreme conditions
- time.Sleep(time.Second)
- }
- }
- }
- func (bcs batchConsistentStrategy) push(message string) error {
- ctx, keys, err := bcs.keysFn(message)
- if err != nil {
- return err
- }
- var batchError errorx.BatchError
- mapping := bcs.balance(keys)
- for conn, connKeys := range mapping {
- payload, err := bcs.assembleFn(ctx, connKeys)
- if err != nil {
- batchError.Add(err)
- continue
- }
- for i := 0; i < retryTimes; i++ {
- if err = bcs.pushOnce(conn, payload); err != nil {
- batchError.Add(err)
- } else {
- break
- }
- }
- }
- return batchError.Err()
- }
- func (bcs batchConsistentStrategy) pushOnce(server interface{}, payload string) error {
- pusher, ok := server.(queue.QueuePusher)
- if ok {
- return pusher.Push(payload)
- } else {
- return ErrPusherTypeError
- }
- }
- func (cs consistentStrategy) addListener(listener discov.Listener) {
- cs.subClient.AddListener(listener)
- }
- func (cs consistentStrategy) push(message string) error {
- var batchError errorx.BatchError
- key, payload, err := cs.keyFn(message)
- if err != nil {
- return err
- }
- for i := 0; i < retryTimes; i++ {
- if err = cs.pushOnce(key, payload); err != nil {
- batchError.Add(err)
- } else {
- return nil
- }
- }
- return batchError.Err()
- }
- func (cs consistentStrategy) pushOnce(key, payload string) error {
- // we need to make sure the servers are available, otherwise wait forever
- for {
- if server, ok := cs.subClient.Next(key); ok {
- pusher, ok := server.(queue.QueuePusher)
- if ok {
- return pusher.Push(payload)
- } else {
- return ErrPusherTypeError
- }
- } else {
- cs.subClient.WaitForServers()
- // make sure we don't flood logs too much in extreme conditions
- time.Sleep(time.Second)
- }
- }
- }
- func (rrs roundRobinStrategy) addListener(listener discov.Listener) {
- rrs.subClient.AddListener(listener)
- }
- func (rrs roundRobinStrategy) push(message string) error {
- var batchError errorx.BatchError
- for i := 0; i < retryTimes; i++ {
- if err := rrs.pushOnce(message); err != nil {
- batchError.Add(err)
- } else {
- return nil
- }
- }
- return batchError.Err()
- }
- func (rrs roundRobinStrategy) pushOnce(message string) error {
- if server, ok := rrs.subClient.Next(); ok {
- pusher, ok := server.(queue.QueuePusher)
- if ok {
- return pusher.Push(message)
- } else {
- return ErrPusherTypeError
- }
- } else {
- rrs.subClient.WaitForServers()
- return rrs.pushOnce(message)
- }
- }
- func getName(key string) string {
- return fmt.Sprintf("etcd:%s", key)
- }
- func newPusher(server string) (queue.QueuePusher, error) {
- if rds, key, option, err := newRedisWithKey(server); err != nil {
- return nil, err
- } else if option == constant.TimedQueueType {
- return redisqueue.NewPusher(rds, key, redisqueue.WithTime()), nil
- } else {
- return redisqueue.NewPusher(rds, key), nil
- }
- }
- func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) {
- fields := strings.Split(server, constant.Delimeter)
- if len(fields) < etcdRedisFields {
- err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server)
- return
- }
- addr := fields[0]
- tp := fields[1]
- pass := fields[2]
- key = fields[3]
- if len(fields) > etcdRedisFields {
- option = fields[4]
- }
- rds = redis.NewRedis(addr, tp, pass)
- return
- }
- func (sl *serverListener) OnUpdate(keys []string, servers []string, newKey string) {
- sl.updater.Update(keys, servers, newKey)
- }
- func (sl *serverListener) OnReload() {
- sl.updater.Update(nil, nil, "")
- }
- func (sl *serverListener) update(change update.ServerChange) {
- content, err := change.Marshal()
- if err != nil {
- logx.Error(err)
- }
- if err = broadcast(change.Servers, content); err != nil {
- logx.Error(err)
- }
- }
- func broadcast(servers []string, message string) error {
- var be errorx.BatchError
- for _, server := range servers {
- q, err := UnmarshalPusher(server)
- if err != nil {
- be.Add(err)
- } else {
- q.Push(message)
- }
- }
- return be.Err()
- }
- func unwrapTimedMessage(message string) (string, error) {
- var tm redisqueue.TimedMessage
- if err := jsonx.UnmarshalFromString(message, &tm); err != nil {
- return "", err
- }
- return tm.Payload, nil
- }
|