sqlconn.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package sqlx
  2. import (
  3. "database/sql"
  4. "github.com/tal-tech/go-zero/core/breaker"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. )
  7. // ErrNotFound is an alias of sql.ErrNoRows
  8. var ErrNotFound = sql.ErrNoRows
  9. type (
  10. // Session stands for raw connections or transaction sessions
  11. Session interface {
  12. Exec(query string, args ...interface{}) (sql.Result, error)
  13. Prepare(query string) (StmtSession, error)
  14. QueryRow(v interface{}, query string, args ...interface{}) error
  15. QueryRowPartial(v interface{}, query string, args ...interface{}) error
  16. QueryRows(v interface{}, query string, args ...interface{}) error
  17. QueryRowsPartial(v interface{}, query string, args ...interface{}) error
  18. }
  19. // SqlConn only stands for raw connections, so Transact method can be called.
  20. SqlConn interface {
  21. Session
  22. // RawDB is for other ORM to operate with, use it with caution.
  23. RawDB() (*sql.DB, error)
  24. Transact(func(session Session) error) error
  25. }
  26. // SqlOption defines the method to customize a sql connection.
  27. SqlOption func(*commonSqlConn)
  28. // StmtSession interface represents a session that can be used to execute statements.
  29. StmtSession interface {
  30. Close() error
  31. Exec(args ...interface{}) (sql.Result, error)
  32. QueryRow(v interface{}, args ...interface{}) error
  33. QueryRowPartial(v interface{}, args ...interface{}) error
  34. QueryRows(v interface{}, args ...interface{}) error
  35. QueryRowsPartial(v interface{}, args ...interface{}) error
  36. }
  37. // thread-safe
  38. // Because CORBA doesn't support PREPARE, so we need to combine the
  39. // query arguments into one string and do underlying query without arguments
  40. commonSqlConn struct {
  41. connProv connProvider
  42. onError func(error)
  43. beginTx beginnable
  44. brk breaker.Breaker
  45. accept func(error) bool
  46. }
  47. connProvider func() (*sql.DB, error)
  48. sessionConn interface {
  49. Exec(query string, args ...interface{}) (sql.Result, error)
  50. Query(query string, args ...interface{}) (*sql.Rows, error)
  51. }
  52. statement struct {
  53. query string
  54. stmt *sql.Stmt
  55. }
  56. stmtConn interface {
  57. Exec(args ...interface{}) (sql.Result, error)
  58. Query(args ...interface{}) (*sql.Rows, error)
  59. }
  60. )
  61. // NewSqlConn returns a SqlConn with given driver name and datasource.
  62. func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn {
  63. conn := &commonSqlConn{
  64. connProv: func() (*sql.DB, error) {
  65. return getSqlConn(driverName, datasource)
  66. },
  67. onError: func(err error) {
  68. logInstanceError(datasource, err)
  69. },
  70. beginTx: begin,
  71. brk: breaker.NewBreaker(),
  72. }
  73. for _, opt := range opts {
  74. opt(conn)
  75. }
  76. return conn
  77. }
  78. // NewSqlConnFromDB returns a SqlConn with the given sql.DB.
  79. // Use it with caution, it's provided for other ORM to interact with.
  80. func NewSqlConnFromDB(db *sql.DB, opts ...SqlOption) SqlConn {
  81. conn := &commonSqlConn{
  82. connProv: func() (*sql.DB, error) {
  83. return db, nil
  84. },
  85. onError: func(err error) {
  86. logx.Errorf("Error on getting sql instance: %v", err)
  87. },
  88. beginTx: begin,
  89. brk: breaker.NewBreaker(),
  90. }
  91. for _, opt := range opts {
  92. opt(conn)
  93. }
  94. return conn
  95. }
  96. func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result, err error) {
  97. err = db.brk.DoWithAcceptable(func() error {
  98. var conn *sql.DB
  99. conn, err = db.connProv()
  100. if err != nil {
  101. db.onError(err)
  102. return err
  103. }
  104. result, err = exec(conn, q, args...)
  105. return err
  106. }, db.acceptable)
  107. return
  108. }
  109. func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) {
  110. err = db.brk.DoWithAcceptable(func() error {
  111. var conn *sql.DB
  112. conn, err = db.connProv()
  113. if err != nil {
  114. db.onError(err)
  115. return err
  116. }
  117. st, err := conn.Prepare(query)
  118. if err != nil {
  119. return err
  120. }
  121. stmt = statement{
  122. query: query,
  123. stmt: st,
  124. }
  125. return nil
  126. }, db.acceptable)
  127. return
  128. }
  129. func (db *commonSqlConn) QueryRow(v interface{}, q string, args ...interface{}) error {
  130. return db.queryRows(func(rows *sql.Rows) error {
  131. return unmarshalRow(v, rows, true)
  132. }, q, args...)
  133. }
  134. func (db *commonSqlConn) QueryRowPartial(v interface{}, q string, args ...interface{}) error {
  135. return db.queryRows(func(rows *sql.Rows) error {
  136. return unmarshalRow(v, rows, false)
  137. }, q, args...)
  138. }
  139. func (db *commonSqlConn) QueryRows(v interface{}, q string, args ...interface{}) error {
  140. return db.queryRows(func(rows *sql.Rows) error {
  141. return unmarshalRows(v, rows, true)
  142. }, q, args...)
  143. }
  144. func (db *commonSqlConn) QueryRowsPartial(v interface{}, q string, args ...interface{}) error {
  145. return db.queryRows(func(rows *sql.Rows) error {
  146. return unmarshalRows(v, rows, false)
  147. }, q, args...)
  148. }
  149. func (db *commonSqlConn) RawDB() (*sql.DB, error) {
  150. return db.connProv()
  151. }
  152. func (db *commonSqlConn) Transact(fn func(Session) error) error {
  153. return db.brk.DoWithAcceptable(func() error {
  154. return transact(db, db.beginTx, fn)
  155. }, db.acceptable)
  156. }
  157. func (db *commonSqlConn) acceptable(err error) bool {
  158. ok := err == nil || err == sql.ErrNoRows || err == sql.ErrTxDone
  159. if db.accept == nil {
  160. return ok
  161. }
  162. return ok || db.accept(err)
  163. }
  164. func (db *commonSqlConn) queryRows(scanner func(*sql.Rows) error, q string, args ...interface{}) error {
  165. var qerr error
  166. return db.brk.DoWithAcceptable(func() error {
  167. conn, err := db.connProv()
  168. if err != nil {
  169. db.onError(err)
  170. return err
  171. }
  172. return query(conn, func(rows *sql.Rows) error {
  173. qerr = scanner(rows)
  174. return qerr
  175. }, q, args...)
  176. }, func(err error) bool {
  177. return qerr == err || db.acceptable(err)
  178. })
  179. }
  180. func (s statement) Close() error {
  181. return s.stmt.Close()
  182. }
  183. func (s statement) Exec(args ...interface{}) (sql.Result, error) {
  184. return execStmt(s.stmt, s.query, args...)
  185. }
  186. func (s statement) QueryRow(v interface{}, args ...interface{}) error {
  187. return queryStmt(s.stmt, func(rows *sql.Rows) error {
  188. return unmarshalRow(v, rows, true)
  189. }, s.query, args...)
  190. }
  191. func (s statement) QueryRowPartial(v interface{}, args ...interface{}) error {
  192. return queryStmt(s.stmt, func(rows *sql.Rows) error {
  193. return unmarshalRow(v, rows, false)
  194. }, s.query, args...)
  195. }
  196. func (s statement) QueryRows(v interface{}, args ...interface{}) error {
  197. return queryStmt(s.stmt, func(rows *sql.Rows) error {
  198. return unmarshalRows(v, rows, true)
  199. }, s.query, args...)
  200. }
  201. func (s statement) QueryRowsPartial(v interface{}, args ...interface{}) error {
  202. return queryStmt(s.stmt, func(rows *sql.Rows) error {
  203. return unmarshalRows(v, rows, false)
  204. }, s.query, args...)
  205. }