瀏覽代碼

rename sharedcalls to singleflight (#1017)

Kevin Wan 3 年之前
父節點
當前提交
5cc9eb0de4

+ 2 - 2
core/collection/cache.go

@@ -34,7 +34,7 @@ type (
 		expire         time.Duration
 		timingWheel    *TimingWheel
 		lruCache       lru
-		barrier        syncx.SharedCalls
+		barrier        syncx.SingleFlight
 		unstableExpiry mathx.Unstable
 		stats          *cacheStat
 	}
@@ -46,7 +46,7 @@ func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
 		data:           make(map[string]interface{}),
 		expire:         expire,
 		lruCache:       emptyLruCache,
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 	}
 

+ 1 - 1
core/stores/cache/cache.go

@@ -29,7 +29,7 @@ type (
 )
 
 // New returns a Cache.
-func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error,
+func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error,
 	opts ...Option) Cache {
 	if len(c) == 0 || TotalWeights(c) <= 0 {
 		log.Fatal("no cache nodes")

+ 2 - 2
core/stores/cache/cache_test.go

@@ -104,7 +104,7 @@ func TestCache_SetDel(t *testing.T) {
 			Weight: 100,
 		},
 	}
-	c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
+	c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder)
 	for i := 0; i < total; i++ {
 		if i%2 == 0 {
 			assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
@@ -142,7 +142,7 @@ func TestCache_OneNode(t *testing.T) {
 			Weight: 100,
 		},
 	}
-	c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
+	c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder)
 	for i := 0; i < total; i++ {
 		if i%2 == 0 {
 			assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))

+ 2 - 2
core/stores/cache/cachenode.go

