cache.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "time"
  8. "github.com/zeromicro/go-zero/core/errorx"
  9. "github.com/zeromicro/go-zero/core/hash"
  10. "github.com/zeromicro/go-zero/core/syncx"
  11. )
  12. type (
  13. // Cache interface is used to define the cache implementation.
  14. Cache interface {
  15. // Del deletes cached values with keys.
  16. Del(keys ...string) error
  17. // DelCtx deletes cached values with keys.
  18. DelCtx(ctx context.Context, keys ...string) error
  19. // Get gets the cache with key and fills into v.
  20. Get(key string, val interface{}) error
  21. // GetCtx gets the cache with key and fills into v.
  22. GetCtx(ctx context.Context, key string, val interface{}) error
  23. // IsNotFound checks if the given error is the defined errNotFound.
  24. IsNotFound(err error) bool
  25. // Set sets the cache with key and v, using c.expiry.
  26. Set(key string, val interface{}) error
  27. // SetCtx sets the cache with key and v, using c.expiry.
  28. SetCtx(ctx context.Context, key string, val interface{}) error
  29. // SetWithExpire sets the cache with key and v, using given expire.
  30. SetWithExpire(key string, val interface{}, expire time.Duration) error
  31. // SetWithExpireCtx sets the cache with key and v, using given expire.
  32. SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error
  33. // Take takes the result from cache first, if not found,
  34. // query from DB and set cache using c.expiry, then return the result.
  35. Take(val interface{}, key string, query func(val interface{}) error) error
  36. // TakeCtx takes the result from cache first, if not found,
  37. // query from DB and set cache using c.expiry, then return the result.
  38. TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error
  39. // TakeWithExpire takes the result from cache first, if not found,
  40. // query from DB and set cache using given expire, then return the result.
  41. TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error
  42. // TakeWithExpireCtx takes the result from cache first, if not found,
  43. // query from DB and set cache using given expire, then return the result.
  44. TakeWithExpireCtx(ctx context.Context, val interface{}, key string,
  45. query func(val interface{}, expire time.Duration) error) error
  46. }
  47. cacheCluster struct {
  48. dispatcher *hash.ConsistentHash
  49. errNotFound error
  50. }
  51. )
  52. // New returns a Cache.
  53. func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error,
  54. opts ...Option) Cache {
  55. if len(c) == 0 || TotalWeights(c) <= 0 {
  56. log.Fatal("no cache nodes")
  57. }
  58. if len(c) == 1 {
  59. return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
  60. }
  61. dispatcher := hash.NewConsistentHash()
  62. for _, node := range c {
  63. cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
  64. dispatcher.AddWithWeight(cn, node.Weight)
  65. }
  66. return cacheCluster{
  67. dispatcher: dispatcher,
  68. errNotFound: errNotFound,
  69. }
  70. }
  71. // Del deletes cached values with keys.
  72. func (cc cacheCluster) Del(keys ...string) error {
  73. return cc.DelCtx(context.Background(), keys...)
  74. }
  75. // DelCtx deletes cached values with keys.
  76. func (cc cacheCluster) DelCtx(ctx context.Context, keys ...string) error {
  77. switch len(keys) {
  78. case 0:
  79. return nil
  80. case 1:
  81. key := keys[0]
  82. c, ok := cc.dispatcher.Get(key)
  83. if !ok {
  84. return cc.errNotFound
  85. }
  86. return c.(Cache).DelCtx(ctx, key)
  87. default:
  88. var be errorx.BatchError
  89. nodes := make(map[interface{}][]string)
  90. for _, key := range keys {
  91. c, ok := cc.dispatcher.Get(key)
  92. if !ok {
  93. be.Add(fmt.Errorf("key %q not found", key))
  94. continue
  95. }
  96. nodes[c] = append(nodes[c], key)
  97. }
  98. for c, ks := range nodes {
  99. if err := c.(Cache).DelCtx(ctx, ks...); err != nil {
  100. be.Add(err)
  101. }
  102. }
  103. return be.Err()
  104. }
  105. }
  106. // Get gets the cache with key and fills into v.
  107. func (cc cacheCluster) Get(key string, val interface{}) error {
  108. return cc.GetCtx(context.Background(), key, val)
  109. }
  110. // GetCtx gets the cache with key and fills into v.
  111. func (cc cacheCluster) GetCtx(ctx context.Context, key string, val interface{}) error {
  112. c, ok := cc.dispatcher.Get(key)
  113. if !ok {
  114. return cc.errNotFound
  115. }
  116. return c.(Cache).GetCtx(ctx, key, val)
  117. }
  118. // IsNotFound checks if the given error is the defined errNotFound.
  119. func (cc cacheCluster) IsNotFound(err error) bool {
  120. return errors.Is(err, cc.errNotFound)
  121. }
  122. // Set sets the cache with key and v, using c.expiry.
  123. func (cc cacheCluster) Set(key string, val interface{}) error {
  124. return cc.SetCtx(context.Background(), key, val)
  125. }
  126. // SetCtx sets the cache with key and v, using c.expiry.
  127. func (cc cacheCluster) SetCtx(ctx context.Context, key string, val interface{}) error {
  128. c, ok := cc.dispatcher.Get(key)
  129. if !ok {
  130. return cc.errNotFound
  131. }
  132. return c.(Cache).SetCtx(ctx, key, val)
  133. }
  134. // SetWithExpire sets the cache with key and v, using given expire.
  135. func (cc cacheCluster) SetWithExpire(key string, val interface{}, expire time.Duration) error {
  136. return cc.SetWithExpireCtx(context.Background(), key, val, expire)
  137. }
  138. // SetWithExpireCtx sets the cache with key and v, using given expire.
  139. func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error {
  140. c, ok := cc.dispatcher.Get(key)
  141. if !ok {
  142. return cc.errNotFound
  143. }
  144. return c.(Cache).SetWithExpireCtx(ctx, key, val, expire)
  145. }
  146. // Take takes the result from cache first, if not found,
  147. // query from DB and set cache using c.expiry, then return the result.
  148. func (cc cacheCluster) Take(val interface{}, key string, query func(val interface{}) error) error {
  149. return cc.TakeCtx(context.Background(), val, key, query)
  150. }
  151. // TakeCtx takes the result from cache first, if not found,
  152. // query from DB and set cache using c.expiry, then return the result.
  153. func (cc cacheCluster) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error {
  154. c, ok := cc.dispatcher.Get(key)
  155. if !ok {
  156. return cc.errNotFound
  157. }
  158. return c.(Cache).TakeCtx(ctx, val, key, query)
  159. }
  160. // TakeWithExpire takes the result from cache first, if not found,
  161. // query from DB and set cache using given expire, then return the result.
  162. func (cc cacheCluster) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
  163. return cc.TakeWithExpireCtx(context.Background(), val, key, query)
  164. }
  165. // TakeWithExpireCtx takes the result from cache first, if not found,
  166. // query from DB and set cache using given expire, then return the result.
  167. func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
  168. c, ok := cc.dispatcher.Get(key)
  169. if !ok {
  170. return cc.errNotFound
  171. }
  172. return c.(Cache).TakeWithExpireCtx(ctx, val, key, query)
  173. }