rpcserver.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package internal
  2. import (
  3. "net"
  4. "github.com/tal-tech/go-zero/core/proc"
  5. "github.com/tal-tech/go-zero/core/stat"
  6. "github.com/tal-tech/go-zero/zrpc/internal/serverinterceptors"
  7. "google.golang.org/grpc"
  8. )
  9. type (
  10. ServerOption func(options *rpcServerOptions)
  11. rpcServerOptions struct {
  12. metrics *stat.Metrics
  13. }
  14. rpcServer struct {
  15. *baseRpcServer
  16. }
  17. )
  18. func init() {
  19. InitLogger()
  20. }
  21. func NewRpcServer(address string, opts ...ServerOption) Server {
  22. var options rpcServerOptions
  23. for _, opt := range opts {
  24. opt(&options)
  25. }
  26. if options.metrics == nil {
  27. options.metrics = stat.NewMetrics(address)
  28. }
  29. return &rpcServer{
  30. baseRpcServer: newBaseRpcServer(address, options.metrics),
  31. }
  32. }
  33. func (s *rpcServer) SetName(name string) {
  34. s.baseRpcServer.SetName(name)
  35. }
  36. func (s *rpcServer) Start(register RegisterFn) error {
  37. lis, err := net.Listen("tcp", s.address)
  38. if err != nil {
  39. return err
  40. }
  41. unaryInterceptors := []grpc.UnaryServerInterceptor{
  42. serverinterceptors.UnaryCrashInterceptor(),
  43. serverinterceptors.UnaryStatInterceptor(s.metrics),
  44. serverinterceptors.UnaryPromMetricInterceptor(),
  45. }
  46. unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
  47. streamInterceptors := []grpc.StreamServerInterceptor{
  48. serverinterceptors.StreamCrashInterceptor,
  49. }
  50. streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
  51. options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
  52. WithStreamServerInterceptors(streamInterceptors...))
  53. server := grpc.NewServer(options...)
  54. register(server)
  55. // we need to make sure all others are wrapped up
  56. // so we do graceful stop at shutdown phase instead of wrap up phase
  57. shutdownCalled := proc.AddShutdownListener(func() {
  58. server.GracefulStop()
  59. })
  60. err = server.Serve(lis)
  61. shutdownCalled()
  62. return err
  63. }
  64. func WithMetrics(metrics *stat.Metrics) ServerOption {
  65. return func(options *rpcServerOptions) {
  66. options.metrics = metrics
  67. }
  68. }