model.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package mongo
  2. import (
  3. "log"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. "github.com/zeromicro/go-zero/core/breaker"
  7. )
  8. // A Model is a mongo model.
  9. type Model struct {
  10. session *concurrentSession
  11. db *mgo.Database
  12. collection string
  13. brk breaker.Breaker
  14. opts []Option
  15. }
  16. // MustNewModel returns a Model, exits on errors.
  17. func MustNewModel(url, collection string, opts ...Option) *Model {
  18. model, err := NewModel(url, collection, opts...)
  19. if err != nil {
  20. log.Fatal(err)
  21. }
  22. return model
  23. }
  24. // NewModel returns a Model.
  25. func NewModel(url, collection string, opts ...Option) (*Model, error) {
  26. session, err := getConcurrentSession(url)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &Model{
  31. session: session,
  32. // If name is empty, the database name provided in the dialed URL is used instead
  33. db: session.DB(""),
  34. collection: collection,
  35. brk: breaker.GetBreaker(url),
  36. opts: opts,
  37. }, nil
  38. }
  39. // Find finds a record with given query.
  40. func (mm *Model) Find(query interface{}) (Query, error) {
  41. return mm.query(func(c Collection) Query {
  42. return c.Find(query)
  43. })
  44. }
  45. // FindId finds a record with given id.
  46. func (mm *Model) FindId(id interface{}) (Query, error) {
  47. return mm.query(func(c Collection) Query {
  48. return c.FindId(id)
  49. })
  50. }
  51. // GetCollection returns a Collection with given session.
  52. func (mm *Model) GetCollection(session *mgo.Session) Collection {
  53. return newCollection(mm.db.C(mm.collection).With(session), mm.brk)
  54. }
  55. // Insert inserts docs into mm.
  56. func (mm *Model) Insert(docs ...interface{}) error {
  57. return mm.execute(func(c Collection) error {
  58. return c.Insert(docs...)
  59. })
  60. }
  61. // Pipe returns a Pipe with given pipeline.
  62. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
  63. return mm.pipe(func(c Collection) Pipe {
  64. return c.Pipe(pipeline)
  65. })
  66. }
  67. // PutSession returns the given session.
  68. func (mm *Model) PutSession(session *mgo.Session) {
  69. mm.session.putSession(session)
  70. }
  71. // Remove removes the records with given selector.
  72. func (mm *Model) Remove(selector interface{}) error {
  73. return mm.execute(func(c Collection) error {
  74. return c.Remove(selector)
  75. })
  76. }
  77. // RemoveAll removes all with given selector and returns a mgo.ChangeInfo.
  78. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
  79. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  80. return c.RemoveAll(selector)
  81. })
  82. }
  83. // RemoveId removes a record with given id.
  84. func (mm *Model) RemoveId(id interface{}) error {
  85. return mm.execute(func(c Collection) error {
  86. return c.RemoveId(id)
  87. })
  88. }
  89. // TakeSession gets a session.
  90. func (mm *Model) TakeSession() (*mgo.Session, error) {
  91. return mm.session.takeSession(mm.opts...)
  92. }
  93. // Update updates a record with given selector.
  94. func (mm *Model) Update(selector, update interface{}) error {
  95. return mm.execute(func(c Collection) error {
  96. return c.Update(selector, update)
  97. })
  98. }
  99. // UpdateId updates a record with given id.
  100. func (mm *Model) UpdateId(id, update interface{}) error {
  101. return mm.execute(func(c Collection) error {
  102. return c.UpdateId(id, update)
  103. })
  104. }
  105. // Upsert upserts a record with given selector, and returns a mgo.ChangeInfo.
  106. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
  107. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  108. return c.Upsert(selector, update)
  109. })
  110. }
  111. func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
  112. session, err := mm.TakeSession()
  113. if err != nil {
  114. return nil, err
  115. }
  116. defer mm.PutSession(session)
  117. return fn(mm.GetCollection(session))
  118. }
  119. func (mm *Model) execute(fn func(c Collection) error) error {
  120. session, err := mm.TakeSession()
  121. if err != nil {
  122. return err
  123. }
  124. defer mm.PutSession(session)
  125. return fn(mm.GetCollection(session))
  126. }
  127. func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
  128. session, err := mm.TakeSession()
  129. if err != nil {
  130. return nil, err
  131. }
  132. defer mm.PutSession(session)
  133. return fn(mm.GetCollection(session)), nil
  134. }
  135. func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
  136. session, err := mm.TakeSession()
  137. if err != nil {
  138. return nil, err
  139. }
  140. defer mm.PutSession(session)
  141. return fn(mm.GetCollection(session)), nil
  142. }
  143. // WithTimeout customizes an operation with given timeout.
  144. func WithTimeout(timeout time.Duration) Option {
  145. return func(opts *options) {
  146. opts.timeout = timeout
  147. }
  148. }