123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- package mon
- import (
- "context"
- "errors"
- "time"
- "github.com/wuntsong-org/go-zero-plus/core/breaker"
- "github.com/wuntsong-org/go-zero-plus/core/timex"
- "go.mongodb.org/mongo-driver/mongo"
- mopt "go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/x/mongo/driver/session"
- )
- const (
- defaultSlowThreshold = time.Millisecond * 500
- // spanName is the span name of the mongo calls.
- spanName = "mongo"
- // mongodb method names
- aggregate = "Aggregate"
- bulkWrite = "BulkWrite"
- countDocuments = "CountDocuments"
- deleteMany = "DeleteMany"
- deleteOne = "DeleteOne"
- distinct = "Distinct"
- estimatedDocumentCount = "EstimatedDocumentCount"
- find = "Find"
- findOne = "FindOne"
- findOneAndDelete = "FindOneAndDelete"
- findOneAndReplace = "FindOneAndReplace"
- findOneAndUpdate = "FindOneAndUpdate"
- insertMany = "InsertMany"
- insertOne = "InsertOne"
- replaceOne = "ReplaceOne"
- updateByID = "UpdateByID"
- updateMany = "UpdateMany"
- updateOne = "UpdateOne"
- )
- // ErrNotFound is an alias of mongo.ErrNoDocuments
- var ErrNotFound = mongo.ErrNoDocuments
- type (
- // Collection defines a MongoDB collection.
- Collection interface {
- // Aggregate executes an aggregation pipeline.
- Aggregate(ctx context.Context, pipeline any, opts ...*mopt.AggregateOptions) (
- *mongo.Cursor, error)
- // BulkWrite performs a bulk write operation.
- BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*mopt.BulkWriteOptions) (
- *mongo.BulkWriteResult, error)
- // Clone creates a copy of this collection with the same settings.
- Clone(opts ...*mopt.CollectionOptions) (*mongo.Collection, error)
- // CountDocuments returns the number of documents in the collection that match the filter.
- CountDocuments(ctx context.Context, filter any, opts ...*mopt.CountOptions) (int64, error)
- // Database returns the database that this collection is a part of.
- Database() *mongo.Database
- // DeleteMany deletes documents from the collection that match the filter.
- DeleteMany(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (
- *mongo.DeleteResult, error)
- // DeleteOne deletes at most one document from the collection that matches the filter.
- DeleteOne(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (
- *mongo.DeleteResult, error)
- // Distinct returns a list of distinct values for the given key across the collection.
- Distinct(ctx context.Context, fieldName string, filter any,
- opts ...*mopt.DistinctOptions) ([]any, error)
- // Drop drops this collection from database.
- Drop(ctx context.Context) error
- // EstimatedDocumentCount returns an estimate of the count of documents in a collection
- // using collection metadata.
- EstimatedDocumentCount(ctx context.Context, opts ...*mopt.EstimatedDocumentCountOptions) (int64, error)
- // Find finds the documents matching the provided filter.
- Find(ctx context.Context, filter any, opts ...*mopt.FindOptions) (*mongo.Cursor, error)
- // FindOne returns up to one document that matches the provided filter.
- FindOne(ctx context.Context, filter any, opts ...*mopt.FindOneOptions) (
- *mongo.SingleResult, error)
- // FindOneAndDelete returns at most one document that matches the filter. If the filter
- // matches multiple documents, only the first document is deleted.
- FindOneAndDelete(ctx context.Context, filter any, opts ...*mopt.FindOneAndDeleteOptions) (
- *mongo.SingleResult, error)
- // FindOneAndReplace returns at most one document that matches the filter. If the filter
- // matches multiple documents, FindOneAndReplace returns the first document in the
- // collection that matches the filter.
- FindOneAndReplace(ctx context.Context, filter, replacement any,
- opts ...*mopt.FindOneAndReplaceOptions) (*mongo.SingleResult, error)
- // FindOneAndUpdate returns at most one document that matches the filter. If the filter
- // matches multiple documents, FindOneAndUpdate returns the first document in the
- // collection that matches the filter.
- FindOneAndUpdate(ctx context.Context, filter, update any,
- opts ...*mopt.FindOneAndUpdateOptions) (*mongo.SingleResult, error)
- // Indexes returns the index view for this collection.
- Indexes() mongo.IndexView
- // InsertMany inserts the provided documents.
- InsertMany(ctx context.Context, documents []any, opts ...*mopt.InsertManyOptions) (
- *mongo.InsertManyResult, error)
- // InsertOne inserts the provided document.
- InsertOne(ctx context.Context, document any, opts ...*mopt.InsertOneOptions) (
- *mongo.InsertOneResult, error)
- // ReplaceOne replaces at most one document that matches the filter.
- ReplaceOne(ctx context.Context, filter, replacement any,
- opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error)
- // UpdateByID updates a single document matching the provided filter.
- UpdateByID(ctx context.Context, id, update any,
- opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
- // UpdateMany updates the provided documents.
- UpdateMany(ctx context.Context, filter, update any,
- opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
- // UpdateOne updates a single document matching the provided filter.
- UpdateOne(ctx context.Context, filter, update any,
- opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error)
- // Watch returns a change stream cursor used to receive notifications of changes to the collection.
- Watch(ctx context.Context, pipeline any, opts ...*mopt.ChangeStreamOptions) (
- *mongo.ChangeStream, error)
- }
- decoratedCollection struct {
- *mongo.Collection
- name string
- brk breaker.Breaker
- }
- keepablePromise struct {
- promise breaker.Promise
- log func(error)
- }
- )
- func newCollection(collection *mongo.Collection, brk breaker.Breaker) Collection {
- return &decoratedCollection{
- Collection: collection,
- name: collection.Name(),
- brk: brk,
- }
- }
- func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline any,
- opts ...*mopt.AggregateOptions) (cur *mongo.Cursor, err error) {
- ctx, span := startSpan(ctx, aggregate)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- starTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, aggregate, starTime, err)
- }()
- cur, err = c.Collection.Aggregate(ctx, pipeline, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
- opts ...*mopt.BulkWriteOptions) (res *mongo.BulkWriteResult, err error) {
- ctx, span := startSpan(ctx, bulkWrite)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, bulkWrite, startTime, err)
- }()
- res, err = c.Collection.BulkWrite(ctx, models, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) CountDocuments(ctx context.Context, filter any,
- opts ...*mopt.CountOptions) (count int64, err error) {
- ctx, span := startSpan(ctx, countDocuments)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, countDocuments, startTime, err)
- }()
- count, err = c.Collection.CountDocuments(ctx, filter, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) DeleteMany(ctx context.Context, filter any,
- opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
- ctx, span := startSpan(ctx, deleteMany)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, deleteMany, startTime, err)
- }()
- res, err = c.Collection.DeleteMany(ctx, filter, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) DeleteOne(ctx context.Context, filter any,
- opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
- ctx, span := startSpan(ctx, deleteOne)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, deleteOne, startTime, err, filter)
- }()
- res, err = c.Collection.DeleteOne(ctx, filter, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, filter any,
- opts ...*mopt.DistinctOptions) (val []any, err error) {
- ctx, span := startSpan(ctx, distinct)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, distinct, startTime, err)
- }()
- val, err = c.Collection.Distinct(ctx, fieldName, filter, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context,
- opts ...*mopt.EstimatedDocumentCountOptions) (val int64, err error) {
- ctx, span := startSpan(ctx, estimatedDocumentCount)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err)
- }()
- val, err = c.Collection.EstimatedDocumentCount(ctx, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) Find(ctx context.Context, filter any,
- opts ...*mopt.FindOptions) (cur *mongo.Cursor, err error) {
- ctx, span := startSpan(ctx, find)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, find, startTime, err, filter)
- }()
- cur, err = c.Collection.Find(ctx, filter, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) FindOne(ctx context.Context, filter any,
- opts ...*mopt.FindOneOptions) (res *mongo.SingleResult, err error) {
- ctx, span := startSpan(ctx, findOne)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, findOne, startTime, err, filter)
- }()
- res = c.Collection.FindOne(ctx, filter, opts...)
- err = res.Err()
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter any,
- opts ...*mopt.FindOneAndDeleteOptions) (res *mongo.SingleResult, err error) {
- ctx, span := startSpan(ctx, findOneAndDelete)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, findOneAndDelete, startTime, err, filter)
- }()
- res = c.Collection.FindOneAndDelete(ctx, filter, opts...)
- err = res.Err()
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter any,
- replacement any, opts ...*mopt.FindOneAndReplaceOptions) (
- res *mongo.SingleResult, err error) {
- ctx, span := startSpan(ctx, findOneAndReplace)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement)
- }()
- res = c.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
- err = res.Err()
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter, update any,
- opts ...*mopt.FindOneAndUpdateOptions) (res *mongo.SingleResult, err error) {
- ctx, span := startSpan(ctx, findOneAndUpdate)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update)
- }()
- res = c.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
- err = res.Err()
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) InsertMany(ctx context.Context, documents []any,
- opts ...*mopt.InsertManyOptions) (res *mongo.InsertManyResult, err error) {
- ctx, span := startSpan(ctx, insertMany)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, insertMany, startTime, err)
- }()
- res, err = c.Collection.InsertMany(ctx, documents, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) InsertOne(ctx context.Context, document any,
- opts ...*mopt.InsertOneOptions) (res *mongo.InsertOneResult, err error) {
- ctx, span := startSpan(ctx, insertOne)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, insertOne, startTime, err, document)
- }()
- res, err = c.Collection.InsertOne(ctx, document, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter, replacement any,
- opts ...*mopt.ReplaceOptions) (res *mongo.UpdateResult, err error) {
- ctx, span := startSpan(ctx, replaceOne)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, replaceOne, startTime, err, filter, replacement)
- }()
- res, err = c.Collection.ReplaceOne(ctx, filter, replacement, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) UpdateByID(ctx context.Context, id, update any,
- opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
- ctx, span := startSpan(ctx, updateByID)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, updateByID, startTime, err, id, update)
- }()
- res, err = c.Collection.UpdateByID(ctx, id, update, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) UpdateMany(ctx context.Context, filter, update any,
- opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
- ctx, span := startSpan(ctx, updateMany)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDurationSimple(ctx, updateMany, startTime, err)
- }()
- res, err = c.Collection.UpdateMany(ctx, filter, update, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) UpdateOne(ctx context.Context, filter, update any,
- opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
- ctx, span := startSpan(ctx, updateOne)
- defer func() {
- endSpan(span, err)
- }()
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- c.logDuration(ctx, updateOne, startTime, err, filter, update)
- }()
- res, err = c.Collection.UpdateOne(ctx, filter, update, opts...)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) logDuration(ctx context.Context, method string,
- startTime time.Duration, err error, docs ...any) {
- logDurationWithDocs(ctx, c.name, method, startTime, err, docs...)
- }
- func (c *decoratedCollection) logDurationSimple(ctx context.Context, method string,
- startTime time.Duration, err error) {
- logDuration(ctx, c.name, method, startTime, err)
- }
- func (p keepablePromise) accept(err error) error {
- p.promise.Accept()
- p.log(err)
- return err
- }
- func (p keepablePromise) keep(err error) error {
- if acceptable(err) {
- p.promise.Accept()
- } else {
- p.promise.Reject(err.Error())
- }
- p.log(err)
- return err
- }
- func acceptable(err error) bool {
- return err == nil ||
- errors.Is(err, mongo.ErrNoDocuments) ||
- errors.Is(err, mongo.ErrNilValue) ||
- errors.Is(err, mongo.ErrNilDocument) ||
- errors.Is(err, mongo.ErrNilCursor) ||
- errors.Is(err, mongo.ErrEmptySlice) ||
- // session errors
- errors.Is(err, session.ErrSessionEnded) ||
- errors.Is(err, session.ErrNoTransactStarted) ||
- errors.Is(err, session.ErrTransactInProgress) ||
- errors.Is(err, session.ErrAbortAfterCommit) ||
- errors.Is(err, session.ErrAbortTwice) ||
- errors.Is(err, session.ErrCommitAfterAbort) ||
- errors.Is(err, session.ErrUnackWCUnsupported) ||
- errors.Is(err, session.ErrSnapshotTransaction)
- }
|