cachenode.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/wuntsong-org/go-zero-plus/core/jsonx"
  11. "github.com/wuntsong-org/go-zero-plus/core/logx"
  12. "github.com/wuntsong-org/go-zero-plus/core/mathx"
  13. "github.com/wuntsong-org/go-zero-plus/core/stat"
  14. "github.com/wuntsong-org/go-zero-plus/core/stores/redis"
  15. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  16. )
  17. const (
  18. notFoundPlaceholder = "*"
  19. // make the expiry unstable to avoid lots of cached items expire at the same time
  20. // make the unstable expiry to be [0.95, 1.05] * seconds
  21. expiryDeviation = 0.05
  22. )
  23. // indicates there is no such value associate with the key
  24. var errPlaceholder = errors.New("placeholder")
  25. type cacheNode struct {
  26. rds *redis.Redis
  27. expiry time.Duration
  28. notFoundExpiry time.Duration
  29. barrier syncx.SingleFlight
  30. r *rand.Rand
  31. lock *sync.Mutex
  32. unstableExpiry mathx.Unstable
  33. stat *Stat
  34. errNotFound error
  35. }
  36. // NewNode returns a cacheNode.
  37. // rds is the underlying redis node or cluster.
  38. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  39. // st is used to stat the cache.
  40. // errNotFound defines the error that returned on cache not found.
  41. // opts are the options that customize the cacheNode.
  42. func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
  43. errNotFound error, opts ...Option) Cache {
  44. o := newOptions(opts...)
  45. return cacheNode{
  46. rds: rds,
  47. expiry: o.Expiry,
  48. notFoundExpiry: o.NotFoundExpiry,
  49. barrier: barrier,
  50. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  51. lock: new(sync.Mutex),
  52. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  53. stat: st,
  54. errNotFound: errNotFound,
  55. }
  56. }
  57. // Del deletes cached values with keys.
  58. func (c cacheNode) Del(keys ...string) error {
  59. return c.DelCtx(context.Background(), keys...)
  60. }
  61. // DelCtx deletes cached values with keys.
  62. func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
  63. if len(keys) == 0 {
  64. return nil
  65. }
  66. logger := logx.WithContext(ctx)
  67. if len(keys) > 1 && c.rds.Type == redis.ClusterType {
  68. for _, key := range keys {
  69. if _, err := c.rds.DelCtx(ctx, key); err != nil {
  70. logger.Errorf("failed to clear cache with key: %q, error: %v", key, err)
  71. c.asyncRetryDelCache(key)
  72. }
  73. }
  74. } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
  75. logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  76. c.asyncRetryDelCache(keys...)
  77. }
  78. return nil
  79. }
  80. // Get gets the cache with key and fills into v.
  81. func (c cacheNode) Get(key string, val any) error {
  82. return c.GetCtx(context.Background(), key, val)
  83. }
  84. // GetCtx gets the cache with key and fills into v.
  85. func (c cacheNode) GetCtx(ctx context.Context, key string, val any) error {
  86. err := c.doGetCache(ctx, key, val)
  87. if errors.Is(err, errPlaceholder) {
  88. return c.errNotFound
  89. }
  90. return err
  91. }
  92. // IsNotFound checks if the given error is the defined errNotFound.
  93. func (c cacheNode) IsNotFound(err error) bool {
  94. return errors.Is(err, c.errNotFound)
  95. }
  96. // Set sets the cache with key and v, using c.expiry.
  97. func (c cacheNode) Set(key string, val any) error {
  98. return c.SetCtx(context.Background(), key, val)
  99. }
  100. // SetCtx sets the cache with key and v, using c.expiry.
  101. func (c cacheNode) SetCtx(ctx context.Context, key string, val any) error {
  102. return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry))
  103. }
  104. // SetWithExpire sets the cache with key and v, using given expire.
  105. func (c cacheNode) SetWithExpire(key string, val any, expire time.Duration) error {
  106. return c.SetWithExpireCtx(context.Background(), key, val, expire)
  107. }
  108. // SetWithExpireCtx sets the cache with key and v, using given expire.
  109. func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val any,
  110. expire time.Duration) error {
  111. data, err := jsonx.Marshal(val)
  112. if err != nil {
  113. return err
  114. }
  115. return c.rds.SetexCtx(ctx, key, string(data), int(math.Ceil(expire.Seconds())))
  116. }
  117. // String returns a string that represents the cacheNode.
  118. func (c cacheNode) String() string {
  119. return c.rds.Addr
  120. }
  121. // Take takes the result from cache first, if not found,
  122. // query from DB and set cache using c.expiry, then return the result.
  123. func (c cacheNode) Take(val any, key string, query func(val any) error) error {
  124. return c.TakeCtx(context.Background(), val, key, query)
  125. }
  126. // TakeCtx takes the result from cache first, if not found,
  127. // query from DB and set cache using c.expiry, then return the result.
  128. func (c cacheNode) TakeCtx(ctx context.Context, val any, key string,
  129. query func(val any) error) error {
  130. return c.doTake(ctx, val, key, query, func(v any) error {
  131. return c.SetCtx(ctx, key, v)
  132. })
  133. }
  134. // TakeWithExpire takes the result from cache first, if not found,
  135. // query from DB and set cache using given expire, then return the result.
  136. func (c cacheNode) TakeWithExpire(val any, key string, query func(val any,
  137. expire time.Duration) error) error {
  138. return c.TakeWithExpireCtx(context.Background(), val, key, query)
  139. }
  140. // TakeWithExpireCtx takes the result from cache first, if not found,
  141. // query from DB and set cache using given expire, then return the result.
  142. func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val any, key string,
  143. query func(val any, expire time.Duration) error) error {
  144. expire := c.aroundDuration(c.expiry)
  145. return c.doTake(ctx, val, key, func(v any) error {
  146. return query(v, expire)
  147. }, func(v any) error {
  148. return c.SetWithExpireCtx(ctx, key, v, expire)
  149. })
  150. }
  151. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  152. return c.unstableExpiry.AroundDuration(duration)
  153. }
  154. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  155. AddCleanTask(func() error {
  156. _, err := c.rds.Del(keys...)
  157. return err
  158. }, keys...)
  159. }
  160. func (c cacheNode) doGetCache(ctx context.Context, key string, v any) error {
  161. c.stat.IncrementTotal()
  162. data, err := c.rds.GetCtx(ctx, key)
  163. if err != nil {
  164. c.stat.IncrementMiss()
  165. return err
  166. }
  167. if len(data) == 0 {
  168. c.stat.IncrementMiss()
  169. return c.errNotFound
  170. }
  171. c.stat.IncrementHit()
  172. if data == notFoundPlaceholder {
  173. return errPlaceholder
  174. }
  175. return c.processCache(ctx, key, data, v)
  176. }
  177. func (c cacheNode) doTake(ctx context.Context, v any, key string,
  178. query func(v any) error, cacheVal func(v any) error) error {
  179. logger := logx.WithContext(ctx)
  180. val, fresh, err := c.barrier.DoEx(key, func() (any, error) {
  181. if err := c.doGetCache(ctx, key, v); err != nil {
  182. if errors.Is(err, errPlaceholder) {
  183. return nil, c.errNotFound
  184. } else if !errors.Is(err, c.errNotFound) {
  185. // why we just return the error instead of query from db,
  186. // because we don't allow the disaster pass to the dbs.
  187. // fail fast, in case we bring down the dbs.
  188. return nil, err
  189. }
  190. if err = query(v); errors.Is(err, c.errNotFound) {
  191. if err = c.setCacheWithNotFound(ctx, key); err != nil {
  192. logger.Error(err)
  193. }
  194. return nil, c.errNotFound
  195. } else if err != nil {
  196. c.stat.IncrementDbFails()
  197. return nil, err
  198. }
  199. if err = cacheVal(v); err != nil {
  200. logger.Error(err)
  201. }
  202. }
  203. return jsonx.Marshal(v)
  204. })
  205. if err != nil {
  206. return err
  207. }
  208. if fresh {
  209. return nil
  210. }
  211. // got the result from previous ongoing query.
  212. // why not call IncrementTotal at the beginning of this function?
  213. // because a shared error is returned, and we don't want to count.
  214. // for example, if the db is down, the query will be failed, we count
  215. // the shared errors with one db failure.
  216. c.stat.IncrementTotal()
  217. c.stat.IncrementHit()
  218. return jsonx.Unmarshal(val.([]byte), v)
  219. }
  220. func (c cacheNode) processCache(ctx context.Context, key, data string, v any) error {
  221. err := jsonx.Unmarshal([]byte(data), v)
  222. if err == nil {
  223. return nil
  224. }
  225. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  226. c.rds.Addr, key, data, err)
  227. logger := logx.WithContext(ctx)
  228. logger.Error(report)
  229. stat.Report(report)
  230. if _, e := c.rds.DelCtx(ctx, key); e != nil {
  231. logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  232. c.rds.Addr, key, data, e)
  233. }
  234. // returns errNotFound to reload the value by the given queryFn
  235. return c.errNotFound
  236. }
  237. func (c cacheNode) setCacheWithNotFound(ctx context.Context, key string) error {
  238. seconds := int(math.Ceil(c.aroundDuration(c.notFoundExpiry).Seconds()))
  239. _, err := c.rds.SetnxExCtx(ctx, key, notFoundPlaceholder, seconds)
  240. return err
  241. }