123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package redis
- import (
- "context"
- "io"
- "net"
- "strings"
- "time"
- red "github.com/go-redis/redis/v8"
- "github.com/zeromicro/go-zero/core/breaker"
- "github.com/zeromicro/go-zero/core/errorx"
- "github.com/zeromicro/go-zero/core/logx"
- "github.com/zeromicro/go-zero/core/mapping"
- "github.com/zeromicro/go-zero/core/timex"
- "github.com/zeromicro/go-zero/core/trace"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/codes"
- oteltrace "go.opentelemetry.io/otel/trace"
- )
- // spanName is the span name of the redis calls.
- const spanName = "redis"
- var (
- startTimeKey = contextKey("startTime")
- durationHook = hook{}
- redisCmdsAttributeKey = attribute.Key("redis.cmds")
- )
- type (
- contextKey string
- hook struct{}
- )
- func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
- return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
- }
- func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
- err := cmd.Err()
- h.endSpan(ctx, err)
- val := ctx.Value(startTimeKey)
- if val == nil {
- return nil
- }
- start, ok := val.(time.Duration)
- if !ok {
- return nil
- }
- duration := timex.Since(start)
- if duration > slowThreshold.Load() {
- logDuration(ctx, []red.Cmder{cmd}, duration)
- metricSlowCount.Inc(cmd.Name())
- }
- metricReqDur.ObserveFloat(float64(duration)/float64(time.Millisecond), cmd.Name())
- if msg := formatError(err); len(msg) > 0 {
- metricReqErr.Inc(cmd.Name(), msg)
- }
- return nil
- }
- func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
- if len(cmds) == 0 {
- return ctx, nil
- }
- return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
- }
- func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
- if len(cmds) == 0 {
- return nil
- }
- batchError := errorx.BatchError{}
- for _, cmd := range cmds {
- err := cmd.Err()
- if err == nil {
- continue
- }
- batchError.Add(err)
- }
- h.endSpan(ctx, batchError.Err())
- val := ctx.Value(startTimeKey)
- if val == nil {
- return nil
- }
- start, ok := val.(time.Duration)
- if !ok {
- return nil
- }
- duration := timex.Since(start)
- if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
- logDuration(ctx, cmds, duration)
- }
- metricReqDur.Observe(duration.Milliseconds(), "Pipeline")
- if msg := formatError(batchError.Err()); len(msg) > 0 {
- metricReqErr.Inc("Pipeline", msg)
- }
- return nil
- }
- func formatError(err error) string {
- if err == nil || err == red.Nil {
- return ""
- }
- opErr, ok := err.(*net.OpError)
- if ok && opErr.Timeout() {
- return "timeout"
- }
- switch err {
- case io.EOF:
- return "eof"
- case context.DeadlineExceeded:
- return "context deadline"
- case breaker.ErrServiceUnavailable:
- return "breaker"
- default:
- return "unexpected error"
- }
- }
- func logDuration(ctx context.Context, cmds []red.Cmder, duration time.Duration) {
- var buf strings.Builder
- for k, cmd := range cmds {
- if k > 0 {
- buf.WriteByte('\n')
- }
- var build strings.Builder
- for i, arg := range cmd.Args() {
- if i > 0 {
- build.WriteByte(' ')
- }
- build.WriteString(mapping.Repr(arg))
- }
- buf.WriteString(build.String())
- }
- logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
- }
- func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
- tracer := trace.TracerFromContext(ctx)
- ctx, span := tracer.Start(ctx,
- spanName,
- oteltrace.WithSpanKind(oteltrace.SpanKindClient),
- )
- cmdStrs := make([]string, 0, len(cmds))
- for _, cmd := range cmds {
- cmdStrs = append(cmdStrs, cmd.Name())
- }
- span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
- return ctx
- }
- func (h hook) endSpan(ctx context.Context, err error) {
- span := oteltrace.SpanFromContext(ctx)
- defer span.End()
- if err == nil || err == red.Nil {
- span.SetStatus(codes.Ok, "")
- return
- }
- span.SetStatus(codes.Error, err.Error())
- span.RecordError(err)
- }
|