server.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package zrpc
  2. import (
  3. "time"
  4. "github.com/wuntsong-org/go-zero-plus/core/load"
  5. "github.com/wuntsong-org/go-zero-plus/core/logx"
  6. "github.com/wuntsong-org/go-zero-plus/core/stat"
  7. "github.com/wuntsong-org/go-zero-plus/core/stores/redis"
  8. "github.com/wuntsong-org/go-zero-plus/zrpc/internal"
  9. "github.com/wuntsong-org/go-zero-plus/zrpc/internal/auth"
  10. "github.com/wuntsong-org/go-zero-plus/zrpc/internal/serverinterceptors"
  11. "google.golang.org/grpc"
  12. )
  13. // A RpcServer is a rpc server.
  14. type RpcServer struct {
  15. server internal.Server
  16. register internal.RegisterFn
  17. }
  18. // MustNewServer returns a RpcSever, exits on any error.
  19. func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
  20. server, err := NewServer(c, register)
  21. logx.Must(err)
  22. return server
  23. }
  24. // NewServer returns a RpcServer.
  25. func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {
  26. var err error
  27. if err = c.Validate(); err != nil {
  28. return nil, err
  29. }
  30. var server internal.Server
  31. metrics := stat.NewMetrics(c.ListenOn)
  32. serverOptions := []internal.ServerOption{
  33. internal.WithMetrics(metrics),
  34. internal.WithRpcHealth(c.Health),
  35. }
  36. if c.HasEtcd() {
  37. server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, c.Middlewares, serverOptions...)
  38. if err != nil {
  39. return nil, err
  40. }
  41. } else {
  42. server = internal.NewRpcServer(c.ListenOn, c.Middlewares, serverOptions...)
  43. }
  44. server.SetName(c.Name)
  45. if err = setupInterceptors(server, c, metrics); err != nil {
  46. return nil, err
  47. }
  48. rpcServer := &RpcServer{
  49. server: server,
  50. register: register,
  51. }
  52. if err = c.SetUp(); err != nil {
  53. return nil, err
  54. }
  55. return rpcServer, nil
  56. }
  57. // AddOptions adds given options.
  58. func (rs *RpcServer) AddOptions(options ...grpc.ServerOption) {
  59. rs.server.AddOptions(options...)
  60. }
  61. // AddStreamInterceptors adds given stream interceptors.
  62. func (rs *RpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
  63. rs.server.AddStreamInterceptors(interceptors...)
  64. }
  65. // AddUnaryInterceptors adds given unary interceptors.
  66. func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
  67. rs.server.AddUnaryInterceptors(interceptors...)
  68. }
  69. // Start starts the RpcServer.
  70. // Graceful shutdown is enabled by default.
  71. // Use proc.SetTimeToForceQuit to customize the graceful shutdown period.
  72. func (rs *RpcServer) Start() {
  73. if err := rs.server.Start(rs.register); err != nil {
  74. logx.Error(err)
  75. panic(err)
  76. }
  77. }
  78. // Stop stops the RpcServer.
  79. func (rs *RpcServer) Stop() {
  80. logx.Close()
  81. }
  82. // DontLogContentForMethod disable logging content for given method.
  83. // Deprecated: use ServerMiddlewaresConf.IgnoreContentMethods instead.
  84. func DontLogContentForMethod(method string) {
  85. serverinterceptors.DontLogContentForMethod(method)
  86. }
  87. // SetServerSlowThreshold sets the slow threshold on server side.
  88. // Deprecated: use ServerMiddlewaresConf.SlowThreshold instead.
  89. func SetServerSlowThreshold(threshold time.Duration) {
  90. serverinterceptors.SetSlowThreshold(threshold)
  91. }
  92. func setupAuthInterceptors(svr internal.Server, c RpcServerConf) error {
  93. rds, err := redis.NewRedis(c.Redis.RedisConf)
  94. if err != nil {
  95. return err
  96. }
  97. authenticator, err := auth.NewAuthenticator(rds, c.Redis.Key, c.StrictControl)
  98. if err != nil {
  99. return err
  100. }
  101. svr.AddStreamInterceptors(serverinterceptors.StreamAuthorizeInterceptor(authenticator))
  102. svr.AddUnaryInterceptors(serverinterceptors.UnaryAuthorizeInterceptor(authenticator))
  103. return nil
  104. }
  105. func setupInterceptors(svr internal.Server, c RpcServerConf, metrics *stat.Metrics) error {
  106. if c.CpuThreshold > 0 {
  107. shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
  108. svr.AddUnaryInterceptors(serverinterceptors.UnarySheddingInterceptor(shedder, metrics))
  109. }
  110. if c.Timeout > 0 {
  111. svr.AddUnaryInterceptors(serverinterceptors.UnaryTimeoutInterceptor(
  112. time.Duration(c.Timeout)*time.Millisecond, c.MethodTimeouts...))
  113. }
  114. if c.Auth {
  115. if err := setupAuthInterceptors(svr, c); err != nil {
  116. return err
  117. }
  118. }
  119. return nil
  120. }