subscriber.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/discov/internal"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. type (
  9. // SubOption defines the method to customize a Subscriber.
  10. SubOption func(sub *Subscriber)
  11. // A Subscriber is used to subscribe the given key on a etcd cluster.
  12. Subscriber struct {
  13. endpoints []string
  14. exclusive bool
  15. items *container
  16. }
  17. )
  18. // NewSubscriber returns a Subscriber.
  19. // endpoints is the hosts of the etcd cluster.
  20. // key is the key to subscribe.
  21. // opts are used to customize the Subscriber.
  22. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  23. sub := &Subscriber{
  24. endpoints: endpoints,
  25. }
  26. for _, opt := range opts {
  27. opt(sub)
  28. }
  29. sub.items = newContainer(sub.exclusive)
  30. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  31. return nil, err
  32. }
  33. return sub, nil
  34. }
  35. // AddListener adds listener to s.
  36. func (s *Subscriber) AddListener(listener func()) {
  37. s.items.addListener(listener)
  38. }
  39. // Values returns all the subscription values.
  40. func (s *Subscriber) Values() []string {
  41. return s.items.getValues()
  42. }
  43. // Exclusive means that key value can only be 1:1,
  44. // which means later added value will remove the keys associated with the same value previously.
  45. func Exclusive() SubOption {
  46. return func(sub *Subscriber) {
  47. sub.exclusive = true
  48. }
  49. }
  50. func WithSubEtcdAccount(user, pass string) SubOption {
  51. return func(sub *Subscriber) {
  52. internal.AddAccount(sub.endpoints, user, pass)
  53. }
  54. }
  55. type container struct {
  56. exclusive bool
  57. values map[string][]string
  58. mapping map[string]string
  59. snapshot atomic.Value
  60. dirty *syncx.AtomicBool
  61. listeners []func()
  62. lock sync.Mutex
  63. }
  64. func newContainer(exclusive bool) *container {
  65. return &container{
  66. exclusive: exclusive,
  67. values: make(map[string][]string),
  68. mapping: make(map[string]string),
  69. dirty: syncx.ForAtomicBool(true),
  70. }
  71. }
  72. func (c *container) OnAdd(kv internal.KV) {
  73. c.addKv(kv.Key, kv.Val)
  74. c.notifyChange()
  75. }
  76. func (c *container) OnDelete(kv internal.KV) {
  77. c.removeKey(kv.Key)
  78. c.notifyChange()
  79. }
  80. // addKv adds the kv, returns if there are already other keys associate with the value
  81. func (c *container) addKv(key, value string) ([]string, bool) {
  82. c.lock.Lock()
  83. defer c.lock.Unlock()
  84. c.dirty.Set(true)
  85. keys := c.values[value]
  86. previous := append([]string(nil), keys...)
  87. early := len(keys) > 0
  88. if c.exclusive && early {
  89. for _, each := range keys {
  90. c.doRemoveKey(each)
  91. }
  92. }
  93. c.values[value] = append(c.values[value], key)
  94. c.mapping[key] = value
  95. if early {
  96. return previous, true
  97. }
  98. return nil, false
  99. }
  100. func (c *container) addListener(listener func()) {
  101. c.lock.Lock()
  102. c.listeners = append(c.listeners, listener)
  103. c.lock.Unlock()
  104. }
  105. func (c *container) doRemoveKey(key string) {
  106. server, ok := c.mapping[key]
  107. if !ok {
  108. return
  109. }
  110. delete(c.mapping, key)
  111. keys := c.values[server]
  112. remain := keys[:0]
  113. for _, k := range keys {
  114. if k != key {
  115. remain = append(remain, k)
  116. }
  117. }
  118. if len(remain) > 0 {
  119. c.values[server] = remain
  120. } else {
  121. delete(c.values, server)
  122. }
  123. }
  124. func (c *container) getValues() []string {
  125. if !c.dirty.True() {
  126. return c.snapshot.Load().([]string)
  127. }
  128. c.lock.Lock()
  129. defer c.lock.Unlock()
  130. var vals []string
  131. for each := range c.values {
  132. vals = append(vals, each)
  133. }
  134. c.snapshot.Store(vals)
  135. c.dirty.Set(false)
  136. return vals
  137. }
  138. func (c *container) notifyChange() {
  139. c.lock.Lock()
  140. listeners := append(([]func())(nil), c.listeners...)
  141. c.lock.Unlock()
  142. for _, listener := range listeners {
  143. listener()
  144. }
  145. }
  146. // removeKey removes the kv, returns true if there are still other keys associate with the value
  147. func (c *container) removeKey(key string) {
  148. c.lock.Lock()
  149. defer c.lock.Unlock()
  150. c.dirty.Set(true)
  151. c.doRemoveKey(key)
  152. }