cachenode.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/zeromicro/go-zero/core/jsonx"
  11. "github.com/zeromicro/go-zero/core/logx"
  12. "github.com/zeromicro/go-zero/core/mathx"
  13. "github.com/zeromicro/go-zero/core/stat"
  14. "github.com/zeromicro/go-zero/core/stores/redis"
  15. "github.com/zeromicro/go-zero/core/syncx"
  16. )
  17. const (
  18. notFoundPlaceholder = "*"
  19. // make the expiry unstable to avoid lots of cached items expire at the same time
  20. // make the unstable expiry to be [0.95, 1.05] * seconds
  21. expiryDeviation = 0.05
  22. )
  23. // indicates there is no such value associate with the key
  24. var errPlaceholder = errors.New("placeholder")
  25. type cacheNode struct {
  26. rds *redis.Redis
  27. expiry time.Duration
  28. notFoundExpiry time.Duration
  29. barrier syncx.SingleFlight
  30. r *rand.Rand
  31. lock *sync.Mutex
  32. unstableExpiry mathx.Unstable
  33. stat *Stat
  34. errNotFound error
  35. }
  36. // NewNode returns a cacheNode.
  37. // rds is the underlying redis node or cluster.
  38. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  39. // st is used to stat the cache.
  40. // errNotFound defines the error that returned on cache not found.
  41. // opts are the options that customize the cacheNode.
  42. func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
  43. errNotFound error, opts ...Option) Cache {
  44. o := newOptions(opts...)
  45. return cacheNode{
  46. rds: rds,
  47. expiry: o.Expiry,
  48. notFoundExpiry: o.NotFoundExpiry,
  49. barrier: barrier,
  50. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  51. lock: new(sync.Mutex),
  52. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  53. stat: st,
  54. errNotFound: errNotFound,
  55. }
  56. }
  57. // Del deletes cached values with keys.
  58. func (c cacheNode) Del(keys ...string) error {
  59. return c.DelCtx(context.Background(), keys...)
  60. }
  61. // DelCtx deletes cached values with keys.
  62. func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
  63. if len(keys) == 0 {
  64. return nil
  65. }
  66. logger := logx.WithContext(ctx)
  67. if len(keys) > 1 && c.rds.Type == redis.ClusterType {
  68. for _, key := range keys {
  69. if _, err := c.rds.DelCtx(ctx, key); err != nil {
  70. logger.Errorf("failed to clear cache with key: %q, error: %v", key, err)
  71. c.asyncRetryDelCache(key)
  72. }
  73. }
  74. } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
  75. logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  76. c.asyncRetryDelCache(keys...)
  77. }
  78. return nil
  79. }
  80. // Get gets the cache with key and fills into v.
  81. func (c cacheNode) Get(key string, val interface{}) error {
  82. return c.GetCtx(context.Background(), key, val)
  83. }
  84. // GetCtx gets the cache with key and fills into v.
  85. func (c cacheNode) GetCtx(ctx context.Context, key string, val interface{}) error {
  86. err := c.doGetCache(ctx, key, val)
  87. if err == errPlaceholder {
  88. return c.errNotFound
  89. }
  90. return err
  91. }
  92. // IsNotFound checks if the given error is the defined errNotFound.
  93. func (c cacheNode) IsNotFound(err error) bool {
  94. return errors.Is(err, c.errNotFound)
  95. }
  96. // Set sets the cache with key and v, using c.expiry.
  97. func (c cacheNode) Set(key string, val interface{}) error {
  98. return c.SetCtx(context.Background(), key, val)
  99. }
  100. // SetCtx sets the cache with key and v, using c.expiry.
  101. func (c cacheNode) SetCtx(ctx context.Context, key string, val interface{}) error {
  102. return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry))
  103. }
  104. // SetWithExpire sets the cache with key and v, using given expire.
  105. func (c cacheNode) SetWithExpire(key string, val interface{}, expire time.Duration) error {
  106. return c.SetWithExpireCtx(context.Background(), key, val, expire)
  107. }
  108. // SetWithExpireCtx sets the cache with key and v, using given expire.
  109. func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{},
  110. expire time.Duration) error {
  111. data, err := jsonx.Marshal(val)
  112. if err != nil {
  113. return err
  114. }
  115. return c.rds.SetexCtx(ctx, key, string(data), int(math.Ceil(expire.Seconds())))
  116. }
  117. // String returns a string that represents the cacheNode.
  118. func (c cacheNode) String() string {
  119. return c.rds.Addr
  120. }
  121. // Take takes the result from cache first, if not found,
  122. // query from DB and set cache using c.expiry, then return the result.
  123. func (c cacheNode) Take(val interface{}, key string, query func(val interface{}) error) error {
  124. return c.TakeCtx(context.Background(), val, key, query)
  125. }
  126. // TakeCtx takes the result from cache first, if not found,
  127. // query from DB and set cache using c.expiry, then return the result.
  128. func (c cacheNode) TakeCtx(ctx context.Context, val interface{}, key string,
  129. query func(val interface{}) error) error {
  130. return c.doTake(ctx, val, key, query, func(v interface{}) error {
  131. return c.SetCtx(ctx, key, v)
  132. })
  133. }
  134. // TakeWithExpire takes the result from cache first, if not found,
  135. // query from DB and set cache using given expire, then return the result.
  136. func (c cacheNode) TakeWithExpire(val interface{}, key string, query func(val interface{},
  137. expire time.Duration) error) error {
  138. return c.TakeWithExpireCtx(context.Background(), val, key, query)
  139. }
  140. // TakeWithExpireCtx takes the result from cache first, if not found,
  141. // query from DB and set cache using given expire, then return the result.
  142. func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string,
  143. query func(val interface{}, expire time.Duration) error) error {
  144. expire := c.aroundDuration(c.expiry)
  145. return c.doTake(ctx, val, key, func(v interface{}) error {
  146. return query(v, expire)
  147. }, func(v interface{}) error {
  148. return c.SetWithExpireCtx(ctx, key, v, expire)
  149. })
  150. }
  151. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  152. return c.unstableExpiry.AroundDuration(duration)
  153. }
  154. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  155. AddCleanTask(func() error {
  156. _, err := c.rds.Del(keys...)
  157. return err
  158. }, keys...)
  159. }
  160. func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) error {
  161. c.stat.IncrementTotal()
  162. data, err := c.rds.GetCtx(ctx, key)
  163. if err != nil {
  164. c.stat.IncrementMiss()
  165. return err
  166. }
  167. if len(data) == 0 {
  168. c.stat.IncrementMiss()
  169. return c.errNotFound
  170. }
  171. c.stat.IncrementHit()
  172. if data == notFoundPlaceholder {
  173. return errPlaceholder
  174. }
  175. return c.processCache(ctx, key, data, v)
  176. }
  177. func (c cacheNode) doTake(ctx context.Context, v interface{}, key string,
  178. query func(v interface{}) error, cacheVal func(v interface{}) error) error {
  179. logger := logx.WithContext(ctx)
  180. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  181. if err := c.doGetCache(ctx, key, v); err != nil {
  182. if err == errPlaceholder {
  183. return nil, c.errNotFound
  184. } else if err != c.errNotFound {
  185. // why we just return the error instead of query from db,
  186. // because we don't allow the disaster pass to the dbs.
  187. // fail fast, in case we bring down the dbs.
  188. return nil, err
  189. }
  190. if err = query(v); err == c.errNotFound {
  191. if err = c.setCacheWithNotFound(ctx, key); err != nil {
  192. logger.Error(err)
  193. }
  194. return nil, c.errNotFound
  195. } else if err != nil {
  196. c.stat.IncrementDbFails()
  197. return nil, err
  198. }
  199. if err = cacheVal(v); err != nil {
  200. logger.Error(err)
  201. }
  202. }
  203. return jsonx.Marshal(v)
  204. })
  205. if err != nil {
  206. return err
  207. }
  208. if fresh {
  209. return nil
  210. }
  211. // got the result from previous ongoing query.
  212. // why not call IncrementTotal at the beginning of this function?
  213. // because a shared error is returned, and we don't want to count.
  214. // for example, if the db is down, the query will be failed, we count
  215. // the shared errors with one db failure.
  216. c.stat.IncrementTotal()
  217. c.stat.IncrementHit()
  218. return jsonx.Unmarshal(val.([]byte), v)
  219. }
  220. func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error {
  221. err := jsonx.Unmarshal([]byte(data), v)
  222. if err == nil {
  223. return nil
  224. }
  225. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  226. c.rds.Addr, key, data, err)
  227. logger := logx.WithContext(ctx)
  228. logger.Error(report)
  229. stat.Report(report)
  230. if _, e := c.rds.DelCtx(ctx, key); e != nil {
  231. logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  232. c.rds.Addr, key, data, e)
  233. }
  234. // returns errNotFound to reload the value by the given queryFn
  235. return c.errNotFound
  236. }
  237. func (c cacheNode) setCacheWithNotFound(ctx context.Context, key string) error {
  238. seconds := int(math.Ceil(c.aroundDuration(c.notFoundExpiry).Seconds()))
  239. return c.rds.SetexCtx(ctx, key, notFoundPlaceholder, seconds)
  240. }