1
0

shutdown.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. //go:build linux || darwin
  2. package proc
  3. import (
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "syscall"
  8. "time"
  9. "github.com/wuntsong-org/go-zero-plus/core/logx"
  10. "github.com/wuntsong-org/go-zero-plus/core/threading"
  11. )
  12. const (
  13. wrapUpTime = time.Second
  14. // why we use 5500 milliseconds is because most of our queue are blocking mode with 5 seconds
  15. waitTime = 5500 * time.Millisecond
  16. )
  17. var (
  18. wrapUpListeners = new(listenerManager)
  19. shutdownListeners = new(listenerManager)
  20. delayTimeBeforeForceQuit = waitTime
  21. )
  22. // AddShutdownListener adds fn as a shutdown listener.
  23. // The returned func can be used to wait for fn getting called.
  24. func AddShutdownListener(fn func()) (waitForCalled func()) {
  25. return shutdownListeners.addListener(fn)
  26. }
  27. // AddWrapUpListener adds fn as a wrap up listener.
  28. // The returned func can be used to wait for fn getting called.
  29. func AddWrapUpListener(fn func()) (waitForCalled func()) {
  30. return wrapUpListeners.addListener(fn)
  31. }
  32. // SetTimeToForceQuit sets the waiting time before force quitting.
  33. func SetTimeToForceQuit(duration time.Duration) {
  34. delayTimeBeforeForceQuit = duration
  35. }
  36. // Shutdown calls the registered shutdown listeners, only for test purpose.
  37. func Shutdown() {
  38. shutdownListeners.notifyListeners()
  39. }
  40. // WrapUp wraps up the process, only for test purpose.
  41. func WrapUp() {
  42. wrapUpListeners.notifyListeners()
  43. }
  44. func gracefulStop(signals chan os.Signal, sig syscall.Signal) {
  45. signal.Stop(signals)
  46. logx.Infof("Got signal %d, shutting down...", sig)
  47. go wrapUpListeners.notifyListeners()
  48. time.Sleep(wrapUpTime)
  49. go shutdownListeners.notifyListeners()
  50. time.Sleep(delayTimeBeforeForceQuit - wrapUpTime)
  51. logx.Infof("Still alive after %v, going to force kill the process...", delayTimeBeforeForceQuit)
  52. _ = syscall.Kill(syscall.Getpid(), sig)
  53. }
  54. type listenerManager struct {
  55. lock sync.Mutex
  56. waitGroup sync.WaitGroup
  57. listeners []func()
  58. }
  59. func (lm *listenerManager) addListener(fn func()) (waitForCalled func()) {
  60. lm.waitGroup.Add(1)
  61. lm.lock.Lock()
  62. lm.listeners = append(lm.listeners, func() {
  63. defer lm.waitGroup.Done()
  64. fn()
  65. })
  66. lm.lock.Unlock()
  67. return func() {
  68. lm.waitGroup.Wait()
  69. }
  70. }
  71. func (lm *listenerManager) notifyListeners() {
  72. lm.lock.Lock()
  73. defer lm.lock.Unlock()
  74. group := threading.NewRoutineGroup()
  75. for _, listener := range lm.listeners {
  76. group.RunSafe(listener)
  77. }
  78. group.Wait()
  79. lm.listeners = nil
  80. }