subscriber.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package discov
  2. import (
  3. "sync"
  4. "zero/core/discov/internal"
  5. )
  6. type (
  7. subOptions struct {
  8. exclusive bool
  9. }
  10. SubOption func(opts *subOptions)
  11. Subscriber struct {
  12. items *container
  13. }
  14. )
  15. func NewSubscriber(endpoints []string, key string, opts ...SubOption) *Subscriber {
  16. var subOpts subOptions
  17. for _, opt := range opts {
  18. opt(&subOpts)
  19. }
  20. subscriber := &Subscriber{
  21. items: newContainer(subOpts.exclusive),
  22. }
  23. internal.GetRegistry().Monitor(endpoints, key, subscriber.items)
  24. return subscriber
  25. }
  26. func (s *Subscriber) Values() []string {
  27. return s.items.getValues()
  28. }
  29. // exclusive means that key value can only be 1:1,
  30. // which means later added value will remove the keys associated with the same value previously.
  31. func Exclusive() SubOption {
  32. return func(opts *subOptions) {
  33. opts.exclusive = true
  34. }
  35. }
  36. type container struct {
  37. exclusive bool
  38. values map[string][]string
  39. mapping map[string]string
  40. lock sync.Mutex
  41. }
  42. func newContainer(exclusive bool) *container {
  43. return &container{
  44. exclusive: exclusive,
  45. values: make(map[string][]string),
  46. mapping: make(map[string]string),
  47. }
  48. }
  49. func (c *container) OnAdd(kv internal.KV) {
  50. c.addKv(kv.Key, kv.Val)
  51. }
  52. func (c *container) OnDelete(kv internal.KV) {
  53. c.removeKey(kv.Key)
  54. }
  55. // addKv adds the kv, returns if there are already other keys associate with the value
  56. func (c *container) addKv(key, value string) ([]string, bool) {
  57. c.lock.Lock()
  58. defer c.lock.Unlock()
  59. keys := c.values[value]
  60. previous := append([]string(nil), keys...)
  61. early := len(keys) > 0
  62. if c.exclusive && early {
  63. for _, each := range keys {
  64. c.doRemoveKey(each)
  65. }
  66. }
  67. c.values[value] = append(c.values[value], key)
  68. c.mapping[key] = value
  69. if early {
  70. return previous, true
  71. } else {
  72. return nil, false
  73. }
  74. }
  75. func (c *container) doRemoveKey(key string) {
  76. server, ok := c.mapping[key]
  77. if !ok {
  78. return
  79. }
  80. delete(c.mapping, key)
  81. keys := c.values[server]
  82. remain := keys[:0]
  83. for _, k := range keys {
  84. if k != key {
  85. remain = append(remain, k)
  86. }
  87. }
  88. if len(remain) > 0 {
  89. c.values[server] = remain
  90. } else {
  91. delete(c.values, server)
  92. }
  93. }
  94. func (c *container) getValues() []string {
  95. c.lock.Lock()
  96. defer c.lock.Unlock()
  97. var vs []string
  98. for each := range c.values {
  99. vs = append(vs, each)
  100. }
  101. return vs
  102. }
  103. // removeKey removes the kv, returns true if there are still other keys associate with the value
  104. func (c *container) removeKey(key string) {
  105. c.lock.Lock()
  106. defer c.lock.Unlock()
  107. c.doRemoveKey(key)
  108. }
  109. func (c *container) removeVal(val string) (empty bool) {
  110. c.lock.Lock()
  111. defer c.lock.Unlock()
  112. for k := range c.values {
  113. if k == val {
  114. delete(c.values, k)
  115. }
  116. }
  117. for k, v := range c.mapping {
  118. if v == val {
  119. delete(c.mapping, k)
  120. }
  121. }
  122. return len(c.values) == 0
  123. }