Browse Source

chore: better shedding algorithm, make sure recover from shedding (#2476)

* backup

* chore: better shedding algorithm, make sure recover from shedding
Kevin Wan 2 years ago
parent
commit
4b9066eda6
2 changed files with 22 additions and 14 deletions
  1. 15 11
      core/load/adaptiveshedder.go
  2. 7 3
      core/load/adaptiveshedder_test.go

+ 15 - 11
core/load/adaptiveshedder.go

@@ -17,7 +17,7 @@ import (
 const (
 const (
 	defaultBuckets = 50
 	defaultBuckets = 50
 	defaultWindow  = time.Second * 5
 	defaultWindow  = time.Second * 5
-	// using 1000m notation, 900m is like 80%, keep it as var for unit test
+	// using 1000m notation, 900m is like 90%, keep it as var for unit test
 	defaultCpuThreshold = 900
 	defaultCpuThreshold = 900
 	defaultMinRt        = float64(time.Second / time.Millisecond)
 	defaultMinRt        = float64(time.Second / time.Millisecond)
 	// moving average hyperparameter beta for calculating requests on the fly
 	// moving average hyperparameter beta for calculating requests on the fly
@@ -70,7 +70,7 @@ type (
 		flying          int64
 		flying          int64
 		avgFlying       float64
 		avgFlying       float64
 		avgFlyingLock   syncx.SpinLock
 		avgFlyingLock   syncx.SpinLock
-		dropTime        *syncx.AtomicDuration
+		overloadTime    *syncx.AtomicDuration
 		droppedRecently *syncx.AtomicBool
 		droppedRecently *syncx.AtomicBool
 		passCounter     *collection.RollingWindow
 		passCounter     *collection.RollingWindow
 		rtCounter       *collection.RollingWindow
 		rtCounter       *collection.RollingWindow
@@ -106,7 +106,7 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
 	return &adaptiveShedder{
 	return &adaptiveShedder{
 		cpuThreshold:    options.cpuThreshold,
 		cpuThreshold:    options.cpuThreshold,
 		windows:         int64(time.Second / bucketDuration),
 		windows:         int64(time.Second / bucketDuration),
-		dropTime:        syncx.NewAtomicDuration(),
+		overloadTime:    syncx.NewAtomicDuration(),
 		droppedRecently: syncx.NewAtomicBool(),
 		droppedRecently: syncx.NewAtomicBool(),
 		passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
 		passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
 			collection.IgnoreCurrentBucket()),
 			collection.IgnoreCurrentBucket()),
@@ -118,7 +118,6 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
 // Allow implements Shedder.Allow.
 // Allow implements Shedder.Allow.
 func (as *adaptiveShedder) Allow() (Promise, error) {
 func (as *adaptiveShedder) Allow() (Promise, error) {
 	if as.shouldDrop() {
 	if as.shouldDrop() {
-		as.dropTime.Set(timex.Now())
 		as.droppedRecently.Set(true)
 		as.droppedRecently.Set(true)
 
 
 		return nil, ErrServiceOverloaded
 		return nil, ErrServiceOverloaded
@@ -215,21 +214,26 @@ func (as *adaptiveShedder) stillHot() bool {
 		return false
 		return false
 	}
 	}
 
 
-	dropTime := as.dropTime.Load()
-	if dropTime == 0 {
+	overloadTime := as.overloadTime.Load()
+	if overloadTime == 0 {
 		return false
 		return false
 	}
 	}
 
 
-	hot := timex.Since(dropTime) < coolOffDuration
-	if !hot {
-		as.droppedRecently.Set(false)
+	if timex.Since(overloadTime) < coolOffDuration {
+		return true
 	}
 	}
 
 
-	return hot
+	as.droppedRecently.Set(false)
+	return false
 }
 }
 
 
 func (as *adaptiveShedder) systemOverloaded() bool {
 func (as *adaptiveShedder) systemOverloaded() bool {
-	return systemOverloadChecker(as.cpuThreshold)
+	if !systemOverloadChecker(as.cpuThreshold) {
+		return false
+	}
+
+	as.overloadTime.Set(timex.Now())
+	return true
 }
 }
 
 
 // WithBuckets customizes the Shedder with given number of buckets.
 // WithBuckets customizes the Shedder with given number of buckets.

+ 7 - 3
core/load/adaptiveshedder_test.go

@@ -13,6 +13,7 @@ import (
 	"github.com/zeromicro/go-zero/core/mathx"
 	"github.com/zeromicro/go-zero/core/mathx"
 	"github.com/zeromicro/go-zero/core/stat"
 	"github.com/zeromicro/go-zero/core/stat"
 	"github.com/zeromicro/go-zero/core/syncx"
 	"github.com/zeromicro/go-zero/core/syncx"
+	"github.com/zeromicro/go-zero/core/timex"
 )
 )
 
 
 const (
 const (
@@ -136,7 +137,7 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) {
 		passCounter:     passCounter,
 		passCounter:     passCounter,
 		rtCounter:       rtCounter,
 		rtCounter:       rtCounter,
 		windows:         buckets,
 		windows:         buckets,
-		dropTime:        syncx.NewAtomicDuration(),
+		overloadTime:    syncx.NewAtomicDuration(),
 		droppedRecently: syncx.NewAtomicBool(),
 		droppedRecently: syncx.NewAtomicBool(),
 	}
 	}
 	// cpu >=  800, inflight < maxPass
 	// cpu >=  800, inflight < maxPass
@@ -190,12 +191,15 @@ func TestAdaptiveShedderStillHot(t *testing.T) {
 		passCounter:     passCounter,
 		passCounter:     passCounter,
 		rtCounter:       rtCounter,
 		rtCounter:       rtCounter,
 		windows:         buckets,
 		windows:         buckets,
-		dropTime:        syncx.NewAtomicDuration(),
+		overloadTime:    syncx.NewAtomicDuration(),
 		droppedRecently: syncx.ForAtomicBool(true),
 		droppedRecently: syncx.ForAtomicBool(true),
 	}
 	}
 	assert.False(t, shedder.stillHot())
 	assert.False(t, shedder.stillHot())
-	shedder.dropTime.Set(-coolOffDuration * 2)
+	shedder.overloadTime.Set(-coolOffDuration * 2)
 	assert.False(t, shedder.stillHot())
 	assert.False(t, shedder.stillHot())
+	shedder.droppedRecently.Set(true)
+	shedder.overloadTime.Set(timex.Now())
+	assert.True(t, shedder.stillHot())
 }
 }
 
 
 func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
 func BenchmarkAdaptiveShedder_Allow(b *testing.B) {