serverstream.go 988 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package opentelemetry
  2. import (
  3. "context"
  4. "google.golang.org/grpc"
  5. )
  6. // serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
  7. // SendMsg method call.
  8. type serverStream struct {
  9. grpc.ServerStream
  10. ctx context.Context
  11. receivedMessageID int
  12. sentMessageID int
  13. }
  14. func (w *serverStream) Context() context.Context {
  15. return w.ctx
  16. }
  17. func (w *serverStream) RecvMsg(m interface{}) error {
  18. err := w.ServerStream.RecvMsg(m)
  19. if err == nil {
  20. w.receivedMessageID++
  21. MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  22. }
  23. return err
  24. }
  25. func (w *serverStream) SendMsg(m interface{}) error {
  26. err := w.ServerStream.SendMsg(m)
  27. w.sentMessageID++
  28. MessageSent.Event(w.Context(), w.sentMessageID, m)
  29. return err
  30. }
  31. // WrapServerStream wraps the given grpc.ServerStream with the given context.
  32. func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
  33. return &serverStream{
  34. ServerStream: ss,
  35. ctx: ctx,
  36. }
  37. }