opentracinginterceptor.go 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package serverinterceptors
  2. import (
  3. "context"
  4. "github.com/tal-tech/go-zero/core/trace/opentelemetry"
  5. "go.opentelemetry.io/otel"
  6. "go.opentelemetry.io/otel/baggage"
  7. "go.opentelemetry.io/otel/codes"
  8. "go.opentelemetry.io/otel/trace"
  9. "google.golang.org/grpc"
  10. gcodes "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/metadata"
  12. "google.golang.org/grpc/status"
  13. )
  14. // UnaryOpenTracingInterceptor returns a grpc.UnaryServerInterceptor for opentelemetry.
  15. func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor {
  16. propagator := otel.GetTextMapPropagator()
  17. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
  18. handler grpc.UnaryHandler) (interface{}, error) {
  19. if !opentelemetry.Enabled() {
  20. return handler(ctx, req)
  21. }
  22. requestMetadata, _ := metadata.FromIncomingContext(ctx)
  23. metadataCopy := requestMetadata.Copy()
  24. bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
  25. ctx = baggage.ContextWithBaggage(ctx, bags)
  26. tr := otel.Tracer(opentelemetry.TraceName)
  27. name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
  28. ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
  29. trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
  30. defer span.End()
  31. opentelemetry.MessageReceived.Event(ctx, 1, req)
  32. resp, err := handler(ctx, req)
  33. if err != nil {
  34. s, _ := status.FromError(err)
  35. span.SetStatus(codes.Error, s.Message())
  36. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  37. opentelemetry.MessageSent.Event(ctx, 1, s.Proto())
  38. return nil, err
  39. }
  40. span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
  41. opentelemetry.MessageSent.Event(ctx, 1, resp)
  42. return resp, nil
  43. }
  44. }
  45. // StreamOpenTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
  46. func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor {
  47. propagator := otel.GetTextMapPropagator()
  48. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  49. ctx := ss.Context()
  50. if !opentelemetry.Enabled() {
  51. return handler(srv, opentelemetry.WrapServerStream(ctx, ss))
  52. }
  53. requestMetadata, _ := metadata.FromIncomingContext(ctx)
  54. metadataCopy := requestMetadata.Copy()
  55. bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
  56. ctx = baggage.ContextWithBaggage(ctx, bags)
  57. tr := otel.Tracer(opentelemetry.TraceName)
  58. name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
  59. ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
  60. trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
  61. defer span.End()
  62. if err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)); err != nil {
  63. s, _ := status.FromError(err)
  64. span.SetStatus(codes.Error, s.Message())
  65. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  66. return err
  67. }
  68. span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
  69. return nil
  70. }
  71. }