Quellcode durchsuchen

feat: support WithStreamClientInterceptor for zrpc clients (#1907)

* feat: support WithStreamClientInterceptor for zrpc clients

* fix: data race
Kevin Wan vor 3 Jahren
Ursprung
Commit
e80a64fa67

+ 2 - 2
core/logx/logs_test.go

@@ -462,7 +462,7 @@ func TestStructedLogWithDuration(t *testing.T) {
 
 	WithDuration(time.Second).Info(message)
 	var entry logEntry
-	if err := json.Unmarshal([]byte(w.builder.String()), &entry); err != nil {
+	if err := json.Unmarshal([]byte(w.String()), &entry); err != nil {
 		t.Error(err)
 	}
 	assert.Equal(t, levelInfo, entry.Level)
@@ -515,7 +515,7 @@ func TestErrorfWithWrappedError(t *testing.T) {
 	defer writer.Store(old)
 
 	Errorf("hello %w", errors.New(message))
-	assert.True(t, strings.Contains(w.builder.String(), "hello there"))
+	assert.True(t, strings.Contains(w.String(), "hello there"))
 }
 
 func TestMustNil(t *testing.T) {

+ 1 - 1
core/logx/syslog_test.go

@@ -38,7 +38,7 @@ func captureOutput(f func()) string {
 	f()
 	SetLevel(prevLevel)
 
-	return w.builder.String()
+	return w.String()
 }
 
 func getContent(jsonStr string) string {

+ 2 - 0
zrpc/client.go

@@ -15,6 +15,8 @@ var (
 	WithDialOption = internal.WithDialOption
 	// WithNonBlock sets the dialing to be nonblock.
 	WithNonBlock = internal.WithNonBlock
+	// WithStreamClientInterceptor is an alias of internal.WithStreamClientInterceptor.
+	WithStreamClientInterceptor = internal.WithStreamClientInterceptor
 	// WithTimeout is an alias of internal.WithTimeout.
 	WithTimeout = internal.WithTimeout
 	// WithTransportCredentials return a func to make the gRPC calls secured with given credentials.

+ 7 - 0
zrpc/internal/client.go

@@ -131,6 +131,13 @@ func WithNonBlock() ClientOption {
 	}
 }
 
+// WithStreamClientInterceptor returns a func to customize a ClientOptions with given interceptor.
+func WithStreamClientInterceptor(interceptor grpc.StreamClientInterceptor) ClientOption {
+	return func(options *ClientOptions) {
+		options.DialOptions = append(options.DialOptions, WithStreamClientInterceptors(interceptor))
+	}
+}
+
 // WithTimeout returns a func to customize a ClientOptions with given timeout.
 func WithTimeout(timeout time.Duration) ClientOption {
 	return func(options *ClientOptions) {

+ 11 - 0
zrpc/internal/client_test.go

@@ -31,6 +31,17 @@ func TestWithNonBlock(t *testing.T) {
 	assert.True(t, options.NonBlock)
 }
 
+func TestWithStreamClientInterceptor(t *testing.T) {
+	var options ClientOptions
+	opt := WithStreamClientInterceptor(func(ctx context.Context, desc *grpc.StreamDesc,
+		cc *grpc.ClientConn, method string, streamer grpc.Streamer,
+		opts ...grpc.CallOption) (grpc.ClientStream, error) {
+		return nil, nil
+	})
+	opt(&options)
+	assert.Equal(t, 1, len(options.DialOptions))
+}
+
 func TestWithTransportCredentials(t *testing.T) {
 	var options ClientOptions
 	opt := WithTransportCredentials(nil)

+ 1 - 1
zrpc/internal/rpcserver.go

@@ -71,7 +71,7 @@ func (s *rpcServer) Start(register RegisterFn) error {
 		WithStreamServerInterceptors(streamInterceptors...))
 	server := grpc.NewServer(options...)
 	register(server)
-	// we need to make sure all others are wrapped up
+	// we need to make sure all others are wrapped up,
 	// so we do graceful stop at shutdown phase instead of wrap up phase
 	waitForCalled := proc.AddWrapUpListener(func() {
 		server.GracefulStop()