hook.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package redis
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "strings"
  7. "time"
  8. red "github.com/go-redis/redis/v8"
  9. "github.com/zeromicro/go-zero/core/breaker"
  10. "github.com/zeromicro/go-zero/core/errorx"
  11. "github.com/zeromicro/go-zero/core/logx"
  12. "github.com/zeromicro/go-zero/core/mapping"
  13. "github.com/zeromicro/go-zero/core/timex"
  14. "github.com/zeromicro/go-zero/core/trace"
  15. "go.opentelemetry.io/otel"
  16. "go.opentelemetry.io/otel/attribute"
  17. "go.opentelemetry.io/otel/codes"
  18. oteltrace "go.opentelemetry.io/otel/trace"
  19. )
  20. // spanName is the span name of the redis calls.
  21. const spanName = "redis"
  22. var (
  23. startTimeKey = contextKey("startTime")
  24. durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)}
  25. redisCmdsAttributeKey = attribute.Key("redis.cmds")
  26. )
  27. type (
  28. contextKey string
  29. hook struct {
  30. tracer oteltrace.Tracer
  31. }
  32. )
  33. func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
  34. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
  35. }
  36. func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
  37. err := cmd.Err()
  38. h.endSpan(ctx, err)
  39. val := ctx.Value(startTimeKey)
  40. if val == nil {
  41. return nil
  42. }
  43. start, ok := val.(time.Duration)
  44. if !ok {
  45. return nil
  46. }
  47. duration := timex.Since(start)
  48. if duration > slowThreshold.Load() {
  49. logDuration(ctx, cmd, duration)
  50. }
  51. metricReqDur.Observe(int64(duration/time.Millisecond), cmd.Name())
  52. if msg := formatError(err); len(msg) > 0 {
  53. metricReqErr.Inc(cmd.Name(), msg)
  54. }
  55. return nil
  56. }
  57. func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
  58. if len(cmds) == 0 {
  59. return ctx, nil
  60. }
  61. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
  62. }
  63. func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
  64. if len(cmds) == 0 {
  65. return nil
  66. }
  67. batchError := errorx.BatchError{}
  68. for _, cmd := range cmds {
  69. err := cmd.Err()
  70. if err == nil {
  71. continue
  72. }
  73. batchError.Add(err)
  74. }
  75. h.endSpan(ctx, batchError.Err())
  76. val := ctx.Value(startTimeKey)
  77. if val == nil {
  78. return nil
  79. }
  80. start, ok := val.(time.Duration)
  81. if !ok {
  82. return nil
  83. }
  84. duration := timex.Since(start)
  85. if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
  86. logDuration(ctx, cmds[0], duration)
  87. }
  88. metricReqDur.Observe(int64(duration/time.Millisecond), "Pipeline")
  89. if msg := formatError(batchError.Err()); len(msg) > 0 {
  90. metricReqErr.Inc("Pipeline", msg)
  91. }
  92. return nil
  93. }
  94. func formatError(err error) string {
  95. if err == nil || err == red.Nil {
  96. return ""
  97. }
  98. opErr, ok := err.(*net.OpError)
  99. if ok && opErr.Timeout() {
  100. return "timeout"
  101. }
  102. switch err {
  103. case io.EOF:
  104. return "eof"
  105. case context.DeadlineExceeded:
  106. return "context deadline"
  107. case breaker.ErrServiceUnavailable:
  108. return "breaker"
  109. default:
  110. return "unexpected error"
  111. }
  112. }
  113. func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) {
  114. var buf strings.Builder
  115. for i, arg := range cmd.Args() {
  116. if i > 0 {
  117. buf.WriteByte(' ')
  118. }
  119. buf.WriteString(mapping.Repr(arg))
  120. }
  121. logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
  122. }
  123. func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
  124. ctx, span := h.tracer.Start(ctx,
  125. spanName,
  126. oteltrace.WithSpanKind(oteltrace.SpanKindClient),
  127. )
  128. cmdStrs := make([]string, 0, len(cmds))
  129. for _, cmd := range cmds {
  130. cmdStrs = append(cmdStrs, cmd.Name())
  131. }
  132. span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
  133. return ctx
  134. }
  135. func (h hook) endSpan(ctx context.Context, err error) {
  136. span := oteltrace.SpanFromContext(ctx)
  137. defer span.End()
  138. if err == nil || err == red.Nil {
  139. span.SetStatus(codes.Ok, "")
  140. return
  141. }
  142. span.SetStatus(codes.Error, err.Error())
  143. span.RecordError(err)
  144. }