123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package internal
- import (
- "fmt"
- "net"
- "github.com/wuntsong-org/go-zero-plus/core/proc"
- "github.com/wuntsong-org/go-zero-plus/core/stat"
- "github.com/wuntsong-org/go-zero-plus/internal/health"
- "github.com/wuntsong-org/go-zero-plus/zrpc/internal/serverinterceptors"
- "google.golang.org/grpc"
- "google.golang.org/grpc/health/grpc_health_v1"
- )
- const probeNamePrefix = "zrpc"
- type (
- // ServerOption defines the method to customize a rpcServerOptions.
- ServerOption func(options *rpcServerOptions)
- rpcServerOptions struct {
- metrics *stat.Metrics
- health bool
- }
- rpcServer struct {
- *baseRpcServer
- name string
- middlewares ServerMiddlewaresConf
- healthManager health.Probe
- }
- )
- // NewRpcServer returns a Server.
- func NewRpcServer(addr string, middlewares ServerMiddlewaresConf, opts ...ServerOption) Server {
- var options rpcServerOptions
- for _, opt := range opts {
- opt(&options)
- }
- if options.metrics == nil {
- options.metrics = stat.NewMetrics(addr)
- }
- return &rpcServer{
- baseRpcServer: newBaseRpcServer(addr, &options),
- middlewares: middlewares,
- healthManager: health.NewHealthManager(fmt.Sprintf("%s-%s", probeNamePrefix, addr)),
- }
- }
- func (s *rpcServer) SetName(name string) {
- s.name = name
- s.baseRpcServer.SetName(name)
- }
- func (s *rpcServer) Start(register RegisterFn) error {
- lis, err := net.Listen("tcp", s.address)
- if err != nil {
- return err
- }
- unaryInterceptorOption := grpc.ChainUnaryInterceptor(s.buildUnaryInterceptors()...)
- streamInterceptorOption := grpc.ChainStreamInterceptor(s.buildStreamInterceptors()...)
- options := append(s.options, unaryInterceptorOption, streamInterceptorOption)
- server := grpc.NewServer(options...)
- register(server)
- // register the health check service
- if s.health != nil {
- grpc_health_v1.RegisterHealthServer(server, s.health)
- s.health.Resume()
- }
- s.healthManager.MarkReady()
- health.AddProbe(s.healthManager)
- // we need to make sure all others are wrapped up,
- // so we do graceful stop at shutdown phase instead of wrap up phase
- waitForCalled := proc.AddShutdownListener(func() {
- if s.health != nil {
- s.health.Shutdown()
- }
- server.GracefulStop()
- })
- defer waitForCalled()
- return server.Serve(lis)
- }
- func (s *rpcServer) buildStreamInterceptors() []grpc.StreamServerInterceptor {
- var interceptors []grpc.StreamServerInterceptor
- if s.middlewares.Trace {
- interceptors = append(interceptors, serverinterceptors.StreamTracingInterceptor)
- }
- if s.middlewares.Recover {
- interceptors = append(interceptors, serverinterceptors.StreamRecoverInterceptor)
- }
- if s.middlewares.Breaker {
- interceptors = append(interceptors, serverinterceptors.StreamBreakerInterceptor)
- }
- return append(interceptors, s.streamInterceptors...)
- }
- func (s *rpcServer) buildUnaryInterceptors() []grpc.UnaryServerInterceptor {
- var interceptors []grpc.UnaryServerInterceptor
- if s.middlewares.Trace {
- interceptors = append(interceptors, serverinterceptors.UnaryTracingInterceptor)
- }
- if s.middlewares.Recover {
- interceptors = append(interceptors, serverinterceptors.UnaryRecoverInterceptor)
- }
- if s.middlewares.Stat {
- interceptors = append(interceptors,
- serverinterceptors.UnaryStatInterceptor(s.metrics, s.middlewares.StatConf))
- }
- if s.middlewares.Prometheus {
- interceptors = append(interceptors, serverinterceptors.UnaryPrometheusInterceptor)
- }
- if s.middlewares.Breaker {
- interceptors = append(interceptors, serverinterceptors.UnaryBreakerInterceptor)
- }
- return append(interceptors, s.unaryInterceptors...)
- }
- // WithMetrics returns a func that sets metrics to a Server.
- func WithMetrics(metrics *stat.Metrics) ServerOption {
- return func(options *rpcServerOptions) {
- options.metrics = metrics
- }
- }
- // WithRpcHealth returns a func that sets rpc health switch to a Server.
- func WithRpcHealth(health bool) ServerOption {
- return func(options *rpcServerOptions) {
- options.health = health
- }
- }
|