server.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package gateway
  2. import (
  3. "context"
  4. "net/http"
  5. "strings"
  6. "time"
  7. "github.com/fullstorydev/grpcurl"
  8. "github.com/golang/protobuf/jsonpb"
  9. "github.com/jhump/protoreflect/grpcreflect"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "github.com/zeromicro/go-zero/core/mr"
  12. "github.com/zeromicro/go-zero/rest"
  13. "github.com/zeromicro/go-zero/rest/httpx"
  14. "github.com/zeromicro/go-zero/zrpc"
  15. "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
  16. )
  17. // Server is a gateway server.
  18. type Server struct {
  19. *rest.Server
  20. upstreams []upstream
  21. timeout time.Duration
  22. }
  23. // MustNewServer creates a new gateway server.
  24. func MustNewServer(c GatewayConf) *Server {
  25. return &Server{
  26. Server: rest.MustNewServer(c.RestConf),
  27. upstreams: c.Upstreams,
  28. timeout: c.Timeout,
  29. }
  30. }
  31. // Start starts the gateway server.
  32. func (s *Server) Start() {
  33. logx.Must(s.build())
  34. s.Server.Start()
  35. }
  36. // Stop stops the gateway server.
  37. func (s *Server) Stop() {
  38. s.Server.Stop()
  39. }
  40. func (s *Server) build() error {
  41. return mr.MapReduceVoid(func(source chan<- interface{}) {
  42. for _, up := range s.upstreams {
  43. source <- up
  44. }
  45. }, func(item interface{}, writer mr.Writer, cancel func(error)) {
  46. up := item.(upstream)
  47. cli := zrpc.MustNewClient(up.Grpc)
  48. source, err := s.createDescriptorSource(cli, up)
  49. if err != nil {
  50. cancel(err)
  51. return
  52. }
  53. resolver := grpcurl.AnyResolverFromDescriptorSource(source)
  54. for _, m := range up.Mapping {
  55. writer.Write(rest.Route{
  56. Method: strings.ToUpper(m.Method),
  57. Path: m.Path,
  58. Handler: s.buildHandler(source, resolver, cli, m),
  59. })
  60. }
  61. }, func(pipe <-chan interface{}, cancel func(error)) {
  62. for item := range pipe {
  63. route := item.(rest.Route)
  64. s.Server.AddRoute(route)
  65. }
  66. })
  67. }
  68. func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,
  69. cli zrpc.Client, m mapping) func(http.ResponseWriter, *http.Request) {
  70. return func(w http.ResponseWriter, r *http.Request) {
  71. handler := &grpcurl.DefaultEventHandler{
  72. Out: w,
  73. Formatter: grpcurl.NewJSONFormatter(true,
  74. grpcurl.AnyResolverFromDescriptorSource(source)),
  75. }
  76. parser, err := newRequestParser(r, resolver)
  77. if err != nil {
  78. httpx.Error(w, err)
  79. return
  80. }
  81. ctx, can := context.WithTimeout(r.Context(), s.timeout)
  82. defer can()
  83. if err := grpcurl.InvokeRPC(ctx, source, cli.Conn(), m.Rpc, buildHeaders(r.Header),
  84. handler, parser.Next); err != nil {
  85. httpx.Error(w, err)
  86. }
  87. }
  88. }
  89. func (s *Server) createDescriptorSource(cli zrpc.Client, up upstream) (grpcurl.DescriptorSource, error) {
  90. var source grpcurl.DescriptorSource
  91. var err error
  92. if len(up.ProtoSet) > 0 {
  93. source, err = grpcurl.DescriptorSourceFromProtoSets(up.ProtoSet)
  94. if err != nil {
  95. return nil, err
  96. }
  97. } else {
  98. refCli := grpc_reflection_v1alpha.NewServerReflectionClient(cli.Conn())
  99. client := grpcreflect.NewClient(context.Background(), refCli)
  100. source = grpcurl.DescriptorSourceFromServer(context.Background(), client)
  101. }
  102. return source, nil
  103. }