hook.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package redis
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. red "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/errorx"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "github.com/zeromicro/go-zero/core/mapping"
  10. "github.com/zeromicro/go-zero/core/timex"
  11. "github.com/zeromicro/go-zero/core/trace"
  12. "go.opentelemetry.io/otel"
  13. "go.opentelemetry.io/otel/attribute"
  14. "go.opentelemetry.io/otel/codes"
  15. oteltrace "go.opentelemetry.io/otel/trace"
  16. )
  17. // spanName is the span name of the redis calls.
  18. const spanName = "redis"
  19. var (
  20. startTimeKey = contextKey("startTime")
  21. durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)}
  22. redisCmdsAttributeKey = attribute.Key("redis.cmds")
  23. )
  24. type (
  25. contextKey string
  26. hook struct {
  27. tracer oteltrace.Tracer
  28. }
  29. )
  30. func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
  31. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
  32. }
  33. func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
  34. err := cmd.Err()
  35. h.endSpan(ctx, err)
  36. val := ctx.Value(startTimeKey)
  37. if val == nil {
  38. return nil
  39. }
  40. start, ok := val.(time.Duration)
  41. if !ok {
  42. return nil
  43. }
  44. duration := timex.Since(start)
  45. if duration > slowThreshold.Load() {
  46. logDuration(ctx, cmd, duration)
  47. }
  48. metricReqDur.Observe(int64(duration/time.Millisecond), cmd.Name())
  49. if msg := formatError(err); len(msg) > 0 {
  50. metricReqErr.Inc(cmd.Name(), msg)
  51. }
  52. return nil
  53. }
  54. func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
  55. if len(cmds) == 0 {
  56. return ctx, nil
  57. }
  58. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
  59. }
  60. func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
  61. if len(cmds) == 0 {
  62. return nil
  63. }
  64. batchError := errorx.BatchError{}
  65. for _, cmd := range cmds {
  66. err := cmd.Err()
  67. if err == nil {
  68. continue
  69. }
  70. batchError.Add(err)
  71. }
  72. h.endSpan(ctx, batchError.Err())
  73. val := ctx.Value(startTimeKey)
  74. if val == nil {
  75. return nil
  76. }
  77. start, ok := val.(time.Duration)
  78. if !ok {
  79. return nil
  80. }
  81. duration := timex.Since(start)
  82. if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
  83. logDuration(ctx, cmds[0], duration)
  84. }
  85. metricReqDur.Observe(int64(duration/time.Millisecond), "Pipeline")
  86. if msg := formatError(batchError.Err()); len(msg) > 0 {
  87. metricReqErr.Inc("Pipeline", msg)
  88. }
  89. return nil
  90. }
  91. func formatError(err error) string {
  92. if err == nil || err == red.Nil {
  93. return ""
  94. }
  95. es := err.Error()
  96. switch {
  97. case strings.HasPrefix(es, "read"):
  98. return "read timeout"
  99. case strings.HasPrefix(es, "dial"):
  100. if strings.Contains(es, "connection refused") {
  101. return "connection refused"
  102. }
  103. return "dial timeout"
  104. case strings.HasPrefix(es, "write"):
  105. return "write timeout"
  106. case strings.Contains(es, "EOF"):
  107. return "eof"
  108. case strings.Contains(es, "reset"):
  109. return "reset"
  110. case strings.Contains(es, "broken"):
  111. return "broken pipe"
  112. case strings.Contains(es, "breaker"):
  113. return "breaker"
  114. default:
  115. return "unexpected error"
  116. }
  117. }
  118. func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) {
  119. var buf strings.Builder
  120. for i, arg := range cmd.Args() {
  121. if i > 0 {
  122. buf.WriteByte(' ')
  123. }
  124. buf.WriteString(mapping.Repr(arg))
  125. }
  126. logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
  127. }
  128. func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
  129. ctx, span := h.tracer.Start(ctx,
  130. spanName,
  131. oteltrace.WithSpanKind(oteltrace.SpanKindClient),
  132. )
  133. cmdStrs := make([]string, 0, len(cmds))
  134. for _, cmd := range cmds {
  135. cmdStrs = append(cmdStrs, cmd.Name())
  136. }
  137. span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
  138. return ctx
  139. }
  140. func (h hook) endSpan(ctx context.Context, err error) {
  141. span := oteltrace.SpanFromContext(ctx)
  142. defer span.End()
  143. if err == nil || err == red.Nil {
  144. span.SetStatus(codes.Ok, "")
  145. return
  146. }
  147. span.SetStatus(codes.Error, err.Error())
  148. span.RecordError(err)
  149. }