subscriber.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/discov/internal"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. type (
  9. // SubOption defines the method to customize a Subscriber.
  10. SubOption func(sub *Subscriber)
  11. // A Subscriber is used to subscribe the given key on a etcd cluster.
  12. Subscriber struct {
  13. endpoints []string
  14. exclusive bool
  15. items *container
  16. }
  17. )
  18. // NewSubscriber returns a Subscriber.
  19. // endpoints is the hosts of the etcd cluster.
  20. // key is the key to subscribe.
  21. // opts are used to customize the Subscriber.
  22. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  23. sub := &Subscriber{
  24. endpoints: endpoints,
  25. }
  26. for _, opt := range opts {
  27. opt(sub)
  28. }
  29. sub.items = newContainer(sub.exclusive)
  30. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  31. return nil, err
  32. }
  33. return sub, nil
  34. }
  35. // AddListener adds listener to s.
  36. func (s *Subscriber) AddListener(listener func()) {
  37. s.items.addListener(listener)
  38. }
  39. // Values returns all the subscription values.
  40. func (s *Subscriber) Values() []string {
  41. return s.items.getValues()
  42. }
  43. // Exclusive means that key value can only be 1:1,
  44. // which means later added value will remove the keys associated with the same value previously.
  45. func Exclusive() SubOption {
  46. return func(sub *Subscriber) {
  47. sub.exclusive = true
  48. }
  49. }
  50. // WithSubEtcdAccount customizes the Subscriber with given etcd username/password.
  51. func WithSubEtcdAccount(user, pass string) SubOption {
  52. return func(sub *Subscriber) {
  53. internal.AddAccount(sub.endpoints, user, pass)
  54. }
  55. }
  56. type container struct {
  57. exclusive bool
  58. values map[string][]string
  59. mapping map[string]string
  60. snapshot atomic.Value
  61. dirty *syncx.AtomicBool
  62. listeners []func()
  63. lock sync.Mutex
  64. }
  65. func newContainer(exclusive bool) *container {
  66. return &container{
  67. exclusive: exclusive,
  68. values: make(map[string][]string),
  69. mapping: make(map[string]string),
  70. dirty: syncx.ForAtomicBool(true),
  71. }
  72. }
  73. func (c *container) OnAdd(kv internal.KV) {
  74. c.addKv(kv.Key, kv.Val)
  75. c.notifyChange()
  76. }
  77. func (c *container) OnDelete(kv internal.KV) {
  78. c.removeKey(kv.Key)
  79. c.notifyChange()
  80. }
  81. // addKv adds the kv, returns if there are already other keys associate with the value
  82. func (c *container) addKv(key, value string) ([]string, bool) {
  83. c.lock.Lock()
  84. defer c.lock.Unlock()
  85. c.dirty.Set(true)
  86. keys := c.values[value]
  87. previous := append([]string(nil), keys...)
  88. early := len(keys) > 0
  89. if c.exclusive && early {
  90. for _, each := range keys {
  91. c.doRemoveKey(each)
  92. }
  93. }
  94. c.values[value] = append(c.values[value], key)
  95. c.mapping[key] = value
  96. if early {
  97. return previous, true
  98. }
  99. return nil, false
  100. }
  101. func (c *container) addListener(listener func()) {
  102. c.lock.Lock()
  103. c.listeners = append(c.listeners, listener)
  104. c.lock.Unlock()
  105. }
  106. func (c *container) doRemoveKey(key string) {
  107. server, ok := c.mapping[key]
  108. if !ok {
  109. return
  110. }
  111. delete(c.mapping, key)
  112. keys := c.values[server]
  113. remain := keys[:0]
  114. for _, k := range keys {
  115. if k != key {
  116. remain = append(remain, k)
  117. }
  118. }
  119. if len(remain) > 0 {
  120. c.values[server] = remain
  121. } else {
  122. delete(c.values, server)
  123. }
  124. }
  125. func (c *container) getValues() []string {
  126. if !c.dirty.True() {
  127. return c.snapshot.Load().([]string)
  128. }
  129. c.lock.Lock()
  130. defer c.lock.Unlock()
  131. var vals []string
  132. for each := range c.values {
  133. vals = append(vals, each)
  134. }
  135. c.snapshot.Store(vals)
  136. c.dirty.Set(false)
  137. return vals
  138. }
  139. func (c *container) notifyChange() {
  140. c.lock.Lock()
  141. listeners := append(([]func())(nil), c.listeners...)
  142. c.lock.Unlock()
  143. for _, listener := range listeners {
  144. listener()
  145. }
  146. }
  147. // removeKey removes the kv, returns true if there are still other keys associate with the value
  148. func (c *container) removeKey(key string) {
  149. c.lock.Lock()
  150. defer c.lock.Unlock()
  151. c.dirty.Set(true)
  152. c.doRemoveKey(key)
  153. }