cachenode.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. "github.com/zeromicro/go-zero/core/jsonx"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "github.com/zeromicro/go-zero/core/mathx"
  12. "github.com/zeromicro/go-zero/core/stat"
  13. "github.com/zeromicro/go-zero/core/stores/redis"
  14. "github.com/zeromicro/go-zero/core/syncx"
  15. )
  16. const (
  17. notFoundPlaceholder = "*"
  18. // make the expiry unstable to avoid lots of cached items expire at the same time
  19. // make the unstable expiry to be [0.95, 1.05] * seconds
  20. expiryDeviation = 0.05
  21. )
  22. // indicates there is no such value associate with the key
  23. var errPlaceholder = errors.New("placeholder")
  24. type cacheNode struct {
  25. rds *redis.Redis
  26. expiry time.Duration
  27. notFoundExpiry time.Duration
  28. barrier syncx.SingleFlight
  29. r *rand.Rand
  30. lock *sync.Mutex
  31. unstableExpiry mathx.Unstable
  32. stat *Stat
  33. errNotFound error
  34. }
  35. // NewNode returns a cacheNode.
  36. // rds is the underlying redis node or cluster.
  37. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  38. // st is used to stat the cache.
  39. // errNotFound defines the error that returned on cache not found.
  40. // opts are the options that customize the cacheNode.
  41. func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
  42. errNotFound error, opts ...Option) Cache {
  43. o := newOptions(opts...)
  44. return cacheNode{
  45. rds: rds,
  46. expiry: o.Expiry,
  47. notFoundExpiry: o.NotFoundExpiry,
  48. barrier: barrier,
  49. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  50. lock: new(sync.Mutex),
  51. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  52. stat: st,
  53. errNotFound: errNotFound,
  54. }
  55. }
  56. // Del deletes cached values with keys.
  57. func (c cacheNode) Del(keys ...string) error {
  58. return c.DelCtx(context.Background(), keys...)
  59. }
  60. // DelCtx deletes cached values with keys.
  61. func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
  62. if len(keys) == 0 {
  63. return nil
  64. }
  65. logger := logx.WithContext(ctx)
  66. if len(keys) > 1 && c.rds.Type == redis.ClusterType {
  67. for _, key := range keys {
  68. if _, err := c.rds.DelCtx(ctx, key); err != nil {
  69. logger.Errorf("failed to clear cache with key: %q, error: %v", key, err)
  70. c.asyncRetryDelCache(key)
  71. }
  72. }
  73. } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
  74. logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  75. c.asyncRetryDelCache(keys...)
  76. }
  77. return nil
  78. }
  79. // Get gets the cache with key and fills into v.
  80. func (c cacheNode) Get(key string, val interface{}) error {
  81. return c.GetCtx(context.Background(), key, val)
  82. }
  83. // GetCtx gets the cache with key and fills into v.
  84. func (c cacheNode) GetCtx(ctx context.Context, key string, val interface{}) error {
  85. err := c.doGetCache(ctx, key, val)
  86. if err == errPlaceholder {
  87. return c.errNotFound
  88. }
  89. return err
  90. }
  91. // IsNotFound checks if the given error is the defined errNotFound.
  92. func (c cacheNode) IsNotFound(err error) bool {
  93. return errors.Is(err, c.errNotFound)
  94. }
  95. // Set sets the cache with key and v, using c.expiry.
  96. func (c cacheNode) Set(key string, val interface{}) error {
  97. return c.SetCtx(context.Background(), key, val)
  98. }
  99. // SetCtx sets the cache with key and v, using c.expiry.
  100. func (c cacheNode) SetCtx(ctx context.Context, key string, val interface{}) error {
  101. return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry))
  102. }
  103. // SetWithExpire sets the cache with key and v, using given expire.
  104. func (c cacheNode) SetWithExpire(key string, val interface{}, expire time.Duration) error {
  105. return c.SetWithExpireCtx(context.Background(), key, val, expire)
  106. }
  107. // SetWithExpireCtx sets the cache with key and v, using given expire.
  108. func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error {
  109. data, err := jsonx.Marshal(val)
  110. if err != nil {
  111. return err
  112. }
  113. return c.rds.SetexCtx(ctx, key, string(data), int(expire.Seconds()))
  114. }
  115. // String returns a string that represents the cacheNode.
  116. func (c cacheNode) String() string {
  117. return c.rds.Addr
  118. }
  119. // Take takes the result from cache first, if not found,
  120. // query from DB and set cache using c.expiry, then return the result.
  121. func (c cacheNode) Take(val interface{}, key string, query func(val interface{}) error) error {
  122. return c.TakeCtx(context.Background(), val, key, query)
  123. }
  124. // TakeCtx takes the result from cache first, if not found,
  125. // query from DB and set cache using c.expiry, then return the result.
  126. func (c cacheNode) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error {
  127. return c.doTake(ctx, val, key, query, func(v interface{}) error {
  128. return c.SetCtx(ctx, key, v)
  129. })
  130. }
  131. // TakeWithExpire takes the result from cache first, if not found,
  132. // query from DB and set cache using given expire, then return the result.
  133. func (c cacheNode) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
  134. return c.TakeWithExpireCtx(context.Background(), val, key, query)
  135. }
  136. // TakeWithExpireCtx takes the result from cache first, if not found,
  137. // query from DB and set cache using given expire, then return the result.
  138. func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
  139. expire := c.aroundDuration(c.expiry)
  140. return c.doTake(ctx, val, key, func(v interface{}) error {
  141. return query(v, expire)
  142. }, func(v interface{}) error {
  143. return c.SetWithExpireCtx(ctx, key, v, expire)
  144. })
  145. }
  146. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  147. return c.unstableExpiry.AroundDuration(duration)
  148. }
  149. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  150. AddCleanTask(func() error {
  151. _, err := c.rds.Del(keys...)
  152. return err
  153. }, keys...)
  154. }
  155. func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) error {
  156. c.stat.IncrementTotal()
  157. data, err := c.rds.GetCtx(ctx, key)
  158. if err != nil {
  159. c.stat.IncrementMiss()
  160. return err
  161. }
  162. if len(data) == 0 {
  163. c.stat.IncrementMiss()
  164. return c.errNotFound
  165. }
  166. c.stat.IncrementHit()
  167. if data == notFoundPlaceholder {
  168. return errPlaceholder
  169. }
  170. return c.processCache(ctx, key, data, v)
  171. }
  172. func (c cacheNode) doTake(ctx context.Context, v interface{}, key string,
  173. query func(v interface{}) error, cacheVal func(v interface{}) error) error {
  174. logger := logx.WithContext(ctx)
  175. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  176. if err := c.doGetCache(ctx, key, v); err != nil {
  177. if err == errPlaceholder {
  178. return nil, c.errNotFound
  179. } else if err != c.errNotFound {
  180. // why we just return the error instead of query from db,
  181. // because we don't allow the disaster pass to the dbs.
  182. // fail fast, in case we bring down the dbs.
  183. return nil, err
  184. }
  185. if err = query(v); err == c.errNotFound {
  186. if err = c.setCacheWithNotFound(ctx, key); err != nil {
  187. logger.Error(err)
  188. }
  189. return nil, c.errNotFound
  190. } else if err != nil {
  191. c.stat.IncrementDbFails()
  192. return nil, err
  193. }
  194. if err = cacheVal(v); err != nil {
  195. logger.Error(err)
  196. }
  197. }
  198. return jsonx.Marshal(v)
  199. })
  200. if err != nil {
  201. return err
  202. }
  203. if fresh {
  204. return nil
  205. }
  206. // got the result from previous ongoing query
  207. c.stat.IncrementTotal()
  208. c.stat.IncrementHit()
  209. return jsonx.Unmarshal(val.([]byte), v)
  210. }
  211. func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error {
  212. err := jsonx.Unmarshal([]byte(data), v)
  213. if err == nil {
  214. return nil
  215. }
  216. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  217. c.rds.Addr, key, data, err)
  218. logger := logx.WithContext(ctx)
  219. logger.Error(report)
  220. stat.Report(report)
  221. if _, e := c.rds.DelCtx(ctx, key); e != nil {
  222. logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  223. c.rds.Addr, key, data, e)
  224. }
  225. // returns errNotFound to reload the value by the given queryFn
  226. return c.errNotFound
  227. }
  228. func (c cacheNode) setCacheWithNotFound(ctx context.Context, key string) error {
  229. return c.rds.SetexCtx(ctx, key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  230. }