model.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package mongo
  2. import (
  3. "log"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. )
  7. type (
  8. options struct {
  9. timeout time.Duration
  10. }
  11. Option func(opts *options)
  12. Model struct {
  13. session *concurrentSession
  14. db *mgo.Database
  15. collection string
  16. opts []Option
  17. }
  18. )
  19. func MustNewModel(url, database, collection string, opts ...Option) *Model {
  20. model, err := NewModel(url, database, collection, opts...)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. return model
  25. }
  26. func NewModel(url, database, collection string, opts ...Option) (*Model, error) {
  27. session, err := getConcurrentSession(url)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return &Model{
  32. session: session,
  33. db: session.DB(database),
  34. collection: collection,
  35. opts: opts,
  36. }, nil
  37. }
  38. func (mm *Model) Find(query interface{}) (Query, error) {
  39. return mm.query(func(c Collection) Query {
  40. return c.Find(query)
  41. })
  42. }
  43. func (mm *Model) FindId(id interface{}) (Query, error) {
  44. return mm.query(func(c Collection) Query {
  45. return c.FindId(id)
  46. })
  47. }
  48. func (mm *Model) GetCollection(session *mgo.Session) Collection {
  49. return newCollection(mm.db.C(mm.collection).With(session))
  50. }
  51. func (mm *Model) Insert(docs ...interface{}) error {
  52. return mm.execute(func(c Collection) error {
  53. return c.Insert(docs...)
  54. })
  55. }
  56. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
  57. return mm.pipe(func(c Collection) Pipe {
  58. return c.Pipe(pipeline)
  59. })
  60. }
  61. func (mm *Model) PutSession(session *mgo.Session) {
  62. mm.session.putSession(session)
  63. }
  64. func (mm *Model) Remove(selector interface{}) error {
  65. return mm.execute(func(c Collection) error {
  66. return c.Remove(selector)
  67. })
  68. }
  69. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
  70. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  71. return c.RemoveAll(selector)
  72. })
  73. }
  74. func (mm *Model) RemoveId(id interface{}) error {
  75. return mm.execute(func(c Collection) error {
  76. return c.RemoveId(id)
  77. })
  78. }
  79. func (mm *Model) TakeSession() (*mgo.Session, error) {
  80. return mm.session.takeSession(mm.opts...)
  81. }
  82. func (mm *Model) Update(selector, update interface{}) error {
  83. return mm.execute(func(c Collection) error {
  84. return c.Update(selector, update)
  85. })
  86. }
  87. func (mm *Model) UpdateId(id, update interface{}) error {
  88. return mm.execute(func(c Collection) error {
  89. return c.UpdateId(id, update)
  90. })
  91. }
  92. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
  93. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  94. return c.Upsert(selector, update)
  95. })
  96. }
  97. func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
  98. session, err := mm.TakeSession()
  99. if err != nil {
  100. return nil, err
  101. }
  102. defer mm.PutSession(session)
  103. return fn(mm.GetCollection(session))
  104. }
  105. func (mm *Model) execute(fn func(c Collection) error) error {
  106. session, err := mm.TakeSession()
  107. if err != nil {
  108. return err
  109. }
  110. defer mm.PutSession(session)
  111. return fn(mm.GetCollection(session))
  112. }
  113. func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
  114. session, err := mm.TakeSession()
  115. if err != nil {
  116. return nil, err
  117. }
  118. defer mm.PutSession(session)
  119. return fn(mm.GetCollection(session)), nil
  120. }
  121. func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
  122. session, err := mm.TakeSession()
  123. if err != nil {
  124. return nil, err
  125. }
  126. defer mm.PutSession(session)
  127. return fn(mm.GetCollection(session)), nil
  128. }
  129. func WithTimeout(timeout time.Duration) Option {
  130. return func(opts *options) {
  131. opts.timeout = timeout
  132. }
  133. }