sheddinginterceptor.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package serverinterceptors
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "github.com/wuntsong-org/go-zero-plus/core/load"
  7. "github.com/wuntsong-org/go-zero-plus/core/stat"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. )
  12. const serviceType = "rpc"
  13. var (
  14. sheddingStat *load.SheddingStat
  15. lock sync.Mutex
  16. )
  17. // UnarySheddingInterceptor returns a func that does load shedding on processing unary requests.
  18. func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
  19. ensureSheddingStat()
  20. return func(ctx context.Context, req any, info *grpc.UnaryServerInfo,
  21. handler grpc.UnaryHandler) (val any, err error) {
  22. sheddingStat.IncrementTotal()
  23. var promise load.Promise
  24. promise, err = shedder.Allow()
  25. if err != nil {
  26. metrics.AddDrop()
  27. sheddingStat.IncrementDrop()
  28. err = status.Error(codes.ResourceExhausted, err.Error())
  29. return
  30. }
  31. defer func() {
  32. if errors.Is(err, context.DeadlineExceeded) {
  33. promise.Fail()
  34. } else {
  35. sheddingStat.IncrementPass()
  36. promise.Pass()
  37. }
  38. }()
  39. return handler(ctx, req)
  40. }
  41. }
  42. func ensureSheddingStat() {
  43. lock.Lock()
  44. if sheddingStat == nil {
  45. sheddingStat = load.NewSheddingStat(serviceType)
  46. }
  47. lock.Unlock()
  48. }