adaptiveshedder.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package load
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "sync/atomic"
  7. "time"
  8. "github.com/wuntsong-org/go-zero-plus/core/collection"
  9. "github.com/wuntsong-org/go-zero-plus/core/logx"
  10. "github.com/wuntsong-org/go-zero-plus/core/stat"
  11. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  12. "github.com/wuntsong-org/go-zero-plus/core/timex"
  13. )
  14. const (
  15. defaultBuckets = 50
  16. defaultWindow = time.Second * 5
  17. // using 1000m notation, 900m is like 90%, keep it as var for unit test
  18. defaultCpuThreshold = 900
  19. defaultMinRt = float64(time.Second / time.Millisecond)
  20. // moving average hyperparameter beta for calculating requests on the fly
  21. flyingBeta = 0.9
  22. coolOffDuration = time.Second
  23. )
  24. var (
  25. // ErrServiceOverloaded is returned by Shedder.Allow when the service is overloaded.
  26. ErrServiceOverloaded = errors.New("service overloaded")
  27. // default to be enabled
  28. enabled = syncx.ForAtomicBool(true)
  29. // default to be enabled
  30. logEnabled = syncx.ForAtomicBool(true)
  31. // make it a variable for unit test
  32. systemOverloadChecker = func(cpuThreshold int64) bool {
  33. return stat.CpuUsage() >= cpuThreshold
  34. }
  35. )
  36. type (
  37. // A Promise interface is returned by Shedder.Allow to let callers tell
  38. // whether the processing request is successful or not.
  39. Promise interface {
  40. // Pass lets the caller tell that the call is successful.
  41. Pass()
  42. // Fail lets the caller tell that the call is failed.
  43. Fail()
  44. }
  45. // Shedder is the interface that wraps the Allow method.
  46. Shedder interface {
  47. // Allow returns the Promise if allowed, otherwise ErrServiceOverloaded.
  48. Allow() (Promise, error)
  49. }
  50. // ShedderOption lets caller customize the Shedder.
  51. ShedderOption func(opts *shedderOptions)
  52. shedderOptions struct {
  53. window time.Duration
  54. buckets int
  55. cpuThreshold int64
  56. }
  57. adaptiveShedder struct {
  58. cpuThreshold int64
  59. windows int64
  60. flying int64
  61. avgFlying float64
  62. avgFlyingLock syncx.SpinLock
  63. overloadTime *syncx.AtomicDuration
  64. droppedRecently *syncx.AtomicBool
  65. passCounter *collection.RollingWindow
  66. rtCounter *collection.RollingWindow
  67. }
  68. )
  69. // Disable lets callers disable load shedding.
  70. func Disable() {
  71. enabled.Set(false)
  72. }
  73. // DisableLog disables the stat logs for load shedding.
  74. func DisableLog() {
  75. logEnabled.Set(false)
  76. }
  77. // NewAdaptiveShedder returns an adaptive shedder.
  78. // opts can be used to customize the Shedder.
  79. func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
  80. if !enabled.True() {
  81. return newNopShedder()
  82. }
  83. options := shedderOptions{
  84. window: defaultWindow,
  85. buckets: defaultBuckets,
  86. cpuThreshold: defaultCpuThreshold,
  87. }
  88. for _, opt := range opts {
  89. opt(&options)
  90. }
  91. bucketDuration := options.window / time.Duration(options.buckets)
  92. return &adaptiveShedder{
  93. cpuThreshold: options.cpuThreshold,
  94. windows: int64(time.Second / bucketDuration),
  95. overloadTime: syncx.NewAtomicDuration(),
  96. droppedRecently: syncx.NewAtomicBool(),
  97. passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  98. collection.IgnoreCurrentBucket()),
  99. rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  100. collection.IgnoreCurrentBucket()),
  101. }
  102. }
  103. // Allow implements Shedder.Allow.
  104. func (as *adaptiveShedder) Allow() (Promise, error) {
  105. if as.shouldDrop() {
  106. as.droppedRecently.Set(true)
  107. return nil, ErrServiceOverloaded
  108. }
  109. as.addFlying(1)
  110. return &promise{
  111. start: timex.Now(),
  112. shedder: as,
  113. }, nil
  114. }
  115. func (as *adaptiveShedder) addFlying(delta int64) {
  116. flying := atomic.AddInt64(&as.flying, delta)
  117. // update avgFlying when the request is finished.
  118. // this strategy makes avgFlying have a little bit lag against flying, and smoother.
  119. // when the flying requests increase rapidly, avgFlying increase slower, accept more requests.
  120. // when the flying requests drop rapidly, avgFlying drop slower, accept less requests.
  121. // it makes the service to serve as more requests as possible.
  122. if delta < 0 {
  123. as.avgFlyingLock.Lock()
  124. as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
  125. as.avgFlyingLock.Unlock()
  126. }
  127. }
  128. func (as *adaptiveShedder) highThru() bool {
  129. as.avgFlyingLock.Lock()
  130. avgFlying := as.avgFlying
  131. as.avgFlyingLock.Unlock()
  132. maxFlight := as.maxFlight()
  133. return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
  134. }
  135. func (as *adaptiveShedder) maxFlight() int64 {
  136. // windows = buckets per second
  137. // maxQPS = maxPASS * windows
  138. // minRT = min average response time in milliseconds
  139. // maxQPS * minRT / milliseconds_per_second
  140. return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
  141. }
  142. func (as *adaptiveShedder) maxPass() int64 {
  143. var result float64 = 1
  144. as.passCounter.Reduce(func(b *collection.Bucket) {
  145. if b.Sum > result {
  146. result = b.Sum
  147. }
  148. })
  149. return int64(result)
  150. }
  151. func (as *adaptiveShedder) minRt() float64 {
  152. result := defaultMinRt
  153. as.rtCounter.Reduce(func(b *collection.Bucket) {
  154. if b.Count <= 0 {
  155. return
  156. }
  157. avg := math.Round(b.Sum / float64(b.Count))
  158. if avg < result {
  159. result = avg
  160. }
  161. })
  162. return result
  163. }
  164. func (as *adaptiveShedder) shouldDrop() bool {
  165. if as.systemOverloaded() || as.stillHot() {
  166. if as.highThru() {
  167. flying := atomic.LoadInt64(&as.flying)
  168. as.avgFlyingLock.Lock()
  169. avgFlying := as.avgFlying
  170. as.avgFlyingLock.Unlock()
  171. msg := fmt.Sprintf(
  172. "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
  173. stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
  174. logx.Error(msg)
  175. stat.Report(msg)
  176. return true
  177. }
  178. }
  179. return false
  180. }
  181. func (as *adaptiveShedder) stillHot() bool {
  182. if !as.droppedRecently.True() {
  183. return false
  184. }
  185. overloadTime := as.overloadTime.Load()
  186. if overloadTime == 0 {
  187. return false
  188. }
  189. if timex.Since(overloadTime) < coolOffDuration {
  190. return true
  191. }
  192. as.droppedRecently.Set(false)
  193. return false
  194. }
  195. func (as *adaptiveShedder) systemOverloaded() bool {
  196. if !systemOverloadChecker(as.cpuThreshold) {
  197. return false
  198. }
  199. as.overloadTime.Set(timex.Now())
  200. return true
  201. }
  202. // WithBuckets customizes the Shedder with given number of buckets.
  203. func WithBuckets(buckets int) ShedderOption {
  204. return func(opts *shedderOptions) {
  205. opts.buckets = buckets
  206. }
  207. }
  208. // WithCpuThreshold customizes the Shedder with given cpu threshold.
  209. func WithCpuThreshold(threshold int64) ShedderOption {
  210. return func(opts *shedderOptions) {
  211. opts.cpuThreshold = threshold
  212. }
  213. }
  214. // WithWindow customizes the Shedder with given
  215. func WithWindow(window time.Duration) ShedderOption {
  216. return func(opts *shedderOptions) {
  217. opts.window = window
  218. }
  219. }
  220. type promise struct {
  221. start time.Duration
  222. shedder *adaptiveShedder
  223. }
  224. func (p *promise) Fail() {
  225. p.shedder.addFlying(-1)
  226. }
  227. func (p *promise) Pass() {
  228. rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
  229. p.shedder.addFlying(-1)
  230. p.shedder.rtCounter.Add(math.Ceil(rt))
  231. p.shedder.passCounter.Add(1)
  232. }