cachenode.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. "github.com/zeromicro/go-zero/core/jsonx"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "github.com/zeromicro/go-zero/core/mathx"
  12. "github.com/zeromicro/go-zero/core/stat"
  13. "github.com/zeromicro/go-zero/core/stores/redis"
  14. "github.com/zeromicro/go-zero/core/syncx"
  15. )
  16. const (
  17. notFoundPlaceholder = "*"
  18. // make the expiry unstable to avoid lots of cached items expire at the same time
  19. // make the unstable expiry to be [0.95, 1.05] * seconds
  20. expiryDeviation = 0.05
  21. )
  22. // indicates there is no such value associate with the key
  23. var errPlaceholder = errors.New("placeholder")
  24. type cacheNode struct {
  25. rds *redis.Redis
  26. expiry time.Duration
  27. notFoundExpiry time.Duration
  28. barrier syncx.SingleFlight
  29. r *rand.Rand
  30. lock *sync.Mutex
  31. unstableExpiry mathx.Unstable
  32. stat *Stat
  33. errNotFound error
  34. }
  35. // NewNode returns a cacheNode.
  36. // rds is the underlying redis node or cluster.
  37. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  38. // st is used to stat the cache.
  39. // errNotFound defines the error that returned on cache not found.
  40. // opts are the options that customize the cacheNode.
  41. func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
  42. errNotFound error, opts ...Option) Cache {
  43. o := newOptions(opts...)
  44. return cacheNode{
  45. rds: rds,
  46. expiry: o.Expiry,
  47. notFoundExpiry: o.NotFoundExpiry,
  48. barrier: barrier,
  49. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  50. lock: new(sync.Mutex),
  51. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  52. stat: st,
  53. errNotFound: errNotFound,
  54. }
  55. }
  56. // Del deletes cached values with keys.
  57. func (c cacheNode) Del(keys ...string) error {
  58. return c.DelCtx(context.Background(), keys...)
  59. }
  60. // DelCtx deletes cached values with keys.
  61. func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
  62. if len(keys) == 0 {
  63. return nil
  64. }
  65. logger := logx.WithContext(ctx)
  66. if len(keys) > 1 && c.rds.Type == redis.ClusterType {
  67. for _, key := range keys {
  68. if _, err := c.rds.DelCtx(ctx, key); err != nil {
  69. logger.Errorf("failed to clear cache with key: %q, error: %v", key, err)
  70. c.asyncRetryDelCache(key)
  71. }
  72. }
  73. } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
  74. logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  75. c.asyncRetryDelCache(keys...)
  76. }
  77. return nil
  78. }
  79. // Get gets the cache with key and fills into v.
  80. func (c cacheNode) Get(key string, val interface{}) error {
  81. return c.GetCtx(context.Background(), key, val)
  82. }
  83. // GetCtx gets the cache with key and fills into v.
  84. func (c cacheNode) GetCtx(ctx context.Context, key string, val interface{}) error {
  85. err := c.doGetCache(ctx, key, val)
  86. if err == errPlaceholder {
  87. return c.errNotFound
  88. }
  89. return err
  90. }
  91. // IsNotFound checks if the given error is the defined errNotFound.
  92. func (c cacheNode) IsNotFound(err error) bool {
  93. return errors.Is(err, c.errNotFound)
  94. }
  95. // Set sets the cache with key and v, using c.expiry.
  96. func (c cacheNode) Set(key string, val interface{}) error {
  97. return c.SetCtx(context.Background(), key, val)
  98. }
  99. // SetCtx sets the cache with key and v, using c.expiry.
  100. func (c cacheNode) SetCtx(ctx context.Context, key string, val interface{}) error {
  101. return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry))
  102. }
  103. // SetWithExpire sets the cache with key and v, using given expire.
  104. func (c cacheNode) SetWithExpire(key string, val interface{}, expire time.Duration) error {
  105. return c.SetWithExpireCtx(context.Background(), key, val, expire)
  106. }
  107. // SetWithExpireCtx sets the cache with key and v, using given expire.
  108. func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{},
  109. expire time.Duration) error {
  110. data, err := jsonx.Marshal(val)
  111. if err != nil {
  112. return err
  113. }
  114. return c.rds.SetexCtx(ctx, key, string(data), int(expire.Seconds()))
  115. }
  116. // String returns a string that represents the cacheNode.
  117. func (c cacheNode) String() string {
  118. return c.rds.Addr
  119. }
  120. // Take takes the result from cache first, if not found,
  121. // query from DB and set cache using c.expiry, then return the result.
  122. func (c cacheNode) Take(val interface{}, key string, query func(val interface{}) error) error {
  123. return c.TakeCtx(context.Background(), val, key, query)
  124. }
  125. // TakeCtx takes the result from cache first, if not found,
  126. // query from DB and set cache using c.expiry, then return the result.
  127. func (c cacheNode) TakeCtx(ctx context.Context, val interface{}, key string,
  128. query func(val interface{}) error) error {
  129. return c.doTake(ctx, val, key, query, func(v interface{}) error {
  130. return c.SetCtx(ctx, key, v)
  131. })
  132. }
  133. // TakeWithExpire takes the result from cache first, if not found,
  134. // query from DB and set cache using given expire, then return the result.
  135. func (c cacheNode) TakeWithExpire(val interface{}, key string, query func(val interface{},
  136. expire time.Duration) error) error {
  137. return c.TakeWithExpireCtx(context.Background(), val, key, query)
  138. }
  139. // TakeWithExpireCtx takes the result from cache first, if not found,
  140. // query from DB and set cache using given expire, then return the result.
  141. func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string,
  142. query func(val interface{}, expire time.Duration) error) error {
  143. expire := c.aroundDuration(c.expiry)
  144. return c.doTake(ctx, val, key, func(v interface{}) error {
  145. return query(v, expire)
  146. }, func(v interface{}) error {
  147. return c.SetWithExpireCtx(ctx, key, v, expire)
  148. })
  149. }
  150. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  151. return c.unstableExpiry.AroundDuration(duration)
  152. }
  153. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  154. AddCleanTask(func() error {
  155. _, err := c.rds.Del(keys...)
  156. return err
  157. }, keys...)
  158. }
  159. func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) error {
  160. c.stat.IncrementTotal()
  161. data, err := c.rds.GetCtx(ctx, key)
  162. if err != nil {
  163. c.stat.IncrementMiss()
  164. return err
  165. }
  166. if len(data) == 0 {
  167. c.stat.IncrementMiss()
  168. return c.errNotFound
  169. }
  170. c.stat.IncrementHit()
  171. if data == notFoundPlaceholder {
  172. return errPlaceholder
  173. }
  174. return c.processCache(ctx, key, data, v)
  175. }
  176. func (c cacheNode) doTake(ctx context.Context, v interface{}, key string,
  177. query func(v interface{}) error, cacheVal func(v interface{}) error) error {
  178. logger := logx.WithContext(ctx)
  179. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  180. if err := c.doGetCache(ctx, key, v); err != nil {
  181. if err == errPlaceholder {
  182. return nil, c.errNotFound
  183. } else if err != c.errNotFound {
  184. // why we just return the error instead of query from db,
  185. // because we don't allow the disaster pass to the dbs.
  186. // fail fast, in case we bring down the dbs.
  187. return nil, err
  188. }
  189. if err = query(v); err == c.errNotFound {
  190. if err = c.setCacheWithNotFound(ctx, key); err != nil {
  191. logger.Error(err)
  192. }
  193. return nil, c.errNotFound
  194. } else if err != nil {
  195. c.stat.IncrementDbFails()
  196. return nil, err
  197. }
  198. if err = cacheVal(v); err != nil {
  199. logger.Error(err)
  200. }
  201. }
  202. return jsonx.Marshal(v)
  203. })
  204. if err != nil {
  205. return err
  206. }
  207. if fresh {
  208. return nil
  209. }
  210. // got the result from previous ongoing query.
  211. // why not call IncrementTotal at the beginning of this function?
  212. // because a shared error is returned, and we don't want to count.
  213. // for example, if the db is down, the query will be failed, we count
  214. // the shared errors with one db failure.
  215. c.stat.IncrementTotal()
  216. c.stat.IncrementHit()
  217. return jsonx.Unmarshal(val.([]byte), v)
  218. }
  219. func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error {
  220. err := jsonx.Unmarshal([]byte(data), v)
  221. if err == nil {
  222. return nil
  223. }
  224. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  225. c.rds.Addr, key, data, err)
  226. logger := logx.WithContext(ctx)
  227. logger.Error(report)
  228. stat.Report(report)
  229. if _, e := c.rds.DelCtx(ctx, key); e != nil {
  230. logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  231. c.rds.Addr, key, data, e)
  232. }
  233. // returns errNotFound to reload the value by the given queryFn
  234. return c.errNotFound
  235. }
  236. func (c cacheNode) setCacheWithNotFound(ctx context.Context, key string) error {
  237. return c.rds.SetexCtx(ctx, key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  238. }