adaptiveshedder.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package load
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/collection"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/stat"
  11. "github.com/tal-tech/go-zero/core/syncx"
  12. "github.com/tal-tech/go-zero/core/timex"
  13. )
  14. const (
  15. defaultBuckets = 50
  16. defaultWindow = time.Second * 5
  17. // using 1000m notation, 900m is like 80%, keep it as var for unit test
  18. defaultCpuThreshold = 900
  19. defaultMinRt = float64(time.Second / time.Millisecond)
  20. // moving average hyperparameter beta for calculating requests on the fly
  21. flyingBeta = 0.9
  22. coolOffDuration = time.Second
  23. )
  24. var (
  25. // ErrServiceOverloaded is returned by Shedder.Allow when the service is overloaded.
  26. ErrServiceOverloaded = errors.New("service overloaded")
  27. // default to be enabled
  28. enabled = syncx.ForAtomicBool(true)
  29. // default to be enabled
  30. logEnabled = syncx.ForAtomicBool(true)
  31. // make it a variable for unit test
  32. systemOverloadChecker = func(cpuThreshold int64) bool {
  33. return stat.CpuUsage() >= cpuThreshold
  34. }
  35. )
  36. type (
  37. // A Promise interface is returned by Shedder.Allow to let callers tell
  38. // whether the processing request is successful or not.
  39. Promise interface {
  40. // Pass lets the caller tell that the call is successful.
  41. Pass()
  42. // Fail lets the caller tell that the call is failed.
  43. Fail()
  44. }
  45. // Shedder is the interface that wraps the Allow method.
  46. Shedder interface {
  47. // Allow returns the Promise if allowed, otherwise ErrServiceOverloaded.
  48. Allow() (Promise, error)
  49. }
  50. // ShedderOption lets caller customize the Shedder.
  51. ShedderOption func(opts *shedderOptions)
  52. shedderOptions struct {
  53. window time.Duration
  54. buckets int
  55. cpuThreshold int64
  56. }
  57. adaptiveShedder struct {
  58. cpuThreshold int64
  59. windows int64
  60. flying int64
  61. avgFlying float64
  62. avgFlyingLock syncx.SpinLock
  63. dropTime *syncx.AtomicDuration
  64. droppedRecently *syncx.AtomicBool
  65. passCounter *collection.RollingWindow
  66. rtCounter *collection.RollingWindow
  67. }
  68. )
  69. // Disable lets callers disable load shedding.
  70. func Disable() {
  71. enabled.Set(false)
  72. }
  73. // DisableLog disables the stat logs for load shedding.
  74. func DisableLog() {
  75. logEnabled.Set(false)
  76. }
  77. // NewAdaptiveShedder returns an adaptive shedder.
  78. // opts can be used to customize the Shedder.
  79. func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
  80. if !enabled.True() {
  81. return newNopShedder()
  82. }
  83. options := shedderOptions{
  84. window: defaultWindow,
  85. buckets: defaultBuckets,
  86. cpuThreshold: defaultCpuThreshold,
  87. }
  88. for _, opt := range opts {
  89. opt(&options)
  90. }
  91. bucketDuration := options.window / time.Duration(options.buckets)
  92. return &adaptiveShedder{
  93. cpuThreshold: options.cpuThreshold,
  94. windows: int64(time.Second / bucketDuration),
  95. dropTime: syncx.NewAtomicDuration(),
  96. droppedRecently: syncx.NewAtomicBool(),
  97. passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  98. collection.IgnoreCurrentBucket()),
  99. rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  100. collection.IgnoreCurrentBucket()),
  101. }
  102. }
  103. // Allow implements Shedder.Allow.
  104. func (as *adaptiveShedder) Allow() (Promise, error) {
  105. if as.shouldDrop() {
  106. as.dropTime.Set(timex.Now())
  107. as.droppedRecently.Set(true)
  108. return nil, ErrServiceOverloaded
  109. }
  110. as.addFlying(1)
  111. return &promise{
  112. start: timex.Now(),
  113. shedder: as,
  114. }, nil
  115. }
  116. func (as *adaptiveShedder) addFlying(delta int64) {
  117. flying := atomic.AddInt64(&as.flying, delta)
  118. // update avgFlying when the request is finished.
  119. // this strategy makes avgFlying have a little bit lag against flying, and smoother.
  120. // when the flying requests increase rapidly, avgFlying increase slower, accept more requests.
  121. // when the flying requests drop rapidly, avgFlying drop slower, accept less requests.
  122. // it makes the service to serve as more requests as possible.
  123. if delta < 0 {
  124. as.avgFlyingLock.Lock()
  125. as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
  126. as.avgFlyingLock.Unlock()
  127. }
  128. }
  129. func (as *adaptiveShedder) highThru() bool {
  130. as.avgFlyingLock.Lock()
  131. avgFlying := as.avgFlying
  132. as.avgFlyingLock.Unlock()
  133. maxFlight := as.maxFlight()
  134. return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
  135. }
  136. func (as *adaptiveShedder) maxFlight() int64 {
  137. // windows = buckets per second
  138. // maxQPS = maxPASS * windows
  139. // minRT = min average response time in milliseconds
  140. // maxQPS * minRT / milliseconds_per_second
  141. return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
  142. }
  143. func (as *adaptiveShedder) maxPass() int64 {
  144. var result float64 = 1
  145. as.passCounter.Reduce(func(b *collection.Bucket) {
  146. if b.Sum > result {
  147. result = b.Sum
  148. }
  149. })
  150. return int64(result)
  151. }
  152. func (as *adaptiveShedder) minRt() float64 {
  153. result := defaultMinRt
  154. as.rtCounter.Reduce(func(b *collection.Bucket) {
  155. if b.Count <= 0 {
  156. return
  157. }
  158. avg := math.Round(b.Sum / float64(b.Count))
  159. if avg < result {
  160. result = avg
  161. }
  162. })
  163. return result
  164. }
  165. func (as *adaptiveShedder) shouldDrop() bool {
  166. if as.systemOverloaded() || as.stillHot() {
  167. if as.highThru() {
  168. flying := atomic.LoadInt64(&as.flying)
  169. as.avgFlyingLock.Lock()
  170. avgFlying := as.avgFlying
  171. as.avgFlyingLock.Unlock()
  172. msg := fmt.Sprintf(
  173. "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
  174. stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
  175. logx.Error(msg)
  176. stat.Report(msg)
  177. return true
  178. }
  179. }
  180. return false
  181. }
  182. func (as *adaptiveShedder) stillHot() bool {
  183. if !as.droppedRecently.True() {
  184. return false
  185. }
  186. dropTime := as.dropTime.Load()
  187. if dropTime == 0 {
  188. return false
  189. }
  190. hot := timex.Since(dropTime) < coolOffDuration
  191. if !hot {
  192. as.droppedRecently.Set(false)
  193. }
  194. return hot
  195. }
  196. func (as *adaptiveShedder) systemOverloaded() bool {
  197. return systemOverloadChecker(as.cpuThreshold)
  198. }
  199. // WithBuckets customizes the Shedder with given number of buckets.
  200. func WithBuckets(buckets int) ShedderOption {
  201. return func(opts *shedderOptions) {
  202. opts.buckets = buckets
  203. }
  204. }
  205. // WithCpuThreshold customizes the Shedder with given cpu threshold.
  206. func WithCpuThreshold(threshold int64) ShedderOption {
  207. return func(opts *shedderOptions) {
  208. opts.cpuThreshold = threshold
  209. }
  210. }
  211. // WithWindow customizes the Shedder with given
  212. func WithWindow(window time.Duration) ShedderOption {
  213. return func(opts *shedderOptions) {
  214. opts.window = window
  215. }
  216. }
  217. type promise struct {
  218. start time.Duration
  219. shedder *adaptiveShedder
  220. }
  221. func (p *promise) Fail() {
  222. p.shedder.addFlying(-1)
  223. }
  224. func (p *promise) Pass() {
  225. rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
  226. p.shedder.addFlying(-1)
  227. p.shedder.rtCounter.Add(math.Ceil(rt))
  228. p.shedder.passCounter.Add(1)
  229. }