tracinginterceptor.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package serverinterceptors
  2. import (
  3. "context"
  4. ztrace "github.com/zeromicro/go-zero/core/trace"
  5. "go.opentelemetry.io/otel"
  6. "go.opentelemetry.io/otel/baggage"
  7. "go.opentelemetry.io/otel/codes"
  8. "go.opentelemetry.io/otel/trace"
  9. "google.golang.org/grpc"
  10. gcodes "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/metadata"
  12. "google.golang.org/grpc/status"
  13. )
  14. // UnaryTracingInterceptor is a grpc.UnaryServerInterceptor for opentelemetry.
  15. func UnaryTracingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
  16. handler grpc.UnaryHandler) (interface{}, error) {
  17. ctx, span := startSpan(ctx, info.FullMethod)
  18. defer span.End()
  19. ztrace.MessageReceived.Event(ctx, 1, req)
  20. resp, err := handler(ctx, req)
  21. if err != nil {
  22. s, ok := status.FromError(err)
  23. if ok {
  24. span.SetStatus(codes.Error, s.Message())
  25. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  26. ztrace.MessageSent.Event(ctx, 1, s.Proto())
  27. } else {
  28. span.SetStatus(codes.Error, err.Error())
  29. }
  30. return nil, err
  31. }
  32. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  33. ztrace.MessageSent.Event(ctx, 1, resp)
  34. return resp, nil
  35. }
  36. // StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
  37. func StreamTracingInterceptor(svr interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
  38. handler grpc.StreamHandler) error {
  39. ctx, span := startSpan(ss.Context(), info.FullMethod)
  40. defer span.End()
  41. if err := handler(svr, wrapServerStream(ctx, ss)); err != nil {
  42. s, ok := status.FromError(err)
  43. if ok {
  44. span.SetStatus(codes.Error, s.Message())
  45. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  46. } else {
  47. span.SetStatus(codes.Error, err.Error())
  48. }
  49. return err
  50. }
  51. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  52. return nil
  53. }
  54. // serverStream wraps around the embedded grpc.ServerStream,
  55. // and intercepts the RecvMsg and SendMsg method call.
  56. type serverStream struct {
  57. grpc.ServerStream
  58. ctx context.Context
  59. receivedMessageID int
  60. sentMessageID int
  61. }
  62. func (w *serverStream) Context() context.Context {
  63. return w.ctx
  64. }
  65. func (w *serverStream) RecvMsg(m interface{}) error {
  66. err := w.ServerStream.RecvMsg(m)
  67. if err == nil {
  68. w.receivedMessageID++
  69. ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  70. }
  71. return err
  72. }
  73. func (w *serverStream) SendMsg(m interface{}) error {
  74. err := w.ServerStream.SendMsg(m)
  75. w.sentMessageID++
  76. ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
  77. return err
  78. }
  79. func startSpan(ctx context.Context, method string) (context.Context, trace.Span) {
  80. md, ok := metadata.FromIncomingContext(ctx)
  81. if !ok {
  82. md = metadata.MD{}
  83. }
  84. bags, spanCtx := ztrace.Extract(ctx, otel.GetTextMapPropagator(), &md)
  85. ctx = baggage.ContextWithBaggage(ctx, bags)
  86. tr := otel.Tracer(ztrace.TraceName)
  87. name, attr := ztrace.SpanInfo(method, ztrace.PeerFromCtx(ctx))
  88. return tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
  89. trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
  90. }
  91. // wrapServerStream wraps the given grpc.ServerStream with the given context.
  92. func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
  93. return &serverStream{
  94. ServerStream: ss,
  95. ctx: ctx,
  96. }
  97. }