balancer.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package internal
  2. import "sync"
  3. type (
  4. DialFn func(server string) (interface{}, error)
  5. CloseFn func(server string, conn interface{}) error
  6. Balancer interface {
  7. AddConn(kv KV) error
  8. IsEmpty() bool
  9. Next(key ...string) (interface{}, bool)
  10. RemoveKey(key string)
  11. initialize()
  12. setListener(listener Listener)
  13. }
  14. serverConn struct {
  15. key string
  16. conn interface{}
  17. }
  18. baseBalancer struct {
  19. exclusive bool
  20. servers map[string][]string
  21. mapping map[string]string
  22. lock sync.Mutex
  23. dialFn DialFn
  24. closeFn CloseFn
  25. listener Listener
  26. }
  27. )
  28. func newBaseBalancer(dialFn DialFn, closeFn CloseFn, exclusive bool) *baseBalancer {
  29. return &baseBalancer{
  30. exclusive: exclusive,
  31. servers: make(map[string][]string),
  32. mapping: make(map[string]string),
  33. dialFn: dialFn,
  34. closeFn: closeFn,
  35. }
  36. }
  37. // addKv adds the kv, returns if there are already other keys associate with the server
  38. func (b *baseBalancer) addKv(key, value string) ([]string, bool) {
  39. b.lock.Lock()
  40. defer b.lock.Unlock()
  41. keys := b.servers[value]
  42. previous := append([]string(nil), keys...)
  43. early := len(keys) > 0
  44. if b.exclusive && early {
  45. for _, each := range keys {
  46. b.doRemoveKv(each)
  47. }
  48. }
  49. b.servers[value] = append(b.servers[value], key)
  50. b.mapping[key] = value
  51. if early {
  52. return previous, true
  53. } else {
  54. return nil, false
  55. }
  56. }
  57. func (b *baseBalancer) doRemoveKv(key string) (server string, keepConn bool) {
  58. server, ok := b.mapping[key]
  59. if !ok {
  60. return "", true
  61. }
  62. delete(b.mapping, key)
  63. keys := b.servers[server]
  64. remain := keys[:0]
  65. for _, k := range keys {
  66. if k != key {
  67. remain = append(remain, k)
  68. }
  69. }
  70. if len(remain) > 0 {
  71. b.servers[server] = remain
  72. return server, true
  73. } else {
  74. delete(b.servers, server)
  75. return server, false
  76. }
  77. }
  78. func (b *baseBalancer) removeKv(key string) (server string, keepConn bool) {
  79. b.lock.Lock()
  80. defer b.lock.Unlock()
  81. return b.doRemoveKv(key)
  82. }
  83. func (b *baseBalancer) setListener(listener Listener) {
  84. b.lock.Lock()
  85. b.listener = listener
  86. b.lock.Unlock()
  87. }