subscriber.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/discov/internal"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. type (
  9. subOptions struct {
  10. exclusive bool
  11. }
  12. SubOption func(opts *subOptions)
  13. Subscriber struct {
  14. items *container
  15. }
  16. )
  17. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  18. var subOpts subOptions
  19. for _, opt := range opts {
  20. opt(&subOpts)
  21. }
  22. sub := &Subscriber{
  23. items: newContainer(subOpts.exclusive),
  24. }
  25. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  26. return nil, err
  27. }
  28. return sub, nil
  29. }
  30. func (s *Subscriber) AddListener(listener func()) {
  31. s.items.addListener(listener)
  32. }
  33. func (s *Subscriber) Values() []string {
  34. return s.items.getValues()
  35. }
  36. // exclusive means that key value can only be 1:1,
  37. // which means later added value will remove the keys associated with the same value previously.
  38. func Exclusive() SubOption {
  39. return func(opts *subOptions) {
  40. opts.exclusive = true
  41. }
  42. }
  43. type container struct {
  44. exclusive bool
  45. values map[string][]string
  46. mapping map[string]string
  47. snapshot atomic.Value
  48. dirty *syncx.AtomicBool
  49. listeners []func()
  50. lock sync.Mutex
  51. }
  52. func newContainer(exclusive bool) *container {
  53. return &container{
  54. exclusive: exclusive,
  55. values: make(map[string][]string),
  56. mapping: make(map[string]string),
  57. dirty: syncx.ForAtomicBool(true),
  58. }
  59. }
  60. func (c *container) OnAdd(kv internal.KV) {
  61. c.addKv(kv.Key, kv.Val)
  62. c.notifyChange()
  63. }
  64. func (c *container) OnDelete(kv internal.KV) {
  65. c.removeKey(kv.Key)
  66. c.notifyChange()
  67. }
  68. // addKv adds the kv, returns if there are already other keys associate with the value
  69. func (c *container) addKv(key, value string) ([]string, bool) {
  70. c.lock.Lock()
  71. defer c.lock.Unlock()
  72. c.dirty.Set(true)
  73. keys := c.values[value]
  74. previous := append([]string(nil), keys...)
  75. early := len(keys) > 0
  76. if c.exclusive && early {
  77. for _, each := range keys {
  78. c.doRemoveKey(each)
  79. }
  80. }
  81. c.values[value] = append(c.values[value], key)
  82. c.mapping[key] = value
  83. if early {
  84. return previous, true
  85. }
  86. return nil, false
  87. }
  88. func (c *container) addListener(listener func()) {
  89. c.lock.Lock()
  90. c.listeners = append(c.listeners, listener)
  91. c.lock.Unlock()
  92. }
  93. func (c *container) doRemoveKey(key string) {
  94. server, ok := c.mapping[key]
  95. if !ok {
  96. return
  97. }
  98. delete(c.mapping, key)
  99. keys := c.values[server]
  100. remain := keys[:0]
  101. for _, k := range keys {
  102. if k != key {
  103. remain = append(remain, k)
  104. }
  105. }
  106. if len(remain) > 0 {
  107. c.values[server] = remain
  108. } else {
  109. delete(c.values, server)
  110. }
  111. }
  112. func (c *container) getValues() []string {
  113. if !c.dirty.True() {
  114. return c.snapshot.Load().([]string)
  115. }
  116. c.lock.Lock()
  117. defer c.lock.Unlock()
  118. var vals []string
  119. for each := range c.values {
  120. vals = append(vals, each)
  121. }
  122. c.snapshot.Store(vals)
  123. c.dirty.Set(false)
  124. return vals
  125. }
  126. func (c *container) notifyChange() {
  127. c.lock.Lock()
  128. listeners := append(([]func())(nil), c.listeners...)
  129. c.lock.Unlock()
  130. for _, listener := range listeners {
  131. listener()
  132. }
  133. }
  134. // removeKey removes the kv, returns true if there are still other keys associate with the value
  135. func (c *container) removeKey(key string) {
  136. c.lock.Lock()
  137. defer c.lock.Unlock()
  138. c.dirty.Set(true)
  139. c.doRemoveKey(key)
  140. }