retryinterceptor.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package retry
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. "github.com/tal-tech/go-zero/core/retry/backoff"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/metadata"
  11. "google.golang.org/grpc/status"
  12. )
  13. const AttemptMetadataKey = "x-retry-attempt"
  14. var (
  15. // DefaultRetriableCodes default retry code
  16. DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable}
  17. // defaultRetryOptions default retry configuration
  18. defaultRetryOptions = &options{
  19. max: 0, // disabled
  20. perCallTimeout: 0, // disabled
  21. includeRetryHeader: true,
  22. codes: DefaultRetriableCodes,
  23. backoffFunc: backoff.LinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
  24. }
  25. )
  26. type (
  27. // options retry the configuration
  28. options struct {
  29. max int
  30. perCallTimeout time.Duration
  31. includeRetryHeader bool
  32. codes []codes.Code
  33. backoffFunc backoff.Func
  34. }
  35. // CallOption is a grpc.CallOption that is local to grpc retry.
  36. CallOption struct {
  37. grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
  38. apply func(opt *options)
  39. }
  40. )
  41. func waitRetryBackoff(logger logx.Logger, attempt int, ctx context.Context, retryOptions *options) error {
  42. var waitTime time.Duration = 0
  43. if attempt > 0 {
  44. waitTime = retryOptions.backoffFunc(attempt)
  45. }
  46. if waitTime > 0 {
  47. timer := time.NewTimer(waitTime)
  48. defer timer.Stop()
  49. logger.Infof("grpc retry attempt: %d, backoff for %v", attempt, waitTime)
  50. select {
  51. case <-ctx.Done():
  52. return status.FromContextError(ctx.Err()).Err()
  53. case <-timer.C:
  54. // double check
  55. err := ctx.Err()
  56. if err != nil {
  57. return status.FromContextError(err).Err()
  58. }
  59. }
  60. }
  61. return nil
  62. }
  63. func isRetriable(err error, retryOptions *options) bool {
  64. errCode := status.Code(err)
  65. if isContextError(err) {
  66. return false
  67. }
  68. for _, code := range retryOptions.codes {
  69. if code == errCode {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func isContextError(err error) bool {
  76. code := status.Code(err)
  77. return code == codes.DeadlineExceeded || code == codes.Canceled
  78. }
  79. func reuseOrNewWithCallOptions(opt *options, retryCallOptions []*CallOption) *options {
  80. if len(retryCallOptions) == 0 {
  81. return opt
  82. }
  83. return parseRetryCallOptions(opt, retryCallOptions...)
  84. }
  85. func parseRetryCallOptions(opt *options, opts ...*CallOption) *options {
  86. for _, option := range opts {
  87. option.apply(opt)
  88. }
  89. return opt
  90. }
  91. func perCallContext(ctx context.Context, callOpts *options, attempt int) context.Context {
  92. if attempt > 0 {
  93. if callOpts.perCallTimeout != 0 {
  94. var cancel context.CancelFunc
  95. ctx, cancel = context.WithTimeout(ctx, callOpts.perCallTimeout)
  96. _ = cancel
  97. }
  98. if callOpts.includeRetryHeader {
  99. cloneMd := extractIncomingAndClone(ctx)
  100. cloneMd.Set(AttemptMetadataKey, strconv.Itoa(attempt))
  101. ctx = metadata.NewOutgoingContext(ctx, cloneMd)
  102. }
  103. }
  104. return ctx
  105. }
  106. func extractIncomingAndClone(ctx context.Context) metadata.MD {
  107. md, ok := metadata.FromIncomingContext(ctx)
  108. if !ok {
  109. return metadata.MD{}
  110. }
  111. return md.Copy()
  112. }
  113. func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []*CallOption) {
  114. for _, opt := range callOptions {
  115. if co, ok := opt.(*CallOption); ok {
  116. retryOptions = append(retryOptions, co)
  117. } else {
  118. grpcOptions = append(grpcOptions, opt)
  119. }
  120. }
  121. return grpcOptions, retryOptions
  122. }
  123. func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOption) error, opts ...grpc.CallOption) error {
  124. logger := logx.WithContext(ctx)
  125. grpcOpts, retryOpts := filterCallOptions(opts)
  126. callOpts := reuseOrNewWithCallOptions(defaultRetryOptions, retryOpts)
  127. if callOpts.max == 0 {
  128. return call(ctx, opts...)
  129. }
  130. var lastErr error
  131. for attempt := 0; attempt <= callOpts.max; attempt++ {
  132. if err := waitRetryBackoff(logger, attempt, ctx, callOpts); err != nil {
  133. return err
  134. }
  135. callCtx := perCallContext(ctx, callOpts, attempt)
  136. lastErr = call(callCtx, grpcOpts...)
  137. if lastErr == nil {
  138. return nil
  139. }
  140. if attempt == 0 {
  141. logger.Errorf("grpc call failed, got err: %v", lastErr)
  142. } else {
  143. logger.Errorf("grpc retry attempt: %d, got err: %v", attempt, lastErr)
  144. }
  145. if isContextError(lastErr) {
  146. if ctx.Err() != nil {
  147. logger.Errorf("grpc retry attempt: %d, parent context error: %v", attempt, ctx.Err())
  148. return lastErr
  149. } else if callOpts.perCallTimeout != 0 {
  150. logger.Errorf("grpc retry attempt: %d, context error from retry call", attempt)
  151. continue
  152. }
  153. }
  154. if !isRetriable(lastErr, callOpts) {
  155. return lastErr
  156. }
  157. }
  158. return lastErr
  159. }