|
@@ -6,6 +6,7 @@ import (
|
|
"time"
|
|
"time"
|
|
|
|
|
|
"zero/core/proc"
|
|
"zero/core/proc"
|
|
|
|
+ "zero/core/syncx"
|
|
"zero/core/threading"
|
|
"zero/core/threading"
|
|
"zero/core/timex"
|
|
"zero/core/timex"
|
|
)
|
|
)
|
|
@@ -30,6 +31,8 @@ type (
|
|
interval time.Duration
|
|
interval time.Duration
|
|
container TaskContainer
|
|
container TaskContainer
|
|
waitGroup sync.WaitGroup
|
|
waitGroup sync.WaitGroup
|
|
|
|
+ // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
|
|
|
|
+ wgBarrier syncx.Barrier
|
|
guarded bool
|
|
guarded bool
|
|
newTicker func(duration time.Duration) timex.Ticker
|
|
newTicker func(duration time.Duration) timex.Ticker
|
|
lock sync.Mutex
|
|
lock sync.Mutex
|
|
@@ -74,7 +77,9 @@ func (pe *PeriodicalExecutor) Sync(fn func()) {
|
|
}
|
|
}
|
|
|
|
|
|
func (pe *PeriodicalExecutor) Wait() {
|
|
func (pe *PeriodicalExecutor) Wait() {
|
|
- pe.waitGroup.Wait()
|
|
|
|
|
|
+ pe.wgBarrier.Guard(func() {
|
|
|
|
+ pe.waitGroup.Wait()
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
|
|
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
|
|
@@ -131,8 +136,12 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
|
|
}
|
|
}
|
|
|
|
|
|
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
|
|
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
|
|
- pe.waitGroup.Add(1)
|
|
|
|
- defer pe.waitGroup.Done()
|
|
|
|
|
|
+ pe.wgBarrier.Guard(func() {
|
|
|
|
+ pe.waitGroup.Add(1)
|
|
|
|
+ })
|
|
|
|
+ defer pe.wgBarrier.Guard(func() {
|
|
|
|
+ pe.waitGroup.Done()
|
|
|
|
+ })
|
|
|
|
|
|
ok := pe.hasTasks(tasks)
|
|
ok := pe.hasTasks(tasks)
|
|
if ok {
|
|
if ok {
|