subclient.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package discov
  2. import (
  3. "sync"
  4. "zero/core/discov/internal"
  5. "zero/core/logx"
  6. )
  7. const (
  8. _ = iota // keyBasedBalance, default
  9. idBasedBalance
  10. )
  11. type (
  12. Listener internal.Listener
  13. subClient struct {
  14. balancer internal.Balancer
  15. lock sync.Mutex
  16. cond *sync.Cond
  17. listeners []internal.Listener
  18. }
  19. balanceOptions struct {
  20. balanceType int
  21. }
  22. BalanceOption func(*balanceOptions)
  23. RoundRobinSubClient struct {
  24. *subClient
  25. }
  26. ConsistentSubClient struct {
  27. *subClient
  28. }
  29. BatchConsistentSubClient struct {
  30. *ConsistentSubClient
  31. }
  32. )
  33. func NewRoundRobinSubClient(endpoints []string, key string, dialFn internal.DialFn, closeFn internal.CloseFn,
  34. opts ...SubOption) (*RoundRobinSubClient, error) {
  35. var subOpts subOptions
  36. for _, opt := range opts {
  37. opt(&subOpts)
  38. }
  39. cli, err := newSubClient(endpoints, key, internal.NewRoundRobinBalancer(dialFn, closeFn, subOpts.exclusive))
  40. if err != nil {
  41. return nil, err
  42. }
  43. return &RoundRobinSubClient{
  44. subClient: cli,
  45. }, nil
  46. }
  47. func NewConsistentSubClient(endpoints []string, key string, dialFn internal.DialFn,
  48. closeFn internal.CloseFn, opts ...BalanceOption) (*ConsistentSubClient, error) {
  49. var balanceOpts balanceOptions
  50. for _, opt := range opts {
  51. opt(&balanceOpts)
  52. }
  53. var keyer func(internal.KV) string
  54. switch balanceOpts.balanceType {
  55. case idBasedBalance:
  56. keyer = func(kv internal.KV) string {
  57. if id, ok := extractId(kv.Key); ok {
  58. return id
  59. } else {
  60. return kv.Key
  61. }
  62. }
  63. default:
  64. keyer = func(kv internal.KV) string {
  65. return kv.Val
  66. }
  67. }
  68. cli, err := newSubClient(endpoints, key, internal.NewConsistentBalancer(dialFn, closeFn, keyer))
  69. if err != nil {
  70. return nil, err
  71. }
  72. return &ConsistentSubClient{
  73. subClient: cli,
  74. }, nil
  75. }
  76. func NewBatchConsistentSubClient(endpoints []string, key string, dialFn internal.DialFn, closeFn internal.CloseFn,
  77. opts ...BalanceOption) (*BatchConsistentSubClient, error) {
  78. cli, err := NewConsistentSubClient(endpoints, key, dialFn, closeFn, opts...)
  79. if err != nil {
  80. return nil, err
  81. }
  82. return &BatchConsistentSubClient{
  83. ConsistentSubClient: cli,
  84. }, nil
  85. }
  86. func newSubClient(endpoints []string, key string, balancer internal.Balancer) (*subClient, error) {
  87. client := &subClient{
  88. balancer: balancer,
  89. }
  90. client.cond = sync.NewCond(&client.lock)
  91. if err := internal.GetRegistry().Monitor(endpoints, key, client); err != nil {
  92. return nil, err
  93. }
  94. return client, nil
  95. }
  96. func (c *subClient) AddListener(listener internal.Listener) {
  97. c.lock.Lock()
  98. c.listeners = append(c.listeners, listener)
  99. c.lock.Unlock()
  100. }
  101. func (c *subClient) OnAdd(kv internal.KV) {
  102. c.lock.Lock()
  103. defer c.lock.Unlock()
  104. if err := c.balancer.AddConn(kv); err != nil {
  105. logx.Error(err)
  106. } else {
  107. c.cond.Broadcast()
  108. }
  109. }
  110. func (c *subClient) OnDelete(kv internal.KV) {
  111. c.balancer.RemoveKey(kv.Key)
  112. }
  113. func (c *subClient) WaitForServers() {
  114. logx.Error("Waiting for alive servers")
  115. c.lock.Lock()
  116. defer c.lock.Unlock()
  117. if c.balancer.IsEmpty() {
  118. c.cond.Wait()
  119. }
  120. }
  121. func (c *subClient) onAdd(keys []string, servers []string, newKey string) {
  122. // guarded by locked outside
  123. for _, listener := range c.listeners {
  124. listener.OnUpdate(keys, servers, newKey)
  125. }
  126. }
  127. func (c *RoundRobinSubClient) Next() (interface{}, bool) {
  128. return c.balancer.Next()
  129. }
  130. func (c *ConsistentSubClient) Next(key string) (interface{}, bool) {
  131. return c.balancer.Next(key)
  132. }
  133. func (bc *BatchConsistentSubClient) Next(keys []string) (map[interface{}][]string, bool) {
  134. if len(keys) == 0 {
  135. return nil, false
  136. }
  137. result := make(map[interface{}][]string)
  138. for _, key := range keys {
  139. dest, ok := bc.ConsistentSubClient.Next(key)
  140. if !ok {
  141. return nil, false
  142. }
  143. result[dest] = append(result[dest], key)
  144. }
  145. return result, true
  146. }
  147. func BalanceWithId() BalanceOption {
  148. return func(opts *balanceOptions) {
  149. opts.balanceType = idBasedBalance
  150. }
  151. }