cachenode.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package cache
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/jsonx"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/mathx"
  11. "github.com/tal-tech/go-zero/core/stat"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. notFoundPlaceholder = "*"
  17. // make the expiry unstable to avoid lots of cached items expire at the same time
  18. // make the unstable expiry to be [0.95, 1.05] * seconds
  19. expiryDeviation = 0.05
  20. )
  21. // indicates there is no such value associate with the key
  22. var errPlaceholder = errors.New("placeholder")
  23. type cacheNode struct {
  24. rds *redis.Redis
  25. expiry time.Duration
  26. notFoundExpiry time.Duration
  27. barrier syncx.SharedCalls
  28. r *rand.Rand
  29. lock *sync.Mutex
  30. unstableExpiry mathx.Unstable
  31. stat *CacheStat
  32. errNotFound error
  33. }
  34. // NewCacheNode returns a cacheNode.
  35. // rds is the underlying redis node or cluster.
  36. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  37. // st is used to stat the cache.
  38. // errNotFound defines the error that returned on cache not found.
  39. // opts are the options that customize the cacheNode.
  40. func NewCacheNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
  41. errNotFound error, opts ...Option) Cache {
  42. o := newOptions(opts...)
  43. return cacheNode{
  44. rds: rds,
  45. expiry: o.Expiry,
  46. notFoundExpiry: o.NotFoundExpiry,
  47. barrier: barrier,
  48. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  49. lock: new(sync.Mutex),
  50. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  51. stat: st,
  52. errNotFound: errNotFound,
  53. }
  54. }
  55. // DelCache deletes cached values with keys.
  56. func (c cacheNode) DelCache(keys ...string) error {
  57. if len(keys) == 0 {
  58. return nil
  59. }
  60. if _, err := c.rds.Del(keys...); err != nil {
  61. logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  62. c.asyncRetryDelCache(keys...)
  63. }
  64. return nil
  65. }
  66. // GetCache gets the cache with key and fills into v.
  67. func (c cacheNode) GetCache(key string, v interface{}) error {
  68. if err := c.doGetCache(key, v); err == errPlaceholder {
  69. return c.errNotFound
  70. } else {
  71. return err
  72. }
  73. }
  74. // SetCache sets the cache with key and v, using c.expiry.
  75. func (c cacheNode) SetCache(key string, v interface{}) error {
  76. return c.SetCacheWithExpire(key, v, c.aroundDuration(c.expiry))
  77. }
  78. // SetCacheWithExpire sets the cache with key and v, using given expire.
  79. func (c cacheNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
  80. data, err := jsonx.Marshal(v)
  81. if err != nil {
  82. return err
  83. }
  84. return c.rds.Setex(key, string(data), int(expire.Seconds()))
  85. }
  86. // String returns a string that represents the cacheNode.
  87. func (c cacheNode) String() string {
  88. return c.rds.Addr
  89. }
  90. // TakeWithExpire takes the result from cache first, if not found,
  91. // query from DB and set cache using c.expiry, then return the result.
  92. func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  93. return c.doTake(v, key, query, func(v interface{}) error {
  94. return c.SetCache(key, v)
  95. })
  96. }
  97. // TakeWithExpire takes the result from cache first, if not found,
  98. // query from DB and set cache using given expire, then return the result.
  99. func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interface{},
  100. expire time.Duration) error) error {
  101. expire := c.aroundDuration(c.expiry)
  102. return c.doTake(v, key, func(v interface{}) error {
  103. return query(v, expire)
  104. }, func(v interface{}) error {
  105. return c.SetCacheWithExpire(key, v, expire)
  106. })
  107. }
  108. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  109. return c.unstableExpiry.AroundDuration(duration)
  110. }
  111. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  112. AddCleanTask(func() error {
  113. _, err := c.rds.Del(keys...)
  114. return err
  115. }, keys...)
  116. }
  117. func (c cacheNode) doGetCache(key string, v interface{}) error {
  118. c.stat.IncrementTotal()
  119. data, err := c.rds.Get(key)
  120. if err != nil {
  121. c.stat.IncrementMiss()
  122. return err
  123. }
  124. if len(data) == 0 {
  125. c.stat.IncrementMiss()
  126. return c.errNotFound
  127. }
  128. c.stat.IncrementHit()
  129. if data == notFoundPlaceholder {
  130. return errPlaceholder
  131. }
  132. return c.processCache(key, data, v)
  133. }
  134. func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
  135. cacheVal func(v interface{}) error) error {
  136. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  137. if err := c.doGetCache(key, v); err != nil {
  138. if err == errPlaceholder {
  139. return nil, c.errNotFound
  140. } else if err != c.errNotFound {
  141. // why we just return the error instead of query from db,
  142. // because we don't allow the disaster pass to the dbs.
  143. // fail fast, in case we bring down the dbs.
  144. return nil, err
  145. }
  146. if err = query(v); err == c.errNotFound {
  147. if err = c.setCacheWithNotFound(key); err != nil {
  148. logx.Error(err)
  149. }
  150. return nil, c.errNotFound
  151. } else if err != nil {
  152. c.stat.IncrementDbFails()
  153. return nil, err
  154. }
  155. if err = cacheVal(v); err != nil {
  156. logx.Error(err)
  157. }
  158. }
  159. return jsonx.Marshal(v)
  160. })
  161. if err != nil {
  162. return err
  163. }
  164. if fresh {
  165. return nil
  166. }
  167. // got the result from previous ongoing query
  168. c.stat.IncrementTotal()
  169. c.stat.IncrementHit()
  170. return jsonx.Unmarshal(val.([]byte), v)
  171. }
  172. func (c cacheNode) processCache(key string, data string, v interface{}) error {
  173. err := jsonx.Unmarshal([]byte(data), v)
  174. if err == nil {
  175. return nil
  176. }
  177. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  178. c.rds.Addr, key, data, err)
  179. logx.Error(report)
  180. stat.Report(report)
  181. if _, e := c.rds.Del(key); e != nil {
  182. logx.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  183. c.rds.Addr, key, data, e)
  184. }
  185. // returns errNotFound to reload the value by the given queryFn
  186. return c.errNotFound
  187. }
  188. func (c cacheNode) setCacheWithNotFound(key string) error {
  189. return c.rds.Setex(key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  190. }