瀏覽代碼

avoid multiply zero on calculation load

kevin 4 年之前
父節點
當前提交
69ea425000
共有 1 個文件被更改,包括 36 次插入4 次删除
  1. 36 4
      rpcx/internal/balancer/p2c/p2c.go

+ 36 - 4
rpcx/internal/balancer/p2c/p2c.go

@@ -2,12 +2,16 @@ package p2c
 
 import (
 	"context"
+	"fmt"
 	"math"
 	"math/rand"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
 
+	"zero/core/logx"
+	"zero/core/syncx"
 	"zero/core/timex"
 	"zero/rpcx/internal/codes"
 
@@ -24,6 +28,7 @@ const (
 	throttleSuccess = initSuccess / 2
 	penalty         = int64(math.MaxInt32)
 	pickTimes       = 3
+	logInterval     = time.Minute
 )
 
 func init() {
@@ -54,12 +59,14 @@ func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn)
 	return &p2cPicker{
 		conns: conns,
 		r:     rand.New(rand.NewSource(time.Now().UnixNano())),
+		stamp: syncx.NewAtomicDuration(),
 	}
 }
 
 type p2cPicker struct {
 	conns []*subConn
 	r     *rand.Rand
+	stamp *syncx.AtomicDuration
 	lock  sync.Mutex
 }
 
@@ -95,6 +102,7 @@ func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
 	}
 
 	atomic.AddInt64(&chosen.inflight, 1)
+	atomic.AddInt64(&chosen.requests, 1)
 	return chosen.conn, p.buildDoneFunc(chosen), nil
 }
 
@@ -102,14 +110,14 @@ func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
 	start := int64(timex.Now())
 	return func(info balancer.DoneInfo) {
 		atomic.AddInt64(&c.inflight, -1)
-		now := int64(timex.Now())
+		now := timex.Now()
 		last := atomic.SwapInt64(&c.last, int64(now))
-		td := now - last
+		td := int64(now) - last
 		if td < 0 {
 			td = 0
 		}
 		w := math.Exp(float64(-td) / float64(decayTime))
-		lag := now - start
+		lag := int64(now) - start
 		if lag < 0 {
 			lag = 0
 		}
@@ -124,6 +132,13 @@ func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
 		}
 		osucc := atomic.LoadUint64(&c.success)
 		atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
+
+		stamp := p.stamp.Load()
+		if now-stamp >= logInterval {
+			if p.stamp.CompareAndSwap(stamp, now) {
+				p.logStats()
+			}
+		}
 	}
 }
 
@@ -147,12 +162,28 @@ func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
 	}
 }
 
+func (p *p2cPicker) logStats() {
+	var stats []string
+
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	for _, conn := range p.conns {
+		fmt.Println(conn.lag, conn.inflight)
+		stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d",
+			conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0)))
+	}
+
+	logx.Statf("p2c - %s", strings.Join(stats, "; "))
+}
+
 type subConn struct {
 	addr     resolver.Address
 	conn     balancer.SubConn
 	lag      uint64
 	inflight int64
 	success  uint64
+	requests int64
 	last     int64
 	pick     int64
 }
@@ -162,8 +193,9 @@ func (c *subConn) healthy() bool {
 }
 
 func (c *subConn) load() int64 {
+	// plus one to avoid multiply zero
 	lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
-	load := lag * atomic.LoadInt64(&c.inflight)
+	load := lag * (atomic.LoadInt64(&c.inflight) + 1)
 	if load == 0 {
 		return penalty
 	} else {