opentracinginterceptor.go 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package clientinterceptors
  2. import (
  3. "context"
  4. "github.com/tal-tech/go-zero/core/trace/opentelemetry"
  5. "go.opentelemetry.io/otel"
  6. "go.opentelemetry.io/otel/codes"
  7. "go.opentelemetry.io/otel/trace"
  8. "google.golang.org/grpc"
  9. gcodes "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/metadata"
  11. "google.golang.org/grpc/status"
  12. )
  13. // UnaryOpenTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
  14. func UnaryOpenTracingInterceptor() grpc.UnaryClientInterceptor {
  15. propagator := otel.GetTextMapPropagator()
  16. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
  17. invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  18. if !opentelemetry.Enabled() {
  19. return invoker(ctx, method, req, reply, cc, opts...)
  20. }
  21. requestMetadata, _ := metadata.FromOutgoingContext(ctx)
  22. metadataCopy := requestMetadata.Copy()
  23. tr := otel.Tracer(opentelemetry.TraceName)
  24. name, attr := opentelemetry.SpanInfo(method, cc.Target())
  25. ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
  26. trace.WithAttributes(attr...))
  27. defer span.End()
  28. opentelemetry.Inject(ctx, propagator, &metadataCopy)
  29. ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
  30. opentelemetry.MessageSent.Event(ctx, 1, req)
  31. opentelemetry.MessageReceived.Event(ctx, 1, reply)
  32. if err := invoker(ctx, method, req, reply, cc, opts...); err != nil {
  33. s, _ := status.FromError(err)
  34. span.SetStatus(codes.Error, s.Message())
  35. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  36. return err
  37. }
  38. span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
  39. return nil
  40. }
  41. }
  42. // StreamOpenTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
  43. func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor {
  44. propagator := otel.GetTextMapPropagator()
  45. return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  46. streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  47. if !opentelemetry.Enabled() {
  48. return streamer(ctx, desc, cc, method, opts...)
  49. }
  50. requestMetadata, _ := metadata.FromOutgoingContext(ctx)
  51. metadataCopy := requestMetadata.Copy()
  52. tr := otel.Tracer(opentelemetry.TraceName)
  53. name, attr := opentelemetry.SpanInfo(method, cc.Target())
  54. ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
  55. trace.WithAttributes(attr...))
  56. opentelemetry.Inject(ctx, propagator, &metadataCopy)
  57. ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
  58. s, err := streamer(ctx, desc, cc, method, opts...)
  59. if err != nil {
  60. grpcStatus, _ := status.FromError(err)
  61. span.SetStatus(codes.Error, grpcStatus.Message())
  62. span.SetAttributes(opentelemetry.StatusCodeAttr(grpcStatus.Code()))
  63. span.End()
  64. return s, err
  65. }
  66. stream := opentelemetry.WrapClientStream(ctx, s, desc)
  67. go func() {
  68. if err := <-stream.Finished; err != nil {
  69. s, _ := status.FromError(err)
  70. span.SetStatus(codes.Error, s.Message())
  71. span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
  72. } else {
  73. span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
  74. }
  75. span.End()
  76. }()
  77. return stream, nil
  78. }
  79. }