p2c.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package p2c
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/wuntsong-org/go-zero-plus/core/logx"
  11. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  12. "github.com/wuntsong-org/go-zero-plus/core/timex"
  13. "github.com/wuntsong-org/go-zero-plus/zrpc/internal/codes"
  14. "google.golang.org/grpc/balancer"
  15. "google.golang.org/grpc/balancer/base"
  16. "google.golang.org/grpc/resolver"
  17. )
  18. const (
  19. // Name is the name of p2c balancer.
  20. Name = "p2c_ewma"
  21. decayTime = int64(time.Second * 10) // default value from finagle
  22. forcePick = int64(time.Second)
  23. initSuccess = 1000
  24. throttleSuccess = initSuccess / 2
  25. penalty = int64(math.MaxInt32)
  26. pickTimes = 3
  27. logInterval = time.Minute
  28. )
  29. var emptyPickResult balancer.PickResult
  30. func init() {
  31. balancer.Register(newBuilder())
  32. }
  33. type p2cPickerBuilder struct{}
  34. func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
  35. readySCs := info.ReadySCs
  36. if len(readySCs) == 0 {
  37. return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  38. }
  39. var conns []*subConn
  40. for conn, connInfo := range readySCs {
  41. conns = append(conns, &subConn{
  42. addr: connInfo.Address,
  43. conn: conn,
  44. success: initSuccess,
  45. })
  46. }
  47. return &p2cPicker{
  48. conns: conns,
  49. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  50. stamp: syncx.NewAtomicDuration(),
  51. }
  52. }
  53. func newBuilder() balancer.Builder {
  54. return base.NewBalancerBuilder(Name, new(p2cPickerBuilder), base.Config{HealthCheck: true})
  55. }
  56. type p2cPicker struct {
  57. conns []*subConn
  58. r *rand.Rand
  59. stamp *syncx.AtomicDuration
  60. lock sync.Mutex
  61. }
  62. func (p *p2cPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
  63. p.lock.Lock()
  64. defer p.lock.Unlock()
  65. var chosen *subConn
  66. switch len(p.conns) {
  67. case 0:
  68. return emptyPickResult, balancer.ErrNoSubConnAvailable
  69. case 1:
  70. chosen = p.choose(p.conns[0], nil)
  71. case 2:
  72. chosen = p.choose(p.conns[0], p.conns[1])
  73. default:
  74. var node1, node2 *subConn
  75. for i := 0; i < pickTimes; i++ {
  76. a := p.r.Intn(len(p.conns))
  77. b := p.r.Intn(len(p.conns) - 1)
  78. if b >= a {
  79. b++
  80. }
  81. node1 = p.conns[a]
  82. node2 = p.conns[b]
  83. if node1.healthy() && node2.healthy() {
  84. break
  85. }
  86. }
  87. chosen = p.choose(node1, node2)
  88. }
  89. atomic.AddInt64(&chosen.inflight, 1)
  90. atomic.AddInt64(&chosen.requests, 1)
  91. return balancer.PickResult{
  92. SubConn: chosen.conn,
  93. Done: p.buildDoneFunc(chosen),
  94. }, nil
  95. }
  96. func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
  97. start := int64(timex.Now())
  98. return func(info balancer.DoneInfo) {
  99. atomic.AddInt64(&c.inflight, -1)
  100. now := timex.Now()
  101. last := atomic.SwapInt64(&c.last, int64(now))
  102. td := int64(now) - last
  103. if td < 0 {
  104. td = 0
  105. }
  106. w := math.Exp(float64(-td) / float64(decayTime))
  107. lag := int64(now) - start
  108. if lag < 0 {
  109. lag = 0
  110. }
  111. olag := atomic.LoadUint64(&c.lag)
  112. if olag == 0 {
  113. w = 0
  114. }
  115. atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
  116. success := initSuccess
  117. if info.Err != nil && !codes.Acceptable(info.Err) {
  118. success = 0
  119. }
  120. osucc := atomic.LoadUint64(&c.success)
  121. atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
  122. stamp := p.stamp.Load()
  123. if now-stamp >= logInterval {
  124. if p.stamp.CompareAndSwap(stamp, now) {
  125. p.logStats()
  126. }
  127. }
  128. }
  129. }
  130. func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
  131. start := int64(timex.Now())
  132. if c2 == nil {
  133. atomic.StoreInt64(&c1.pick, start)
  134. return c1
  135. }
  136. if c1.load() > c2.load() {
  137. c1, c2 = c2, c1
  138. }
  139. pick := atomic.LoadInt64(&c2.pick)
  140. if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
  141. return c2
  142. }
  143. atomic.StoreInt64(&c1.pick, start)
  144. return c1
  145. }
  146. func (p *p2cPicker) logStats() {
  147. var stats []string
  148. p.lock.Lock()
  149. defer p.lock.Unlock()
  150. for _, conn := range p.conns {
  151. stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d",
  152. conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0)))
  153. }
  154. logx.Statf("p2c - %s", strings.Join(stats, "; "))
  155. }
  156. type subConn struct {
  157. lag uint64
  158. inflight int64
  159. success uint64
  160. requests int64
  161. last int64
  162. pick int64
  163. addr resolver.Address
  164. conn balancer.SubConn
  165. }
  166. func (c *subConn) healthy() bool {
  167. return atomic.LoadUint64(&c.success) > throttleSuccess
  168. }
  169. func (c *subConn) load() int64 {
  170. // plus one to avoid multiply zero
  171. lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
  172. load := lag * (atomic.LoadInt64(&c.inflight) + 1)
  173. if load == 0 {
  174. return penalty
  175. }
  176. return load
  177. }