瀏覽代碼

fix:etcd get&watch not atomic (#2321)

maizige 2 年之前
父節點
當前提交
422f401153
共有 2 個文件被更改,包括 22 次插入12 次删除
  1. 17 9
      core/discov/internal/registry.go
  2. 5 3
      core/discov/internal/registry_test.go

+ 17 - 9
core/discov/internal/registry.go

@@ -208,7 +208,7 @@ func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event) {
 	}
 	}
 }
 }
 
 
-func (c *cluster) load(cli EtcdClient, key string) {
+func (c *cluster) load(cli EtcdClient, key string) int64 {
 	var resp *clientv3.GetResponse
 	var resp *clientv3.GetResponse
 	for {
 	for {
 		var err error
 		var err error
@@ -232,6 +232,8 @@ func (c *cluster) load(cli EtcdClient, key string) {
 	}
 	}
 
 
 	c.handleChanges(key, kvs)
 	c.handleChanges(key, kvs)
+
+	return resp.Header.Revision
 }
 }
 
 
 func (c *cluster) monitor(key string, l UpdateListener) error {
 func (c *cluster) monitor(key string, l UpdateListener) error {
@@ -244,9 +246,9 @@ func (c *cluster) monitor(key string, l UpdateListener) error {
 		return err
 		return err
 	}
 	}
 
 
-	c.load(cli, key)
+	rev := c.load(cli, key)
 	c.watchGroup.Run(func() {
 	c.watchGroup.Run(func() {
-		c.watch(cli, key)
+		c.watch(cli, key, rev)
 	})
 	})
 
 
 	return nil
 	return nil
@@ -278,22 +280,28 @@ func (c *cluster) reload(cli EtcdClient) {
 	for _, key := range keys {
 	for _, key := range keys {
 		k := key
 		k := key
 		c.watchGroup.Run(func() {
 		c.watchGroup.Run(func() {
-			c.load(cli, k)
-			c.watch(cli, k)
+			rev := c.load(cli, k)
+			c.watch(cli, k, rev)
 		})
 		})
 	}
 	}
 }
 }
 
 
-func (c *cluster) watch(cli EtcdClient, key string) {
+func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
 	for {
 	for {
-		if c.watchStream(cli, key) {
+		if c.watchStream(cli, key, rev) {
 			return
 			return
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (c *cluster) watchStream(cli EtcdClient, key string) bool {
-	rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
+func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool {
+	var rch clientv3.WatchChan
+	if rev != 0 {
+		rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(), clientv3.WithRev(rev+1))
+	} else {
+		rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
+	}
+
 	for {
 	for {
 		select {
 		select {
 		case wresp, ok := <-rch:
 		case wresp, ok := <-rch:

+ 5 - 3
core/discov/internal/registry_test.go

@@ -2,6 +2,7 @@ package internal
 
 
 import (
 import (
 	"context"
 	"context"
+	"go.etcd.io/etcd/api/v3/etcdserverpb"
 	"sync"
 	"sync"
 	"testing"
 	"testing"
 
 
@@ -112,6 +113,7 @@ func TestCluster_Load(t *testing.T) {
 	restore := setMockClient(cli)
 	restore := setMockClient(cli)
 	defer restore()
 	defer restore()
 	cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
 	cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
+		Header: &etcdserverpb.ResponseHeader{},
 		Kvs: []*mvccpb.KeyValue{
 		Kvs: []*mvccpb.KeyValue{
 			{
 			{
 				Key:   []byte("hello"),
 				Key:   []byte("hello"),
@@ -168,7 +170,7 @@ func TestCluster_Watch(t *testing.T) {
 			listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
 			listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
 				wg.Done()
 				wg.Done()
 			}).MaxTimes(1)
 			}).MaxTimes(1)
-			go c.watch(cli, "any")
+			go c.watch(cli, "any", 0)
 			ch <- clientv3.WatchResponse{
 			ch <- clientv3.WatchResponse{
 				Events: []*clientv3.Event{
 				Events: []*clientv3.Event{
 					{
 					{
@@ -212,7 +214,7 @@ func TestClusterWatch_RespFailures(t *testing.T) {
 				ch <- resp
 				ch <- resp
 				close(c.done)
 				close(c.done)
 			}()
 			}()
-			c.watch(cli, "any")
+			c.watch(cli, "any", 0)
 		})
 		})
 	}
 	}
 }
 }
@@ -232,7 +234,7 @@ func TestClusterWatch_CloseChan(t *testing.T) {
 		close(ch)
 		close(ch)
 		close(c.done)
 		close(c.done)
 	}()
 	}()
-	c.watch(cli, "any")
+	c.watch(cli, "any", 0)
 }
 }
 
 
 func TestValueOnlyContext(t *testing.T) {
 func TestValueOnlyContext(t *testing.T) {