tracinginterceptor.go 4.9 KB


  1. package clientinterceptors
  2. import (
  3. "context"
  4. "io"
  5. ztrace "github.com/zeromicro/go-zero/core/trace"
  6. "go.opentelemetry.io/otel"
  7. "go.opentelemetry.io/otel/codes"
  8. "go.opentelemetry.io/otel/trace"
  9. "google.golang.org/grpc"
  10. gcodes "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/metadata"
  12. "google.golang.org/grpc/status"
  13. )
  14. const (
  15. receiveEndEvent streamEventType = iota
  16. errorEvent
  17. )
  18. // UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
  19. func UnaryTracingInterceptor(ctx context.Context, method string, req, reply interface{},
  20. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  21. ctx, span := startSpan(ctx, method, cc.Target())
  22. defer span.End()
  23. ztrace.MessageSent.Event(ctx, 1, req)
  24. err := invoker(ctx, method, req, reply, cc, opts...)
  25. // fix: https://github.com/zeromicro/go-zero/issues/1954
  26. ztrace.MessageReceived.Event(ctx, 1, reply)
  27. if err != nil {
  28. s, ok := status.FromError(err)
  29. if ok {
  30. span.SetStatus(codes.Error, s.Message())
  31. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  32. } else {
  33. span.SetStatus(codes.Error, err.Error())
  34. }
  35. return err
  36. }
  37. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  38. return nil
  39. }
  40. // StreamTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
  41. func StreamTracingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
  42. method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  43. ctx, span := startSpan(ctx, method, cc.Target())
  44. s, err := streamer(ctx, desc, cc, method, opts...)
  45. if err != nil {
  46. st, ok := status.FromError(err)
  47. if ok {
  48. span.SetStatus(codes.Error, st.Message())
  49. span.SetAttributes(ztrace.StatusCodeAttr(st.Code()))
  50. } else {
  51. span.SetStatus(codes.Error, err.Error())
  52. }
  53. span.End()
  54. return s, err
  55. }
  56. stream := wrapClientStream(ctx, s, desc)
  57. go func() {
  58. if err := <-stream.Finished; err != nil {
  59. s, ok := status.FromError(err)
  60. if ok {
  61. span.SetStatus(codes.Error, s.Message())
  62. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  63. } else {
  64. span.SetStatus(codes.Error, err.Error())
  65. }
  66. } else {
  67. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  68. }
  69. span.End()
  70. }()
  71. return stream, nil
  72. }
  73. type (
  74. streamEventType int
  75. streamEvent struct {
  76. Type streamEventType
  77. Err error
  78. }
  79. clientStream struct {
  80. grpc.ClientStream
  81. Finished chan error
  82. desc *grpc.StreamDesc
  83. events chan streamEvent
  84. eventsDone chan struct{}
  85. receivedMessageID int
  86. sentMessageID int
  87. }
  88. )
  89. func (w *clientStream) CloseSend() error {
  90. err := w.ClientStream.CloseSend()
  91. if err != nil {
  92. w.sendStreamEvent(errorEvent, err)
  93. }
  94. return err
  95. }
  96. func (w *clientStream) Header() (metadata.MD, error) {
  97. md, err := w.ClientStream.Header()
  98. if err != nil {
  99. w.sendStreamEvent(errorEvent, err)
  100. }
  101. return md, err
  102. }
  103. func (w *clientStream) RecvMsg(m interface{}) error {
  104. err := w.ClientStream.RecvMsg(m)
  105. if err == nil && !w.desc.ServerStreams {
  106. w.sendStreamEvent(receiveEndEvent, nil)
  107. } else if err == io.EOF {
  108. w.sendStreamEvent(receiveEndEvent, nil)
  109. } else if err != nil {
  110. w.sendStreamEvent(errorEvent, err)
  111. } else {
  112. w.receivedMessageID++
  113. ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  114. }
  115. return err
  116. }
  117. func (w *clientStream) SendMsg(m interface{}) error {
  118. err := w.ClientStream.SendMsg(m)
  119. w.sentMessageID++
  120. ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
  121. if err != nil {
  122. w.sendStreamEvent(errorEvent, err)
  123. }
  124. return err
  125. }
  126. func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
  127. select {
  128. case <-w.eventsDone:
  129. case w.events <- streamEvent{Type: eventType, Err: err}:
  130. }
  131. }
  132. func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) {
  133. var md metadata.MD
  134. requestMetadata, ok := metadata.FromOutgoingContext(ctx)
  135. if ok {
  136. md = requestMetadata.Copy()
  137. } else {
  138. md = metadata.MD{}
  139. }
  140. tr := otel.Tracer(ztrace.TraceName)
  141. name, attr := ztrace.SpanInfo(method, target)
  142. ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
  143. trace.WithAttributes(attr...))
  144. ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md)
  145. ctx = metadata.NewOutgoingContext(ctx, md)
  146. return ctx, span
  147. }
  148. // wrapClientStream wraps s with given ctx and desc.
  149. func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
  150. events := make(chan streamEvent)
  151. eventsDone := make(chan struct{})
  152. finished := make(chan error)
  153. go func() {
  154. defer close(eventsDone)
  155. for {
  156. select {
  157. case event := <-events:
  158. switch event.Type {
  159. case receiveEndEvent:
  160. finished <- nil
  161. return
  162. case errorEvent:
  163. finished <- event.Err
  164. return
  165. }
  166. case <-ctx.Done():
  167. finished <- ctx.Err()
  168. return
  169. }
  170. }
  171. }()
  172. return &clientStream{
  173. ClientStream: s,
  174. desc: desc,
  175. events: events,
  176. eventsDone: eventsDone,
  177. Finished: finished,
  178. }
  179. }