subscriber.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/wuntsong-org/go-zero-plus/core/discov/internal"
  6. "github.com/wuntsong-org/go-zero-plus/core/logx"
  7. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  8. )
  9. type (
  10. // SubOption defines the method to customize a Subscriber.
  11. SubOption func(sub *Subscriber)
  12. // A Subscriber is used to subscribe the given key on an etcd cluster.
  13. Subscriber struct {
  14. endpoints []string
  15. exclusive bool
  16. items *container
  17. }
  18. )
  19. // NewSubscriber returns a Subscriber.
  20. // endpoints is the hosts of the etcd cluster.
  21. // key is the key to subscribe.
  22. // opts are used to customize the Subscriber.
  23. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  24. sub := &Subscriber{
  25. endpoints: endpoints,
  26. }
  27. for _, opt := range opts {
  28. opt(sub)
  29. }
  30. sub.items = newContainer(sub.exclusive)
  31. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  32. return nil, err
  33. }
  34. return sub, nil
  35. }
  36. // AddListener adds listener to s.
  37. func (s *Subscriber) AddListener(listener func()) {
  38. s.items.addListener(listener)
  39. }
  40. // Values returns all the subscription values.
  41. func (s *Subscriber) Values() []string {
  42. return s.items.getValues()
  43. }
  44. // Exclusive means that key value can only be 1:1,
  45. // which means later added value will remove the keys associated with the same value previously.
  46. func Exclusive() SubOption {
  47. return func(sub *Subscriber) {
  48. sub.exclusive = true
  49. }
  50. }
  51. // WithSubEtcdAccount provides the etcd username/password.
  52. func WithSubEtcdAccount(user, pass string) SubOption {
  53. return func(sub *Subscriber) {
  54. RegisterAccount(sub.endpoints, user, pass)
  55. }
  56. }
  57. // WithSubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
  58. func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) SubOption {
  59. return func(sub *Subscriber) {
  60. logx.Must(RegisterTLS(sub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
  61. }
  62. }
  63. type container struct {
  64. exclusive bool
  65. values map[string][]string
  66. mapping map[string]string
  67. snapshot atomic.Value
  68. dirty *syncx.AtomicBool
  69. listeners []func()
  70. lock sync.Mutex
  71. }
  72. func newContainer(exclusive bool) *container {
  73. return &container{
  74. exclusive: exclusive,
  75. values: make(map[string][]string),
  76. mapping: make(map[string]string),
  77. dirty: syncx.ForAtomicBool(true),
  78. }
  79. }
  80. func (c *container) OnAdd(kv internal.KV) {
  81. c.addKv(kv.Key, kv.Val)
  82. c.notifyChange()
  83. }
  84. func (c *container) OnDelete(kv internal.KV) {
  85. c.removeKey(kv.Key)
  86. c.notifyChange()
  87. }
  88. // addKv adds the kv, returns if there are already other keys associate with the value
  89. func (c *container) addKv(key, value string) ([]string, bool) {
  90. c.lock.Lock()
  91. defer c.lock.Unlock()
  92. c.dirty.Set(true)
  93. keys := c.values[value]
  94. previous := append([]string(nil), keys...)
  95. early := len(keys) > 0
  96. if c.exclusive && early {
  97. for _, each := range keys {
  98. c.doRemoveKey(each)
  99. }
  100. }
  101. c.values[value] = append(c.values[value], key)
  102. c.mapping[key] = value
  103. if early {
  104. return previous, true
  105. }
  106. return nil, false
  107. }
  108. func (c *container) addListener(listener func()) {
  109. c.lock.Lock()
  110. c.listeners = append(c.listeners, listener)
  111. c.lock.Unlock()
  112. }
  113. func (c *container) doRemoveKey(key string) {
  114. server, ok := c.mapping[key]
  115. if !ok {
  116. return
  117. }
  118. delete(c.mapping, key)
  119. keys := c.values[server]
  120. remain := keys[:0]
  121. for _, k := range keys {
  122. if k != key {
  123. remain = append(remain, k)
  124. }
  125. }
  126. if len(remain) > 0 {
  127. c.values[server] = remain
  128. } else {
  129. delete(c.values, server)
  130. }
  131. }
  132. func (c *container) getValues() []string {
  133. if !c.dirty.True() {
  134. return c.snapshot.Load().([]string)
  135. }
  136. c.lock.Lock()
  137. defer c.lock.Unlock()
  138. var vals []string
  139. for each := range c.values {
  140. vals = append(vals, each)
  141. }
  142. c.snapshot.Store(vals)
  143. c.dirty.Set(false)
  144. return vals
  145. }
  146. func (c *container) notifyChange() {
  147. c.lock.Lock()
  148. listeners := append(([]func())(nil), c.listeners...)
  149. c.lock.Unlock()
  150. for _, listener := range listeners {
  151. listener()
  152. }
  153. }
  154. // removeKey removes the kv, returns true if there are still other keys associate with the value
  155. func (c *container) removeKey(key string) {
  156. c.lock.Lock()
  157. defer c.lock.Unlock()
  158. c.dirty.Set(true)
  159. c.doRemoveKey(key)
  160. }