sheddingstat.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package load
  2. import (
  3. "sync/atomic"
  4. "time"
  5. "github.com/wuntsong-org/go-zero-plus/core/logx"
  6. "github.com/wuntsong-org/go-zero-plus/core/stat"
  7. )
  8. type (
  9. // A SheddingStat is used to store the statistics for load shedding.
  10. SheddingStat struct {
  11. name string
  12. total int64
  13. pass int64
  14. drop int64
  15. }
  16. snapshot struct {
  17. Total int64
  18. Pass int64
  19. Drop int64
  20. }
  21. )
  22. // NewSheddingStat returns a SheddingStat.
  23. func NewSheddingStat(name string) *SheddingStat {
  24. st := &SheddingStat{
  25. name: name,
  26. }
  27. go st.run()
  28. return st
  29. }
  30. // IncrementTotal increments the total requests.
  31. func (s *SheddingStat) IncrementTotal() {
  32. atomic.AddInt64(&s.total, 1)
  33. }
  34. // IncrementPass increments the passed requests.
  35. func (s *SheddingStat) IncrementPass() {
  36. atomic.AddInt64(&s.pass, 1)
  37. }
  38. // IncrementDrop increments the dropped requests.
  39. func (s *SheddingStat) IncrementDrop() {
  40. atomic.AddInt64(&s.drop, 1)
  41. }
  42. func (s *SheddingStat) loop(c <-chan time.Time) {
  43. for range c {
  44. st := s.reset()
  45. if !logEnabled.True() {
  46. continue
  47. }
  48. c := stat.CpuUsage()
  49. if st.Drop == 0 {
  50. logx.Statf("(%s) shedding_stat [1m], cpu: %d, total: %d, pass: %d, drop: %d",
  51. s.name, c, st.Total, st.Pass, st.Drop)
  52. } else {
  53. logx.Statf("(%s) shedding_stat_drop [1m], cpu: %d, total: %d, pass: %d, drop: %d",
  54. s.name, c, st.Total, st.Pass, st.Drop)
  55. }
  56. }
  57. }
  58. func (s *SheddingStat) reset() snapshot {
  59. return snapshot{
  60. Total: atomic.SwapInt64(&s.total, 0),
  61. Pass: atomic.SwapInt64(&s.pass, 0),
  62. Drop: atomic.SwapInt64(&s.drop, 0),
  63. }
  64. }
  65. func (s *SheddingStat) run() {
  66. ticker := time.NewTicker(time.Minute)
  67. defer ticker.Stop()
  68. s.loop(ticker.C)
  69. }