123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- // Copyright 2025 BackendServerTemplate Authors. All rights reserved.
- // Use of this source code is governed by a MIT-style
- // license that can be found in the LICENSE file.
- package controller
- import (
- "errors"
- "fmt"
- "github.com/SongZihuan/BackendServerTemplate/src/logger"
- "github.com/SongZihuan/BackendServerTemplate/src/server/servercontext"
- "github.com/SongZihuan/BackendServerTemplate/src/server/serverinterface"
- "github.com/SongZihuan/BackendServerTemplate/src/utils/strconvutils"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
- )
- type Controller struct {
- server map[string]serverinterface.Server
- context map[string]*servercontext.ServerContext
- ctx *servercontext.ServerContext
- running atomic.Bool
- name string
- stopWaitTime time.Duration
- wg *sync.WaitGroup
- }
- type ControllerOption struct {
- StopWaitTime time.Duration
- }
- func NewController(opt *ControllerOption) (*Controller, error) {
- ctx := servercontext.NewServerContext()
- if opt == nil {
- opt = &ControllerOption{
- StopWaitTime: 10 * time.Second,
- }
- } else {
- if opt.StopWaitTime == 0 {
- opt.StopWaitTime = 10 * time.Second
- }
- }
- controller := &Controller{
- server: make(map[string]serverinterface.Server, 10),
- context: make(map[string]*servercontext.ServerContext, 10),
- ctx: ctx,
- name: serverinterface.ControllerName,
- wg: new(sync.WaitGroup),
- stopWaitTime: opt.StopWaitTime,
- }
- {
- controller.server[controller.name] = controller
- controller.context[controller.name] = ctx
- }
- return controller, nil
- }
- func (s *Controller) AddServer(ser serverinterface.Server) error {
- if s.running.Load() {
- return fmt.Errorf("controller is running")
- }
- name := ser.Name()
- if name == serverinterface.ControllerName {
- return fmt.Errorf("name can not be %s", serverinterface.ControllerName)
- } else if name == "" {
- return fmt.Errorf("name can not be empty")
- }
- _, ok := s.server[name]
- if ok {
- return fmt.Errorf("server is exists")
- }
- s.server[name] = ser
- s.context[name] = ser.GetCtx()
- return nil
- }
- func (s *Controller) DelServer(ser serverinterface.Server) error {
- if s.running.Load() {
- return fmt.Errorf("controller is running")
- }
- name := ser.Name()
- if name == serverinterface.ControllerName {
- return fmt.Errorf("name can not be %s", serverinterface.ControllerName)
- } else if name == "" {
- return fmt.Errorf("name can not be empty")
- }
- _, ok := s.server[name]
- if !ok {
- return fmt.Errorf("server is not exists")
- }
- delete(s.server, name)
- delete(s.server, name)
- return nil
- }
- func (s *Controller) Name() string {
- return s.name
- }
- func (s *Controller) GetCtx() *servercontext.ServerContext {
- return s.ctx
- }
- func (s *Controller) Run() {
- if s.running.Swap(true) {
- return
- }
- defer func() {
- s.running.Store(false)
- }()
- s.wg = new(sync.WaitGroup)
- s.wg.Add(1)
- defer s.wg.Done()
- for name, server := range s.server {
- if name == s.name {
- continue
- }
- _, ok := s.context[name]
- if !ok {
- logger.Errorf("server %s context not found", name)
- continue
- }
- logger.Infof("start to run server: %s", name)
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- server.Run()
- }()
- }
- SelectCycle:
- for {
- cases := make([]reflect.SelectCase, 0, len(s.context))
- serverName := make([]string, 0, len(s.context))
- for name, ctx := range s.context {
- if name == s.name {
- continue
- }
- cases = append(cases, reflect.SelectCase{
- Dir: reflect.SelectRecv,
- Chan: reflect.ValueOf(ctx.Listen()),
- })
- serverName = append(serverName, name)
- }
- cases = append(cases, reflect.SelectCase{
- Dir: reflect.SelectRecv,
- Chan: reflect.ValueOf(s.ctx.Listen()),
- })
- serverName = append(serverName, s.name)
- chosen, _, _ := reflect.Select(cases)
- stopServerName := serverName[chosen]
- if stopServerName != s.name {
- ctx, ok := s.context[stopServerName]
- if !ok {
- logger.Errorf("unknown server stop: %s", stopServerName)
- break SelectCycle
- } else {
- switch ctx.Reason() {
- default:
- fallthrough
- case servercontext.StopReasonStop:
- logger.Infof("server %s stop", stopServerName)
- break SelectCycle
- case servercontext.StopReasonFinish:
- // 不会停止其他任务
- logger.Infof("server %s run finished", stopServerName)
- delete(s.context, stopServerName)
- delete(s.server, stopServerName)
- continue SelectCycle
- case servercontext.StopReasonError:
- err := ctx.Error()
- if errors.Is(err, servercontext.StopAllTask) {
- logger.Infof("server %s run finished (stop all task)", stopServerName)
- s.ctx.RunError(err)
- } else if err != nil {
- logger.Infof("server %s stop with error: %s", stopServerName, err.Error())
- s.ctx.FinishAndStopAllTask()
- } else {
- logger.Infof("server %s stop with error: unknown", stopServerName)
- s.ctx.RunError(err)
- }
- break SelectCycle
- }
- }
- } else {
- break SelectCycle
- }
- }
- for name, ctx := range s.context {
- if name == s.name {
- continue
- }
- ctx.StopTask()
- }
- }
- func (s *Controller) Stop() {
- s.ctx.StopTask()
- if s.wg != nil {
- wgchan := make(chan any)
- go func() {
- s.wg.Wait()
- close(wgchan)
- }()
- select {
- case <-time.After(s.stopWaitTime):
- logger.Errorf("%s - 退出清理超时... (%s)", s.name, strconvutils.TimeDurationToString(s.stopWaitTime))
- case <-wgchan:
- // pass
- }
- }
- }
- func (s *Controller) IsRunning() bool {
- return s.running.Load()
- }
- func _test() {
- var a serverinterface.Server
- var b *Controller
- a = b
- _ = a
- }
|