Przeglądaj źródła

feat: add etcd resolver scheme, fix discov minor issue (#1281)

Kevin Wan 3 lat temu
rodzic
commit
d828c3f37e

+ 31 - 8
core/discov/internal/registry.go

@@ -37,25 +37,35 @@ func GetRegistry() *Registry {
 
 // GetConn returns an etcd client connection associated with given endpoints.
 func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
-	return r.getCluster(endpoints).getClient()
+	c, _ := r.getCluster(endpoints)
+	return c.getClient()
 }
 
 // Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
 func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
-	return r.getCluster(endpoints).monitor(key, l)
+	c, exists := r.getCluster(endpoints)
+	// if exists, the existing values should be updated to the listener.
+	if exists {
+		kvs := c.getCurrent(key)
+		for _, kv := range kvs {
+			l.OnAdd(kv)
+		}
+	}
+
+	return c.monitor(key, l)
 }
 
-func (r *Registry) getCluster(endpoints []string) *cluster {
+func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
 	clusterKey := getClusterKey(endpoints)
 	r.lock.Lock()
 	defer r.lock.Unlock()
-	c, ok := r.clusters[clusterKey]
-	if !ok {
+	c, exists = r.clusters[clusterKey]
+	if !exists {
 		c = newCluster(endpoints)
 		r.clusters[clusterKey] = c
 	}
 
-	return c
+	return
 }
 
 type cluster struct {
@@ -94,6 +104,21 @@ func (c *cluster) getClient() (EtcdClient, error) {
 	return val.(EtcdClient), nil
 }
 
+func (c *cluster) getCurrent(key string) []KV {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	var kvs []KV
+	for k, v := range c.values[key] {
+		kvs = append(kvs, KV{
+			Key: k,
+			Val: v,
+		})
+	}
+
+	return kvs
+}
+
 func (c *cluster) handleChanges(key string, kvs []KV) {
 	var add []KV
 	var remove []KV
@@ -197,14 +222,12 @@ func (c *cluster) load(cli EtcdClient, key string) {
 	}
 
 	var kvs []KV
-	c.lock.Lock()
 	for _, ev := range resp.Kvs {
 		kvs = append(kvs, KV{
 			Key: string(ev.Key),
 			Val: string(ev.Value),
 		})
 	}
-	c.lock.Unlock()
 
 	c.handleChanges(key, kvs)
 }

+ 3 - 3
core/discov/internal/registry_test.go

@@ -34,9 +34,9 @@ func setMockClient(cli EtcdClient) func() {
 
 func TestGetCluster(t *testing.T) {
 	AddAccount([]string{"first"}, "foo", "bar")
-	c1 := GetRegistry().getCluster([]string{"first"})
-	c2 := GetRegistry().getCluster([]string{"second"})
-	c3 := GetRegistry().getCluster([]string{"first"})
+	c1, _ := GetRegistry().getCluster([]string{"first"})
+	c2, _ := GetRegistry().getCluster([]string{"second"})
+	c3, _ := GetRegistry().getCluster([]string{"first"})
 	assert.Equal(t, c1, c3)
 	assert.NotEqual(t, c1, c2)
 }

+ 1 - 1
zrpc/internal/resolver/discovbuilder.go

@@ -10,7 +10,7 @@ import (
 
 type discovBuilder struct{}
 
-func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
+func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
 	resolver.Resolver, error) {
 	hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
 		return r == EndpointSepChar

+ 9 - 0
zrpc/internal/resolver/etcdbuilder.go

@@ -0,0 +1,9 @@
+package resolver
+
+type etcdBuilder struct {
+	discovBuilder
+}
+
+func (b *etcdBuilder) Scheme() string {
+	return EtcdScheme
+}

+ 10 - 6
zrpc/internal/resolver/resolver.go

@@ -11,6 +11,8 @@ const (
 	DirectScheme = "direct"
 	// DiscovScheme stands for discov scheme.
 	DiscovScheme = "discov"
+	// EtcdScheme stands for etcd scheme.
+	EtcdScheme = "etcd"
 	// KubernetesScheme stands for k8s scheme.
 	KubernetesScheme = "k8s"
 	// EndpointSepChar is the separator cha in endpoints.
@@ -23,16 +25,18 @@ var (
 	// EndpointSep is the separator string in endpoints.
 	EndpointSep = fmt.Sprintf("%c", EndpointSepChar)
 
-	dirBuilder directBuilder
-	disBuilder discovBuilder
-	k8sBuilder kubeBuilder
+	directResolverBuilder directBuilder
+	discovResolverBuilder discovBuilder
+	etcdResolverBuilder   etcdBuilder
+	k8sResolverBuilder    kubeBuilder
 )
 
 // RegisterResolver registers the direct and discov schemes to the resolver.
 func RegisterResolver() {
-	resolver.Register(&dirBuilder)
-	resolver.Register(&disBuilder)
-	resolver.Register(&k8sBuilder)
+	resolver.Register(&directResolverBuilder)
+	resolver.Register(&discovResolverBuilder)
+	resolver.Register(&etcdResolverBuilder)
+	resolver.Register(&k8sResolverBuilder)
 }
 
 type nopResolver struct {