hook.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package redis
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "strings"
  7. "time"
  8. red "github.com/go-redis/redis/v8"
  9. "github.com/wuntsong-org/go-zero-plus/core/breaker"
  10. "github.com/wuntsong-org/go-zero-plus/core/errorx"
  11. "github.com/wuntsong-org/go-zero-plus/core/logx"
  12. "github.com/wuntsong-org/go-zero-plus/core/mapping"
  13. "github.com/wuntsong-org/go-zero-plus/core/timex"
  14. "github.com/wuntsong-org/go-zero-plus/core/trace"
  15. "go.opentelemetry.io/otel/attribute"
  16. "go.opentelemetry.io/otel/codes"
  17. oteltrace "go.opentelemetry.io/otel/trace"
  18. )
  19. // spanName is the span name of the redis calls.
  20. const spanName = "redis"
  21. var (
  22. startTimeKey = contextKey("startTime")
  23. durationHook = hook{}
  24. redisCmdsAttributeKey = attribute.Key("redis.cmds")
  25. )
  26. type (
  27. contextKey string
  28. hook struct{}
  29. )
  30. func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
  31. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
  32. }
  33. func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
  34. err := cmd.Err()
  35. h.endSpan(ctx, err)
  36. val := ctx.Value(startTimeKey)
  37. if val == nil {
  38. return nil
  39. }
  40. start, ok := val.(time.Duration)
  41. if !ok {
  42. return nil
  43. }
  44. duration := timex.Since(start)
  45. if duration > slowThreshold.Load() {
  46. logDuration(ctx, []red.Cmder{cmd}, duration)
  47. metricSlowCount.Inc(cmd.Name())
  48. }
  49. metricReqDur.ObserveFloat(float64(duration)/float64(time.Millisecond), cmd.Name())
  50. if msg := formatError(err); len(msg) > 0 {
  51. metricReqErr.Inc(cmd.Name(), msg)
  52. }
  53. return nil
  54. }
  55. func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
  56. if len(cmds) == 0 {
  57. return ctx, nil
  58. }
  59. return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
  60. }
  61. func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
  62. if len(cmds) == 0 {
  63. return nil
  64. }
  65. batchError := errorx.BatchError{}
  66. for _, cmd := range cmds {
  67. err := cmd.Err()
  68. if err == nil {
  69. continue
  70. }
  71. batchError.Add(err)
  72. }
  73. h.endSpan(ctx, batchError.Err())
  74. val := ctx.Value(startTimeKey)
  75. if val == nil {
  76. return nil
  77. }
  78. start, ok := val.(time.Duration)
  79. if !ok {
  80. return nil
  81. }
  82. duration := timex.Since(start)
  83. if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
  84. logDuration(ctx, cmds, duration)
  85. }
  86. metricReqDur.Observe(duration.Milliseconds(), "Pipeline")
  87. if msg := formatError(batchError.Err()); len(msg) > 0 {
  88. metricReqErr.Inc("Pipeline", msg)
  89. }
  90. return nil
  91. }
  92. func formatError(err error) string {
  93. if err == nil || err == red.Nil {
  94. return ""
  95. }
  96. opErr, ok := err.(*net.OpError)
  97. if ok && opErr.Timeout() {
  98. return "timeout"
  99. }
  100. switch err {
  101. case io.EOF:
  102. return "eof"
  103. case context.DeadlineExceeded:
  104. return "context deadline"
  105. case breaker.ErrServiceUnavailable:
  106. return "breaker"
  107. default:
  108. return "unexpected error"
  109. }
  110. }
  111. func logDuration(ctx context.Context, cmds []red.Cmder, duration time.Duration) {
  112. var buf strings.Builder
  113. for k, cmd := range cmds {
  114. if k > 0 {
  115. buf.WriteByte('\n')
  116. }
  117. var build strings.Builder
  118. for i, arg := range cmd.Args() {
  119. if i > 0 {
  120. build.WriteByte(' ')
  121. }
  122. build.WriteString(mapping.Repr(arg))
  123. }
  124. buf.WriteString(build.String())
  125. }
  126. logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
  127. }
  128. func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
  129. tracer := trace.TracerFromContext(ctx)
  130. ctx, span := tracer.Start(ctx,
  131. spanName,
  132. oteltrace.WithSpanKind(oteltrace.SpanKindClient),
  133. )
  134. cmdStrs := make([]string, 0, len(cmds))
  135. for _, cmd := range cmds {
  136. cmdStrs = append(cmdStrs, cmd.Name())
  137. }
  138. span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
  139. return ctx
  140. }
  141. func (h hook) endSpan(ctx context.Context, err error) {
  142. span := oteltrace.SpanFromContext(ctx)
  143. defer span.End()
  144. if err == nil || err == red.Nil {
  145. span.SetStatus(codes.Ok, "")
  146. return
  147. }
  148. span.SetStatus(codes.Error, err.Error())
  149. span.RecordError(err)
  150. }