main.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "os"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "zero/core/breaker"
  10. "zero/core/lang"
  11. "gopkg.in/cheggaaa/pb.v1"
  12. )
  13. const (
  14. duration = time.Minute * 5
  15. breakRange = 20
  16. workRange = 50
  17. requestInterval = time.Millisecond
  18. // multiply to make it visible in plot
  19. stateFator = float64(time.Second/requestInterval) / 2
  20. )
  21. type (
  22. server struct {
  23. state int32
  24. }
  25. metric struct {
  26. calls int64
  27. }
  28. )
  29. func (m *metric) addCall() {
  30. atomic.AddInt64(&m.calls, 1)
  31. }
  32. func (m *metric) reset() int64 {
  33. return atomic.SwapInt64(&m.calls, 0)
  34. }
  35. func newServer() *server {
  36. return &server{}
  37. }
  38. func (s *server) serve(m *metric) bool {
  39. m.addCall()
  40. return atomic.LoadInt32(&s.state) == 1
  41. }
  42. func (s *server) start() {
  43. go func() {
  44. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  45. var state int32
  46. for {
  47. var v int32
  48. if state == 0 {
  49. v = r.Int31n(breakRange)
  50. } else {
  51. v = r.Int31n(workRange)
  52. }
  53. time.Sleep(time.Second * time.Duration(v+1))
  54. state ^= 1
  55. atomic.StoreInt32(&s.state, state)
  56. }
  57. }()
  58. }
  59. func runBreaker(s *server, br breaker.Breaker, duration time.Duration, m *metric) {
  60. ticker := time.NewTicker(requestInterval)
  61. defer ticker.Stop()
  62. done := make(chan lang.PlaceholderType)
  63. go func() {
  64. time.Sleep(duration)
  65. close(done)
  66. }()
  67. for {
  68. select {
  69. case <-ticker.C:
  70. _ = br.Do(func() error {
  71. if s.serve(m) {
  72. return nil
  73. } else {
  74. return breaker.ErrServiceUnavailable
  75. }
  76. })
  77. case <-done:
  78. return
  79. }
  80. }
  81. }
  82. func main() {
  83. srv := newServer()
  84. srv.start()
  85. gb := breaker.NewBreaker()
  86. fp, err := os.Create("result.csv")
  87. lang.Must(err)
  88. defer fp.Close()
  89. fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls")
  90. var gm, nm metric
  91. go func() {
  92. ticker := time.NewTicker(time.Second)
  93. defer ticker.Stop()
  94. var seconds int
  95. for range ticker.C {
  96. seconds++
  97. gcalls := gm.reset()
  98. ncalls := nm.reset()
  99. fmt.Fprintf(fp, "%d,%.2f,%d,%d\n",
  100. seconds, float64(atomic.LoadInt32(&srv.state))*stateFator, gcalls, ncalls)
  101. }
  102. }()
  103. var waitGroup sync.WaitGroup
  104. waitGroup.Add(1)
  105. go func() {
  106. runBreaker(srv, gb, duration, &gm)
  107. waitGroup.Done()
  108. }()
  109. go func() {
  110. bar := pb.New(int(duration / time.Second)).Start()
  111. ticker := time.NewTicker(time.Second)
  112. defer ticker.Stop()
  113. for range ticker.C {
  114. bar.Increment()
  115. }
  116. bar.Finish()
  117. }()
  118. waitGroup.Wait()
  119. }