123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package internal
- import (
- "math/rand"
- "time"
- "zero/core/logx"
- )
- type roundRobinBalancer struct {
- *baseBalancer
- conns []serverConn
- index int
- }
- func NewRoundRobinBalancer(dialFn DialFn, closeFn CloseFn, exclusive bool) *roundRobinBalancer {
- balancer := new(roundRobinBalancer)
- balancer.baseBalancer = newBaseBalancer(dialFn, closeFn, exclusive)
- return balancer
- }
- func (b *roundRobinBalancer) AddConn(kv KV) error {
- var conn interface{}
- prev, found := b.addKv(kv.Key, kv.Val)
- if found {
- conn = b.handlePrevious(prev, kv.Val)
- }
- if conn == nil {
- var err error
- conn, err = b.dialFn(kv.Val)
- if err != nil {
- b.removeKv(kv.Key)
- return err
- }
- }
- b.lock.Lock()
- defer b.lock.Unlock()
- b.conns = append(b.conns, serverConn{
- key: kv.Key,
- conn: conn,
- })
- b.notify(kv.Key)
- return nil
- }
- func (b *roundRobinBalancer) handlePrevious(prev []string, server string) interface{} {
- if len(prev) == 0 {
- return nil
- }
- b.lock.Lock()
- defer b.lock.Unlock()
- if b.exclusive {
- for _, item := range prev {
- conns := b.conns[:0]
- for _, each := range b.conns {
- if each.key == item {
- if err := b.closeFn(server, each.conn); err != nil {
- logx.Error(err)
- }
- } else {
- conns = append(conns, each)
- }
- }
- b.conns = conns
- }
- } else {
- for _, each := range b.conns {
- if each.key == prev[0] {
- return each.conn
- }
- }
- }
- return nil
- }
- func (b *roundRobinBalancer) initialize() {
- rand.Seed(time.Now().UnixNano())
- if len(b.conns) > 0 {
- b.index = rand.Intn(len(b.conns))
- }
- }
- func (b *roundRobinBalancer) IsEmpty() bool {
- b.lock.Lock()
- empty := len(b.conns) == 0
- b.lock.Unlock()
- return empty
- }
- func (b *roundRobinBalancer) Next(...string) (interface{}, bool) {
- b.lock.Lock()
- defer b.lock.Unlock()
- if len(b.conns) == 0 {
- return nil, false
- }
- b.index = (b.index + 1) % len(b.conns)
- return b.conns[b.index].conn, true
- }
- func (b *roundRobinBalancer) notify(key string) {
- if b.listener == nil {
- return
- }
- // b.servers has the format of map[conn][]key
- var keys []string
- var values []string
- for k, v := range b.servers {
- values = append(values, k)
- keys = append(keys, v...)
- }
- b.listener.OnUpdate(keys, values, key)
- }
- func (b *roundRobinBalancer) RemoveKey(key string) {
- server, keep := b.removeKv(key)
- b.lock.Lock()
- defer b.lock.Unlock()
- conns := b.conns[:0]
- for _, conn := range b.conns {
- if conn.key == key {
- // there are other keys assocated with the conn, don't close the conn.
- if keep {
- continue
- }
- if err := b.closeFn(server, conn.conn); err != nil {
- logx.Error(err)
- }
- } else {
- conns = append(conns, conn)
- }
- }
- b.conns = conns
- // notify without new key
- b.notify("")
- }
|