cachedsql_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/logx"
  18. "github.com/tal-tech/go-zero/core/stat"
  19. "github.com/tal-tech/go-zero/core/stores/cache"
  20. "github.com/tal-tech/go-zero/core/stores/redis"
  21. "github.com/tal-tech/go-zero/core/stores/redis/redistest"
  22. "github.com/tal-tech/go-zero/core/stores/sqlx"
  23. )
  24. func init() {
  25. logx.Disable()
  26. stat.SetReporter(nil)
  27. }
  28. func TestCachedConn_GetCache(t *testing.T) {
  29. resetStats()
  30. r, clean, err := redistest.CreateRedis()
  31. assert.Nil(t, err)
  32. defer clean()
  33. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  34. var value string
  35. err = c.GetCache("any", &value)
  36. assert.Equal(t, ErrNotFound, err)
  37. r.Set("any", `"value"`)
  38. err = c.GetCache("any", &value)
  39. assert.Nil(t, err)
  40. assert.Equal(t, "value", value)
  41. }
  42. func TestStat(t *testing.T) {
  43. resetStats()
  44. r, clean, err := redistest.CreateRedis()
  45. assert.Nil(t, err)
  46. defer clean()
  47. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  48. for i := 0; i < 10; i++ {
  49. var str string
  50. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  51. *v.(*string) = "zero"
  52. return nil
  53. })
  54. if err != nil {
  55. t.Error(err)
  56. }
  57. }
  58. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  59. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  60. }
  61. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  62. resetStats()
  63. r, clean, err := redistest.CreateRedis()
  64. assert.Nil(t, err)
  65. defer clean()
  66. c := NewConn(dummySqlConn{}, cache.CacheConf{
  67. {
  68. RedisConf: redis.RedisConf{
  69. Host: r.Addr,
  70. Type: redis.NodeType,
  71. },
  72. Weight: 100,
  73. },
  74. }, cache.WithExpiry(time.Second*10))
  75. var str string
  76. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  77. return fmt.Sprintf("%s/1234", s)
  78. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  79. *v.(*string) = "zero"
  80. return "primary", errors.New("foo")
  81. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  82. assert.Equal(t, "primary", pri)
  83. *v.(*string) = "xin"
  84. return nil
  85. })
  86. assert.NotNil(t, err)
  87. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  88. return fmt.Sprintf("%s/1234", s)
  89. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  90. *v.(*string) = "zero"
  91. return "primary", nil
  92. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  93. assert.Equal(t, "primary", pri)
  94. *v.(*string) = "xin"
  95. return nil
  96. })
  97. assert.Nil(t, err)
  98. assert.Equal(t, "zero", str)
  99. val, err := r.Get("index")
  100. assert.Nil(t, err)
  101. assert.Equal(t, `"primary"`, val)
  102. val, err = r.Get("primary/1234")
  103. assert.Nil(t, err)
  104. assert.Equal(t, `"zero"`, val)
  105. }
  106. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  107. resetStats()
  108. r, clean, err := redistest.CreateRedis()
  109. assert.Nil(t, err)
  110. defer clean()
  111. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  112. cache.WithNotFoundExpiry(time.Second))
  113. var str string
  114. r.Set("index", `"primary"`)
  115. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  116. return fmt.Sprintf("%s/1234", s)
  117. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  118. assert.Fail(t, "should not go here")
  119. return "primary", nil
  120. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  121. *v.(*string) = "xin"
  122. assert.Equal(t, "primary", primary)
  123. return nil
  124. })
  125. assert.Nil(t, err)
  126. assert.Equal(t, "xin", str)
  127. val, err := r.Get("index")
  128. assert.Nil(t, err)
  129. assert.Equal(t, `"primary"`, val)
  130. val, err = r.Get("primary/1234")
  131. assert.Nil(t, err)
  132. assert.Equal(t, `"xin"`, val)
  133. }
  134. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  135. const (
  136. primaryInt8 int8 = 100
  137. primaryInt16 int16 = 10000
  138. primaryInt32 int32 = 10000000
  139. primaryInt64 int64 = 10000000
  140. primaryUint8 uint8 = 100
  141. primaryUint16 uint16 = 10000
  142. primaryUint32 uint32 = 10000000
  143. primaryUint64 uint64 = 10000000
  144. )
  145. tests := []struct {
  146. name string
  147. primary interface{}
  148. primaryCache string
  149. }{
  150. {
  151. name: "int8 primary",
  152. primary: primaryInt8,
  153. primaryCache: fmt.Sprint(primaryInt8),
  154. },
  155. {
  156. name: "int16 primary",
  157. primary: primaryInt16,
  158. primaryCache: fmt.Sprint(primaryInt16),
  159. },
  160. {
  161. name: "int32 primary",
  162. primary: primaryInt32,
  163. primaryCache: fmt.Sprint(primaryInt32),
  164. },
  165. {
  166. name: "int64 primary",
  167. primary: primaryInt64,
  168. primaryCache: fmt.Sprint(primaryInt64),
  169. },
  170. {
  171. name: "uint8 primary",
  172. primary: primaryUint8,
  173. primaryCache: fmt.Sprint(primaryUint8),
  174. },
  175. {
  176. name: "uint16 primary",
  177. primary: primaryUint16,
  178. primaryCache: fmt.Sprint(primaryUint16),
  179. },
  180. {
  181. name: "uint32 primary",
  182. primary: primaryUint32,
  183. primaryCache: fmt.Sprint(primaryUint32),
  184. },
  185. {
  186. name: "uint64 primary",
  187. primary: primaryUint64,
  188. primaryCache: fmt.Sprint(primaryUint64),
  189. },
  190. }
  191. for _, test := range tests {
  192. t.Run(test.name, func(t *testing.T) {
  193. resetStats()
  194. r, clean, err := redistest.CreateRedis()
  195. assert.Nil(t, err)
  196. defer clean()
  197. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  198. cache.WithNotFoundExpiry(time.Second))
  199. var str string
  200. r.Set("index", test.primaryCache)
  201. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  202. return fmt.Sprintf("%v/1234", s)
  203. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  204. assert.Fail(t, "should not go here")
  205. return test.primary, nil
  206. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  207. *v.(*string) = "xin"
  208. assert.Equal(t, primary, primary)
  209. return nil
  210. })
  211. assert.Nil(t, err)
  212. assert.Equal(t, "xin", str)
  213. val, err := r.Get("index")
  214. assert.Nil(t, err)
  215. assert.Equal(t, test.primaryCache, val)
  216. val, err = r.Get(test.primaryCache + "/1234")
  217. assert.Nil(t, err)
  218. assert.Equal(t, `"xin"`, val)
  219. })
  220. }
  221. }
  222. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  223. caches := map[string]string{
  224. "index": "primary",
  225. "primary/1234": "xin",
  226. }
  227. for k, v := range caches {
  228. t.Run(k+"/"+v, func(t *testing.T) {
  229. resetStats()
  230. r, clean, err := redistest.CreateRedis()
  231. assert.Nil(t, err)
  232. defer clean()
  233. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  234. cache.WithNotFoundExpiry(time.Second))
  235. var str string
  236. r.Set(k, v)
  237. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  238. return fmt.Sprintf("%s/1234", s)
  239. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  240. *v.(*string) = "xin"
  241. return "primary", nil
  242. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  243. *v.(*string) = "xin"
  244. assert.Equal(t, "primary", primary)
  245. return nil
  246. })
  247. assert.Nil(t, err)
  248. assert.Equal(t, "xin", str)
  249. val, err := r.Get("index")
  250. assert.Nil(t, err)
  251. assert.Equal(t, `"primary"`, val)
  252. val, err = r.Get("primary/1234")
  253. assert.Nil(t, err)
  254. assert.Equal(t, `"xin"`, val)
  255. })
  256. }
  257. }
  258. func TestStatCacheFails(t *testing.T) {
  259. resetStats()
  260. log.SetOutput(ioutil.Discard)
  261. defer log.SetOutput(os.Stdout)
  262. r := redis.NewRedis("localhost:59999", redis.NodeType)
  263. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  264. for i := 0; i < 20; i++ {
  265. var str string
  266. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  267. return errors.New("db failed")
  268. })
  269. assert.NotNil(t, err)
  270. }
  271. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  272. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  273. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  274. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  275. }
  276. func TestStatDbFails(t *testing.T) {
  277. resetStats()
  278. r, clean, err := redistest.CreateRedis()
  279. assert.Nil(t, err)
  280. defer clean()
  281. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  282. for i := 0; i < 20; i++ {
  283. var str string
  284. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  285. return errors.New("db failed")
  286. })
  287. assert.NotNil(t, err)
  288. }
  289. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  290. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  291. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  292. }
  293. func TestStatFromMemory(t *testing.T) {
  294. resetStats()
  295. r, clean, err := redistest.CreateRedis()
  296. assert.Nil(t, err)
  297. defer clean()
  298. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  299. var all sync.WaitGroup
  300. var wait sync.WaitGroup
  301. all.Add(10)
  302. wait.Add(4)
  303. go func() {
  304. var str string
  305. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  306. *v.(*string) = "zero"
  307. return nil
  308. })
  309. if err != nil {
  310. t.Error(err)
  311. }
  312. wait.Wait()
  313. runtime.Gosched()
  314. all.Done()
  315. }()
  316. for i := 0; i < 4; i++ {
  317. go func() {
  318. var str string
  319. wait.Done()
  320. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  321. *v.(*string) = "zero"
  322. return nil
  323. })
  324. if err != nil {
  325. t.Error(err)
  326. }
  327. all.Done()
  328. }()
  329. }
  330. for i := 0; i < 5; i++ {
  331. go func() {
  332. var str string
  333. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  334. *v.(*string) = "zero"
  335. return nil
  336. })
  337. if err != nil {
  338. t.Error(err)
  339. }
  340. all.Done()
  341. }()
  342. }
  343. all.Wait()
  344. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  345. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  346. }
  347. func TestCachedConnQueryRow(t *testing.T) {
  348. r, clean, err := redistest.CreateRedis()
  349. assert.Nil(t, err)
  350. defer clean()
  351. const (
  352. key = "user"
  353. value = "any"
  354. )
  355. var conn trackedConn
  356. var user string
  357. var ran bool
  358. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  359. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  360. ran = true
  361. user = value
  362. return nil
  363. })
  364. assert.Nil(t, err)
  365. actualValue, err := r.Get(key)
  366. assert.Nil(t, err)
  367. var actual string
  368. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  369. assert.Equal(t, value, actual)
  370. assert.Equal(t, value, user)
  371. assert.True(t, ran)
  372. }
  373. func TestCachedConnQueryRowFromCache(t *testing.T) {
  374. r, clean, err := redistest.CreateRedis()
  375. assert.Nil(t, err)
  376. defer clean()
  377. const (
  378. key = "user"
  379. value = "any"
  380. )
  381. var conn trackedConn
  382. var user string
  383. var ran bool
  384. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  385. assert.Nil(t, c.SetCache(key, value))
  386. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  387. ran = true
  388. user = value
  389. return nil
  390. })
  391. assert.Nil(t, err)
  392. actualValue, err := r.Get(key)
  393. assert.Nil(t, err)
  394. var actual string
  395. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  396. assert.Equal(t, value, actual)
  397. assert.Equal(t, value, user)
  398. assert.False(t, ran)
  399. }
  400. func TestQueryRowNotFound(t *testing.T) {
  401. r, clean, err := redistest.CreateRedis()
  402. assert.Nil(t, err)
  403. defer clean()
  404. const key = "user"
  405. var conn trackedConn
  406. var user string
  407. var ran int
  408. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  409. for i := 0; i < 20; i++ {
  410. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  411. ran++
  412. return sql.ErrNoRows
  413. })
  414. assert.Exactly(t, sqlx.ErrNotFound, err)
  415. }
  416. assert.Equal(t, 1, ran)
  417. }
  418. func TestCachedConnExec(t *testing.T) {
  419. r, clean, err := redistest.CreateRedis()
  420. assert.Nil(t, err)
  421. defer clean()
  422. var conn trackedConn
  423. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  424. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  425. assert.Nil(t, err)
  426. assert.True(t, conn.execValue)
  427. }
  428. func TestCachedConnExecDropCache(t *testing.T) {
  429. r, err := miniredis.Run()
  430. assert.Nil(t, err)
  431. defer r.Close()
  432. const (
  433. key = "user"
  434. value = "any"
  435. )
  436. var conn trackedConn
  437. c := NewNodeConn(&conn, redis.NewRedis(r.Addr(), redis.NodeType), cache.WithExpiry(time.Second*30))
  438. assert.Nil(t, c.SetCache(key, value))
  439. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  440. return conn.Exec("delete from user_table where id='kevin'")
  441. }, key)
  442. assert.Nil(t, err)
  443. assert.True(t, conn.execValue)
  444. _, err = r.Get(key)
  445. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  446. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  447. return nil, errors.New("foo")
  448. }, key)
  449. assert.NotNil(t, err)
  450. }
  451. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  452. const key = "user"
  453. var conn trackedConn
  454. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  455. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  456. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  457. return conn.Exec("delete from user_table where id='kevin'")
  458. }, key)
  459. // async background clean, retry logic
  460. assert.Nil(t, err)
  461. }
  462. func TestCachedConnQueryRows(t *testing.T) {
  463. r, clean, err := redistest.CreateRedis()
  464. assert.Nil(t, err)
  465. defer clean()
  466. var conn trackedConn
  467. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  468. var users []string
  469. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  470. assert.Nil(t, err)
  471. assert.True(t, conn.queryRowsValue)
  472. }
  473. func TestCachedConnTransact(t *testing.T) {
  474. r, clean, err := redistest.CreateRedis()
  475. assert.Nil(t, err)
  476. defer clean()
  477. var conn trackedConn
  478. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  479. err = c.Transact(func(session sqlx.Session) error {
  480. return nil
  481. })
  482. assert.Nil(t, err)
  483. assert.True(t, conn.transactValue)
  484. }
  485. func TestQueryRowNoCache(t *testing.T) {
  486. r, clean, err := redistest.CreateRedis()
  487. assert.Nil(t, err)
  488. defer clean()
  489. const (
  490. key = "user"
  491. value = "any"
  492. )
  493. var user string
  494. var ran bool
  495. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  496. user = value
  497. ran = true
  498. return nil
  499. }}
  500. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  501. err = c.QueryRowNoCache(&user, key)
  502. assert.Nil(t, err)
  503. assert.Equal(t, value, user)
  504. assert.True(t, ran)
  505. }
  506. func resetStats() {
  507. atomic.StoreUint64(&stats.Total, 0)
  508. atomic.StoreUint64(&stats.Hit, 0)
  509. atomic.StoreUint64(&stats.Miss, 0)
  510. atomic.StoreUint64(&stats.DbFails, 0)
  511. }
  512. type dummySqlConn struct {
  513. queryRow func(interface{}, string, ...interface{}) error
  514. }
  515. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  516. return nil, nil
  517. }
  518. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  519. return nil, nil
  520. }
  521. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  522. if d.queryRow != nil {
  523. return d.queryRow(v, query, args...)
  524. }
  525. return nil
  526. }
  527. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  528. return nil
  529. }
  530. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  531. return nil
  532. }
  533. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  534. return nil
  535. }
  536. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  537. return nil
  538. }
  539. type trackedConn struct {
  540. dummySqlConn
  541. execValue bool
  542. queryRowsValue bool
  543. transactValue bool
  544. }
  545. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  546. c.execValue = true
  547. return c.dummySqlConn.Exec(query, args...)
  548. }
  549. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  550. c.queryRowsValue = true
  551. return c.dummySqlConn.QueryRows(v, query, args...)
  552. }
  553. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  554. c.transactValue = true
  555. return c.dummySqlConn.Transact(fn)
  556. }