client.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/tal-tech/go-zero/zrpc/internal/balancer/p2c"
  7. "github.com/tal-tech/go-zero/zrpc/internal/clientinterceptors"
  8. "github.com/tal-tech/go-zero/zrpc/internal/resolver"
  9. "google.golang.org/grpc"
  10. )
  11. const dialTimeout = time.Second * 3
  12. func init() {
  13. resolver.RegisterResolver()
  14. }
  15. type (
  16. ClientOptions struct {
  17. Timeout time.Duration
  18. DialOptions []grpc.DialOption
  19. }
  20. ClientOption func(options *ClientOptions)
  21. client struct {
  22. conn *grpc.ClientConn
  23. }
  24. )
  25. func NewClient(target string, opts ...ClientOption) (*client, error) {
  26. opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
  27. conn, err := dial(target, opts...)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return &client{conn: conn}, nil
  32. }
  33. func (c *client) Conn() *grpc.ClientConn {
  34. return c.conn
  35. }
  36. func WithDialOption(opt grpc.DialOption) ClientOption {
  37. return func(options *ClientOptions) {
  38. options.DialOptions = append(options.DialOptions, opt)
  39. }
  40. }
  41. func WithTimeout(timeout time.Duration) ClientOption {
  42. return func(options *ClientOptions) {
  43. options.Timeout = timeout
  44. }
  45. }
  46. func buildDialOptions(opts ...ClientOption) []grpc.DialOption {
  47. var clientOptions ClientOptions
  48. for _, opt := range opts {
  49. opt(&clientOptions)
  50. }
  51. options := []grpc.DialOption{
  52. grpc.WithInsecure(),
  53. grpc.WithBlock(),
  54. WithUnaryClientInterceptors(
  55. clientinterceptors.TracingInterceptor,
  56. clientinterceptors.DurationInterceptor,
  57. clientinterceptors.BreakerInterceptor,
  58. clientinterceptors.PromMetricInterceptor,
  59. clientinterceptors.TimeoutInterceptor(clientOptions.Timeout),
  60. ),
  61. }
  62. return append(options, clientOptions.DialOptions...)
  63. }
  64. func dial(server string, opts ...ClientOption) (*grpc.ClientConn, error) {
  65. options := buildDialOptions(opts...)
  66. timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
  67. defer cancel()
  68. conn, err := grpc.DialContext(timeCtx, server, options...)
  69. if err != nil {
  70. return nil, fmt.Errorf("rpc dial: %s, error: %s", server, err.Error())
  71. }
  72. return conn, nil
  73. }