model.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package mongo
  2. import (
  3. "log"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. )
  7. type (
  8. options struct {
  9. timeout time.Duration
  10. }
  11. // Option defines the method to customize a mongo model.
  12. Option func(opts *options)
  13. // A Model is a mongo model.
  14. Model struct {
  15. session *concurrentSession
  16. db *mgo.Database
  17. collection string
  18. opts []Option
  19. }
  20. )
  21. // MustNewModel returns a Model, exits on errors.
  22. func MustNewModel(url, collection string, opts ...Option) *Model {
  23. model, err := NewModel(url, collection, opts...)
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. return model
  28. }
  29. // NewModel returns a Model.
  30. func NewModel(url, collection string, opts ...Option) (*Model, error) {
  31. session, err := getConcurrentSession(url)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return &Model{
  36. session: session,
  37. // If name is empty, the database name provided in the dialed URL is used instead
  38. db: session.DB(""),
  39. collection: collection,
  40. opts: opts,
  41. }, nil
  42. }
  43. // Find finds a record with given query.
  44. func (mm *Model) Find(query interface{}) (Query, error) {
  45. return mm.query(func(c Collection) Query {
  46. return c.Find(query)
  47. })
  48. }
  49. // FindId finds a record with given id.
  50. func (mm *Model) FindId(id interface{}) (Query, error) {
  51. return mm.query(func(c Collection) Query {
  52. return c.FindId(id)
  53. })
  54. }
  55. // GetCollection returns a Collection with given session.
  56. func (mm *Model) GetCollection(session *mgo.Session) Collection {
  57. return newCollection(mm.db.C(mm.collection).With(session))
  58. }
  59. // Insert inserts docs into mm.
  60. func (mm *Model) Insert(docs ...interface{}) error {
  61. return mm.execute(func(c Collection) error {
  62. return c.Insert(docs...)
  63. })
  64. }
  65. // Pipe returns a Pipe with given pipeline.
  66. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
  67. return mm.pipe(func(c Collection) Pipe {
  68. return c.Pipe(pipeline)
  69. })
  70. }
  71. // PutSession returns the given session.
  72. func (mm *Model) PutSession(session *mgo.Session) {
  73. mm.session.putSession(session)
  74. }
  75. // Remove removes the records with given selector.
  76. func (mm *Model) Remove(selector interface{}) error {
  77. return mm.execute(func(c Collection) error {
  78. return c.Remove(selector)
  79. })
  80. }
  81. // RemoveAll removes all with given selector and returns a mgo.ChangeInfo.
  82. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
  83. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  84. return c.RemoveAll(selector)
  85. })
  86. }
  87. // RemoveId removes a record with given id.
  88. func (mm *Model) RemoveId(id interface{}) error {
  89. return mm.execute(func(c Collection) error {
  90. return c.RemoveId(id)
  91. })
  92. }
  93. // TakeSession gets a session.
  94. func (mm *Model) TakeSession() (*mgo.Session, error) {
  95. return mm.session.takeSession(mm.opts...)
  96. }
  97. // Update updates a record with given selector.
  98. func (mm *Model) Update(selector, update interface{}) error {
  99. return mm.execute(func(c Collection) error {
  100. return c.Update(selector, update)
  101. })
  102. }
  103. // UpdateId updates a record with given id.
  104. func (mm *Model) UpdateId(id, update interface{}) error {
  105. return mm.execute(func(c Collection) error {
  106. return c.UpdateId(id, update)
  107. })
  108. }
  109. // Upsert upserts a record with given selector, and returns a mgo.ChangeInfo.
  110. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
  111. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  112. return c.Upsert(selector, update)
  113. })
  114. }
  115. func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
  116. session, err := mm.TakeSession()
  117. if err != nil {
  118. return nil, err
  119. }
  120. defer mm.PutSession(session)
  121. return fn(mm.GetCollection(session))
  122. }
  123. func (mm *Model) execute(fn func(c Collection) error) error {
  124. session, err := mm.TakeSession()
  125. if err != nil {
  126. return err
  127. }
  128. defer mm.PutSession(session)
  129. return fn(mm.GetCollection(session))
  130. }
  131. func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
  132. session, err := mm.TakeSession()
  133. if err != nil {
  134. return nil, err
  135. }
  136. defer mm.PutSession(session)
  137. return fn(mm.GetCollection(session)), nil
  138. }
  139. func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
  140. session, err := mm.TakeSession()
  141. if err != nil {
  142. return nil, err
  143. }
  144. defer mm.PutSession(session)
  145. return fn(mm.GetCollection(session)), nil
  146. }
  147. // WithTimeout customizes an operation with given timeout.
  148. func WithTimeout(timeout time.Duration) Option {
  149. return func(opts *options) {
  150. opts.timeout = timeout
  151. }
  152. }