opentracinginterceptor.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package clientinterceptors
  2. import (
  3. "context"
  4. "github.com/tal-tech/go-zero/core/opentelemetry"
  5. "go.opentelemetry.io/otel"
  6. "go.opentelemetry.io/otel/codes"
  7. "go.opentelemetry.io/otel/trace"
  8. "google.golang.org/grpc"
  9. grpc_codes "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/metadata"
  11. "google.golang.org/grpc/status"
  12. )
  13. func OpenTracingInterceptor() grpc.UnaryClientInterceptor {
  14. propagator := otel.GetTextMapPropagator()
  15. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  16. if !opentelemetry.Enabled() {
  17. return invoker(ctx, method, req, reply, cc, opts...)
  18. }
  19. requestMetadata, _ := metadata.FromOutgoingContext(ctx)
  20. metadataCopy := requestMetadata.Copy()
  21. tr := otel.Tracer(opentelemetry.TraceName)
  22. name, attr := opentelemetry.SpanInfo(method, cc.Target())
  23. var span trace.Span
  24. ctx, span = tr.Start(ctx,
  25. name,
  26. trace.WithSpanKind(trace.SpanKindClient),
  27. trace.WithAttributes(attr...),
  28. )
  29. defer span.End()
  30. opentelemetry.Inject(ctx, propagator, &metadataCopy)
  31. ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
  32. opentelemetry.MessageSent.Event(ctx, 1, req)
  33. err := invoker(ctx, method, req, reply, cc, opts...)
  34. opentelemetry.MessageReceived.Event(ctx, 1, reply)
  35. if err != nil {
  36. s, _ := status.FromError(err)
  37. span.SetStatus(codes.Error, s.Message())
  38. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  39. } else {
  40. span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK))
  41. }
  42. return err
  43. }
  44. }
  45. func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor {
  46. propagator := otel.GetTextMapPropagator()
  47. return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  48. if !opentelemetry.Enabled() {
  49. return streamer(ctx, desc, cc, method, opts...)
  50. }
  51. requestMetadata, _ := metadata.FromOutgoingContext(ctx)
  52. metadataCopy := requestMetadata.Copy()
  53. tr := otel.Tracer(opentelemetry.TraceName)
  54. name, attr := opentelemetry.SpanInfo(method, cc.Target())
  55. var span trace.Span
  56. ctx, span = tr.Start(
  57. ctx,
  58. name,
  59. trace.WithSpanKind(trace.SpanKindClient),
  60. trace.WithAttributes(attr...),
  61. )
  62. opentelemetry.Inject(ctx, propagator, &metadataCopy)
  63. ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
  64. s, err := streamer(ctx, desc, cc, method, opts...)
  65. if err != nil {
  66. grpcStatus, _ := status.FromError(err)
  67. span.SetStatus(codes.Error, grpcStatus.Message())
  68. span.SetAttributes(opentelemetry.StatusCodeAttr(grpcStatus.Code()))
  69. span.End()
  70. return s, err
  71. }
  72. stream := opentelemetry.WrapClientStream(ctx, s, desc)
  73. go func() {
  74. err := <-stream.Finished
  75. if err != nil {
  76. s, _ := status.FromError(err)
  77. span.SetStatus(codes.Error, s.Message())
  78. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  79. } else {
  80. span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK))
  81. }
  82. span.End()
  83. }()
  84. return stream, nil
  85. }
  86. }