clientstream.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package opentelemetry
  2. import (
  3. "context"
  4. "io"
  5. "google.golang.org/grpc"
  6. "google.golang.org/grpc/metadata"
  7. "google.golang.org/protobuf/proto"
  8. )
  9. const (
  10. receiveEndEvent streamEventType = iota
  11. errorEvent
  12. )
  13. var _ = proto.Marshal
  14. type streamEventType int
  15. type streamEvent struct {
  16. Type streamEventType
  17. Err error
  18. }
  19. type clientStream struct {
  20. grpc.ClientStream
  21. desc *grpc.StreamDesc
  22. events chan streamEvent
  23. eventsDone chan struct{}
  24. Finished chan error
  25. receivedMessageID int
  26. sentMessageID int
  27. }
  28. func (w *clientStream) RecvMsg(m interface{}) error {
  29. err := w.ClientStream.RecvMsg(m)
  30. if err == nil && !w.desc.ServerStreams {
  31. w.sendStreamEvent(receiveEndEvent, nil)
  32. } else if err == io.EOF {
  33. w.sendStreamEvent(receiveEndEvent, nil)
  34. } else if err != nil {
  35. w.sendStreamEvent(errorEvent, err)
  36. } else {
  37. w.receivedMessageID++
  38. MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  39. }
  40. return err
  41. }
  42. func (w *clientStream) SendMsg(m interface{}) error {
  43. err := w.ClientStream.SendMsg(m)
  44. w.sentMessageID++
  45. MessageSent.Event(w.Context(), w.sentMessageID, m)
  46. if err != nil {
  47. w.sendStreamEvent(errorEvent, err)
  48. }
  49. return err
  50. }
  51. func (w *clientStream) Header() (metadata.MD, error) {
  52. md, err := w.ClientStream.Header()
  53. if err != nil {
  54. w.sendStreamEvent(errorEvent, err)
  55. }
  56. return md, err
  57. }
  58. func (w *clientStream) CloseSend() error {
  59. err := w.ClientStream.CloseSend()
  60. if err != nil {
  61. w.sendStreamEvent(errorEvent, err)
  62. }
  63. return err
  64. }
  65. func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
  66. select {
  67. case <-w.eventsDone:
  68. case w.events <- streamEvent{Type: eventType, Err: err}:
  69. }
  70. }
  71. // WrapClientStream wraps s with given ctx and desc.
  72. func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
  73. events := make(chan streamEvent)
  74. eventsDone := make(chan struct{})
  75. finished := make(chan error)
  76. go func() {
  77. defer close(eventsDone)
  78. for {
  79. select {
  80. case event := <-events:
  81. switch event.Type {
  82. case receiveEndEvent:
  83. finished <- nil
  84. return
  85. case errorEvent:
  86. finished <- event.Err
  87. return
  88. }
  89. case <-ctx.Done():
  90. finished <- ctx.Err()
  91. return
  92. }
  93. }
  94. }()
  95. return &clientStream{
  96. ClientStream: s,
  97. desc: desc,
  98. events: events,
  99. eventsDone: eventsDone,
  100. Finished: finished,
  101. }
  102. }