123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package retry
- import (
- "context"
- "strconv"
- "time"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/retry/backoff"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- )
- const AttemptMetadataKey = "x-retry-attempt"
- var (
- // DefaultRetriableCodes default retry code
- DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable}
- // defaultRetryOptions default retry configuration
- defaultRetryOptions = &options{
- max: 0, // disabled
- perCallTimeout: 0, // disabled
- includeRetryHeader: true,
- codes: DefaultRetriableCodes,
- backoffFunc: backoff.LinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
- }
- )
- type (
- // options retry the configuration
- options struct {
- max int
- perCallTimeout time.Duration
- includeRetryHeader bool
- codes []codes.Code
- backoffFunc backoff.Func
- }
- // CallOption is a grpc.CallOption that is local to grpc retry.
- CallOption struct {
- grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
- apply func(opt *options)
- }
- )
- func waitRetryBackoff(logger logx.Logger, attempt int, ctx context.Context, retryOptions *options) error {
- var waitTime time.Duration = 0
- if attempt > 0 {
- waitTime = retryOptions.backoffFunc(attempt)
- }
- if waitTime > 0 {
- timer := time.NewTimer(waitTime)
- defer timer.Stop()
- logger.Infof("grpc retry attempt: %d, backoff for %v", attempt, waitTime)
- select {
- case <-ctx.Done():
- return status.FromContextError(ctx.Err()).Err()
- case <-timer.C:
- // double check
- err := ctx.Err()
- if err != nil {
- return status.FromContextError(err).Err()
- }
- }
- }
- return nil
- }
- func isRetriable(err error, retryOptions *options) bool {
- errCode := status.Code(err)
- if isContextError(err) {
- return false
- }
- for _, code := range retryOptions.codes {
- if code == errCode {
- return true
- }
- }
- return false
- }
- func isContextError(err error) bool {
- code := status.Code(err)
- return code == codes.DeadlineExceeded || code == codes.Canceled
- }
- func reuseOrNewWithCallOptions(opt *options, retryCallOptions []*CallOption) *options {
- if len(retryCallOptions) == 0 {
- return opt
- }
- return parseRetryCallOptions(opt, retryCallOptions...)
- }
- func parseRetryCallOptions(opt *options, opts ...*CallOption) *options {
- for _, option := range opts {
- option.apply(opt)
- }
- return opt
- }
- func perCallContext(ctx context.Context, callOpts *options, attempt int) context.Context {
- if attempt > 0 {
- if callOpts.perCallTimeout != 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, callOpts.perCallTimeout)
- _ = cancel
- }
- if callOpts.includeRetryHeader {
- cloneMd := extractIncomingAndClone(ctx)
- cloneMd.Set(AttemptMetadataKey, strconv.Itoa(attempt))
- ctx = metadata.NewOutgoingContext(ctx, cloneMd)
- }
- }
- return ctx
- }
- func extractIncomingAndClone(ctx context.Context) metadata.MD {
- md, ok := metadata.FromIncomingContext(ctx)
- if !ok {
- return metadata.MD{}
- }
- return md.Copy()
- }
- func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []*CallOption) {
- for _, opt := range callOptions {
- if co, ok := opt.(*CallOption); ok {
- retryOptions = append(retryOptions, co)
- } else {
- grpcOptions = append(grpcOptions, opt)
- }
- }
- return grpcOptions, retryOptions
- }
- func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOption) error, opts ...grpc.CallOption) error {
- logger := logx.WithContext(ctx)
- grpcOpts, retryOpts := filterCallOptions(opts)
- callOpts := reuseOrNewWithCallOptions(defaultRetryOptions, retryOpts)
- if callOpts.max == 0 {
- return call(ctx, opts...)
- }
- var lastErr error
- for attempt := 0; attempt <= callOpts.max; attempt++ {
- if err := waitRetryBackoff(logger, attempt, ctx, callOpts); err != nil {
- return err
- }
- callCtx := perCallContext(ctx, callOpts, attempt)
- lastErr = call(callCtx, grpcOpts...)
- if lastErr == nil {
- return nil
- }
- if attempt == 0 {
- logger.Errorf("grpc call failed, got err: %v", lastErr)
- } else {
- logger.Errorf("grpc retry attempt: %d, got err: %v", attempt, lastErr)
- }
- if isContextError(lastErr) {
- if ctx.Err() != nil {
- logger.Errorf("grpc retry attempt: %d, parent context error: %v", attempt, ctx.Err())
- return lastErr
- } else if callOpts.perCallTimeout != 0 {
- logger.Errorf("grpc retry attempt: %d, context error from retry call", attempt)
- continue
- }
- }
- if !isRetriable(lastErr, callOpts) {
- return lastErr
- }
- }
- return lastErr
- }
|