clientstream.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package opentelemetry
  2. import (
  3. "context"
  4. "io"
  5. "google.golang.org/grpc"
  6. "google.golang.org/grpc/metadata"
  7. "google.golang.org/protobuf/proto"
  8. )
  9. type streamEventType int
  10. type streamEvent struct {
  11. Type streamEventType
  12. Err error
  13. }
  14. const (
  15. receiveEndEvent streamEventType = iota
  16. errorEvent
  17. )
  18. type clientStream struct {
  19. grpc.ClientStream
  20. desc *grpc.StreamDesc
  21. events chan streamEvent
  22. eventsDone chan struct{}
  23. Finished chan error
  24. receivedMessageID int
  25. sentMessageID int
  26. }
  27. var _ = proto.Marshal
  28. func (w *clientStream) RecvMsg(m interface{}) error {
  29. err := w.ClientStream.RecvMsg(m)
  30. if err == nil && !w.desc.ServerStreams {
  31. w.sendStreamEvent(receiveEndEvent, nil)
  32. } else if err == io.EOF {
  33. w.sendStreamEvent(receiveEndEvent, nil)
  34. } else if err != nil {
  35. w.sendStreamEvent(errorEvent, err)
  36. } else {
  37. w.receivedMessageID++
  38. MessageReceived.Event(w.Context(), w.receivedMessageID, m)
  39. }
  40. return err
  41. }
  42. func (w *clientStream) SendMsg(m interface{}) error {
  43. err := w.ClientStream.SendMsg(m)
  44. w.sentMessageID++
  45. MessageSent.Event(w.Context(), w.sentMessageID, m)
  46. if err != nil {
  47. w.sendStreamEvent(errorEvent, err)
  48. }
  49. return err
  50. }
  51. func (w *clientStream) Header() (metadata.MD, error) {
  52. md, err := w.ClientStream.Header()
  53. if err != nil {
  54. w.sendStreamEvent(errorEvent, err)
  55. }
  56. return md, err
  57. }
  58. func (w *clientStream) CloseSend() error {
  59. err := w.ClientStream.CloseSend()
  60. if err != nil {
  61. w.sendStreamEvent(errorEvent, err)
  62. }
  63. return err
  64. }
  65. func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
  66. select {
  67. case <-w.eventsDone:
  68. case w.events <- streamEvent{Type: eventType, Err: err}:
  69. }
  70. }
  71. func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
  72. events := make(chan streamEvent)
  73. eventsDone := make(chan struct{})
  74. finished := make(chan error)
  75. go func() {
  76. defer close(eventsDone)
  77. for {
  78. select {
  79. case event := <-events:
  80. switch event.Type {
  81. case receiveEndEvent:
  82. finished <- nil
  83. return
  84. case errorEvent:
  85. finished <- event.Err
  86. return
  87. }
  88. case <-ctx.Done():
  89. finished <- ctx.Err()
  90. return
  91. }
  92. }
  93. }()
  94. return &clientStream{
  95. ClientStream: s,
  96. desc: desc,
  97. events: events,
  98. eventsDone: eventsDone,
  99. Finished: finished,
  100. }
  101. }