hook.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package redis
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. red "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/errorx"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "github.com/zeromicro/go-zero/core/mapping"
  10. "github.com/zeromicro/go-zero/core/timex"
  11. "github.com/zeromicro/go-zero/core/trace"
  12. "go.opentelemetry.io/otel"
  13. "go.opentelemetry.io/otel/attribute"
  14. "go.opentelemetry.io/otel/codes"
  15. oteltrace "go.opentelemetry.io/otel/trace"
  16. )
  17. // spanName is the span name of the redis calls.
  18. const spanName = "redis"
  19. var (
  20. startTimeKey = contextKey("startTime")
  21. durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)}
  22. redisCmdsAttributeKey = attribute.Key("redis.cmds")
  23. )
  24. type (
  25. contextKey string
  26. hook struct {
  27. tracer oteltrace.Tracer
  28. }
  29. )
  30. func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
  31. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
  32. }
  33. func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
  34. err := cmd.Err()
  35. h.endSpan(ctx, err)
  36. val := ctx.Value(startTimeKey)
  37. if val == nil {
  38. return nil
  39. }
  40. start, ok := val.(time.Duration)
  41. if !ok {
  42. return nil
  43. }
  44. duration := timex.Since(start)
  45. if duration > slowThreshold.Load() {
  46. logDuration(ctx, cmd, duration)
  47. }
  48. return nil
  49. }
  50. func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
  51. if len(cmds) == 0 {
  52. return ctx, nil
  53. }
  54. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
  55. }
  56. func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
  57. if len(cmds) == 0 {
  58. return nil
  59. }
  60. batchError := errorx.BatchError{}
  61. for _, cmd := range cmds {
  62. err := cmd.Err()
  63. if err == nil {
  64. continue
  65. }
  66. batchError.Add(err)
  67. }
  68. h.endSpan(ctx, batchError.Err())
  69. val := ctx.Value(startTimeKey)
  70. if val == nil {
  71. return nil
  72. }
  73. start, ok := val.(time.Duration)
  74. if !ok {
  75. return nil
  76. }
  77. duration := timex.Since(start)
  78. if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
  79. logDuration(ctx, cmds[0], duration)
  80. }
  81. return nil
  82. }
  83. func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) {
  84. var buf strings.Builder
  85. for i, arg := range cmd.Args() {
  86. if i > 0 {
  87. buf.WriteByte(' ')
  88. }
  89. buf.WriteString(mapping.Repr(arg))
  90. }
  91. logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
  92. }
  93. func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
  94. ctx, span := h.tracer.Start(ctx,
  95. spanName,
  96. oteltrace.WithSpanKind(oteltrace.SpanKindClient),
  97. )
  98. cmdStrs := make([]string, 0, len(cmds))
  99. for _, cmd := range cmds {
  100. cmdStrs = append(cmdStrs, cmd.Name())
  101. }
  102. span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
  103. return ctx
  104. }
  105. func (h hook) endSpan(ctx context.Context, err error) {
  106. span := oteltrace.SpanFromContext(ctx)
  107. defer span.End()
  108. if err == nil || err == red.Nil {
  109. span.SetStatus(codes.Ok, "")
  110. return
  111. }
  112. span.SetStatus(codes.Error, err.Error())
  113. span.RecordError(err)
  114. }