incrementalupdater.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package update
  2. import (
  3. "sync"
  4. "time"
  5. "zero/core/hash"
  6. "zero/core/stringx"
  7. )
  8. const (
  9. incrementalStep = 5
  10. stepDuration = time.Second * 3
  11. )
  12. type (
  13. updateEvent struct {
  14. keys []string
  15. newKey string
  16. servers []string
  17. }
  18. UpdateFunc func(change ServerChange)
  19. IncrementalUpdater struct {
  20. lock sync.Mutex
  21. started bool
  22. taskChan chan updateEvent
  23. updates ServerChange
  24. updateFn UpdateFunc
  25. pendingEvents []updateEvent
  26. }
  27. )
  28. func NewIncrementalUpdater(updateFn UpdateFunc) *IncrementalUpdater {
  29. return &IncrementalUpdater{
  30. taskChan: make(chan updateEvent),
  31. updates: ServerChange{
  32. Current: Snapshot{
  33. Keys: make([]string, 0),
  34. WeightedKeys: make([]weightedKey, 0),
  35. },
  36. Servers: make([]string, 0),
  37. },
  38. updateFn: updateFn,
  39. }
  40. }
  41. func (ru *IncrementalUpdater) Update(keys []string, servers []string, newKey string) {
  42. ru.lock.Lock()
  43. defer ru.lock.Unlock()
  44. if !ru.started {
  45. go ru.run()
  46. ru.started = true
  47. }
  48. ru.taskChan <- updateEvent{
  49. keys: keys,
  50. newKey: newKey,
  51. servers: servers,
  52. }
  53. }
  54. // Return true if incremental update is done
  55. func (ru *IncrementalUpdater) advance() bool {
  56. previous := ru.updates.Current
  57. keys := make([]string, 0)
  58. weightedKeys := make([]weightedKey, 0)
  59. servers := ru.updates.Servers
  60. for _, key := range ru.updates.Current.Keys {
  61. keys = append(keys, key)
  62. }
  63. for _, wkey := range ru.updates.Current.WeightedKeys {
  64. weight := wkey.Weight + incrementalStep
  65. if weight >= hash.TopWeight {
  66. keys = append(keys, wkey.Key)
  67. } else {
  68. weightedKeys = append(weightedKeys, weightedKey{
  69. Key: wkey.Key,
  70. Weight: weight,
  71. })
  72. }
  73. }
  74. for _, event := range ru.pendingEvents {
  75. // ignore reload events
  76. if len(event.newKey) == 0 || len(event.servers) == 0 {
  77. continue
  78. }
  79. // anyway, add the servers, just to avoid missing notify any server
  80. servers = stringx.Union(servers, event.servers)
  81. if keyExists(keys, weightedKeys, event.newKey) {
  82. continue
  83. }
  84. weightedKeys = append(weightedKeys, weightedKey{
  85. Key: event.newKey,
  86. Weight: incrementalStep,
  87. })
  88. }
  89. // clear pending events
  90. ru.pendingEvents = ru.pendingEvents[:0]
  91. change := ServerChange{
  92. Previous: previous,
  93. Current: Snapshot{
  94. Keys: keys,
  95. WeightedKeys: weightedKeys,
  96. },
  97. Servers: servers,
  98. }
  99. ru.updates = change
  100. ru.updateFn(change)
  101. return len(weightedKeys) == 0
  102. }
  103. func (ru *IncrementalUpdater) run() {
  104. defer func() {
  105. ru.lock.Lock()
  106. ru.started = false
  107. ru.lock.Unlock()
  108. }()
  109. ticker := time.NewTicker(stepDuration)
  110. defer ticker.Stop()
  111. for {
  112. select {
  113. case <-ticker.C:
  114. if ru.advance() {
  115. return
  116. }
  117. case event := <-ru.taskChan:
  118. ru.updateKeys(event)
  119. }
  120. }
  121. }
  122. func (ru *IncrementalUpdater) updateKeys(event updateEvent) {
  123. isWeightedKey := func(key string) bool {
  124. for _, wkey := range ru.updates.Current.WeightedKeys {
  125. if wkey.Key == key {
  126. return true
  127. }
  128. }
  129. return false
  130. }
  131. keys := make([]string, 0, len(event.keys))
  132. for _, key := range event.keys {
  133. if !isWeightedKey(key) {
  134. keys = append(keys, key)
  135. }
  136. }
  137. ru.updates.Current.Keys = keys
  138. ru.pendingEvents = append(ru.pendingEvents, event)
  139. }
  140. func keyExists(keys []string, weightedKeys []weightedKey, key string) bool {
  141. for _, each := range keys {
  142. if key == each {
  143. return true
  144. }
  145. }
  146. for _, wkey := range weightedKeys {
  147. if wkey.Key == key {
  148. return true
  149. }
  150. }
  151. return false
  152. }