rpcpubserver.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package internal
  2. import (
  3. "os"
  4. "strings"
  5. "github.com/zeromicro/go-zero/core/discov"
  6. "github.com/zeromicro/go-zero/core/netx"
  7. )
  8. const (
  9. allEths = "0.0.0.0"
  10. envPodIp = "POD_IP"
  11. )
  12. // NewRpcPubServer returns a Server.
  13. func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,
  14. opts ...ServerOption) (Server, error) {
  15. registerEtcd := func() error {
  16. pubListenOn := figureOutListenOn(listenOn)
  17. var pubOpts []discov.PubOption
  18. if etcd.HasAccount() {
  19. pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
  20. }
  21. if etcd.HasTLS() {
  22. pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
  23. etcd.CACertFile, etcd.InsecureSkipVerify))
  24. }
  25. pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
  26. return pubClient.KeepAlive()
  27. }
  28. server := keepAliveServer{
  29. registerEtcd: registerEtcd,
  30. Server: NewRpcServer(listenOn, middlewares, opts...),
  31. }
  32. return server, nil
  33. }
  34. type keepAliveServer struct {
  35. registerEtcd func() error
  36. Server
  37. }
  38. func (s keepAliveServer) Start(fn RegisterFn) error {
  39. if err := s.registerEtcd(); err != nil {
  40. return err
  41. }
  42. return s.Server.Start(fn)
  43. }
  44. func figureOutListenOn(listenOn string) string {
  45. fields := strings.Split(listenOn, ":")
  46. if len(fields) == 0 {
  47. return listenOn
  48. }
  49. host := fields[0]
  50. if len(host) > 0 && host != allEths {
  51. return listenOn
  52. }
  53. ip := os.Getenv(envPodIp)
  54. if len(ip) == 0 {
  55. ip = netx.InternalIp()
  56. }
  57. if len(ip) == 0 {
  58. return listenOn
  59. }
  60. return strings.Join(append([]string{ip}, fields[1:]...), ":")
  61. }