consistentbalancer.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package internal
  2. import (
  3. "zero/core/hash"
  4. "zero/core/logx"
  5. )
  6. type consistentBalancer struct {
  7. *baseBalancer
  8. conns map[string]interface{}
  9. buckets *hash.ConsistentHash
  10. bucketKey func(KV) string
  11. }
  12. func NewConsistentBalancer(dialFn DialFn, closeFn CloseFn, keyer func(kv KV) string) *consistentBalancer {
  13. // we don't support exclusive mode for consistent Balancer, to avoid complexity,
  14. // because there are few scenarios, use it on your own risks.
  15. balancer := &consistentBalancer{
  16. conns: make(map[string]interface{}),
  17. buckets: hash.NewConsistentHash(),
  18. bucketKey: keyer,
  19. }
  20. balancer.baseBalancer = newBaseBalancer(dialFn, closeFn, false)
  21. return balancer
  22. }
  23. func (b *consistentBalancer) AddConn(kv KV) error {
  24. // not adding kv and conn within a transaction, but it doesn't matter
  25. // we just rollback the kv addition if dial failed
  26. var conn interface{}
  27. prev, found := b.addKv(kv.Key, kv.Val)
  28. if found {
  29. conn = b.handlePrevious(prev)
  30. }
  31. if conn == nil {
  32. var err error
  33. conn, err = b.dialFn(kv.Val)
  34. if err != nil {
  35. b.removeKv(kv.Key)
  36. return err
  37. }
  38. }
  39. bucketKey := b.bucketKey(kv)
  40. b.lock.Lock()
  41. defer b.lock.Unlock()
  42. b.conns[bucketKey] = conn
  43. b.buckets.Add(bucketKey)
  44. b.notify(bucketKey)
  45. logx.Infof("added server, key: %s, server: %s", bucketKey, kv.Val)
  46. return nil
  47. }
  48. func (b *consistentBalancer) getConn(key string) (interface{}, bool) {
  49. b.lock.Lock()
  50. conn, ok := b.conns[key]
  51. b.lock.Unlock()
  52. return conn, ok
  53. }
  54. func (b *consistentBalancer) handlePrevious(prev []string) interface{} {
  55. if len(prev) == 0 {
  56. return nil
  57. }
  58. b.lock.Lock()
  59. defer b.lock.Unlock()
  60. // if not exclusive, only need to randomly find one connection
  61. for key, conn := range b.conns {
  62. if key == prev[0] {
  63. return conn
  64. }
  65. }
  66. return nil
  67. }
  68. func (b *consistentBalancer) initialize() {
  69. }
  70. func (b *consistentBalancer) notify(key string) {
  71. if b.listener == nil {
  72. return
  73. }
  74. var keys []string
  75. var values []string
  76. for k := range b.conns {
  77. keys = append(keys, k)
  78. }
  79. for _, v := range b.mapping {
  80. values = append(values, v)
  81. }
  82. b.listener.OnUpdate(keys, values, key)
  83. }
  84. func (b *consistentBalancer) RemoveKey(key string) {
  85. kv := KV{Key: key}
  86. server, keep := b.removeKv(key)
  87. kv.Val = server
  88. bucketKey := b.bucketKey(kv)
  89. b.buckets.Remove(b.bucketKey(kv))
  90. // wrap the query & removal in a function to make sure the quick lock/unlock
  91. conn, ok := func() (interface{}, bool) {
  92. b.lock.Lock()
  93. defer b.lock.Unlock()
  94. conn, ok := b.conns[bucketKey]
  95. if ok {
  96. delete(b.conns, bucketKey)
  97. }
  98. return conn, ok
  99. }()
  100. if ok && !keep {
  101. logx.Infof("removing server, key: %s", kv.Key)
  102. if err := b.closeFn(server, conn); err != nil {
  103. logx.Error(err)
  104. }
  105. }
  106. // notify without new key
  107. b.notify("")
  108. }
  109. func (b *consistentBalancer) IsEmpty() bool {
  110. b.lock.Lock()
  111. empty := len(b.conns) == 0
  112. b.lock.Unlock()
  113. return empty
  114. }
  115. func (b *consistentBalancer) Next(keys ...string) (interface{}, bool) {
  116. if len(keys) != 1 {
  117. return nil, false
  118. }
  119. key := keys[0]
  120. if node, ok := b.buckets.Get(key); !ok {
  121. return nil, false
  122. } else {
  123. return b.getConn(node.(string))
  124. }
  125. }