cachedmodel.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package monc
  2. import (
  3. "context"
  4. "log"
  5. "github.com/zeromicro/go-zero/core/stores/cache"
  6. "github.com/zeromicro/go-zero/core/stores/mon"
  7. "github.com/zeromicro/go-zero/core/stores/redis"
  8. "github.com/zeromicro/go-zero/core/syncx"
  9. "go.mongodb.org/mongo-driver/mongo"
  10. mopt "go.mongodb.org/mongo-driver/mongo/options"
  11. )
  12. var (
  13. // ErrNotFound is an alias of mongo.ErrNoDocuments.
  14. ErrNotFound = mongo.ErrNoDocuments
  15. // can't use one SingleFlight per conn, because multiple conns may share the same cache key.
  16. singleFlight = syncx.NewSingleFlight()
  17. stats = cache.NewStat("monc")
  18. )
  19. // A Model is a mongo model that built with cache capability.
  20. type Model struct {
  21. *mon.Model
  22. cache cache.Cache
  23. }
  24. // MustNewModel returns a Model with a cache cluster, exists on errors.
  25. func MustNewModel(uri, db, collection string, c cache.CacheConf, opts ...cache.Option) *Model {
  26. model, err := NewModel(uri, db, collection, c, opts...)
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30. return model
  31. }
  32. // MustNewNodeModel returns a Model with a cache node, exists on errors.
  33. func MustNewNodeModel(uri, db, collection string, rds *redis.Redis, opts ...cache.Option) *Model {
  34. model, err := NewNodeModel(uri, db, collection, rds, opts...)
  35. if err != nil {
  36. log.Fatal(err)
  37. }
  38. return model
  39. }
  40. // NewModel returns a Model with a cache cluster.
  41. func NewModel(uri, db, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
  42. c := cache.New(conf, singleFlight, stats, mongo.ErrNoDocuments, opts...)
  43. return NewModelWithCache(uri, db, collection, c)
  44. }
  45. // NewModelWithCache returns a Model with a custom cache.
  46. func NewModelWithCache(uri, db, collection string, c cache.Cache) (*Model, error) {
  47. return newModel(uri, db, collection, c)
  48. }
  49. // NewNodeModel returns a Model with a cache node.
  50. func NewNodeModel(uri, db, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
  51. c := cache.NewNode(rds, singleFlight, stats, mongo.ErrNoDocuments, opts...)
  52. return NewModelWithCache(uri, db, collection, c)
  53. }
  54. // newModel returns a Model with the given cache.
  55. func newModel(uri, db, collection string, c cache.Cache) (*Model, error) {
  56. model, err := mon.NewModel(uri, db, collection)
  57. if err != nil {
  58. return nil, err
  59. }
  60. return &Model{
  61. Model: model,
  62. cache: c,
  63. }, nil
  64. }
  65. // DelCache deletes the cache with given keys.
  66. func (mm *Model) DelCache(ctx context.Context, keys ...string) error {
  67. return mm.cache.DelCtx(ctx, keys...)
  68. }
  69. // DeleteOne deletes the document with given filter, and remove it from cache.
  70. func (mm *Model) DeleteOne(ctx context.Context, key string, filter interface{},
  71. opts ...*mopt.DeleteOptions) (int64, error) {
  72. val, err := mm.Model.DeleteOne(ctx, filter, opts...)
  73. if err != nil {
  74. return 0, err
  75. }
  76. if err := mm.DelCache(ctx, key); err != nil {
  77. return 0, err
  78. }
  79. return val, nil
  80. }
  81. // DeleteOneNoCache deletes the document with given filter.
  82. func (mm *Model) DeleteOneNoCache(ctx context.Context, filter interface{},
  83. opts ...*mopt.DeleteOptions) (int64, error) {
  84. return mm.Model.DeleteOne(ctx, filter, opts...)
  85. }
  86. // FindOne unmarshals a record into v with given key and query.
  87. func (mm *Model) FindOne(ctx context.Context, key string, v, filter interface{},
  88. opts ...*mopt.FindOneOptions) error {
  89. return mm.cache.TakeCtx(ctx, v, key, func(v interface{}) error {
  90. return mm.Model.FindOne(ctx, v, filter, opts...)
  91. })
  92. }
  93. // FindOneNoCache unmarshals a record into v with query, without cache.
  94. func (mm *Model) FindOneNoCache(ctx context.Context, v, filter interface{},
  95. opts ...*mopt.FindOneOptions) error {
  96. return mm.Model.FindOne(ctx, v, filter, opts...)
  97. }
  98. // FindOneAndDelete deletes the document with given filter, and unmarshals it into v.
  99. func (mm *Model) FindOneAndDelete(ctx context.Context, key string, v, filter interface{},
  100. opts ...*mopt.FindOneAndDeleteOptions) error {
  101. if err := mm.Model.FindOneAndDelete(ctx, v, filter, opts...); err != nil {
  102. return err
  103. }
  104. return mm.DelCache(ctx, key)
  105. }
  106. // FindOneAndDeleteNoCache deletes the document with given filter, and unmarshals it into v.
  107. func (mm *Model) FindOneAndDeleteNoCache(ctx context.Context, v, filter interface{},
  108. opts ...*mopt.FindOneAndDeleteOptions) error {
  109. return mm.Model.FindOneAndDelete(ctx, v, filter, opts...)
  110. }
  111. // FindOneAndReplace replaces the document with given filter with replacement, and unmarshals it into v.
  112. func (mm *Model) FindOneAndReplace(ctx context.Context, key string, v, filter interface{},
  113. replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) error {
  114. if err := mm.Model.FindOneAndReplace(ctx, v, filter, replacement, opts...); err != nil {
  115. return err
  116. }
  117. return mm.DelCache(ctx, key)
  118. }
  119. // FindOneAndReplaceNoCache replaces the document with given filter with replacement, and unmarshals it into v.
  120. func (mm *Model) FindOneAndReplaceNoCache(ctx context.Context, v, filter interface{},
  121. replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) error {
  122. return mm.Model.FindOneAndReplace(ctx, v, filter, replacement, opts...)
  123. }
  124. // FindOneAndUpdate updates the document with given filter with update, and unmarshals it into v.
  125. func (mm *Model) FindOneAndUpdate(ctx context.Context, key string, v, filter interface{},
  126. update interface{}, opts ...*mopt.FindOneAndUpdateOptions) error {
  127. if err := mm.Model.FindOneAndUpdate(ctx, v, filter, update, opts...); err != nil {
  128. return err
  129. }
  130. return mm.DelCache(ctx, key)
  131. }
  132. // FindOneAndUpdateNoCache updates the document with given filter with update, and unmarshals it into v.
  133. func (mm *Model) FindOneAndUpdateNoCache(ctx context.Context, v, filter interface{},
  134. update interface{}, opts ...*mopt.FindOneAndUpdateOptions) error {
  135. return mm.Model.FindOneAndUpdate(ctx, v, filter, update, opts...)
  136. }
  137. // GetCache unmarshal the cache into v with given key.
  138. func (mm *Model) GetCache(key string, v interface{}) error {
  139. return mm.cache.Get(key, v)
  140. }
  141. // InsertOne inserts a single document into the collection, and remove the cache placeholder.
  142. func (mm *Model) InsertOne(ctx context.Context, key string, document interface{},
  143. opts ...*mopt.InsertOneOptions) (*mongo.InsertOneResult, error) {
  144. res, err := mm.Model.InsertOne(ctx, document, opts...)
  145. if err != nil {
  146. return nil, err
  147. }
  148. if err = mm.DelCache(ctx, key); err != nil {
  149. return nil, err
  150. }
  151. return res, nil
  152. }
  153. // InsertOneNoCache inserts a single document into the collection.
  154. func (mm *Model) InsertOneNoCache(ctx context.Context, document interface{},
  155. opts ...*mopt.InsertOneOptions) (*mongo.InsertOneResult, error) {
  156. return mm.Model.InsertOne(ctx, document, opts...)
  157. }
  158. // ReplaceOne replaces a single document in the collection, and remove the cache.
  159. func (mm *Model) ReplaceOne(ctx context.Context, key string, filter interface{}, replacement interface{},
  160. opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error) {
  161. res, err := mm.Model.ReplaceOne(ctx, filter, replacement, opts...)
  162. if err != nil {
  163. return nil, err
  164. }
  165. if err = mm.DelCache(ctx, key); err != nil {
  166. return nil, err
  167. }
  168. return res, nil
  169. }
  170. // ReplaceOneNoCache replaces a single document in the collection.
  171. func (mm *Model) ReplaceOneNoCache(ctx context.Context, filter interface{}, replacement interface{},
  172. opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error) {
  173. return mm.Model.ReplaceOne(ctx, filter, replacement, opts...)
  174. }
  175. // SetCache sets the cache with given key and value.
  176. func (mm *Model) SetCache(key string, v interface{}) error {
  177. return mm.cache.Set(key, v)
  178. }
  179. // UpdateByID updates the document with given id with update, and remove the cache.
  180. func (mm *Model) UpdateByID(ctx context.Context, key string, id interface{}, update interface{},
  181. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  182. res, err := mm.Model.UpdateByID(ctx, id, update, opts...)
  183. if err != nil {
  184. return nil, err
  185. }
  186. if err = mm.DelCache(ctx, key); err != nil {
  187. return nil, err
  188. }
  189. return res, nil
  190. }
  191. // UpdateByIDNoCache updates the document with given id with update.
  192. func (mm *Model) UpdateByIDNoCache(ctx context.Context, id interface{}, update interface{},
  193. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  194. return mm.Model.UpdateByID(ctx, id, update, opts...)
  195. }
  196. // UpdateMany updates the documents that match filter with update, and remove the cache.
  197. func (mm *Model) UpdateMany(ctx context.Context, keys []string, filter interface{}, update interface{},
  198. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  199. res, err := mm.Model.UpdateMany(ctx, filter, update, opts...)
  200. if err != nil {
  201. return nil, err
  202. }
  203. if err = mm.DelCache(ctx, keys...); err != nil {
  204. return nil, err
  205. }
  206. return res, nil
  207. }
  208. // UpdateManyNoCache updates the documents that match filter with update.
  209. func (mm *Model) UpdateManyNoCache(ctx context.Context, filter interface{}, update interface{},
  210. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  211. return mm.Model.UpdateMany(ctx, filter, update, opts...)
  212. }
  213. // UpdateOne updates the first document that matches filter with update, and remove the cache.
  214. func (mm *Model) UpdateOne(ctx context.Context, key string, filter interface{}, update interface{},
  215. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  216. res, err := mm.Model.UpdateOne(ctx, filter, update, opts...)
  217. if err != nil {
  218. return nil, err
  219. }
  220. if err = mm.DelCache(ctx, key); err != nil {
  221. return nil, err
  222. }
  223. return res, nil
  224. }
  225. // UpdateOneNoCache updates the first document that matches filter with update.
  226. func (mm *Model) UpdateOneNoCache(ctx context.Context, filter interface{}, update interface{},
  227. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) {
  228. return mm.Model.UpdateOne(ctx, filter, update, opts...)
  229. }