cachedsql_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/logx"
  18. "github.com/tal-tech/go-zero/core/stat"
  19. "github.com/tal-tech/go-zero/core/stores/cache"
  20. "github.com/tal-tech/go-zero/core/stores/redis"
  21. "github.com/tal-tech/go-zero/core/stores/sqlx"
  22. )
  23. func init() {
  24. logx.Disable()
  25. stat.SetReporter(nil)
  26. }
  27. func TestCachedConn_GetCache(t *testing.T) {
  28. resetStats()
  29. s, err := miniredis.Run()
  30. if err != nil {
  31. t.Error(err)
  32. }
  33. r := redis.NewRedis(s.Addr(), redis.NodeType)
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. s.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. s, err := miniredis.Run()
  46. if err != nil {
  47. t.Error(err)
  48. }
  49. r := redis.NewRedis(s.Addr(), redis.NodeType)
  50. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  51. for i := 0; i < 10; i++ {
  52. var str string
  53. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  54. *v.(*string) = "zero"
  55. return nil
  56. })
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. }
  61. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  62. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  63. }
  64. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  65. resetStats()
  66. s, err := miniredis.Run()
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. r := redis.NewRedis(s.Addr(), redis.NodeType)
  71. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  72. var str string
  73. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  74. return fmt.Sprintf("%s/1234", s)
  75. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  76. *v.(*string) = "zero"
  77. return "primary", nil
  78. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  79. assert.Equal(t, "primary", pri)
  80. *v.(*string) = "xin"
  81. return nil
  82. })
  83. assert.Nil(t, err)
  84. assert.Equal(t, "zero", str)
  85. val, err := r.Get("index")
  86. assert.Nil(t, err)
  87. assert.Equal(t, `"primary"`, val)
  88. val, err = r.Get("primary/1234")
  89. assert.Nil(t, err)
  90. assert.Equal(t, `"zero"`, val)
  91. }
  92. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  93. resetStats()
  94. s, err := miniredis.Run()
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. r := redis.NewRedis(s.Addr(), redis.NodeType)
  99. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  100. cache.WithNotFoundExpiry(time.Second))
  101. var str string
  102. r.Set("index", `"primary"`)
  103. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  104. return fmt.Sprintf("%s/1234", s)
  105. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  106. assert.Fail(t, "should not go here")
  107. return "primary", nil
  108. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  109. *v.(*string) = "xin"
  110. assert.Equal(t, "primary", primary)
  111. return nil
  112. })
  113. assert.Nil(t, err)
  114. assert.Equal(t, "xin", str)
  115. val, err := r.Get("index")
  116. assert.Nil(t, err)
  117. assert.Equal(t, `"primary"`, val)
  118. val, err = r.Get("primary/1234")
  119. assert.Nil(t, err)
  120. assert.Equal(t, `"xin"`, val)
  121. }
  122. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  123. const (
  124. primaryInt8 int8 = 100
  125. primaryInt16 int16 = 10000
  126. primaryInt32 int32 = 10000000
  127. primaryInt64 int64 = 10000000
  128. primaryUint8 uint8 = 100
  129. primaryUint16 uint16 = 10000
  130. primaryUint32 uint32 = 10000000
  131. primaryUint64 uint64 = 10000000
  132. )
  133. tests := []struct {
  134. name string
  135. primary interface{}
  136. primaryCache string
  137. }{
  138. {
  139. name: "int8 primary",
  140. primary: primaryInt8,
  141. primaryCache: fmt.Sprint(primaryInt8),
  142. },
  143. {
  144. name: "int16 primary",
  145. primary: primaryInt16,
  146. primaryCache: fmt.Sprint(primaryInt16),
  147. },
  148. {
  149. name: "int32 primary",
  150. primary: primaryInt32,
  151. primaryCache: fmt.Sprint(primaryInt32),
  152. },
  153. {
  154. name: "int64 primary",
  155. primary: primaryInt64,
  156. primaryCache: fmt.Sprint(primaryInt64),
  157. },
  158. {
  159. name: "uint8 primary",
  160. primary: primaryUint8,
  161. primaryCache: fmt.Sprint(primaryUint8),
  162. },
  163. {
  164. name: "uint16 primary",
  165. primary: primaryUint16,
  166. primaryCache: fmt.Sprint(primaryUint16),
  167. },
  168. {
  169. name: "uint32 primary",
  170. primary: primaryUint32,
  171. primaryCache: fmt.Sprint(primaryUint32),
  172. },
  173. {
  174. name: "uint64 primary",
  175. primary: primaryUint64,
  176. primaryCache: fmt.Sprint(primaryUint64),
  177. },
  178. }
  179. for _, test := range tests {
  180. t.Run(test.name, func(t *testing.T) {
  181. resetStats()
  182. s, err := miniredis.Run()
  183. if err != nil {
  184. t.Error(err)
  185. }
  186. r := redis.NewRedis(s.Addr(), redis.NodeType)
  187. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  188. cache.WithNotFoundExpiry(time.Second))
  189. var str string
  190. r.Set("index", test.primaryCache)
  191. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  192. return fmt.Sprintf("%v/1234", s)
  193. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  194. assert.Fail(t, "should not go here")
  195. return test.primary, nil
  196. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  197. *v.(*string) = "xin"
  198. assert.Equal(t, primary, primary)
  199. return nil
  200. })
  201. assert.Nil(t, err)
  202. assert.Equal(t, "xin", str)
  203. val, err := r.Get("index")
  204. assert.Nil(t, err)
  205. assert.Equal(t, test.primaryCache, val)
  206. val, err = r.Get(test.primaryCache + "/1234")
  207. assert.Nil(t, err)
  208. assert.Equal(t, `"xin"`, val)
  209. })
  210. }
  211. }
  212. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  213. caches := map[string]string{
  214. "index": "primary",
  215. "primary/1234": "xin",
  216. }
  217. for k, v := range caches {
  218. t.Run(k+"/"+v, func(t *testing.T) {
  219. resetStats()
  220. s, err := miniredis.Run()
  221. if err != nil {
  222. t.Error(err)
  223. }
  224. r := redis.NewRedis(s.Addr(), redis.NodeType)
  225. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  226. cache.WithNotFoundExpiry(time.Second))
  227. var str string
  228. r.Set(k, v)
  229. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  230. return fmt.Sprintf("%s/1234", s)
  231. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  232. *v.(*string) = "xin"
  233. return "primary", nil
  234. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  235. *v.(*string) = "xin"
  236. assert.Equal(t, "primary", primary)
  237. return nil
  238. })
  239. assert.Nil(t, err)
  240. assert.Equal(t, "xin", str)
  241. val, err := r.Get("index")
  242. assert.Nil(t, err)
  243. assert.Equal(t, `"primary"`, val)
  244. val, err = r.Get("primary/1234")
  245. assert.Nil(t, err)
  246. assert.Equal(t, `"xin"`, val)
  247. })
  248. }
  249. }
  250. func TestStatCacheFails(t *testing.T) {
  251. resetStats()
  252. log.SetOutput(ioutil.Discard)
  253. defer log.SetOutput(os.Stdout)
  254. r := redis.NewRedis("localhost:59999", redis.NodeType)
  255. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  256. for i := 0; i < 20; i++ {
  257. var str string
  258. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  259. return errors.New("db failed")
  260. })
  261. assert.NotNil(t, err)
  262. }
  263. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  264. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  265. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  266. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  267. }
  268. func TestStatDbFails(t *testing.T) {
  269. resetStats()
  270. s, err := miniredis.Run()
  271. if err != nil {
  272. t.Error(err)
  273. }
  274. r := redis.NewRedis(s.Addr(), redis.NodeType)
  275. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  276. for i := 0; i < 20; i++ {
  277. var str string
  278. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  279. return errors.New("db failed")
  280. })
  281. assert.NotNil(t, err)
  282. }
  283. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  284. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  285. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  286. }
  287. func TestStatFromMemory(t *testing.T) {
  288. resetStats()
  289. s, err := miniredis.Run()
  290. if err != nil {
  291. t.Error(err)
  292. }
  293. r := redis.NewRedis(s.Addr(), redis.NodeType)
  294. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  295. var all sync.WaitGroup
  296. var wait sync.WaitGroup
  297. all.Add(10)
  298. wait.Add(4)
  299. go func() {
  300. var str string
  301. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  302. *v.(*string) = "zero"
  303. return nil
  304. })
  305. if err != nil {
  306. t.Error(err)
  307. }
  308. wait.Wait()
  309. runtime.Gosched()
  310. all.Done()
  311. }()
  312. for i := 0; i < 4; i++ {
  313. go func() {
  314. var str string
  315. wait.Done()
  316. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  317. *v.(*string) = "zero"
  318. return nil
  319. })
  320. if err != nil {
  321. t.Error(err)
  322. }
  323. all.Done()
  324. }()
  325. }
  326. for i := 0; i < 5; i++ {
  327. go func() {
  328. var str string
  329. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  330. *v.(*string) = "zero"
  331. return nil
  332. })
  333. if err != nil {
  334. t.Error(err)
  335. }
  336. all.Done()
  337. }()
  338. }
  339. all.Wait()
  340. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  341. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  342. }
  343. func TestCachedConnQueryRow(t *testing.T) {
  344. s, err := miniredis.Run()
  345. if err != nil {
  346. t.Error(err)
  347. }
  348. const (
  349. key = "user"
  350. value = "any"
  351. )
  352. var conn trackedConn
  353. var user string
  354. var ran bool
  355. r := redis.NewRedis(s.Addr(), redis.NodeType)
  356. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  357. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  358. ran = true
  359. user = value
  360. return nil
  361. })
  362. assert.Nil(t, err)
  363. actualValue, err := s.Get(key)
  364. assert.Nil(t, err)
  365. var actual string
  366. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  367. assert.Equal(t, value, actual)
  368. assert.Equal(t, value, user)
  369. assert.True(t, ran)
  370. }
  371. func TestCachedConnQueryRowFromCache(t *testing.T) {
  372. s, err := miniredis.Run()
  373. if err != nil {
  374. t.Error(err)
  375. }
  376. const (
  377. key = "user"
  378. value = "any"
  379. )
  380. var conn trackedConn
  381. var user string
  382. var ran bool
  383. r := redis.NewRedis(s.Addr(), redis.NodeType)
  384. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  385. assert.Nil(t, c.SetCache(key, value))
  386. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  387. ran = true
  388. user = value
  389. return nil
  390. })
  391. assert.Nil(t, err)
  392. actualValue, err := s.Get(key)
  393. assert.Nil(t, err)
  394. var actual string
  395. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  396. assert.Equal(t, value, actual)
  397. assert.Equal(t, value, user)
  398. assert.False(t, ran)
  399. }
  400. func TestQueryRowNotFound(t *testing.T) {
  401. s, err := miniredis.Run()
  402. if err != nil {
  403. t.Error(err)
  404. }
  405. const key = "user"
  406. var conn trackedConn
  407. var user string
  408. var ran int
  409. r := redis.NewRedis(s.Addr(), redis.NodeType)
  410. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  411. for i := 0; i < 20; i++ {
  412. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  413. ran++
  414. return sql.ErrNoRows
  415. })
  416. assert.Exactly(t, sqlx.ErrNotFound, err)
  417. }
  418. assert.Equal(t, 1, ran)
  419. }
  420. func TestCachedConnExec(t *testing.T) {
  421. s, err := miniredis.Run()
  422. if err != nil {
  423. t.Error(err)
  424. }
  425. var conn trackedConn
  426. r := redis.NewRedis(s.Addr(), redis.NodeType)
  427. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  428. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  429. assert.Nil(t, err)
  430. assert.True(t, conn.execValue)
  431. }
  432. func TestCachedConnExecDropCache(t *testing.T) {
  433. s, err := miniredis.Run()
  434. if err != nil {
  435. t.Error(err)
  436. }
  437. const (
  438. key = "user"
  439. value = "any"
  440. )
  441. var conn trackedConn
  442. r := redis.NewRedis(s.Addr(), redis.NodeType)
  443. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  444. assert.Nil(t, c.SetCache(key, value))
  445. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  446. return conn.Exec("delete from user_table where id='kevin'")
  447. }, key)
  448. assert.Nil(t, err)
  449. assert.True(t, conn.execValue)
  450. _, err = s.Get(key)
  451. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  452. }
  453. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  454. const key = "user"
  455. var conn trackedConn
  456. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  457. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  458. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  459. return conn.Exec("delete from user_table where id='kevin'")
  460. }, key)
  461. // async background clean, retry logic
  462. assert.Nil(t, err)
  463. }
  464. func TestCachedConnQueryRows(t *testing.T) {
  465. s, err := miniredis.Run()
  466. if err != nil {
  467. t.Error(err)
  468. }
  469. var conn trackedConn
  470. r := redis.NewRedis(s.Addr(), redis.NodeType)
  471. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  472. var users []string
  473. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  474. assert.Nil(t, err)
  475. assert.True(t, conn.queryRowsValue)
  476. }
  477. func TestCachedConnTransact(t *testing.T) {
  478. s, err := miniredis.Run()
  479. if err != nil {
  480. t.Error(err)
  481. }
  482. var conn trackedConn
  483. r := redis.NewRedis(s.Addr(), redis.NodeType)
  484. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  485. err = c.Transact(func(session sqlx.Session) error {
  486. return nil
  487. })
  488. assert.Nil(t, err)
  489. assert.True(t, conn.transactValue)
  490. }
  491. func TestQueryRowNoCache(t *testing.T) {
  492. s, err := miniredis.Run()
  493. if err != nil {
  494. t.Error(err)
  495. }
  496. const (
  497. key = "user"
  498. value = "any"
  499. )
  500. var user string
  501. var ran bool
  502. r := redis.NewRedis(s.Addr(), redis.NodeType)
  503. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  504. user = value
  505. ran = true
  506. return nil
  507. }}
  508. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  509. err = c.QueryRowNoCache(&user, key)
  510. assert.Nil(t, err)
  511. assert.Equal(t, value, user)
  512. assert.True(t, ran)
  513. }
  514. func TestFloatKeyer(t *testing.T) {
  515. primaries := []interface{}{
  516. float32(1),
  517. float64(1),
  518. }
  519. for _, primary := range primaries {
  520. val := floatKeyer(func(i interface{}) string {
  521. return fmt.Sprint(i)
  522. })(primary)
  523. assert.Equal(t, "1", val)
  524. }
  525. }
  526. func resetStats() {
  527. atomic.StoreUint64(&stats.Total, 0)
  528. atomic.StoreUint64(&stats.Hit, 0)
  529. atomic.StoreUint64(&stats.Miss, 0)
  530. atomic.StoreUint64(&stats.DbFails, 0)
  531. }
  532. type dummySqlConn struct {
  533. queryRow func(interface{}, string, ...interface{}) error
  534. }
  535. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  536. return nil, nil
  537. }
  538. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  539. return nil, nil
  540. }
  541. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  542. if d.queryRow != nil {
  543. return d.queryRow(v, query, args...)
  544. }
  545. return nil
  546. }
  547. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  548. return nil
  549. }
  550. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  551. return nil
  552. }
  553. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  554. return nil
  555. }
  556. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  557. return nil
  558. }
  559. type trackedConn struct {
  560. dummySqlConn
  561. execValue bool
  562. queryRowsValue bool
  563. transactValue bool
  564. }
  565. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  566. c.execValue = true
  567. return c.dummySqlConn.Exec(query, args...)
  568. }
  569. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  570. c.queryRowsValue = true
  571. return c.dummySqlConn.QueryRows(v, query, args...)
  572. }
  573. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  574. c.transactValue = true
  575. return c.dummySqlConn.Transact(fn)
  576. }