controller.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Copyright 2025 BackendServerTemplate Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package controller
  5. import (
  6. "errors"
  7. "fmt"
  8. "github.com/SongZihuan/BackendServerTemplate/src/logger"
  9. "github.com/SongZihuan/BackendServerTemplate/src/server/servercontext"
  10. "github.com/SongZihuan/BackendServerTemplate/src/server/serverinterface"
  11. "github.com/SongZihuan/BackendServerTemplate/src/utils/strconvutils"
  12. "reflect"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. type Controller struct {
  18. server map[string]serverinterface.Server
  19. context map[string]*servercontext.ServerContext
  20. ctx *servercontext.ServerContext
  21. running atomic.Bool
  22. name string
  23. stopWaitTime time.Duration
  24. wg *sync.WaitGroup
  25. }
  26. type ControllerOption struct {
  27. StopWaitTime time.Duration
  28. }
  29. func NewController(opt *ControllerOption) (*Controller, error) {
  30. ctx := servercontext.NewServerContext()
  31. if opt == nil {
  32. opt = &ControllerOption{
  33. StopWaitTime: 10 * time.Second,
  34. }
  35. } else {
  36. if opt.StopWaitTime == 0 {
  37. opt.StopWaitTime = 10 * time.Second
  38. }
  39. }
  40. controller := &Controller{
  41. server: make(map[string]serverinterface.Server, 10),
  42. context: make(map[string]*servercontext.ServerContext, 10),
  43. ctx: ctx,
  44. name: serverinterface.ControllerName,
  45. wg: new(sync.WaitGroup),
  46. stopWaitTime: opt.StopWaitTime,
  47. }
  48. {
  49. controller.server[controller.name] = controller
  50. controller.context[controller.name] = ctx
  51. }
  52. return controller, nil
  53. }
  54. func (s *Controller) AddServer(ser serverinterface.Server) error {
  55. if s.running.Load() {
  56. return fmt.Errorf("controller is running")
  57. }
  58. name := ser.Name()
  59. if name == serverinterface.ControllerName {
  60. return fmt.Errorf("name can not be %s", serverinterface.ControllerName)
  61. } else if name == "" {
  62. return fmt.Errorf("name can not be empty")
  63. }
  64. _, ok := s.server[name]
  65. if ok {
  66. return fmt.Errorf("server is exists")
  67. }
  68. s.server[name] = ser
  69. s.context[name] = ser.GetCtx()
  70. return nil
  71. }
  72. func (s *Controller) DelServer(ser serverinterface.Server) error {
  73. if s.running.Load() {
  74. return fmt.Errorf("controller is running")
  75. }
  76. name := ser.Name()
  77. if name == serverinterface.ControllerName {
  78. return fmt.Errorf("name can not be %s", serverinterface.ControllerName)
  79. } else if name == "" {
  80. return fmt.Errorf("name can not be empty")
  81. }
  82. _, ok := s.server[name]
  83. if !ok {
  84. return fmt.Errorf("server is not exists")
  85. }
  86. delete(s.server, name)
  87. delete(s.server, name)
  88. return nil
  89. }
  90. func (s *Controller) Name() string {
  91. return s.name
  92. }
  93. func (s *Controller) GetCtx() *servercontext.ServerContext {
  94. return s.ctx
  95. }
  96. func (s *Controller) Run() {
  97. if s.running.Swap(true) {
  98. return
  99. }
  100. defer func() {
  101. s.running.Store(false)
  102. }()
  103. s.wg = new(sync.WaitGroup)
  104. s.wg.Add(1)
  105. defer s.wg.Done()
  106. for name, server := range s.server {
  107. if name == s.name {
  108. continue
  109. }
  110. _, ok := s.context[name]
  111. if !ok {
  112. logger.Errorf("server %s context not found", name)
  113. continue
  114. }
  115. logger.Infof("start to run server: %s", name)
  116. s.wg.Add(1)
  117. go func() {
  118. defer s.wg.Done()
  119. server.Run()
  120. }()
  121. }
  122. SelectCycle:
  123. for {
  124. cases := make([]reflect.SelectCase, 0, len(s.context))
  125. serverName := make([]string, 0, len(s.context))
  126. for name, ctx := range s.context {
  127. if name == s.name {
  128. continue
  129. }
  130. cases = append(cases, reflect.SelectCase{
  131. Dir: reflect.SelectRecv,
  132. Chan: reflect.ValueOf(ctx.Listen()),
  133. })
  134. serverName = append(serverName, name)
  135. }
  136. cases = append(cases, reflect.SelectCase{
  137. Dir: reflect.SelectRecv,
  138. Chan: reflect.ValueOf(s.ctx.Listen()),
  139. })
  140. serverName = append(serverName, s.name)
  141. chosen, _, _ := reflect.Select(cases)
  142. stopServerName := serverName[chosen]
  143. if stopServerName != s.name {
  144. ctx, ok := s.context[stopServerName]
  145. if !ok {
  146. logger.Errorf("unknown server stop: %s", stopServerName)
  147. break SelectCycle
  148. } else {
  149. switch ctx.Reason() {
  150. default:
  151. fallthrough
  152. case servercontext.StopReasonStop:
  153. logger.Infof("server %s stop", stopServerName)
  154. break SelectCycle
  155. case servercontext.StopReasonFinish:
  156. // 不会停止其他任务
  157. logger.Infof("server %s run finished", stopServerName)
  158. delete(s.context, stopServerName)
  159. delete(s.server, stopServerName)
  160. continue SelectCycle
  161. case servercontext.StopReasonError:
  162. err := ctx.Error()
  163. if errors.Is(err, servercontext.StopAllTask) {
  164. logger.Infof("server %s run finished (stop all task)", stopServerName)
  165. s.ctx.RunError(err)
  166. } else if err != nil {
  167. logger.Infof("server %s stop with error: %s", stopServerName, err.Error())
  168. s.ctx.FinishAndStopAllTask()
  169. } else {
  170. logger.Infof("server %s stop with error: unknown", stopServerName)
  171. s.ctx.RunError(err)
  172. }
  173. break SelectCycle
  174. }
  175. }
  176. } else {
  177. break SelectCycle
  178. }
  179. }
  180. for name, ctx := range s.context {
  181. if name == s.name {
  182. continue
  183. }
  184. ctx.StopTask()
  185. }
  186. }
  187. func (s *Controller) Stop() {
  188. s.ctx.StopTask()
  189. if s.wg != nil {
  190. wgchan := make(chan any)
  191. go func() {
  192. s.wg.Wait()
  193. close(wgchan)
  194. }()
  195. select {
  196. case <-time.After(s.stopWaitTime):
  197. logger.Errorf("%s - 退出清理超时... (%s)", s.name, strconvutils.TimeDurationToString(s.stopWaitTime))
  198. case <-wgchan:
  199. // pass
  200. }
  201. }
  202. }
  203. func (s *Controller) IsRunning() bool {
  204. return s.running.Load()
  205. }
  206. func _test() {
  207. var a serverinterface.Server
  208. var b *Controller
  209. a = b
  210. _ = a
  211. }