model.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package mon
  2. import (
  3. "context"
  4. "log"
  5. "strings"
  6. "github.com/zeromicro/go-zero/core/breaker"
  7. "github.com/zeromicro/go-zero/core/timex"
  8. "go.mongodb.org/mongo-driver/mongo"
  9. mopt "go.mongodb.org/mongo-driver/mongo/options"
  10. )
  11. // Model is a mongodb store model that represents a collection.
  12. type Model struct {
  13. Collection
  14. name string
  15. cli *mongo.Client
  16. brk breaker.Breaker
  17. opts []Option
  18. }
  19. // MustNewModel returns a Model, exits on errors.
  20. func MustNewModel(uri, db, collection string, opts ...Option) *Model {
  21. model, err := NewModel(uri, db, collection, opts...)
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. return model
  26. }
  27. // NewModel returns a Model.
  28. func NewModel(uri, db, collection string, opts ...Option) (*Model, error) {
  29. cli, err := getClient(uri)
  30. if err != nil {
  31. return nil, err
  32. }
  33. name := strings.Join([]string{uri, collection}, "/")
  34. brk := breaker.GetBreaker(uri)
  35. coll := newCollection(cli.Database(db).Collection(collection), brk)
  36. return newModel(name, cli, coll, brk, opts...), nil
  37. }
  38. func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Breaker,
  39. opts ...Option) *Model {
  40. return &Model{
  41. name: name,
  42. Collection: coll,
  43. cli: cli,
  44. brk: brk,
  45. opts: opts,
  46. }
  47. }
  48. // StartSession starts a new session.
  49. func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err error) {
  50. err = m.brk.DoWithAcceptable(func() error {
  51. starTime := timex.Now()
  52. defer func() {
  53. logDuration(m.name, "StartSession", starTime, err)
  54. }()
  55. sess, err = m.cli.StartSession(opts...)
  56. return err
  57. }, acceptable)
  58. return
  59. }
  60. // Aggregate executes an aggregation pipeline.
  61. func (m *Model) Aggregate(ctx context.Context, v, pipeline interface{}, opts ...*mopt.AggregateOptions) error {
  62. cur, err := m.Collection.Aggregate(ctx, pipeline, opts...)
  63. if err != nil {
  64. return err
  65. }
  66. defer cur.Close(ctx)
  67. return cur.All(ctx, v)
  68. }
  69. // DeleteMany deletes documents that match the filter.
  70. func (m *Model) DeleteMany(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (int64, error) {
  71. res, err := m.Collection.DeleteMany(ctx, filter, opts...)
  72. if err != nil {
  73. return 0, err
  74. }
  75. return res.DeletedCount, nil
  76. }
  77. // DeleteOne deletes the first document that matches the filter.
  78. func (m *Model) DeleteOne(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (int64, error) {
  79. res, err := m.Collection.DeleteOne(ctx, filter, opts...)
  80. if err != nil {
  81. return 0, err
  82. }
  83. return res.DeletedCount, nil
  84. }
  85. // Find finds documents that match the filter.
  86. func (m *Model) Find(ctx context.Context, v, filter interface{}, opts ...*mopt.FindOptions) error {
  87. cur, err := m.Collection.Find(ctx, filter, opts...)
  88. if err != nil {
  89. return err
  90. }
  91. defer cur.Close(ctx)
  92. return cur.All(ctx, v)
  93. }
  94. // FindOne finds the first document that matches the filter.
  95. func (m *Model) FindOne(ctx context.Context, v, filter interface{}, opts ...*mopt.FindOneOptions) error {
  96. res, err := m.Collection.FindOne(ctx, filter, opts...)
  97. if err != nil {
  98. return err
  99. }
  100. return res.Decode(v)
  101. }
  102. // FindOneAndDelete finds a single document and deletes it.
  103. func (m *Model) FindOneAndDelete(ctx context.Context, v, filter interface{},
  104. opts ...*mopt.FindOneAndDeleteOptions) error {
  105. res, err := m.Collection.FindOneAndDelete(ctx, filter, opts...)
  106. if err != nil {
  107. return err
  108. }
  109. return res.Decode(v)
  110. }
  111. // FindOneAndReplace finds a single document and replaces it.
  112. func (m *Model) FindOneAndReplace(ctx context.Context, v, filter interface{}, replacement interface{},
  113. opts ...*mopt.FindOneAndReplaceOptions) error {
  114. res, err := m.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
  115. if err != nil {
  116. return err
  117. }
  118. return res.Decode(v)
  119. }
  120. // FindOneAndUpdate finds a single document and updates it.
  121. func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter interface{}, update interface{},
  122. opts ...*mopt.FindOneAndUpdateOptions) error {
  123. res, err := m.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
  124. if err != nil {
  125. return err
  126. }
  127. return res.Decode(v)
  128. }