tracinginterceptor.go 4.9 KB


  1. package clientinterceptors
  2. import (
  3. "context"
  4. "io"
  5. ztrace "github.com/zeromicro/go-zero/core/trace"
  6. "go.opentelemetry.io/otel"
  7. "go.opentelemetry.io/otel/codes"
  8. "go.opentelemetry.io/otel/trace"
  9. "google.golang.org/grpc"
  10. gcodes "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/metadata"
  12. "google.golang.org/grpc/status"
  13. )
  14. const (
  15. receiveEndEvent streamEventType = iota
  16. errorEvent
  17. )
  18. // UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
  19. func UnaryTracingInterceptor(ctx context.Context, method string, req, reply interface{},
  20. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  21. ctx, span := startSpan(ctx, method, cc.Target())
  22. defer span.End()
  23. ztrace.MessageSent.Event(ctx, 1, req)
  24. err := invoker(ctx, method, req, reply, cc, opts...)
  25. ztrace.MessageReceived.Event(ctx, 1, reply)
  26. if err != nil {
  27. s, ok := status.FromError(err)
  28. if ok {
  29. span.SetStatus(codes.Error, s.Message())
  30. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  31. } else {
  32. span.SetStatus(codes.Error, err.Error())
  33. }
  34. return err
  35. }
  36. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  37. return nil
  38. }
  39. // StreamTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
  40. func StreamTracingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
  41. method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  42. ctx, span := startSpan(ctx, method, cc.Target())
  43. s, err := streamer(ctx, desc, cc, method, opts...)
  44. if err != nil {
  45. st, ok := status.FromError(err)
  46. if ok {
  47. span.SetStatus(codes.Error, st.Message())
  48. span.SetAttributes(ztrace.StatusCodeAttr(st.Code()))
  49. } else {
  50. span.SetStatus(codes.Error, err.Error())
  51. }
  52. span.End()
  53. return s, err
  54. }
  55. stream := wrapClientStream(ctx, s, desc)
  56. go func() {
  57. if err := <-stream.Finished; err != nil {
  58. s, ok := status.FromError(err)
  59. if ok {
  60. span.SetStatus(codes.Error, s.Message())
  61. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  62. } else {
  63. span.SetStatus(codes.Error, err.Error())
  64. }
  65. } else {
  66. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  67. }
  68. span.End()
  69. }()
  70. return stream, nil
  71. }
  72. type (
  73. streamEventType int
  74. streamEvent struct {
  75. Type streamEventType
  76. Err error
  77. }
  78. clientStream struct {
  79. grpc.ClientStream
  80. Finished chan error
  81. desc *grpc.StreamDesc
  82. events chan streamEvent
  83. eventsDone chan struct{}
  84. receivedMessageID int
  85. sentMessageID int
  86. }
  87. )
  88. func (w *clientStream) CloseSend() error {
  89. err := w.ClientStream.CloseSend()
  90. if err != nil {
  91. w.sendStreamEvent(errorEvent, err)
  92. }
  93. return err
  94. }
  95. func (w *clientStream) Header() (metadata.MD, error) {
  96. md, err := w.ClientStream.Header()
  97. if err != nil {
  98. w.sendStreamEvent(errorEvent, err)
  99. }
  100. return md, err
  101. }
  102. func (w *clientStream) RecvMsg(m interface{}) error {
  103. err := w.ClientStream.RecvMsg(m)
  104. if err == nil && !w.desc.ServerStreams {
  105. w.sendStreamEvent(receiveEndEvent, nil)
  106. } else if err == io.EOF {
  107. w.sendStreamEvent(receiveEndEvent, nil)
  108. } else if err != nil {
  109. w.sendStreamEvent(errorEvent, err)
  110. } else {
  111. w.receivedMessageID++
  112. ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  113. }
  114. return err
  115. }
  116. func (w *clientStream) SendMsg(m interface{}) error {
  117. err := w.ClientStream.SendMsg(m)
  118. w.sentMessageID++
  119. ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
  120. if err != nil {
  121. w.sendStreamEvent(errorEvent, err)
  122. }
  123. return err
  124. }
  125. func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
  126. select {
  127. case <-w.eventsDone:
  128. case w.events <- streamEvent{Type: eventType, Err: err}:
  129. }
  130. }
  131. func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) {
  132. var md metadata.MD
  133. requestMetadata, ok := metadata.FromOutgoingContext(ctx)
  134. if ok {
  135. md = requestMetadata.Copy()
  136. } else {
  137. md = metadata.MD{}
  138. }
  139. tr := otel.Tracer(ztrace.TraceName)
  140. name, attr := ztrace.SpanInfo(method, target)
  141. ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
  142. trace.WithAttributes(attr...))
  143. ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md)
  144. ctx = metadata.NewOutgoingContext(ctx, md)
  145. return ctx, span
  146. }
  147. // wrapClientStream wraps s with given ctx and desc.
  148. func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
  149. events := make(chan streamEvent)
  150. eventsDone := make(chan struct{})
  151. finished := make(chan error)
  152. go func() {
  153. defer close(eventsDone)
  154. for {
  155. select {
  156. case event := <-events:
  157. switch event.Type {
  158. case receiveEndEvent:
  159. finished <- nil
  160. return
  161. case errorEvent:
  162. finished <- event.Err
  163. return
  164. }
  165. case <-ctx.Done():
  166. finished <- ctx.Err()
  167. return
  168. }
  169. }
  170. }()
  171. return &clientStream{
  172. ClientStream: s,
  173. desc: desc,
  174. events: events,
  175. eventsDone: eventsDone,
  176. Finished: finished,
  177. }
  178. }