p2c.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package p2c
  2. import (
  3. "context"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "zero/core/timex"
  10. "zero/rpcx/internal/codes"
  11. "google.golang.org/grpc/balancer"
  12. "google.golang.org/grpc/balancer/base"
  13. "google.golang.org/grpc/resolver"
  14. )
  15. const (
  16. Name = "p2c_ewma"
  17. decayTime = int64(time.Millisecond * 600)
  18. forcePick = int64(time.Second)
  19. initSuccess = 1000
  20. throttleSuccess = initSuccess / 2
  21. penalty = int64(math.MaxInt32)
  22. pickTimes = 3
  23. )
  24. func init() {
  25. balancer.Register(newBuilder())
  26. }
  27. type p2cPickerBuilder struct {
  28. }
  29. func newBuilder() balancer.Builder {
  30. return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
  31. }
  32. func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
  33. if len(readySCs) == 0 {
  34. return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  35. }
  36. var conns []*subConn
  37. for addr, conn := range readySCs {
  38. conns = append(conns, &subConn{
  39. addr: addr,
  40. conn: conn,
  41. success: initSuccess,
  42. })
  43. }
  44. return &p2cPicker{
  45. conns: conns,
  46. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  47. }
  48. }
  49. type p2cPicker struct {
  50. conns []*subConn
  51. r *rand.Rand
  52. lock sync.Mutex
  53. }
  54. func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
  55. conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
  56. p.lock.Lock()
  57. defer p.lock.Unlock()
  58. var chosen *subConn
  59. switch len(p.conns) {
  60. case 0:
  61. return nil, nil, balancer.ErrNoSubConnAvailable
  62. case 1:
  63. chosen = p.choose(p.conns[0], nil)
  64. case 2:
  65. chosen = p.choose(p.conns[0], p.conns[1])
  66. default:
  67. var node1, node2 *subConn
  68. for i := 0; i < pickTimes; i++ {
  69. a := p.r.Intn(len(p.conns))
  70. b := p.r.Intn(len(p.conns) - 1)
  71. if b >= a {
  72. b++
  73. }
  74. node1 = p.conns[a]
  75. node2 = p.conns[b]
  76. if node1.healthy() && node2.healthy() {
  77. break
  78. }
  79. }
  80. chosen = p.choose(node1, node2)
  81. }
  82. atomic.AddInt64(&chosen.inflight, 1)
  83. return chosen.conn, p.buildDoneFunc(chosen), nil
  84. }
  85. func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
  86. start := int64(timex.Now())
  87. return func(info balancer.DoneInfo) {
  88. atomic.AddInt64(&c.inflight, -1)
  89. now := int64(timex.Now())
  90. last := atomic.SwapInt64(&c.last, int64(now))
  91. td := now - last
  92. if td < 0 {
  93. td = 0
  94. }
  95. w := math.Exp(float64(-td) / float64(decayTime))
  96. lag := now - start
  97. if lag < 0 {
  98. lag = 0
  99. }
  100. olag := atomic.LoadUint64(&c.lag)
  101. if olag == 0 {
  102. w = 0
  103. }
  104. atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
  105. success := initSuccess
  106. if info.Err != nil && !codes.Acceptable(info.Err) {
  107. success = 0
  108. }
  109. osucc := atomic.LoadUint64(&c.success)
  110. atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
  111. }
  112. }
  113. func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
  114. start := int64(timex.Now())
  115. if c2 == nil {
  116. atomic.StoreInt64(&c1.pick, start)
  117. return c1
  118. }
  119. if c1.load() > c2.load() {
  120. c1, c2 = c2, c1
  121. }
  122. pick := atomic.LoadInt64(&c2.pick)
  123. if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
  124. return c2
  125. } else {
  126. atomic.StoreInt64(&c1.pick, start)
  127. return c1
  128. }
  129. }
  130. type subConn struct {
  131. addr resolver.Address
  132. conn balancer.SubConn
  133. lag uint64
  134. inflight int64
  135. success uint64
  136. last int64
  137. pick int64
  138. }
  139. func (c *subConn) healthy() bool {
  140. return atomic.LoadUint64(&c.success) > throttleSuccess
  141. }
  142. func (c *subConn) load() int64 {
  143. lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
  144. load := lag * atomic.LoadInt64(&c.inflight)
  145. if load == 0 {
  146. return penalty
  147. } else {
  148. return load
  149. }
  150. }