|
@@ -2,6 +2,7 @@ package redis
|
|
|
|
|
|
import (
|
|
import (
|
|
"math/rand"
|
|
"math/rand"
|
|
|
|
+ "strconv"
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
@@ -11,19 +12,26 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
const (
|
|
const (
|
|
|
|
+ randomLen = 16
|
|
|
|
+ tolerance = 500 // milliseconds
|
|
|
|
+ millisPerSecond = 1000
|
|
|
|
+ lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
|
|
+ redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
|
|
|
|
+ return "OK"
|
|
|
|
+else
|
|
|
|
+ return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
|
|
|
|
+end`
|
|
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
return redis.call("DEL", KEYS[1])
|
|
else
|
|
else
|
|
return 0
|
|
return 0
|
|
end`
|
|
end`
|
|
- randomLen = 16
|
|
|
|
)
|
|
)
|
|
|
|
|
|
// A RedisLock is a redis lock.
|
|
// A RedisLock is a redis lock.
|
|
type RedisLock struct {
|
|
type RedisLock struct {
|
|
store *Redis
|
|
store *Redis
|
|
seconds uint32
|
|
seconds uint32
|
|
- count int32
|
|
|
|
key string
|
|
key string
|
|
id string
|
|
id string
|
|
}
|
|
}
|
|
@@ -43,35 +51,30 @@ func NewRedisLock(store *Redis, key string) *RedisLock {
|
|
|
|
|
|
// Acquire acquires the lock.
|
|
// Acquire acquires the lock.
|
|
func (rl *RedisLock) Acquire() (bool, error) {
|
|
func (rl *RedisLock) Acquire() (bool, error) {
|
|
- newCount := atomic.AddInt32(&rl.count, 1)
|
|
|
|
- if newCount > 1 {
|
|
|
|
- return true, nil
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
seconds := atomic.LoadUint32(&rl.seconds)
|
|
seconds := atomic.LoadUint32(&rl.seconds)
|
|
- ok, err := rl.store.SetnxEx(rl.key, rl.id, int(seconds+1)) // +1s for tolerance
|
|
|
|
|
|
+ resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{
|
|
|
|
+ rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
|
|
|
|
+ })
|
|
if err == red.Nil {
|
|
if err == red.Nil {
|
|
- atomic.AddInt32(&rl.count, -1)
|
|
|
|
return false, nil
|
|
return false, nil
|
|
} else if err != nil {
|
|
} else if err != nil {
|
|
- atomic.AddInt32(&rl.count, -1)
|
|
|
|
logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
|
|
logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
|
|
return false, err
|
|
return false, err
|
|
- } else if !ok {
|
|
|
|
- atomic.AddInt32(&rl.count, -1)
|
|
|
|
|
|
+ } else if resp == nil {
|
|
return false, nil
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
|
|
- return true, nil
|
|
|
|
|
|
+ reply, ok := resp.(string)
|
|
|
|
+ if ok && reply == "OK" {
|
|
|
|
+ return true, nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)
|
|
|
|
+ return false, nil
|
|
}
|
|
}
|
|
|
|
|
|
// Release releases the lock.
|
|
// Release releases the lock.
|
|
func (rl *RedisLock) Release() (bool, error) {
|
|
func (rl *RedisLock) Release() (bool, error) {
|
|
- newCount := atomic.AddInt32(&rl.count, -1)
|
|
|
|
- if newCount > 0 {
|
|
|
|
- return true, nil
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
|
|
resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
|
|
if err != nil {
|
|
if err != nil {
|
|
return false, err
|
|
return false, err
|