rpcserver.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package internal
  2. import (
  3. "net"
  4. "github.com/tal-tech/go-zero/core/proc"
  5. "github.com/tal-tech/go-zero/core/stat"
  6. "github.com/tal-tech/go-zero/zrpc/internal/serverinterceptors"
  7. "google.golang.org/grpc"
  8. )
  9. type (
  10. // ServerOption defines the method to customize a rpcServerOptions.
  11. ServerOption func(options *rpcServerOptions)
  12. rpcServerOptions struct {
  13. metrics *stat.Metrics
  14. MaxRetries int
  15. }
  16. rpcServer struct {
  17. name string
  18. *baseRpcServer
  19. }
  20. )
  21. func init() {
  22. InitLogger()
  23. }
  24. // NewRpcServer returns a Server.
  25. func NewRpcServer(address string, opts ...ServerOption) Server {
  26. var options rpcServerOptions
  27. for _, opt := range opts {
  28. opt(&options)
  29. }
  30. if options.metrics == nil {
  31. options.metrics = stat.NewMetrics(address)
  32. }
  33. return &rpcServer{
  34. baseRpcServer: newBaseRpcServer(address, &options),
  35. }
  36. }
  37. func (s *rpcServer) SetName(name string) {
  38. s.name = name
  39. s.baseRpcServer.SetName(name)
  40. }
  41. func (s *rpcServer) Start(register RegisterFn) error {
  42. lis, err := net.Listen("tcp", s.address)
  43. if err != nil {
  44. return err
  45. }
  46. unaryInterceptors := []grpc.UnaryServerInterceptor{
  47. serverinterceptors.UnaryTracingInterceptor,
  48. serverinterceptors.RetryInterceptor(s.maxRetries),
  49. serverinterceptors.UnaryCrashInterceptor,
  50. serverinterceptors.UnaryStatInterceptor(s.metrics),
  51. serverinterceptors.UnaryPrometheusInterceptor,
  52. serverinterceptors.UnaryBreakerInterceptor,
  53. }
  54. unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
  55. streamInterceptors := []grpc.StreamServerInterceptor{
  56. serverinterceptors.StreamTracingInterceptor,
  57. serverinterceptors.StreamCrashInterceptor,
  58. serverinterceptors.StreamBreakerInterceptor,
  59. }
  60. streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
  61. options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
  62. WithStreamServerInterceptors(streamInterceptors...))
  63. server := grpc.NewServer(options...)
  64. register(server)
  65. // we need to make sure all others are wrapped up
  66. // so we do graceful stop at shutdown phase instead of wrap up phase
  67. waitForCalled := proc.AddWrapUpListener(func() {
  68. server.GracefulStop()
  69. })
  70. defer waitForCalled()
  71. return server.Serve(lis)
  72. }
  73. // WithMetrics returns a func that sets metrics to a Server.
  74. func WithMetrics(metrics *stat.Metrics) ServerOption {
  75. return func(options *rpcServerOptions) {
  76. options.metrics = metrics
  77. }
  78. }
  79. // WithMaxRetries returns a func that sets a max retries to a Server.
  80. func WithMaxRetries(maxRetries int) ServerOption {
  81. return func(options *rpcServerOptions) {
  82. options.MaxRetries = maxRetries
  83. }
  84. }