rpcsubclient.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package internal
  2. import (
  3. "time"
  4. "zero/core/discov"
  5. "zero/core/logx"
  6. "zero/core/threading"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/connectivity"
  9. )
  10. const (
  11. coolOffTime = time.Second * 5
  12. retryTimes = 3
  13. )
  14. type (
  15. RoundRobinSubClient struct {
  16. *discov.RoundRobinSubClient
  17. }
  18. ConsistentSubClient struct {
  19. *discov.ConsistentSubClient
  20. }
  21. )
  22. func NewRoundRobinRpcClient(endpoints []string, key string, opts ...ClientOption) (*RoundRobinSubClient, error) {
  23. subClient, err := discov.NewRoundRobinSubClient(endpoints, key, func(server string) (interface{}, error) {
  24. return dial(server, opts...)
  25. }, func(server string, conn interface{}) error {
  26. return closeConn(conn.(*grpc.ClientConn))
  27. }, discov.Exclusive())
  28. if err != nil {
  29. return nil, err
  30. } else {
  31. return &RoundRobinSubClient{subClient}, nil
  32. }
  33. }
  34. func NewConsistentRpcClient(endpoints []string, key string, opts ...ClientOption) (*ConsistentSubClient, error) {
  35. subClient, err := discov.NewConsistentSubClient(endpoints, key, func(server string) (interface{}, error) {
  36. return dial(server, opts...)
  37. }, func(server string, conn interface{}) error {
  38. return closeConn(conn.(*grpc.ClientConn))
  39. })
  40. if err != nil {
  41. return nil, err
  42. } else {
  43. return &ConsistentSubClient{subClient}, nil
  44. }
  45. }
  46. func (cli *RoundRobinSubClient) Next() (*grpc.ClientConn, bool) {
  47. return next(func() (interface{}, bool) {
  48. return cli.RoundRobinSubClient.Next()
  49. })
  50. }
  51. func (cli *ConsistentSubClient) Next(key string) (*grpc.ClientConn, bool) {
  52. return next(func() (interface{}, bool) {
  53. return cli.ConsistentSubClient.Next(key)
  54. })
  55. }
  56. func closeConn(conn *grpc.ClientConn) error {
  57. // why to close the conn asynchronously is because maybe another goroutine
  58. // is using the same conn, we can wait the coolOffTime to let the other
  59. // goroutine to finish using the conn.
  60. // after the conn unregistered, the balancer will not assign the conn,
  61. // but maybe the already assigned tasks are still using it.
  62. threading.GoSafe(func() {
  63. time.Sleep(coolOffTime)
  64. if err := conn.Close(); err != nil {
  65. logx.Error(err)
  66. }
  67. })
  68. return nil
  69. }
  70. func next(nextFn func() (interface{}, bool)) (*grpc.ClientConn, bool) {
  71. for i := 0; i < retryTimes; i++ {
  72. v, ok := nextFn()
  73. if !ok {
  74. break
  75. }
  76. conn, yes := v.(*grpc.ClientConn)
  77. if !yes {
  78. break
  79. }
  80. switch conn.GetState() {
  81. case connectivity.Ready:
  82. return conn, true
  83. }
  84. }
  85. return nil, false
  86. }