subscriber.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package discov
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "zero/core/discov/internal"
  6. "zero/core/syncx"
  7. )
  8. type (
  9. subOptions struct {
  10. exclusive bool
  11. }
  12. SubOption func(opts *subOptions)
  13. Subscriber struct {
  14. items *container
  15. }
  16. )
  17. func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
  18. var subOpts subOptions
  19. for _, opt := range opts {
  20. opt(&subOpts)
  21. }
  22. sub := &Subscriber{
  23. items: newContainer(subOpts.exclusive),
  24. }
  25. if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
  26. return nil, err
  27. }
  28. return sub, nil
  29. }
  30. func (s *Subscriber) Values() []string {
  31. return s.items.getValues()
  32. }
  33. // exclusive means that key value can only be 1:1,
  34. // which means later added value will remove the keys associated with the same value previously.
  35. func Exclusive() SubOption {
  36. return func(opts *subOptions) {
  37. opts.exclusive = true
  38. }
  39. }
  40. type container struct {
  41. exclusive bool
  42. values map[string][]string
  43. mapping map[string]string
  44. snapshot atomic.Value
  45. dirty *syncx.AtomicBool
  46. lock sync.Mutex
  47. }
  48. func newContainer(exclusive bool) *container {
  49. return &container{
  50. exclusive: exclusive,
  51. values: make(map[string][]string),
  52. mapping: make(map[string]string),
  53. dirty: syncx.ForAtomicBool(true),
  54. }
  55. }
  56. func (c *container) OnAdd(kv internal.KV) {
  57. c.addKv(kv.Key, kv.Val)
  58. }
  59. func (c *container) OnDelete(kv internal.KV) {
  60. c.removeKey(kv.Key)
  61. }
  62. // addKv adds the kv, returns if there are already other keys associate with the value
  63. func (c *container) addKv(key, value string) ([]string, bool) {
  64. c.lock.Lock()
  65. defer c.lock.Unlock()
  66. c.dirty.Set(true)
  67. keys := c.values[value]
  68. previous := append([]string(nil), keys...)
  69. early := len(keys) > 0
  70. if c.exclusive && early {
  71. for _, each := range keys {
  72. c.doRemoveKey(each)
  73. }
  74. }
  75. c.values[value] = append(c.values[value], key)
  76. c.mapping[key] = value
  77. if early {
  78. return previous, true
  79. } else {
  80. return nil, false
  81. }
  82. }
  83. func (c *container) doRemoveKey(key string) {
  84. server, ok := c.mapping[key]
  85. if !ok {
  86. return
  87. }
  88. delete(c.mapping, key)
  89. keys := c.values[server]
  90. remain := keys[:0]
  91. for _, k := range keys {
  92. if k != key {
  93. remain = append(remain, k)
  94. }
  95. }
  96. if len(remain) > 0 {
  97. c.values[server] = remain
  98. } else {
  99. delete(c.values, server)
  100. }
  101. }
  102. func (c *container) getValues() []string {
  103. if !c.dirty.True() {
  104. return c.snapshot.Load().([]string)
  105. }
  106. c.lock.Lock()
  107. defer c.lock.Unlock()
  108. var vals []string
  109. for each := range c.values {
  110. vals = append(vals, each)
  111. }
  112. c.snapshot.Store(vals)
  113. c.dirty.Set(false)
  114. return vals
  115. }
  116. // removeKey removes the kv, returns true if there are still other keys associate with the value
  117. func (c *container) removeKey(key string) {
  118. c.lock.Lock()
  119. defer c.lock.Unlock()
  120. c.dirty.Set(true)
  121. c.doRemoveKey(key)
  122. }