tracinginterceptor_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package clientinterceptors
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/trace"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/metadata"
  13. "google.golang.org/grpc/status"
  14. )
  15. func TestOpenTracingInterceptor(t *testing.T) {
  16. trace.StartAgent(trace.Config{
  17. Name: "go-zero-test",
  18. Endpoint: "http://localhost:14268/api/traces",
  19. Batcher: "jaeger",
  20. Sampler: 1.0,
  21. })
  22. cc := new(grpc.ClientConn)
  23. err := UnaryTracingInterceptor(context.Background(), "/ListUser", nil, nil, cc,
  24. func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
  25. opts ...grpc.CallOption) error {
  26. return nil
  27. })
  28. assert.Nil(t, err)
  29. }
  30. func TestUnaryTracingInterceptor(t *testing.T) {
  31. var run int32
  32. var wg sync.WaitGroup
  33. wg.Add(1)
  34. cc := new(grpc.ClientConn)
  35. err := UnaryTracingInterceptor(context.Background(), "/foo", nil, nil, cc,
  36. func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
  37. opts ...grpc.CallOption) error {
  38. defer wg.Done()
  39. atomic.AddInt32(&run, 1)
  40. return nil
  41. })
  42. wg.Wait()
  43. assert.Nil(t, err)
  44. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  45. }
  46. func TestUnaryTracingInterceptor_WithError(t *testing.T) {
  47. var run int32
  48. var wg sync.WaitGroup
  49. wg.Add(1)
  50. cc := new(grpc.ClientConn)
  51. err := UnaryTracingInterceptor(context.Background(), "/foo", nil, nil, cc,
  52. func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
  53. opts ...grpc.CallOption) error {
  54. defer wg.Done()
  55. atomic.AddInt32(&run, 1)
  56. return errors.New("dummy")
  57. })
  58. wg.Wait()
  59. assert.NotNil(t, err)
  60. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  61. }
  62. func TestStreamTracingInterceptor(t *testing.T) {
  63. var run int32
  64. var wg sync.WaitGroup
  65. wg.Add(1)
  66. cc := new(grpc.ClientConn)
  67. _, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo",
  68. func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  69. opts ...grpc.CallOption) (grpc.ClientStream, error) {
  70. defer wg.Done()
  71. atomic.AddInt32(&run, 1)
  72. return nil, nil
  73. })
  74. wg.Wait()
  75. assert.Nil(t, err)
  76. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  77. }
  78. func TestStreamTracingInterceptor_FinishWithNormalError(t *testing.T) {
  79. var wg sync.WaitGroup
  80. wg.Add(1)
  81. cc := new(grpc.ClientConn)
  82. ctx, cancel := context.WithCancel(context.Background())
  83. stream, err := StreamTracingInterceptor(ctx, nil, cc, "/foo",
  84. func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  85. opts ...grpc.CallOption) (grpc.ClientStream, error) {
  86. defer wg.Done()
  87. return nil, nil
  88. })
  89. wg.Wait()
  90. assert.Nil(t, err)
  91. cancel()
  92. cs := stream.(*clientStream)
  93. <-cs.eventsDone
  94. }
  95. func TestStreamTracingInterceptor_FinishWithGrpcError(t *testing.T) {
  96. tests := []struct {
  97. name string
  98. event streamEventType
  99. err error
  100. }{
  101. {
  102. name: "receive event",
  103. event: receiveEndEvent,
  104. err: status.Error(codes.DataLoss, "dummy"),
  105. },
  106. {
  107. name: "error event",
  108. event: errorEvent,
  109. err: status.Error(codes.DataLoss, "dummy"),
  110. },
  111. }
  112. for _, test := range tests {
  113. test := test
  114. t.Run(test.name, func(t *testing.T) {
  115. t.Parallel()
  116. var wg sync.WaitGroup
  117. wg.Add(1)
  118. cc := new(grpc.ClientConn)
  119. stream, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo",
  120. func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  121. opts ...grpc.CallOption) (grpc.ClientStream, error) {
  122. defer wg.Done()
  123. return &mockedClientStream{
  124. err: errors.New("dummy"),
  125. }, nil
  126. })
  127. wg.Wait()
  128. assert.Nil(t, err)
  129. cs := stream.(*clientStream)
  130. cs.sendStreamEvent(test.event, status.Error(codes.DataLoss, "dummy"))
  131. <-cs.eventsDone
  132. cs.sendStreamEvent(test.event, test.err)
  133. assert.NotNil(t, cs.CloseSend())
  134. })
  135. }
  136. }
  137. func TestStreamTracingInterceptor_WithError(t *testing.T) {
  138. tests := []struct {
  139. name string
  140. err error
  141. }{
  142. {
  143. name: "normal error",
  144. err: errors.New("dummy"),
  145. },
  146. {
  147. name: "grpc error",
  148. err: status.Error(codes.DataLoss, "dummy"),
  149. },
  150. }
  151. for _, test := range tests {
  152. test := test
  153. t.Run(test.name, func(t *testing.T) {
  154. t.Parallel()
  155. var run int32
  156. var wg sync.WaitGroup
  157. wg.Add(1)
  158. cc := new(grpc.ClientConn)
  159. _, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo",
  160. func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  161. opts ...grpc.CallOption) (grpc.ClientStream, error) {
  162. defer wg.Done()
  163. atomic.AddInt32(&run, 1)
  164. return new(mockedClientStream), test.err
  165. })
  166. wg.Wait()
  167. assert.NotNil(t, err)
  168. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  169. })
  170. }
  171. }
  172. func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
  173. var run int32
  174. var wg sync.WaitGroup
  175. wg.Add(1)
  176. cc := new(grpc.ClientConn)
  177. err := UnaryTracingInterceptor(context.Background(), "/foo", nil, nil, cc,
  178. func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
  179. opts ...grpc.CallOption) error {
  180. defer wg.Done()
  181. atomic.AddInt32(&run, 1)
  182. return nil
  183. })
  184. wg.Wait()
  185. assert.Nil(t, err)
  186. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  187. }
  188. func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) {
  189. var run int32
  190. var wg sync.WaitGroup
  191. wg.Add(1)
  192. cc := new(grpc.ClientConn)
  193. _, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo",
  194. func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
  195. opts ...grpc.CallOption) (grpc.ClientStream, error) {
  196. defer wg.Done()
  197. atomic.AddInt32(&run, 1)
  198. return nil, nil
  199. })
  200. wg.Wait()
  201. assert.Nil(t, err)
  202. assert.Equal(t, int32(1), atomic.LoadInt32(&run))
  203. }
  204. type mockedClientStream struct {
  205. md metadata.MD
  206. err error
  207. }
  208. func (m *mockedClientStream) Header() (metadata.MD, error) {
  209. return m.md, m.err
  210. }
  211. func (m *mockedClientStream) Trailer() metadata.MD {
  212. panic("implement me")
  213. }
  214. func (m *mockedClientStream) CloseSend() error {
  215. return m.err
  216. }
  217. func (m *mockedClientStream) Context() context.Context {
  218. panic("implement me")
  219. }
  220. func (m *mockedClientStream) SendMsg(v interface{}) error {
  221. panic("implement me")
  222. }
  223. func (m *mockedClientStream) RecvMsg(v interface{}) error {
  224. panic("implement me")
  225. }