瀏覽代碼

fix(executors): periodicalexecutor should handle crash correctly (#3043)

cong 2 年之前
父節點
當前提交
18d163c4f7
共有 2 個文件被更改,包括 63 次插入9 次删除
  1. 5 3
      core/executors/periodicalexecutor.go
  2. 58 6
      core/executors/periodicalexecutor_test.go

+ 5 - 3
core/executors/periodicalexecutor.go

@@ -116,7 +116,7 @@ func (pe *PeriodicalExecutor) addAndCheck(task any) (any, bool) {
 }
 
 func (pe *PeriodicalExecutor) backgroundFlush() {
-	threading.GoSafe(func() {
+	go func() {
 		// flush before quit goroutine to avoid missing tasks
 		defer pe.Flush()
 
@@ -144,7 +144,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
 				}
 			}
 		}
-	})
+	}()
 }
 
 func (pe *PeriodicalExecutor) doneExecution() {
@@ -162,7 +162,9 @@ func (pe *PeriodicalExecutor) executeTasks(tasks any) bool {
 
 	ok := pe.hasTasks(tasks)
 	if ok {
-		pe.container.Execute(tasks)
+		threading.RunSafe(func() {
+			pe.container.Execute(tasks)
+		})
 	}
 
 	return ok

+ 58 - 6
core/executors/periodicalexecutor_test.go

@@ -108,6 +108,64 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) {
 	lock.Unlock()
 }
 
+func TestPeriodicalExecutor_Panic(t *testing.T) {
+	// avoid data race
+	var lock sync.Mutex
+	ticker := timex.NewFakeTicker()
+
+	var (
+		executedTasks []int
+		expected      []int
+	)
+	executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) {
+		tt := tasks.([]int)
+		lock.Lock()
+		executedTasks = append(executedTasks, tt...)
+		lock.Unlock()
+		if tt[0] == 0 {
+			panic("test")
+		}
+	}))
+	executor.newTicker = func(duration time.Duration) timex.Ticker {
+		return ticker
+	}
+	for i := 0; i < 30; i++ {
+		executor.Add(i)
+		expected = append(expected, i)
+	}
+	ticker.Tick()
+	ticker.Tick()
+	time.Sleep(time.Millisecond)
+	lock.Lock()
+	assert.Equal(t, expected, executedTasks)
+	lock.Unlock()
+}
+
+func TestPeriodicalExecutor_FlushPanic(t *testing.T) {
+	var (
+		executedTasks []int
+		expected      []int
+		lock          sync.Mutex
+	)
+	executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) {
+		tt := tasks.([]int)
+		lock.Lock()
+		executedTasks = append(executedTasks, tt...)
+		lock.Unlock()
+		if tt[0] == 0 {
+			panic("flush panic")
+		}
+	}))
+	for i := 0; i < 8; i++ {
+		executor.Add(i)
+		expected = append(expected, i)
+	}
+	executor.Flush()
+	lock.Lock()
+	assert.Equal(t, expected, executedTasks)
+	lock.Unlock()
+}
+
 func TestPeriodicalExecutor_Wait(t *testing.T) {
 	var lock sync.Mutex
 	executer := NewBulkExecutor(func(tasks []any) {
@@ -151,13 +209,7 @@ func TestPeriodicalExecutor_Deadlock(t *testing.T) {
 }
 
 func TestPeriodicalExecutor_hasTasks(t *testing.T) {
-	ticker := timex.NewFakeTicker()
-	defer ticker.Stop()
-
 	exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
-	exec.newTicker = func(d time.Duration) timex.Ticker {
-		return ticker
-	}
 	assert.False(t, exec.hasTasks(nil))
 	assert.True(t, exec.hasTasks(1))
 }