cachenode.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package cache
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/jsonx"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/mathx"
  11. "github.com/tal-tech/go-zero/core/stat"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. notFoundPlaceholder = "*"
  17. // make the expiry unstable to avoid lots of cached items expire at the same time
  18. // make the unstable expiry to be [0.95, 1.05] * seconds
  19. expiryDeviation = 0.05
  20. )
  21. // indicates there is no such value associate with the key
  22. var errPlaceholder = errors.New("placeholder")
  23. type cacheNode struct {
  24. rds *redis.Redis
  25. expiry time.Duration
  26. notFoundExpiry time.Duration
  27. barrier syncx.SingleFlight
  28. r *rand.Rand
  29. lock *sync.Mutex
  30. unstableExpiry mathx.Unstable
  31. stat *Stat
  32. errNotFound error
  33. }
  34. // NewNode returns a cacheNode.
  35. // rds is the underlying redis node or cluster.
  36. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  37. // st is used to stat the cache.
  38. // errNotFound defines the error that returned on cache not found.
  39. // opts are the options that customize the cacheNode.
  40. func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
  41. errNotFound error, opts ...Option) Cache {
  42. o := newOptions(opts...)
  43. return cacheNode{
  44. rds: rds,
  45. expiry: o.Expiry,
  46. notFoundExpiry: o.NotFoundExpiry,
  47. barrier: barrier,
  48. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  49. lock: new(sync.Mutex),
  50. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  51. stat: st,
  52. errNotFound: errNotFound,
  53. }
  54. }
  55. // Del deletes cached values with keys.
  56. func (c cacheNode) Del(keys ...string) error {
  57. if len(keys) == 0 {
  58. return nil
  59. }
  60. if len(keys) > 1 && c.rds.Type == redis.ClusterType {
  61. for _, key := range keys {
  62. if _, err := c.rds.Del(key); err != nil {
  63. logx.Errorf("failed to clear cache with key: %q, error: %v", key, err)
  64. c.asyncRetryDelCache(key)
  65. }
  66. }
  67. } else {
  68. if _, err := c.rds.Del(keys...); err != nil {
  69. logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  70. c.asyncRetryDelCache(keys...)
  71. }
  72. }
  73. return nil
  74. }
  75. // Get gets the cache with key and fills into v.
  76. func (c cacheNode) Get(key string, v interface{}) error {
  77. err := c.doGetCache(key, v)
  78. if err == errPlaceholder {
  79. return c.errNotFound
  80. }
  81. return err
  82. }
  83. // IsNotFound checks if the given error is the defined errNotFound.
  84. func (c cacheNode) IsNotFound(err error) bool {
  85. return err == c.errNotFound
  86. }
  87. // Set sets the cache with key and v, using c.expiry.
  88. func (c cacheNode) Set(key string, v interface{}) error {
  89. return c.SetWithExpire(key, v, c.aroundDuration(c.expiry))
  90. }
  91. // SetWithExpire sets the cache with key and v, using given expire.
  92. func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
  93. data, err := jsonx.Marshal(v)
  94. if err != nil {
  95. return err
  96. }
  97. return c.rds.Setex(key, string(data), int(expire.Seconds()))
  98. }
  99. // String returns a string that represents the cacheNode.
  100. func (c cacheNode) String() string {
  101. return c.rds.Addr
  102. }
  103. // Take takes the result from cache first, if not found,
  104. // query from DB and set cache using c.expiry, then return the result.
  105. func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  106. return c.doTake(v, key, query, func(v interface{}) error {
  107. return c.Set(key, v)
  108. })
  109. }
  110. // TakeWithExpire takes the result from cache first, if not found,
  111. // query from DB and set cache using given expire, then return the result.
  112. func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interface{},
  113. expire time.Duration) error) error {
  114. expire := c.aroundDuration(c.expiry)
  115. return c.doTake(v, key, func(v interface{}) error {
  116. return query(v, expire)
  117. }, func(v interface{}) error {
  118. return c.SetWithExpire(key, v, expire)
  119. })
  120. }
  121. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  122. return c.unstableExpiry.AroundDuration(duration)
  123. }
  124. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  125. AddCleanTask(func() error {
  126. _, err := c.rds.Del(keys...)
  127. return err
  128. }, keys...)
  129. }
  130. func (c cacheNode) doGetCache(key string, v interface{}) error {
  131. c.stat.IncrementTotal()
  132. data, err := c.rds.Get(key)
  133. if err != nil {
  134. c.stat.IncrementMiss()
  135. return err
  136. }
  137. if len(data) == 0 {
  138. c.stat.IncrementMiss()
  139. return c.errNotFound
  140. }
  141. c.stat.IncrementHit()
  142. if data == notFoundPlaceholder {
  143. return errPlaceholder
  144. }
  145. return c.processCache(key, data, v)
  146. }
  147. func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
  148. cacheVal func(v interface{}) error) error {
  149. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  150. if err := c.doGetCache(key, v); err != nil {
  151. if err == errPlaceholder {
  152. return nil, c.errNotFound
  153. } else if err != c.errNotFound {
  154. // why we just return the error instead of query from db,
  155. // because we don't allow the disaster pass to the dbs.
  156. // fail fast, in case we bring down the dbs.
  157. return nil, err
  158. }
  159. if err = query(v); err == c.errNotFound {
  160. if err = c.setCacheWithNotFound(key); err != nil {
  161. logx.Error(err)
  162. }
  163. return nil, c.errNotFound
  164. } else if err != nil {
  165. c.stat.IncrementDbFails()
  166. return nil, err
  167. }
  168. if err = cacheVal(v); err != nil {
  169. logx.Error(err)
  170. }
  171. }
  172. return jsonx.Marshal(v)
  173. })
  174. if err != nil {
  175. return err
  176. }
  177. if fresh {
  178. return nil
  179. }
  180. // got the result from previous ongoing query
  181. c.stat.IncrementTotal()
  182. c.stat.IncrementHit()
  183. return jsonx.Unmarshal(val.([]byte), v)
  184. }
  185. func (c cacheNode) processCache(key, data string, v interface{}) error {
  186. err := jsonx.Unmarshal([]byte(data), v)
  187. if err == nil {
  188. return nil
  189. }
  190. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  191. c.rds.Addr, key, data, err)
  192. logx.Error(report)
  193. stat.Report(report)
  194. if _, e := c.rds.Del(key); e != nil {
  195. logx.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  196. c.rds.Addr, key, data, e)
  197. }
  198. // returns errNotFound to reload the value by the given queryFn
  199. return c.errNotFound
  200. }
  201. func (c cacheNode) setCacheWithNotFound(key string) error {
  202. return c.rds.Setex(key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  203. }