Kevin Wan 4 years ago
parent
commit
fc43876cc5
2 changed files with 36 additions and 2 deletions
  1. 17 2
      core/executors/periodicalexecutor.go
  2. 19 0
      core/executors/periodicalexecutor_test.go

+ 17 - 2
core/executors/periodicalexecutor.go

@@ -134,8 +134,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
 					pe.guarded = false
 					pe.lock.Unlock()
 
-					// flush again to avoid missing tasks
-					pe.Flush()
+					pe.cleanup()
 					return
 				}
 			}
@@ -143,6 +142,22 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
 	})
 }
 
+func (pe *PeriodicalExecutor) cleanup() {
+	// avoid deadlock in Add()
+	for {
+		select {
+		case vals := <-pe.commander:
+			pe.enterExecution()
+			pe.confirmChan <- lang.Placeholder
+			pe.executeTasks(vals)
+		default:
+			// flush again to avoid missing tasks
+			pe.Flush()
+			return
+		}
+	}
+}
+
 func (pe *PeriodicalExecutor) doneExecution() {
 	pe.waitGroup.Done()
 }

+ 19 - 0
core/executors/periodicalexecutor_test.go

@@ -140,6 +140,25 @@ func TestPeriodicalExecutor_WaitFast(t *testing.T) {
 	assert.Equal(t, total, cnt)
 }
 
+func TestPeriodicalExecutor_Deadlock(t *testing.T) {
+	ticker := timex.NewFakeTicker()
+	defer ticker.Stop()
+
+	exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
+	exec.newTicker = func(d time.Duration) timex.Ticker {
+		return ticker
+	}
+	ticker.Tick()
+	exec.lock.Lock()
+	exec.backgroundFlush()
+	time.Sleep(20 * time.Millisecond)
+	ticker.Tick()
+	exec.commander <- 1
+	exec.lock.Unlock()
+	<-exec.confirmChan
+	exec.Wait()
+}
+
 // go test -benchtime 10s -bench .
 func BenchmarkExecutor(b *testing.B) {
 	b.ReportAllocs()