collection.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. package mon
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "github.com/zeromicro/go-zero/core/breaker"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "github.com/zeromicro/go-zero/core/timex"
  9. "go.mongodb.org/mongo-driver/mongo"
  10. mopt "go.mongodb.org/mongo-driver/mongo/options"
  11. "go.mongodb.org/mongo-driver/x/mongo/driver/session"
  12. )
  13. const (
  14. defaultSlowThreshold = time.Millisecond * 500
  15. // spanName is the span name of the mongo calls.
  16. spanName = "mongo"
  17. // mongodb method names
  18. aggregate = "Aggregate"
  19. bulkWrite = "BulkWrite"
  20. countDocuments = "CountDocuments"
  21. deleteMany = "DeleteMany"
  22. deleteOne = "DeleteOne"
  23. distinct = "Distinct"
  24. estimatedDocumentCount = "EstimatedDocumentCount"
  25. find = "Find"
  26. findOne = "FindOne"
  27. findOneAndDelete = "FindOneAndDelete"
  28. findOneAndReplace = "FindOneAndReplace"
  29. findOneAndUpdate = "FindOneAndUpdate"
  30. insertMany = "InsertMany"
  31. insertOne = "InsertOne"
  32. replaceOne = "ReplaceOne"
  33. updateByID = "UpdateByID"
  34. updateMany = "UpdateMany"
  35. updateOne = "UpdateOne"
  36. )
  37. // ErrNotFound is an alias of mongo.ErrNoDocuments
  38. var ErrNotFound = mongo.ErrNoDocuments
  39. type (
  40. // Collection defines a MongoDB collection.
  41. Collection interface {
  42. // Aggregate executes an aggregation pipeline.
  43. Aggregate(ctx context.Context, pipeline any, opts ...*mopt.AggregateOptions) (
  44. *mongo.Cursor, error)
  45. // BulkWrite performs a bulk write operation.
  46. BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*mopt.BulkWriteOptions) (
  47. *mongo.BulkWriteResult, error)
  48. // Clone creates a copy of this collection with the same settings.
  49. Clone(opts ...*mopt.CollectionOptions) (*mongo.Collection, error)
  50. // CountDocuments returns the number of documents in the collection that match the filter.
  51. CountDocuments(ctx context.Context, filter any, opts ...*mopt.CountOptions) (int64, error)
  52. // Database returns the database that this collection is a part of.
  53. Database() *mongo.Database
  54. // DeleteMany deletes documents from the collection that match the filter.
  55. DeleteMany(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (
  56. *mongo.DeleteResult, error)
  57. // DeleteOne deletes at most one document from the collection that matches the filter.
  58. DeleteOne(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (
  59. *mongo.DeleteResult, error)
  60. // Distinct returns a list of distinct values for the given key across the collection.
  61. Distinct(ctx context.Context, fieldName string, filter any,
  62. opts ...*mopt.DistinctOptions) ([]any, error)
  63. // Drop drops this collection from database.
  64. Drop(ctx context.Context) error
  65. // EstimatedDocumentCount returns an estimate of the count of documents in a collection
  66. // using collection metadata.
  67. EstimatedDocumentCount(ctx context.Context, opts ...*mopt.EstimatedDocumentCountOptions) (int64, error)
  68. // Find finds the documents matching the provided filter.
  69. Find(ctx context.Context, filter any, opts ...*mopt.FindOptions) (*mongo.Cursor, error)
  70. // FindOne returns up to one document that matches the provided filter.
  71. FindOne(ctx context.Context, filter any, opts ...*mopt.FindOneOptions) (
  72. *mongo.SingleResult, error)
  73. // FindOneAndDelete returns at most one document that matches the filter. If the filter
  74. // matches multiple documents, only the first document is deleted.
  75. FindOneAndDelete(ctx context.Context, filter any, opts ...*mopt.FindOneAndDeleteOptions) (
  76. *mongo.SingleResult, error)
  77. // FindOneAndReplace returns at most one document that matches the filter. If the filter
  78. // matches multiple documents, FindOneAndReplace returns the first document in the
  79. // collection that matches the filter.
  80. FindOneAndReplace(ctx context.Context, filter, replacement any,
  81. opts ...*mopt.FindOneAndReplaceOptions) (*mongo.SingleResult, error)
  82. // FindOneAndUpdate returns at most one document that matches the filter. If the filter
  83. // matches multiple documents, FindOneAndUpdate returns the first document in the
  84. // collection that matches the filter.
  85. FindOneAndUpdate(ctx context.Context, filter, update any,
  86. opts ...*mopt.FindOneAndUpdateOptions) (*mongo.SingleResult, error)
  87. // Indexes returns the index view for this collection.
  88. Indexes() mongo.IndexView
  89. // InsertMany inserts the provided documents.
  90. InsertMany(ctx context.Context, documents []any, opts ...*mopt.InsertManyOptions) (
  91. *mongo.InsertManyResult, error)
  92. // InsertOne inserts the provided document.
  93. InsertOne(ctx context.Context, document any, opts ...*mopt.InsertOneOptions) (
  94. *mongo.InsertOneResult, error)
  95. // ReplaceOne replaces at most one document that matches the filter.
  96. ReplaceOne(ctx context.Context, filter, replacement any,
  97. opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error)
  98. // UpdateByID updates a single document matching the provided filter.
  99. UpdateByID(ctx context.Context, id, update any,
  100. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
  101. // UpdateMany updates the provided documents.
  102. UpdateMany(ctx context.Context, filter, update any,
  103. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
  104. // UpdateOne updates a single document matching the provided filter.
  105. UpdateOne(ctx context.Context, filter, update any,
  106. opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
  107. // Watch returns a change stream cursor used to receive notifications of changes to the collection.
  108. Watch(ctx context.Context, pipeline any, opts ...*mopt.ChangeStreamOptions) (
  109. *mongo.ChangeStream, error)
  110. }
  111. decoratedCollection struct {
  112. *mongo.Collection
  113. name string
  114. brk breaker.Breaker
  115. }
  116. keepablePromise struct {
  117. promise breaker.Promise
  118. log func(error)
  119. }
  120. )
  121. func newCollection(collection *mongo.Collection, brk breaker.Breaker) Collection {
  122. return &decoratedCollection{
  123. Collection: collection,
  124. name: collection.Name(),
  125. brk: brk,
  126. }
  127. }
  128. func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline any,
  129. opts ...*mopt.AggregateOptions) (cur *mongo.Cursor, err error) {
  130. ctx, span := startSpan(ctx, aggregate)
  131. defer func() {
  132. endSpan(span, err)
  133. }()
  134. err = c.brk.DoWithAcceptable(func() error {
  135. starTime := timex.Now()
  136. defer func() {
  137. c.logDurationSimple(ctx, aggregate, starTime, err)
  138. }()
  139. cur, err = c.Collection.Aggregate(ctx, pipeline, opts...)
  140. return err
  141. }, acceptable)
  142. return
  143. }
  144. func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
  145. opts ...*mopt.BulkWriteOptions) (res *mongo.BulkWriteResult, err error) {
  146. ctx, span := startSpan(ctx, bulkWrite)
  147. defer func() {
  148. endSpan(span, err)
  149. }()
  150. err = c.brk.DoWithAcceptable(func() error {
  151. startTime := timex.Now()
  152. defer func() {
  153. c.logDurationSimple(ctx, bulkWrite, startTime, err)
  154. }()
  155. res, err = c.Collection.BulkWrite(ctx, models, opts...)
  156. return err
  157. }, acceptable)
  158. return
  159. }
  160. func (c *decoratedCollection) CountDocuments(ctx context.Context, filter any,
  161. opts ...*mopt.CountOptions) (count int64, err error) {
  162. ctx, span := startSpan(ctx, countDocuments)
  163. defer func() {
  164. endSpan(span, err)
  165. }()
  166. err = c.brk.DoWithAcceptable(func() error {
  167. startTime := timex.Now()
  168. defer func() {
  169. c.logDurationSimple(ctx, countDocuments, startTime, err)
  170. }()
  171. count, err = c.Collection.CountDocuments(ctx, filter, opts...)
  172. return err
  173. }, acceptable)
  174. return
  175. }
  176. func (c *decoratedCollection) DeleteMany(ctx context.Context, filter any,
  177. opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
  178. ctx, span := startSpan(ctx, deleteMany)
  179. defer func() {
  180. endSpan(span, err)
  181. }()
  182. err = c.brk.DoWithAcceptable(func() error {
  183. startTime := timex.Now()
  184. defer func() {
  185. c.logDurationSimple(ctx, deleteMany, startTime, err)
  186. }()
  187. res, err = c.Collection.DeleteMany(ctx, filter, opts...)
  188. return err
  189. }, acceptable)
  190. return
  191. }
  192. func (c *decoratedCollection) DeleteOne(ctx context.Context, filter any,
  193. opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
  194. ctx, span := startSpan(ctx, deleteOne)
  195. defer func() {
  196. endSpan(span, err)
  197. }()
  198. err = c.brk.DoWithAcceptable(func() error {
  199. startTime := timex.Now()
  200. defer func() {
  201. c.logDuration(ctx, deleteOne, startTime, err, filter)
  202. }()
  203. res, err = c.Collection.DeleteOne(ctx, filter, opts...)
  204. return err
  205. }, acceptable)
  206. return
  207. }
  208. func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, filter any,
  209. opts ...*mopt.DistinctOptions) (val []any, err error) {
  210. ctx, span := startSpan(ctx, distinct)
  211. defer func() {
  212. endSpan(span, err)
  213. }()
  214. err = c.brk.DoWithAcceptable(func() error {
  215. startTime := timex.Now()
  216. defer func() {
  217. c.logDurationSimple(ctx, distinct, startTime, err)
  218. }()
  219. val, err = c.Collection.Distinct(ctx, fieldName, filter, opts...)
  220. return err
  221. }, acceptable)
  222. return
  223. }
  224. func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context,
  225. opts ...*mopt.EstimatedDocumentCountOptions) (val int64, err error) {
  226. ctx, span := startSpan(ctx, estimatedDocumentCount)
  227. defer func() {
  228. endSpan(span, err)
  229. }()
  230. err = c.brk.DoWithAcceptable(func() error {
  231. startTime := timex.Now()
  232. defer func() {
  233. c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err)
  234. }()
  235. val, err = c.Collection.EstimatedDocumentCount(ctx, opts...)
  236. return err
  237. }, acceptable)
  238. return
  239. }
  240. func (c *decoratedCollection) Find(ctx context.Context, filter any,
  241. opts ...*mopt.FindOptions) (cur *mongo.Cursor, err error) {
  242. ctx, span := startSpan(ctx, find)
  243. defer func() {
  244. endSpan(span, err)
  245. }()
  246. err = c.brk.DoWithAcceptable(func() error {
  247. startTime := timex.Now()
  248. defer func() {
  249. c.logDuration(ctx, find, startTime, err, filter)
  250. }()
  251. cur, err = c.Collection.Find(ctx, filter, opts...)
  252. return err
  253. }, acceptable)
  254. return
  255. }
  256. func (c *decoratedCollection) FindOne(ctx context.Context, filter any,
  257. opts ...*mopt.FindOneOptions) (res *mongo.SingleResult, err error) {
  258. ctx, span := startSpan(ctx, findOne)
  259. defer func() {
  260. endSpan(span, err)
  261. }()
  262. err = c.brk.DoWithAcceptable(func() error {
  263. startTime := timex.Now()
  264. defer func() {
  265. c.logDuration(ctx, findOne, startTime, err, filter)
  266. }()
  267. res = c.Collection.FindOne(ctx, filter, opts...)
  268. err = res.Err()
  269. return err
  270. }, acceptable)
  271. return
  272. }
  273. func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter any,
  274. opts ...*mopt.FindOneAndDeleteOptions) (res *mongo.SingleResult, err error) {
  275. ctx, span := startSpan(ctx, findOneAndDelete)
  276. defer func() {
  277. endSpan(span, err)
  278. }()
  279. err = c.brk.DoWithAcceptable(func() error {
  280. startTime := timex.Now()
  281. defer func() {
  282. c.logDuration(ctx, findOneAndDelete, startTime, err, filter)
  283. }()
  284. res = c.Collection.FindOneAndDelete(ctx, filter, opts...)
  285. err = res.Err()
  286. return err
  287. }, acceptable)
  288. return
  289. }
  290. func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter any,
  291. replacement any, opts ...*mopt.FindOneAndReplaceOptions) (
  292. res *mongo.SingleResult, err error) {
  293. ctx, span := startSpan(ctx, findOneAndReplace)
  294. defer func() {
  295. endSpan(span, err)
  296. }()
  297. err = c.brk.DoWithAcceptable(func() error {
  298. startTime := timex.Now()
  299. defer func() {
  300. c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement)
  301. }()
  302. res = c.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
  303. err = res.Err()
  304. return err
  305. }, acceptable)
  306. return
  307. }
  308. func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter, update any,
  309. opts ...*mopt.FindOneAndUpdateOptions) (res *mongo.SingleResult, err error) {
  310. ctx, span := startSpan(ctx, findOneAndUpdate)
  311. defer func() {
  312. endSpan(span, err)
  313. }()
  314. err = c.brk.DoWithAcceptable(func() error {
  315. startTime := timex.Now()
  316. defer func() {
  317. c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update)
  318. }()
  319. res = c.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
  320. err = res.Err()
  321. return err
  322. }, acceptable)
  323. return
  324. }
  325. func (c *decoratedCollection) InsertMany(ctx context.Context, documents []any,
  326. opts ...*mopt.InsertManyOptions) (res *mongo.InsertManyResult, err error) {
  327. ctx, span := startSpan(ctx, insertMany)
  328. defer func() {
  329. endSpan(span, err)
  330. }()
  331. err = c.brk.DoWithAcceptable(func() error {
  332. startTime := timex.Now()
  333. defer func() {
  334. c.logDurationSimple(ctx, insertMany, startTime, err)
  335. }()
  336. res, err = c.Collection.InsertMany(ctx, documents, opts...)
  337. return err
  338. }, acceptable)
  339. return
  340. }
  341. func (c *decoratedCollection) InsertOne(ctx context.Context, document any,
  342. opts ...*mopt.InsertOneOptions) (res *mongo.InsertOneResult, err error) {
  343. ctx, span := startSpan(ctx, insertOne)
  344. defer func() {
  345. endSpan(span, err)
  346. }()
  347. err = c.brk.DoWithAcceptable(func() error {
  348. startTime := timex.Now()
  349. defer func() {
  350. c.logDuration(ctx, insertOne, startTime, err, document)
  351. }()
  352. res, err = c.Collection.InsertOne(ctx, document, opts...)
  353. return err
  354. }, acceptable)
  355. return
  356. }
  357. func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter, replacement any,
  358. opts ...*mopt.ReplaceOptions) (res *mongo.UpdateResult, err error) {
  359. ctx, span := startSpan(ctx, replaceOne)
  360. defer func() {
  361. endSpan(span, err)
  362. }()
  363. err = c.brk.DoWithAcceptable(func() error {
  364. startTime := timex.Now()
  365. defer func() {
  366. c.logDuration(ctx, replaceOne, startTime, err, filter, replacement)
  367. }()
  368. res, err = c.Collection.ReplaceOne(ctx, filter, replacement, opts...)
  369. return err
  370. }, acceptable)
  371. return
  372. }
  373. func (c *decoratedCollection) UpdateByID(ctx context.Context, id, update any,
  374. opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
  375. ctx, span := startSpan(ctx, updateByID)
  376. defer func() {
  377. endSpan(span, err)
  378. }()
  379. err = c.brk.DoWithAcceptable(func() error {
  380. startTime := timex.Now()
  381. defer func() {
  382. c.logDuration(ctx, updateByID, startTime, err, id, update)
  383. }()
  384. res, err = c.Collection.UpdateByID(ctx, id, update, opts...)
  385. return err
  386. }, acceptable)
  387. return
  388. }
  389. func (c *decoratedCollection) UpdateMany(ctx context.Context, filter, update any,
  390. opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
  391. ctx, span := startSpan(ctx, updateMany)
  392. defer func() {
  393. endSpan(span, err)
  394. }()
  395. err = c.brk.DoWithAcceptable(func() error {
  396. startTime := timex.Now()
  397. defer func() {
  398. c.logDurationSimple(ctx, updateMany, startTime, err)
  399. }()
  400. res, err = c.Collection.UpdateMany(ctx, filter, update, opts...)
  401. return err
  402. }, acceptable)
  403. return
  404. }
  405. func (c *decoratedCollection) UpdateOne(ctx context.Context, filter, update any,
  406. opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
  407. ctx, span := startSpan(ctx, updateOne)
  408. defer func() {
  409. endSpan(span, err)
  410. }()
  411. err = c.brk.DoWithAcceptable(func() error {
  412. startTime := timex.Now()
  413. defer func() {
  414. c.logDuration(ctx, updateOne, startTime, err, filter, update)
  415. }()
  416. res, err = c.Collection.UpdateOne(ctx, filter, update, opts...)
  417. return err
  418. }, acceptable)
  419. return
  420. }
  421. func (c *decoratedCollection) logDuration(ctx context.Context, method string,
  422. startTime time.Duration, err error, docs ...any) {
  423. duration := timex.Since(startTime)
  424. logger := logx.WithContext(ctx).WithDuration(duration)
  425. content, jerr := json.Marshal(docs)
  426. // jerr should not be non-nil, but we don't care much on this,
  427. // if non-nil, we just log without docs.
  428. if jerr != nil {
  429. if err != nil {
  430. if duration > slowThreshold.Load() {
  431. logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s)", c.name, method, err.Error())
  432. } else {
  433. logger.Infof("mongo(%s) - %s - fail(%s)", c.name, method, err.Error())
  434. }
  435. } else {
  436. if duration > slowThreshold.Load() {
  437. logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - ok", c.name, method)
  438. } else {
  439. logger.Infof("mongo(%s) - %s - ok", c.name, method)
  440. }
  441. }
  442. } else if err != nil {
  443. if duration > slowThreshold.Load() {
  444. logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
  445. c.name, method, err.Error(), string(content))
  446. } else {
  447. logger.Infof("mongo(%s) - %s - fail(%s) - %s",
  448. c.name, method, err.Error(), string(content))
  449. }
  450. } else {
  451. if duration > slowThreshold.Load() {
  452. logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
  453. c.name, method, string(content))
  454. } else {
  455. logger.Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
  456. }
  457. }
  458. }
  459. func (c *decoratedCollection) logDurationSimple(ctx context.Context, method string, startTime time.Duration, err error) {
  460. logDuration(ctx, c.name, method, startTime, err)
  461. }
  462. func (p keepablePromise) accept(err error) error {
  463. p.promise.Accept()
  464. p.log(err)
  465. return err
  466. }
  467. func (p keepablePromise) keep(err error) error {
  468. if acceptable(err) {
  469. p.promise.Accept()
  470. } else {
  471. p.promise.Reject(err.Error())
  472. }
  473. p.log(err)
  474. return err
  475. }
  476. func acceptable(err error) bool {
  477. return err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilValue ||
  478. err == mongo.ErrNilDocument || err == mongo.ErrNilCursor || err == mongo.ErrEmptySlice ||
  479. // session errors
  480. err == session.ErrSessionEnded || err == session.ErrNoTransactStarted ||
  481. err == session.ErrTransactInProgress || err == session.ErrAbortAfterCommit ||
  482. err == session.ErrAbortTwice || err == session.ErrCommitAfterAbort ||
  483. err == session.ErrUnackWCUnsupported || err == session.ErrSnapshotTransaction
  484. }