Browse Source

Added zrpc server custom serverID for custom registration Key when the service is registered on ETCD. (#3008)

Shyunn 2 years ago
parent
commit
544aa7c432

+ 6 - 0
core/discov/config.go

@@ -13,6 +13,7 @@ var (
 type EtcdConf struct {
 	Hosts              []string
 	Key                string
+	ServerID           int64  `json:",optional"`
 	User               string `json:",optional"`
 	Pass               string `json:",optional"`
 	CertFile           string `json:",optional"`
@@ -26,6 +27,11 @@ func (c EtcdConf) HasAccount() bool {
 	return len(c.User) > 0 && len(c.Pass) > 0
 }
 
+// HasServerID returns if ServerID provided.
+func (c EtcdConf) HasServerID() bool {
+	return c.ServerID > 0
+}
+
 // HasTLS returns if TLS CertFile/CertKeyFile/CACertFile are provided.
 func (c EtcdConf) HasTLS() bool {
 	return len(c.CertFile) > 0 && len(c.CertKeyFile) > 0 && len(c.CACertFile) > 0

+ 33 - 0
core/discov/config_test.go

@@ -80,3 +80,36 @@ func TestEtcdConf_HasAccount(t *testing.T) {
 		assert.Equal(t, test.hasAccount, test.EtcdConf.HasAccount())
 	}
 }
+
+func TestEtcdConf_HasServerID(t *testing.T) {
+	tests := []struct {
+		EtcdConf
+		hasServerID bool
+	}{
+		{
+			EtcdConf: EtcdConf{
+				Hosts:    []string{"any"},
+				ServerID: -1,
+			},
+			hasServerID: false,
+		},
+		{
+			EtcdConf: EtcdConf{
+				Hosts:    []string{"any"},
+				ServerID: 0,
+			},
+			hasServerID: false,
+		},
+		{
+			EtcdConf: EtcdConf{
+				Hosts:    []string{"any"},
+				ServerID: 10000,
+			},
+			hasServerID: true,
+		},
+	}
+
+	for _, test := range tests {
+		assert.Equal(t, test.hasServerID, test.EtcdConf.HasServerID())
+	}
+}

+ 2 - 1
zrpc/internal/clientinterceptors/tracinginterceptor.go

@@ -2,6 +2,7 @@ package clientinterceptors
 
 import (
 	"context"
+	"github.com/zeromicro/go-zero/core/lang"
 	"io"
 
 	ztrace "github.com/zeromicro/go-zero/core/trace"
@@ -94,7 +95,7 @@ type (
 		Finished          chan error
 		desc              *grpc.StreamDesc
 		events            chan streamEvent
-		eventsDone        chan struct{}
+		eventsDone        chan lang.PlaceholderType
 		receivedMessageID int
 		sentMessageID     int
 	}

+ 3 - 0
zrpc/internal/rpcpubserver.go

@@ -26,6 +26,9 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMi
 			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
 				etcd.CACertFile, etcd.InsecureSkipVerify))
 		}
+		if etcd.HasServerID() {
+			pubOpts = append(pubOpts, discov.WithId(etcd.ServerID))
+		}
 		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
 		return pubClient.KeepAlive()
 	}