server.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package ngin
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "zero/core/codec"
  8. "zero/core/load"
  9. "zero/core/stat"
  10. "zero/ngin/handler"
  11. "zero/ngin/internal"
  12. "zero/ngin/internal/router"
  13. "github.com/justinas/alice"
  14. )
  15. // use 1000m to represent 100%
  16. const topCpuUsage = 1000
  17. var ErrSignatureConfig = errors.New("bad config for Signature")
  18. type (
  19. Middleware func(next http.HandlerFunc) http.HandlerFunc
  20. server struct {
  21. conf NgConf
  22. routes []featuredRoutes
  23. unauthorizedCallback handler.UnauthorizedCallback
  24. unsignedCallback handler.UnsignedCallback
  25. middlewares []Middleware
  26. shedder load.Shedder
  27. priorityShedder load.Shedder
  28. }
  29. )
  30. func newServer(c NgConf) *server {
  31. srv := &server{
  32. conf: c,
  33. }
  34. if c.CpuThreshold > 0 {
  35. srv.shedder = load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
  36. srv.priorityShedder = load.NewAdaptiveShedder(load.WithCpuThreshold(
  37. (c.CpuThreshold + topCpuUsage) >> 1))
  38. }
  39. return srv
  40. }
  41. func (s *server) AddRoutes(r featuredRoutes) {
  42. s.routes = append(s.routes, r)
  43. }
  44. func (s *server) SetUnauthorizedCallback(callback handler.UnauthorizedCallback) {
  45. s.unauthorizedCallback = callback
  46. }
  47. func (s *server) SetUnsignedCallback(callback handler.UnsignedCallback) {
  48. s.unsignedCallback = callback
  49. }
  50. func (s *server) Start() error {
  51. return s.StartWithRouter(router.NewPatRouter())
  52. }
  53. func (s *server) StartWithRouter(router router.Router) error {
  54. if err := s.bindRoutes(router); err != nil {
  55. return err
  56. }
  57. return internal.StartHttp(s.conf.Host, s.conf.Port, router)
  58. }
  59. func (s *server) appendAuthHandler(fr featuredRoutes, chain alice.Chain,
  60. verifier func(alice.Chain) alice.Chain) alice.Chain {
  61. if fr.jwt.enabled {
  62. if len(fr.jwt.prevSecret) == 0 {
  63. chain = chain.Append(handler.Authorize(fr.jwt.secret,
  64. handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
  65. } else {
  66. chain = chain.Append(handler.Authorize(fr.jwt.secret,
  67. handler.WithPrevSecret(fr.jwt.prevSecret),
  68. handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
  69. }
  70. }
  71. return verifier(chain)
  72. }
  73. func (s *server) bindFeaturedRoutes(router router.Router, fr featuredRoutes, metrics *stat.Metrics) error {
  74. verifier, err := s.signatureVerifier(fr.signature)
  75. if err != nil {
  76. return err
  77. }
  78. for _, route := range fr.routes {
  79. if err := s.bindRoute(fr, router, metrics, route, verifier); err != nil {
  80. return err
  81. }
  82. }
  83. return nil
  84. }
  85. func (s *server) bindRoute(fr featuredRoutes, router router.Router, metrics *stat.Metrics,
  86. route Route, verifier func(chain alice.Chain) alice.Chain) error {
  87. chain := alice.New(
  88. handler.TracingHandler,
  89. s.getLogHandler(),
  90. handler.MaxConns(s.conf.MaxConns),
  91. handler.BreakerHandler(route.Method, route.Path, metrics),
  92. handler.SheddingHandler(s.getShedder(fr.priority), metrics),
  93. handler.TimeoutHandler(time.Duration(s.conf.Timeout)*time.Millisecond),
  94. handler.RecoverHandler,
  95. handler.MetricHandler(metrics),
  96. handler.PromMetricHandler(route.Path),
  97. handler.MaxBytesHandler(s.conf.MaxBytes),
  98. handler.GunzipHandler,
  99. )
  100. chain = s.appendAuthHandler(fr, chain, verifier)
  101. for _, middleware := range s.middlewares {
  102. chain = chain.Append(convertMiddleware(middleware))
  103. }
  104. handle := chain.ThenFunc(route.Handler)
  105. return router.Handle(route.Method, route.Path, handle)
  106. }
  107. func (s *server) bindRoutes(router router.Router) error {
  108. metrics := s.createMetrics()
  109. for _, fr := range s.routes {
  110. if err := s.bindFeaturedRoutes(router, fr, metrics); err != nil {
  111. return err
  112. }
  113. }
  114. return nil
  115. }
  116. func (s *server) createMetrics() *stat.Metrics {
  117. var metrics *stat.Metrics
  118. if len(s.conf.Name) > 0 {
  119. metrics = stat.NewMetrics(s.conf.Name)
  120. } else {
  121. metrics = stat.NewMetrics(fmt.Sprintf("%s:%d", s.conf.Host, s.conf.Port))
  122. }
  123. return metrics
  124. }
  125. func (s *server) getLogHandler() func(http.Handler) http.Handler {
  126. if s.conf.Verbose {
  127. return handler.DetailedLogHandler
  128. } else {
  129. return handler.LogHandler
  130. }
  131. }
  132. func (s *server) getShedder(priority bool) load.Shedder {
  133. if priority && s.priorityShedder != nil {
  134. return s.priorityShedder
  135. }
  136. return s.shedder
  137. }
  138. func (s *server) signatureVerifier(signature signatureSetting) (func(chain alice.Chain) alice.Chain, error) {
  139. if !signature.enabled {
  140. return func(chain alice.Chain) alice.Chain {
  141. return chain
  142. }, nil
  143. }
  144. if len(signature.PrivateKeys) == 0 {
  145. if signature.Strict {
  146. return nil, ErrSignatureConfig
  147. } else {
  148. return func(chain alice.Chain) alice.Chain {
  149. return chain
  150. }, nil
  151. }
  152. }
  153. decrypters := make(map[string]codec.RsaDecrypter)
  154. for _, key := range signature.PrivateKeys {
  155. fingerprint := key.Fingerprint
  156. file := key.KeyFile
  157. decrypter, err := codec.NewRsaDecrypter(file)
  158. if err != nil {
  159. return nil, err
  160. }
  161. decrypters[fingerprint] = decrypter
  162. }
  163. return func(chain alice.Chain) alice.Chain {
  164. if s.unsignedCallback != nil {
  165. return chain.Append(handler.ContentSecurityHandler(
  166. decrypters, signature.Expiry, signature.Strict, s.unsignedCallback))
  167. } else {
  168. return chain.Append(handler.ContentSecurityHandler(
  169. decrypters, signature.Expiry, signature.Strict))
  170. }
  171. }, nil
  172. }
  173. func (s *server) use(middleware Middleware) {
  174. s.middlewares = append(s.middlewares, middleware)
  175. }
  176. func convertMiddleware(ware Middleware) func(http.Handler) http.Handler {
  177. return func(next http.Handler) http.Handler {
  178. return http.HandlerFunc(ware(next.ServeHTTP))
  179. }
  180. }