1
0

adaptiveshedder_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package load
  2. import (
  3. "math/rand"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/wuntsong-org/go-zero-plus/core/collection"
  10. "github.com/wuntsong-org/go-zero-plus/core/logx"
  11. "github.com/wuntsong-org/go-zero-plus/core/mathx"
  12. "github.com/wuntsong-org/go-zero-plus/core/stat"
  13. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  14. "github.com/wuntsong-org/go-zero-plus/core/timex"
  15. )
  16. const (
  17. buckets = 10
  18. bucketDuration = time.Millisecond * 50
  19. )
  20. func init() {
  21. stat.SetReporter(nil)
  22. }
  23. func TestAdaptiveShedder(t *testing.T) {
  24. DisableLog()
  25. shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
  26. var wg sync.WaitGroup
  27. var drop int64
  28. proba := mathx.NewProba()
  29. for i := 0; i < 100; i++ {
  30. wg.Add(1)
  31. go func() {
  32. defer wg.Done()
  33. for i := 0; i < 30; i++ {
  34. promise, err := shedder.Allow()
  35. if err != nil {
  36. atomic.AddInt64(&drop, 1)
  37. } else {
  38. count := rand.Intn(5)
  39. time.Sleep(time.Millisecond * time.Duration(count))
  40. if proba.TrueOnProba(0.01) {
  41. promise.Fail()
  42. } else {
  43. promise.Pass()
  44. }
  45. }
  46. }
  47. }()
  48. }
  49. wg.Wait()
  50. }
  51. func TestAdaptiveShedderMaxPass(t *testing.T) {
  52. passCounter := newRollingWindow()
  53. for i := 1; i <= 10; i++ {
  54. passCounter.Add(float64(i * 100))
  55. time.Sleep(bucketDuration)
  56. }
  57. shedder := &adaptiveShedder{
  58. passCounter: passCounter,
  59. droppedRecently: syncx.NewAtomicBool(),
  60. }
  61. assert.Equal(t, int64(1000), shedder.maxPass())
  62. // default max pass is equal to 1.
  63. passCounter = newRollingWindow()
  64. shedder = &adaptiveShedder{
  65. passCounter: passCounter,
  66. droppedRecently: syncx.NewAtomicBool(),
  67. }
  68. assert.Equal(t, int64(1), shedder.maxPass())
  69. }
  70. func TestAdaptiveShedderMinRt(t *testing.T) {
  71. rtCounter := newRollingWindow()
  72. for i := 0; i < 10; i++ {
  73. if i > 0 {
  74. time.Sleep(bucketDuration)
  75. }
  76. for j := i*10 + 1; j <= i*10+10; j++ {
  77. rtCounter.Add(float64(j))
  78. }
  79. }
  80. shedder := &adaptiveShedder{
  81. rtCounter: rtCounter,
  82. }
  83. assert.Equal(t, float64(6), shedder.minRt())
  84. // default max min rt is equal to maxFloat64.
  85. rtCounter = newRollingWindow()
  86. shedder = &adaptiveShedder{
  87. rtCounter: rtCounter,
  88. droppedRecently: syncx.NewAtomicBool(),
  89. }
  90. assert.Equal(t, defaultMinRt, shedder.minRt())
  91. }
  92. func TestAdaptiveShedderMaxFlight(t *testing.T) {
  93. passCounter := newRollingWindow()
  94. rtCounter := newRollingWindow()
  95. for i := 0; i < 10; i++ {
  96. if i > 0 {
  97. time.Sleep(bucketDuration)
  98. }
  99. passCounter.Add(float64((i + 1) * 100))
  100. for j := i*10 + 1; j <= i*10+10; j++ {
  101. rtCounter.Add(float64(j))
  102. }
  103. }
  104. shedder := &adaptiveShedder{
  105. passCounter: passCounter,
  106. rtCounter: rtCounter,
  107. windows: buckets,
  108. droppedRecently: syncx.NewAtomicBool(),
  109. }
  110. assert.Equal(t, int64(54), shedder.maxFlight())
  111. }
  112. func TestAdaptiveShedderShouldDrop(t *testing.T) {
  113. logx.Disable()
  114. passCounter := newRollingWindow()
  115. rtCounter := newRollingWindow()
  116. for i := 0; i < 10; i++ {
  117. if i > 0 {
  118. time.Sleep(bucketDuration)
  119. }
  120. passCounter.Add(float64((i + 1) * 100))
  121. for j := i*10 + 1; j <= i*10+10; j++ {
  122. rtCounter.Add(float64(j))
  123. }
  124. }
  125. shedder := &adaptiveShedder{
  126. passCounter: passCounter,
  127. rtCounter: rtCounter,
  128. windows: buckets,
  129. overloadTime: syncx.NewAtomicDuration(),
  130. droppedRecently: syncx.NewAtomicBool(),
  131. }
  132. // cpu >= 800, inflight < maxPass
  133. systemOverloadChecker = func(int64) bool {
  134. return true
  135. }
  136. shedder.avgFlying = 50
  137. assert.False(t, shedder.shouldDrop())
  138. // cpu >= 800, inflight > maxPass
  139. shedder.avgFlying = 80
  140. shedder.flying = 50
  141. assert.False(t, shedder.shouldDrop())
  142. // cpu >= 800, inflight > maxPass
  143. shedder.avgFlying = 80
  144. shedder.flying = 80
  145. assert.True(t, shedder.shouldDrop())
  146. // cpu < 800, inflight > maxPass
  147. systemOverloadChecker = func(int64) bool {
  148. return false
  149. }
  150. shedder.avgFlying = 80
  151. assert.False(t, shedder.shouldDrop())
  152. // cpu >= 800, inflight < maxPass
  153. systemOverloadChecker = func(int64) bool {
  154. return true
  155. }
  156. shedder.avgFlying = 80
  157. shedder.flying = 80
  158. _, err := shedder.Allow()
  159. assert.NotNil(t, err)
  160. }
  161. func TestAdaptiveShedderStillHot(t *testing.T) {
  162. logx.Disable()
  163. passCounter := newRollingWindow()
  164. rtCounter := newRollingWindow()
  165. for i := 0; i < 10; i++ {
  166. if i > 0 {
  167. time.Sleep(bucketDuration)
  168. }
  169. passCounter.Add(float64((i + 1) * 100))
  170. for j := i*10 + 1; j <= i*10+10; j++ {
  171. rtCounter.Add(float64(j))
  172. }
  173. }
  174. shedder := &adaptiveShedder{
  175. passCounter: passCounter,
  176. rtCounter: rtCounter,
  177. windows: buckets,
  178. overloadTime: syncx.NewAtomicDuration(),
  179. droppedRecently: syncx.ForAtomicBool(true),
  180. }
  181. assert.False(t, shedder.stillHot())
  182. shedder.overloadTime.Set(-coolOffDuration * 2)
  183. assert.False(t, shedder.stillHot())
  184. shedder.droppedRecently.Set(true)
  185. shedder.overloadTime.Set(timex.Now())
  186. assert.True(t, shedder.stillHot())
  187. }
  188. func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
  189. logx.Disable()
  190. bench := func(b *testing.B) {
  191. shedder := NewAdaptiveShedder()
  192. proba := mathx.NewProba()
  193. for i := 0; i < 6000; i++ {
  194. p, err := shedder.Allow()
  195. if err == nil {
  196. time.Sleep(time.Millisecond)
  197. if proba.TrueOnProba(0.01) {
  198. p.Fail()
  199. } else {
  200. p.Pass()
  201. }
  202. }
  203. }
  204. b.ResetTimer()
  205. for i := 0; i < b.N; i++ {
  206. p, err := shedder.Allow()
  207. if err == nil {
  208. p.Pass()
  209. }
  210. }
  211. }
  212. systemOverloadChecker = func(int64) bool {
  213. return true
  214. }
  215. b.Run("high load", bench)
  216. systemOverloadChecker = func(int64) bool {
  217. return false
  218. }
  219. b.Run("low load", bench)
  220. }
  221. func newRollingWindow() *collection.RollingWindow {
  222. return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket())
  223. }