cache.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "time"
  8. "github.com/wuntsong-org/go-zero-plus/core/errorx"
  9. "github.com/wuntsong-org/go-zero-plus/core/hash"
  10. "github.com/wuntsong-org/go-zero-plus/core/stores/redis"
  11. "github.com/wuntsong-org/go-zero-plus/core/syncx"
  12. )
  13. type (
  14. // Cache interface is used to define the cache implementation.
  15. Cache interface {
  16. // Del deletes cached values with keys.
  17. Del(keys ...string) error
  18. // DelCtx deletes cached values with keys.
  19. DelCtx(ctx context.Context, keys ...string) error
  20. // Get gets the cache with key and fills into v.
  21. Get(key string, val any) error
  22. // GetCtx gets the cache with key and fills into v.
  23. GetCtx(ctx context.Context, key string, val any) error
  24. // IsNotFound checks if the given error is the defined errNotFound.
  25. IsNotFound(err error) bool
  26. // Set sets the cache with key and v, using c.expiry.
  27. Set(key string, val any) error
  28. // SetCtx sets the cache with key and v, using c.expiry.
  29. SetCtx(ctx context.Context, key string, val any) error
  30. // SetWithExpire sets the cache with key and v, using given expire.
  31. SetWithExpire(key string, val any, expire time.Duration) error
  32. // SetWithExpireCtx sets the cache with key and v, using given expire.
  33. SetWithExpireCtx(ctx context.Context, key string, val any, expire time.Duration) error
  34. // Take takes the result from cache first, if not found,
  35. // query from DB and set cache using c.expiry, then return the result.
  36. Take(val any, key string, query func(val any) error) error
  37. // TakeCtx takes the result from cache first, if not found,
  38. // query from DB and set cache using c.expiry, then return the result.
  39. TakeCtx(ctx context.Context, val any, key string, query func(val any) error) error
  40. // TakeWithExpire takes the result from cache first, if not found,
  41. // query from DB and set cache using given expire, then return the result.
  42. TakeWithExpire(val any, key string, query func(val any, expire time.Duration) error) error
  43. // TakeWithExpireCtx takes the result from cache first, if not found,
  44. // query from DB and set cache using given expire, then return the result.
  45. TakeWithExpireCtx(ctx context.Context, val any, key string,
  46. query func(val any, expire time.Duration) error) error
  47. }
  48. cacheCluster struct {
  49. dispatcher *hash.ConsistentHash
  50. errNotFound error
  51. }
  52. )
  53. // New returns a Cache.
  54. func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error,
  55. opts ...Option) Cache {
  56. if len(c) == 0 || TotalWeights(c) <= 0 {
  57. log.Fatal("no cache nodes")
  58. }
  59. if len(c) == 1 {
  60. return NewNode(redis.MustNewRedis(c[0].RedisConf), barrier, st, errNotFound, opts...)
  61. }
  62. dispatcher := hash.NewConsistentHash()
  63. for _, node := range c {
  64. cn := NewNode(redis.MustNewRedis(node.RedisConf), barrier, st, errNotFound, opts...)
  65. dispatcher.AddWithWeight(cn, node.Weight)
  66. }
  67. return cacheCluster{
  68. dispatcher: dispatcher,
  69. errNotFound: errNotFound,
  70. }
  71. }
  72. // Del deletes cached values with keys.
  73. func (cc cacheCluster) Del(keys ...string) error {
  74. return cc.DelCtx(context.Background(), keys...)
  75. }
  76. // DelCtx deletes cached values with keys.
  77. func (cc cacheCluster) DelCtx(ctx context.Context, keys ...string) error {
  78. switch len(keys) {
  79. case 0:
  80. return nil
  81. case 1:
  82. key := keys[0]
  83. c, ok := cc.dispatcher.Get(key)
  84. if !ok {
  85. return cc.errNotFound
  86. }
  87. return c.(Cache).DelCtx(ctx, key)
  88. default:
  89. var be errorx.BatchError
  90. nodes := make(map[any][]string)
  91. for _, key := range keys {
  92. c, ok := cc.dispatcher.Get(key)
  93. if !ok {
  94. be.Add(fmt.Errorf("key %q not found", key))
  95. continue
  96. }
  97. nodes[c] = append(nodes[c], key)
  98. }
  99. for c, ks := range nodes {
  100. if err := c.(Cache).DelCtx(ctx, ks...); err != nil {
  101. be.Add(err)
  102. }
  103. }
  104. return be.Err()
  105. }
  106. }
  107. // Get gets the cache with key and fills into v.
  108. func (cc cacheCluster) Get(key string, val any) error {
  109. return cc.GetCtx(context.Background(), key, val)
  110. }
  111. // GetCtx gets the cache with key and fills into v.
  112. func (cc cacheCluster) GetCtx(ctx context.Context, key string, val any) error {
  113. c, ok := cc.dispatcher.Get(key)
  114. if !ok {
  115. return cc.errNotFound
  116. }
  117. return c.(Cache).GetCtx(ctx, key, val)
  118. }
  119. // IsNotFound checks if the given error is the defined errNotFound.
  120. func (cc cacheCluster) IsNotFound(err error) bool {
  121. return errors.Is(err, cc.errNotFound)
  122. }
  123. // Set sets the cache with key and v, using c.expiry.
  124. func (cc cacheCluster) Set(key string, val any) error {
  125. return cc.SetCtx(context.Background(), key, val)
  126. }
  127. // SetCtx sets the cache with key and v, using c.expiry.
  128. func (cc cacheCluster) SetCtx(ctx context.Context, key string, val any) error {
  129. c, ok := cc.dispatcher.Get(key)
  130. if !ok {
  131. return cc.errNotFound
  132. }
  133. return c.(Cache).SetCtx(ctx, key, val)
  134. }
  135. // SetWithExpire sets the cache with key and v, using given expire.
  136. func (cc cacheCluster) SetWithExpire(key string, val any, expire time.Duration) error {
  137. return cc.SetWithExpireCtx(context.Background(), key, val, expire)
  138. }
  139. // SetWithExpireCtx sets the cache with key and v, using given expire.
  140. func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, val any, expire time.Duration) error {
  141. c, ok := cc.dispatcher.Get(key)
  142. if !ok {
  143. return cc.errNotFound
  144. }
  145. return c.(Cache).SetWithExpireCtx(ctx, key, val, expire)
  146. }
  147. // Take takes the result from cache first, if not found,
  148. // query from DB and set cache using c.expiry, then return the result.
  149. func (cc cacheCluster) Take(val any, key string, query func(val any) error) error {
  150. return cc.TakeCtx(context.Background(), val, key, query)
  151. }
  152. // TakeCtx takes the result from cache first, if not found,
  153. // query from DB and set cache using c.expiry, then return the result.
  154. func (cc cacheCluster) TakeCtx(ctx context.Context, val any, key string, query func(val any) error) error {
  155. c, ok := cc.dispatcher.Get(key)
  156. if !ok {
  157. return cc.errNotFound
  158. }
  159. return c.(Cache).TakeCtx(ctx, val, key, query)
  160. }
  161. // TakeWithExpire takes the result from cache first, if not found,
  162. // query from DB and set cache using given expire, then return the result.
  163. func (cc cacheCluster) TakeWithExpire(val any, key string, query func(val any, expire time.Duration) error) error {
  164. return cc.TakeWithExpireCtx(context.Background(), val, key, query)
  165. }
  166. // TakeWithExpireCtx takes the result from cache first, if not found,
  167. // query from DB and set cache using given expire, then return the result.
  168. func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, val any, key string, query func(val any, expire time.Duration) error) error {
  169. c, ok := cc.dispatcher.Get(key)
  170. if !ok {
  171. return cc.errNotFound
  172. }
  173. return c.(Cache).TakeWithExpireCtx(ctx, val, key, query)
  174. }