123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- package discov
- import (
- "time"
- "github.com/wuntsong-org/go-zero-plus/core/discov/internal"
- "github.com/wuntsong-org/go-zero-plus/core/lang"
- "github.com/wuntsong-org/go-zero-plus/core/logx"
- "github.com/wuntsong-org/go-zero-plus/core/proc"
- "github.com/wuntsong-org/go-zero-plus/core/syncx"
- "github.com/wuntsong-org/go-zero-plus/core/threading"
- clientv3 "go.etcd.io/etcd/client/v3"
- )
- type (
- // PubOption defines the method to customize a Publisher.
- PubOption func(client *Publisher)
- // A Publisher can be used to publish the value to an etcd cluster on the given key.
- Publisher struct {
- endpoints []string
- key string
- fullKey string
- id int64
- value string
- lease clientv3.LeaseID
- quit *syncx.DoneChan
- pauseChan chan lang.PlaceholderType
- resumeChan chan lang.PlaceholderType
- }
- )
- // NewPublisher returns a Publisher.
- // endpoints is the hosts of the etcd cluster.
- // key:value are a pair to be published.
- // opts are used to customize the Publisher.
- func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher {
- publisher := &Publisher{
- endpoints: endpoints,
- key: key,
- value: value,
- quit: syncx.NewDoneChan(),
- pauseChan: make(chan lang.PlaceholderType),
- resumeChan: make(chan lang.PlaceholderType),
- }
- for _, opt := range opts {
- opt(publisher)
- }
- return publisher
- }
- // KeepAlive keeps key:value alive.
- func (p *Publisher) KeepAlive() error {
- cli, err := p.doRegister()
- if err != nil {
- return err
- }
- proc.AddWrapUpListener(func() {
- p.Stop()
- })
- return p.keepAliveAsync(cli)
- }
- // Pause pauses the renewing of key:value.
- func (p *Publisher) Pause() {
- p.pauseChan <- lang.Placeholder
- }
- // Resume resumes the renewing of key:value.
- func (p *Publisher) Resume() {
- p.resumeChan <- lang.Placeholder
- }
- // Stop stops the renewing and revokes the registration.
- func (p *Publisher) Stop() {
- p.quit.Close()
- }
- func (p *Publisher) doKeepAlive() error {
- ticker := time.NewTicker(time.Second)
- defer ticker.Stop()
- for range ticker.C {
- select {
- case <-p.quit.Done():
- return nil
- default:
- cli, err := p.doRegister()
- if err != nil {
- logx.Errorf("etcd publisher doRegister: %s", err.Error())
- break
- }
- if err := p.keepAliveAsync(cli); err != nil {
- logx.Errorf("etcd publisher keepAliveAsync: %s", err.Error())
- break
- }
- return nil
- }
- }
- return nil
- }
- func (p *Publisher) doRegister() (internal.EtcdClient, error) {
- cli, err := internal.GetRegistry().GetConn(p.endpoints)
- if err != nil {
- return nil, err
- }
- p.lease, err = p.register(cli)
- return cli, err
- }
- func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
- ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
- if err != nil {
- return err
- }
- threading.GoSafe(func() {
- for {
- select {
- case _, ok := <-ch:
- if !ok {
- p.revoke(cli)
- if err := p.doKeepAlive(); err != nil {
- logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
- }
- return
- }
- case <-p.pauseChan:
- logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
- p.revoke(cli)
- select {
- case <-p.resumeChan:
- if err := p.doKeepAlive(); err != nil {
- logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
- }
- return
- case <-p.quit.Done():
- return
- }
- case <-p.quit.Done():
- p.revoke(cli)
- return
- }
- }
- })
- return nil
- }
- func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
- resp, err := client.Grant(client.Ctx(), TimeToLive)
- if err != nil {
- return clientv3.NoLease, err
- }
- lease := resp.ID
- if p.id > 0 {
- p.fullKey = makeEtcdKey(p.key, p.id)
- } else {
- p.fullKey = makeEtcdKey(p.key, int64(lease))
- }
- _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
- return lease, err
- }
- func (p *Publisher) revoke(cli internal.EtcdClient) {
- if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
- logx.Errorf("etcd publisher revoke: %s", err.Error())
- }
- }
- // WithId customizes a Publisher with the id.
- func WithId(id int64) PubOption {
- return func(publisher *Publisher) {
- publisher.id = id
- }
- }
- // WithPubEtcdAccount provides the etcd username/password.
- func WithPubEtcdAccount(user, pass string) PubOption {
- return func(pub *Publisher) {
- RegisterAccount(pub.endpoints, user, pass)
- }
- }
- // WithPubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
- func WithPubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) PubOption {
- return func(pub *Publisher) {
- logx.Must(RegisterTLS(pub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
- }
- }
|