123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package internal
- import (
- "zero/core/hash"
- "zero/core/logx"
- )
- type consistentBalancer struct {
- *baseBalancer
- conns map[string]interface{}
- buckets *hash.ConsistentHash
- bucketKey func(KV) string
- }
- func NewConsistentBalancer(dialFn DialFn, closeFn CloseFn, keyer func(kv KV) string) *consistentBalancer {
- // we don't support exclusive mode for consistent Balancer, to avoid complexity,
- // because there are few scenarios, use it on your own risks.
- balancer := &consistentBalancer{
- conns: make(map[string]interface{}),
- buckets: hash.NewConsistentHash(),
- bucketKey: keyer,
- }
- balancer.baseBalancer = newBaseBalancer(dialFn, closeFn, false)
- return balancer
- }
- func (b *consistentBalancer) AddConn(kv KV) error {
- // not adding kv and conn within a transaction, but it doesn't matter
- // we just rollback the kv addition if dial failed
- var conn interface{}
- prev, found := b.addKv(kv.Key, kv.Val)
- if found {
- conn = b.handlePrevious(prev)
- }
- if conn == nil {
- var err error
- conn, err = b.dialFn(kv.Val)
- if err != nil {
- b.removeKv(kv.Key)
- return err
- }
- }
- bucketKey := b.bucketKey(kv)
- b.lock.Lock()
- defer b.lock.Unlock()
- b.conns[bucketKey] = conn
- b.buckets.Add(bucketKey)
- b.notify(bucketKey)
- logx.Infof("added server, key: %s, server: %s", bucketKey, kv.Val)
- return nil
- }
- func (b *consistentBalancer) getConn(key string) (interface{}, bool) {
- b.lock.Lock()
- conn, ok := b.conns[key]
- b.lock.Unlock()
- return conn, ok
- }
- func (b *consistentBalancer) handlePrevious(prev []string) interface{} {
- if len(prev) == 0 {
- return nil
- }
- b.lock.Lock()
- defer b.lock.Unlock()
- // if not exclusive, only need to randomly find one connection
- for key, conn := range b.conns {
- if key == prev[0] {
- return conn
- }
- }
- return nil
- }
- func (b *consistentBalancer) initialize() {
- }
- func (b *consistentBalancer) notify(key string) {
- if b.listener == nil {
- return
- }
- var keys []string
- var values []string
- for k := range b.conns {
- keys = append(keys, k)
- }
- for _, v := range b.mapping {
- values = append(values, v)
- }
- b.listener.OnUpdate(keys, values, key)
- }
- func (b *consistentBalancer) RemoveKey(key string) {
- kv := KV{Key: key}
- server, keep := b.removeKv(key)
- kv.Val = server
- bucketKey := b.bucketKey(kv)
- b.buckets.Remove(b.bucketKey(kv))
- // wrap the query & removal in a function to make sure the quick lock/unlock
- conn, ok := func() (interface{}, bool) {
- b.lock.Lock()
- defer b.lock.Unlock()
- conn, ok := b.conns[bucketKey]
- if ok {
- delete(b.conns, bucketKey)
- }
- return conn, ok
- }()
- if ok && !keep {
- logx.Infof("removing server, key: %s", kv.Key)
- if err := b.closeFn(server, conn); err != nil {
- logx.Error(err)
- }
- }
- // notify without new key
- b.notify("")
- }
- func (b *consistentBalancer) IsEmpty() bool {
- b.lock.Lock()
- empty := len(b.conns) == 0
- b.lock.Unlock()
- return empty
- }
- func (b *consistentBalancer) Next(keys ...string) (interface{}, bool) {
- if len(keys) != 1 {
- return nil, false
- }
- key := keys[0]
- if node, ok := b.buckets.Get(key); !ok {
- return nil, false
- } else {
- return b.getConn(node.(string))
- }
- }
|