1
0

tracinginterceptor.go 4.8 KB


  1. package clientinterceptors
  2. import (
  3. "context"
  4. "io"
  5. ztrace "github.com/wuntsong-org/go-zero-plus/core/trace"
  6. "go.opentelemetry.io/otel"
  7. "go.opentelemetry.io/otel/codes"
  8. "go.opentelemetry.io/otel/trace"
  9. "google.golang.org/grpc"
  10. gcodes "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/metadata"
  12. "google.golang.org/grpc/status"
  13. )
  14. const (
  15. receiveEndEvent streamEventType = iota
  16. errorEvent
  17. )
  18. // UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
  19. func UnaryTracingInterceptor(ctx context.Context, method string, req, reply any,
  20. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  21. ctx, span := startSpan(ctx, method, cc.Target())
  22. defer span.End()
  23. ztrace.MessageSent.Event(ctx, 1, req)
  24. err := invoker(ctx, method, req, reply, cc, opts...)
  25. ztrace.MessageReceived.Event(ctx, 1, reply)
  26. if err != nil {
  27. s, ok := status.FromError(err)
  28. if ok {
  29. span.SetStatus(codes.Error, s.Message())
  30. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  31. } else {
  32. span.SetStatus(codes.Error, err.Error())
  33. }
  34. return err
  35. }
  36. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  37. return nil
  38. }
  39. // StreamTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
  40. func StreamTracingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
  41. method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  42. ctx, span := startSpan(ctx, method, cc.Target())
  43. s, err := streamer(ctx, desc, cc, method, opts...)
  44. if err != nil {
  45. st, ok := status.FromError(err)
  46. if ok {
  47. span.SetStatus(codes.Error, st.Message())
  48. span.SetAttributes(ztrace.StatusCodeAttr(st.Code()))
  49. } else {
  50. span.SetStatus(codes.Error, err.Error())
  51. }
  52. span.End()
  53. return s, err
  54. }
  55. stream := wrapClientStream(ctx, s, desc)
  56. go func() {
  57. if err := <-stream.Finished; err != nil {
  58. s, ok := status.FromError(err)
  59. if ok {
  60. span.SetStatus(codes.Error, s.Message())
  61. span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
  62. } else {
  63. span.SetStatus(codes.Error, err.Error())
  64. }
  65. } else {
  66. span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
  67. }
  68. span.End()
  69. }()
  70. return stream, nil
  71. }
  72. type (
  73. streamEventType int
  74. streamEvent struct {
  75. Type streamEventType
  76. Err error
  77. }
  78. clientStream struct {
  79. grpc.ClientStream
  80. Finished chan error
  81. desc *grpc.StreamDesc
  82. events chan streamEvent
  83. eventsDone chan struct{}
  84. receivedMessageID int
  85. sentMessageID int
  86. }
  87. )
  88. func (w *clientStream) CloseSend() error {
  89. err := w.ClientStream.CloseSend()
  90. if err != nil {
  91. w.sendStreamEvent(errorEvent, err)
  92. }
  93. return err
  94. }
  95. func (w *clientStream) Header() (metadata.MD, error) {
  96. md, err := w.ClientStream.Header()
  97. if err != nil {
  98. w.sendStreamEvent(errorEvent, err)
  99. }
  100. return md, err
  101. }
  102. func (w *clientStream) RecvMsg(m any) error {
  103. err := w.ClientStream.RecvMsg(m)
  104. if err == nil && !w.desc.ServerStreams {
  105. w.sendStreamEvent(receiveEndEvent, nil)
  106. } else if err == io.EOF {
  107. w.sendStreamEvent(receiveEndEvent, nil)
  108. } else if err != nil {
  109. w.sendStreamEvent(errorEvent, err)
  110. } else {
  111. w.receivedMessageID++
  112. ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  113. }
  114. return err
  115. }
  116. func (w *clientStream) SendMsg(m any) error {
  117. err := w.ClientStream.SendMsg(m)
  118. w.sentMessageID++
  119. ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
  120. if err != nil {
  121. w.sendStreamEvent(errorEvent, err)
  122. }
  123. return err
  124. }
  125. func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
  126. select {
  127. case <-w.eventsDone:
  128. case w.events <- streamEvent{Type: eventType, Err: err}:
  129. }
  130. }
  131. func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) {
  132. md, ok := metadata.FromOutgoingContext(ctx)
  133. if !ok {
  134. md = metadata.MD{}
  135. }
  136. tr := otel.Tracer(ztrace.TraceName)
  137. name, attr := ztrace.SpanInfo(method, target)
  138. ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
  139. trace.WithAttributes(attr...))
  140. ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md)
  141. ctx = metadata.NewOutgoingContext(ctx, md)
  142. return ctx, span
  143. }
  144. // wrapClientStream wraps s with given ctx and desc.
  145. func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
  146. events := make(chan streamEvent)
  147. eventsDone := make(chan struct{})
  148. finished := make(chan error)
  149. go func() {
  150. defer close(eventsDone)
  151. for {
  152. select {
  153. case event := <-events:
  154. switch event.Type {
  155. case receiveEndEvent:
  156. finished <- nil
  157. return
  158. case errorEvent:
  159. finished <- event.Err
  160. return
  161. }
  162. case <-ctx.Done():
  163. finished <- ctx.Err()
  164. return
  165. }
  166. }
  167. }()
  168. return &clientStream{
  169. ClientStream: s,
  170. desc: desc,
  171. events: events,
  172. eventsDone: eventsDone,
  173. Finished: finished,
  174. }
  175. }