1
0

publisher.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package discov
  2. import (
  3. "time"
  4. "github.com/wuntsong-org/go-zero-plus/core/discov/internal"
  5. "github.com/wuntsong-org/go-zero-plus/core/lang"
  6. "github.com/wuntsong-org/go-zero-plus/core/logx"
  7. "github.com/wuntsong-org/go-zero-plus/core/proc"
  8. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  9. "github.com/wuntsong-org/go-zero-plus/core/threading"
  10. clientv3 "go.etcd.io/etcd/client/v3"
  11. )
  12. type (
  13. // PubOption defines the method to customize a Publisher.
  14. PubOption func(client *Publisher)
  15. // A Publisher can be used to publish the value to an etcd cluster on the given key.
  16. Publisher struct {
  17. endpoints []string
  18. key string
  19. fullKey string
  20. id int64
  21. value string
  22. lease clientv3.LeaseID
  23. quit *syncx.DoneChan
  24. pauseChan chan lang.PlaceholderType
  25. resumeChan chan lang.PlaceholderType
  26. }
  27. )
  28. // NewPublisher returns a Publisher.
  29. // endpoints is the hosts of the etcd cluster.
  30. // key:value are a pair to be published.
  31. // opts are used to customize the Publisher.
  32. func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher {
  33. publisher := &Publisher{
  34. endpoints: endpoints,
  35. key: key,
  36. value: value,
  37. quit: syncx.NewDoneChan(),
  38. pauseChan: make(chan lang.PlaceholderType),
  39. resumeChan: make(chan lang.PlaceholderType),
  40. }
  41. for _, opt := range opts {
  42. opt(publisher)
  43. }
  44. return publisher
  45. }
  46. // KeepAlive keeps key:value alive.
  47. func (p *Publisher) KeepAlive() error {
  48. cli, err := p.doRegister()
  49. if err != nil {
  50. return err
  51. }
  52. proc.AddWrapUpListener(func() {
  53. p.Stop()
  54. })
  55. return p.keepAliveAsync(cli)
  56. }
  57. // Pause pauses the renewing of key:value.
  58. func (p *Publisher) Pause() {
  59. p.pauseChan <- lang.Placeholder
  60. }
  61. // Resume resumes the renewing of key:value.
  62. func (p *Publisher) Resume() {
  63. p.resumeChan <- lang.Placeholder
  64. }
  65. // Stop stops the renewing and revokes the registration.
  66. func (p *Publisher) Stop() {
  67. p.quit.Close()
  68. }
  69. func (p *Publisher) doKeepAlive() error {
  70. ticker := time.NewTicker(time.Second)
  71. defer ticker.Stop()
  72. for range ticker.C {
  73. select {
  74. case <-p.quit.Done():
  75. return nil
  76. default:
  77. cli, err := p.doRegister()
  78. if err != nil {
  79. logx.Errorf("etcd publisher doRegister: %s", err.Error())
  80. break
  81. }
  82. if err := p.keepAliveAsync(cli); err != nil {
  83. logx.Errorf("etcd publisher keepAliveAsync: %s", err.Error())
  84. break
  85. }
  86. return nil
  87. }
  88. }
  89. return nil
  90. }
  91. func (p *Publisher) doRegister() (internal.EtcdClient, error) {
  92. cli, err := internal.GetRegistry().GetConn(p.endpoints)
  93. if err != nil {
  94. return nil, err
  95. }
  96. p.lease, err = p.register(cli)
  97. return cli, err
  98. }
  99. func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
  100. ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
  101. if err != nil {
  102. return err
  103. }
  104. threading.GoSafe(func() {
  105. for {
  106. select {
  107. case _, ok := <-ch:
  108. if !ok {
  109. p.revoke(cli)
  110. if err := p.doKeepAlive(); err != nil {
  111. logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
  112. }
  113. return
  114. }
  115. case <-p.pauseChan:
  116. logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
  117. p.revoke(cli)
  118. select {
  119. case <-p.resumeChan:
  120. if err := p.doKeepAlive(); err != nil {
  121. logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
  122. }
  123. return
  124. case <-p.quit.Done():
  125. return
  126. }
  127. case <-p.quit.Done():
  128. p.revoke(cli)
  129. return
  130. }
  131. }
  132. })
  133. return nil
  134. }
  135. func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
  136. resp, err := client.Grant(client.Ctx(), TimeToLive)
  137. if err != nil {
  138. return clientv3.NoLease, err
  139. }
  140. lease := resp.ID
  141. if p.id > 0 {
  142. p.fullKey = makeEtcdKey(p.key, p.id)
  143. } else {
  144. p.fullKey = makeEtcdKey(p.key, int64(lease))
  145. }
  146. _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
  147. return lease, err
  148. }
  149. func (p *Publisher) revoke(cli internal.EtcdClient) {
  150. if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
  151. logx.Errorf("etcd publisher revoke: %s", err.Error())
  152. }
  153. }
  154. // WithId customizes a Publisher with the id.
  155. func WithId(id int64) PubOption {
  156. return func(publisher *Publisher) {
  157. publisher.id = id
  158. }
  159. }
  160. // WithPubEtcdAccount provides the etcd username/password.
  161. func WithPubEtcdAccount(user, pass string) PubOption {
  162. return func(pub *Publisher) {
  163. RegisterAccount(pub.endpoints, user, pass)
  164. }
  165. }
  166. // WithPubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
  167. func WithPubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) PubOption {
  168. return func(pub *Publisher) {
  169. logx.Must(RegisterTLS(pub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
  170. }
  171. }