@@ -29,7 +29,7 @@ type cacheNode struct {
 	rds            *redis.Redis
 	expiry         time.Duration
 	notFoundExpiry time.Duration
-	barrier        syncx.SharedCalls
+	barrier        syncx.SingleFlight
 	r              *rand.Rand
 	lock           *sync.Mutex
 	unstableExpiry mathx.Unstable
@@ -43,7 +43,7 @@ type cacheNode struct {
 // st is used to stat the cache.
 // errNotFound defines the error that returned on cache not found.
 // opts are the options that customize the cacheNode.
-func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *Stat,
+func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
 	errNotFound error, opts ...Option) Cache {
 	o := newOptions(opts...)
 	return cacheNode{

+ 5 - 5
core/stores/cache/cachenode_test.go

@@ -96,7 +96,7 @@ func TestCacheNode_Take(t *testing.T) {
 	cn := cacheNode{
 		rds:            store,
 		r:              rand.New(rand.NewSource(time.Now().UnixNano())),
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		lock:           new(sync.Mutex),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 		stat:           NewStat("any"),
@@ -123,7 +123,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) {
 	cn := cacheNode{
 		rds:            store,
 		r:              rand.New(rand.NewSource(time.Now().UnixNano())),
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		lock:           new(sync.Mutex),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 		stat:           NewStat("any"),
@@ -162,7 +162,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) {
 	cn := cacheNode{
 		rds:            store,
 		r:              rand.New(rand.NewSource(time.Now().UnixNano())),
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		lock:           new(sync.Mutex),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 		stat:           NewStat("any"),
@@ -189,7 +189,7 @@ func TestCacheNode_String(t *testing.T) {
 	cn := cacheNode{
 		rds:            store,
 		r:              rand.New(rand.NewSource(time.Now().UnixNano())),
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		lock:           new(sync.Mutex),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 		stat:           NewStat("any"),
@@ -206,7 +206,7 @@ func TestCacheValueWithBigInt(t *testing.T) {
 	cn := cacheNode{
 		rds:            store,
 		r:              rand.New(rand.NewSource(time.Now().UnixNano())),
-		barrier:        syncx.NewSharedCalls(),
+		barrier:        syncx.NewSingleFlight(),
 		lock:           new(sync.Mutex),
 		unstableExpiry: mathx.NewUnstable(expiryDeviation),
 		stat:           NewStat("any"),

+ 2 - 2
core/stores/mongoc/cachedcollection.go

@@ -11,8 +11,8 @@ var (
 	// ErrNotFound is an alias of mgo.ErrNotFound.
 	ErrNotFound = mgo.ErrNotFound
 
-	// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
-	sharedCalls = syncx.NewSharedCalls()
+	// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
+	sharedCalls = syncx.NewSingleFlight()
 	stats       = cache.NewStat("mongoc")
 )
 

+ 2 - 2
core/stores/sqlc/cachedsql.go

@@ -17,8 +17,8 @@ var (
 	// ErrNotFound is an alias of sqlx.ErrNotFound.
 	ErrNotFound = sqlx.ErrNotFound
 
-	// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
-	exclusiveCalls = syncx.NewSharedCalls()
+	// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
+	exclusiveCalls = syncx.NewSingleFlight()
 	stats          = cache.NewStat("sqlc")
 )
 

+ 6 - 6
core/syncx/resourcemanager.go

@@ -9,16 +9,16 @@ import (
 
 // A ResourceManager is a manager that used to manage resources.
 type ResourceManager struct {
-	resources   map[string]io.Closer
-	sharedCalls SharedCalls
-	lock        sync.RWMutex
+	resources    map[string]io.Closer
+	singleFlight SingleFlight
+	lock         sync.RWMutex
 }
 
 // NewResourceManager returns a ResourceManager.
 func NewResourceManager() *ResourceManager {
 	return &ResourceManager{
-		resources:   make(map[string]io.Closer),
-		sharedCalls: NewSharedCalls(),
+		resources:    make(map[string]io.Closer),
+		singleFlight: NewSingleFlight(),
 	}
 }
 
@@ -39,7 +39,7 @@ func (manager *ResourceManager) Close() error {
 
 // GetResource returns the resource associated with given key.
 func (manager *ResourceManager) GetResource(key string, create func() (io.Closer, error)) (io.Closer, error) {
-	val, err := manager.sharedCalls.Do(key, func() (interface{}, error) {
+	val, err := manager.singleFlight.Do(key, func() (interface{}, error) {
 		manager.lock.RLock()
 		resource, ok := manager.resources[key]
 		manager.lock.RUnlock()

+ 20 - 10
core/syncx/sharedcalls.go → core/syncx/singleflight.go

@@ -3,13 +3,17 @@ package syncx
 import "sync"
 
 type (
-	// SharedCalls lets the concurrent calls with the same key to share the call result.
+	// SharedCalls is an alias of SingleFlight.
+	// Deprecated: use SingleFlight.
+	SharedCalls = SingleFlight
+
+	// SingleFlight lets the concurrent calls with the same key to share the call result.
 	// For example, A called F, before it's done, B called F. Then B would not execute F,
 	// and shared the result returned by F which called by A.
 	// The calls with the same key are dependent, concurrent calls share the returned values.
 	// A ------->calls F with key<------------------->returns val
 	// B --------------------->calls F with key------>returns val
-	SharedCalls interface {
+	SingleFlight interface {
 		Do(key string, fn func() (interface{}, error)) (interface{}, error)
 		DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
 	}
@@ -20,20 +24,26 @@ type (
 		err error
 	}
 
-	sharedGroup struct {
+	flightGroup struct {
 		calls map[string]*call
 		lock  sync.Mutex
 	}
 )
 
-// NewSharedCalls returns a SharedCalls.
-func NewSharedCalls() SharedCalls {
-	return &sharedGroup{
+// NewSingleFlight returns a SingleFlight.
+func NewSingleFlight() SingleFlight {
+	return &flightGroup{
 		calls: make(map[string]*call),
 	}
 }
 
-func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
+// NewSharedCalls returns a SingleFlight.
+// Deprecated: use NewSingleFlight.
+func NewSharedCalls() SingleFlight {
+	return NewSingleFlight()
+}
+
+func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
 	c, done := g.createCall(key)
 	if done {
 		return c.val, c.err
@@ -43,7 +53,7 @@ func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{
 	return c.val, c.err
 }
 
-func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
+func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
 	c, done := g.createCall(key)
 	if done {
 		return c.val, false, c.err
@@ -53,7 +63,7 @@ func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val inte
 	return c.val, true, c.err
 }
 
-func (g *sharedGroup) createCall(key string) (c *call, done bool) {
+func (g *flightGroup) createCall(key string) (c *call, done bool) {
 	g.lock.Lock()
 	if c, ok := g.calls[key]; ok {
 		g.lock.Unlock()
@@ -69,7 +79,7 @@ func (g *sharedGroup) createCall(key string) (c *call, done bool) {
 	return c, false
 }
 
-func (g *sharedGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
+func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
 	defer func() {
 		g.lock.Lock()
 		delete(g.calls, key)

+ 5 - 5
core/syncx/sharedcalls_test.go → core/syncx/singleflight_test.go

@@ -10,7 +10,7 @@ import (
 )
 
 func TestExclusiveCallDo(t *testing.T) {
-	g := NewSharedCalls()
+	g := NewSingleFlight()
 	v, err := g.Do("key", func() (interface{}, error) {
 		return "bar", nil
 	})
@@ -23,7 +23,7 @@ func TestExclusiveCallDo(t *testing.T) {
 }
 
 func TestExclusiveCallDoErr(t *testing.T) {
-	g := NewSharedCalls()
+	g := NewSingleFlight()
 	someErr := errors.New("some error")
 	v, err := g.Do("key", func() (interface{}, error) {
 		return nil, someErr
@@ -37,7 +37,7 @@ func TestExclusiveCallDoErr(t *testing.T) {
 }
 
 func TestExclusiveCallDoDupSuppress(t *testing.T) {
-	g := NewSharedCalls()
+	g := NewSingleFlight()
 	c := make(chan string)
 	var calls int32
 	fn := func() (interface{}, error) {
@@ -69,7 +69,7 @@ func TestExclusiveCallDoDupSuppress(t *testing.T) {
 }
 
 func TestExclusiveCallDoDiffDupSuppress(t *testing.T) {
-	g := NewSharedCalls()
+	g := NewSingleFlight()
 	broadcast := make(chan struct{})
 	var calls int32
 	tests := []string{"e", "a", "e", "a", "b", "c", "b", "a", "c", "d", "b", "c", "d"}
@@ -102,7 +102,7 @@ func TestExclusiveCallDoDiffDupSuppress(t *testing.T) {
 }
 
 func TestExclusiveCallDoExDupSuppress(t *testing.T) {
-	g := NewSharedCalls()
+	g := NewSingleFlight()
 	c := make(chan string)
 	var calls int32
 	fn := func() (interface{}, error) {

+ 2 - 2
zrpc/proxy.go

@@ -15,7 +15,7 @@ type RpcProxy struct {
 	backend     string
 	clients     map[string]Client
 	options     []internal.ClientOption
-	sharedCalls syncx.SharedCalls
+	sharedCalls syncx.SingleFlight
 	lock        sync.Mutex
 }
 
@@ -25,7 +25,7 @@ func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
 		backend:     backend,
 		clients:     make(map[string]Client),
 		options:     opts,
-		sharedCalls: syncx.NewSharedCalls(),
+		sharedCalls: syncx.NewSingleFlight(),
 	}
 }