Преглед на файлове

fix: etcd publisher reconnecting problem (#2710)

* fix: etcd publisher reconnecting problem

* chore: fix wrong call
Kevin Wan преди 2 години
родител
ревизия
c3756a8f1c
променени са 1 файла, в които са добавени 45 реда и са изтрити 11 реда
  1. 45 11
      core/discov/publisher.go

+ 45 - 11
core/discov/publisher.go

@@ -1,6 +1,8 @@
 package discov
 
 import (
+	"time"
+
 	"github.com/zeromicro/go-zero/core/discov/internal"
 	"github.com/zeromicro/go-zero/core/lang"
 	"github.com/zeromicro/go-zero/core/logx"
@@ -51,12 +53,7 @@ func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Pub
 
 // KeepAlive keeps key:value alive.
 func (p *Publisher) KeepAlive() error {
-	cli, err := internal.GetRegistry().GetConn(p.endpoints)
-	if err != nil {
-		return err
-	}
-
-	p.lease, err = p.register(cli)
+	cli, err := p.doRegister()
 	if err != nil {
 		return err
 	}
@@ -83,6 +80,43 @@ func (p *Publisher) Stop() {
 	p.quit.Close()
 }
 
+func (p *Publisher) doKeepAlive() error {
+	ticker := time.NewTicker(time.Second)
+	defer ticker.Stop()
+
+	for range ticker.C {
+		select {
+		case <-p.quit.Done():
+			return nil
+		default:
+			cli, err := p.doRegister()
+			if err != nil {
+				logx.Errorf("etcd publisher doRegister: %s", err.Error())
+				break
+			}
+
+			if err := p.keepAliveAsync(cli); err != nil {
+				logx.Errorf("etcd publisher keepAliveAsync: %s", err.Error())
+				break
+			}
+
+			return nil
+		}
+	}
+
+	return nil
+}
+
+func (p *Publisher) doRegister() (internal.EtcdClient, error) {
+	cli, err := internal.GetRegistry().GetConn(p.endpoints)
+	if err != nil {
+		return nil, err
+	}
+
+	p.lease, err = p.register(cli)
+	return cli, err
+}
+
 func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
 	ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
 	if err != nil {
@@ -95,8 +129,8 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
 			case _, ok := <-ch:
 				if !ok {
 					p.revoke(cli)
-					if err := p.KeepAlive(); err != nil {
-						logx.Errorf("KeepAlive: %s", err.Error())
+					if err := p.doKeepAlive(); err != nil {
+						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
 					}
 					return
 				}
@@ -105,8 +139,8 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
 				p.revoke(cli)
 				select {
 				case <-p.resumeChan:
-					if err := p.KeepAlive(); err != nil {
-						logx.Errorf("KeepAlive: %s", err.Error())
+					if err := p.doKeepAlive(); err != nil {
+						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
 					}
 					return
 				case <-p.quit.Done():
@@ -141,7 +175,7 @@ func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, erro
 
 func (p *Publisher) revoke(cli internal.EtcdClient) {
 	if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
-		logx.Error(err)
+		logx.Errorf("etcd publisher revoke: %s", err.Error())
 	}
 }