Bläddra i källkod

set guarded to false only on quitting background flush (#342)

* set guarded to false only on quitting background flush

* set guarded to false only on quitting background flush, cont.
Kevin Wan 4 år sedan
förälder
incheckning
5bc01e4bfd
1 ändrade filer med 14 tillägg och 18 borttagningar
  1. 14 18
      core/executors/periodicalexecutor.go

+ 14 - 18
core/executors/periodicalexecutor.go

@@ -111,6 +111,9 @@ func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool)
 
 func (pe *PeriodicalExecutor) backgroundFlush() {
 	threading.GoSafe(func() {
+		// flush before quit goroutine to avoid missing tasks
+		defer pe.Flush()
+
 		ticker := pe.newTicker(pe.interval)
 		defer ticker.Stop()
 
@@ -130,30 +133,17 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
 					commanded = false
 				} else if pe.Flush() {
 					last = timex.Now()
-				} else if timex.Since(last) > pe.interval*idleRound {
-					if pe.cleanup() {
-						return
-					}
+				} else if pe.shallQuit(last) {
+					pe.lock.Lock()
+					pe.guarded = false
+					pe.lock.Unlock()
+					return
 				}
 			}
 		}
 	})
 }
 
-func (pe *PeriodicalExecutor) cleanup() (stop bool) {
-	pe.lock.Lock()
-	pe.guarded = false
-	if atomic.LoadInt32(&pe.inflight) == 0 {
-		stop = true
-		// defer to unlock quickly
-		// flush again to avoid missing tasks
-		defer pe.Flush()
-	}
-	pe.lock.Unlock()
-
-	return
-}
-
 func (pe *PeriodicalExecutor) doneExecution() {
 	pe.waitGroup.Done()
 }
@@ -189,3 +179,9 @@ func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
 		return true
 	}
 }
+
+func (pe *PeriodicalExecutor) shallQuit(last time.Duration) bool {
+	idleEnough := timex.Since(last) > pe.interval*idleRound
+	noPending := atomic.LoadInt32(&pe.inflight) == 0
+	return idleEnough && noPending
+}