123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package internal
- import (
- "time"
- "zero/core/discov"
- "zero/core/logx"
- "zero/core/threading"
- "google.golang.org/grpc"
- "google.golang.org/grpc/connectivity"
- )
- const (
- coolOffTime = time.Second * 5
- retryTimes = 3
- )
- type (
- RoundRobinSubClient struct {
- *discov.RoundRobinSubClient
- }
- ConsistentSubClient struct {
- *discov.ConsistentSubClient
- }
- )
- func NewRoundRobinRpcClient(endpoints []string, key string, opts ...ClientOption) (*RoundRobinSubClient, error) {
- subClient, err := discov.NewRoundRobinSubClient(endpoints, key, func(server string) (interface{}, error) {
- return dial(server, opts...)
- }, func(server string, conn interface{}) error {
- return closeConn(conn.(*grpc.ClientConn))
- }, discov.Exclusive())
- if err != nil {
- return nil, err
- } else {
- return &RoundRobinSubClient{subClient}, nil
- }
- }
- func NewConsistentRpcClient(endpoints []string, key string, opts ...ClientOption) (*ConsistentSubClient, error) {
- subClient, err := discov.NewConsistentSubClient(endpoints, key, func(server string) (interface{}, error) {
- return dial(server, opts...)
- }, func(server string, conn interface{}) error {
- return closeConn(conn.(*grpc.ClientConn))
- })
- if err != nil {
- return nil, err
- } else {
- return &ConsistentSubClient{subClient}, nil
- }
- }
- func (cli *RoundRobinSubClient) Next() (*grpc.ClientConn, bool) {
- return next(func() (interface{}, bool) {
- return cli.RoundRobinSubClient.Next()
- })
- }
- func (cli *ConsistentSubClient) Next(key string) (*grpc.ClientConn, bool) {
- return next(func() (interface{}, bool) {
- return cli.ConsistentSubClient.Next(key)
- })
- }
- func closeConn(conn *grpc.ClientConn) error {
- // why to close the conn asynchronously is because maybe another goroutine
- // is using the same conn, we can wait the coolOffTime to let the other
- // goroutine to finish using the conn.
- // after the conn unregistered, the balancer will not assign the conn,
- // but maybe the already assigned tasks are still using it.
- threading.GoSafe(func() {
- time.Sleep(coolOffTime)
- if err := conn.Close(); err != nil {
- logx.Error(err)
- }
- })
- return nil
- }
- func next(nextFn func() (interface{}, bool)) (*grpc.ClientConn, bool) {
- for i := 0; i < retryTimes; i++ {
- v, ok := nextFn()
- if !ok {
- break
- }
- conn, yes := v.(*grpc.ClientConn)
- if !yes {
- break
- }
- switch conn.GetState() {
- case connectivity.Ready:
- return conn, true
- }
- }
- return nil, false
- }
|