cachedsql_test.go 16 KB


  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/lang"
  18. "github.com/tal-tech/go-zero/core/logx"
  19. "github.com/tal-tech/go-zero/core/stat"
  20. "github.com/tal-tech/go-zero/core/stores/cache"
  21. "github.com/tal-tech/go-zero/core/stores/redis"
  22. "github.com/tal-tech/go-zero/core/stores/sqlx"
  23. )
  24. func init() {
  25. logx.Disable()
  26. stat.SetReporter(nil)
  27. }
  28. func TestCachedConn_GetCache(t *testing.T) {
  29. resetStats()
  30. s, clean, err := createMiniRedis()
  31. assert.Nil(t, err)
  32. defer clean()
  33. r := redis.NewRedis(s.Addr(), redis.NodeType)
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. s.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. s, clean, err := createMiniRedis()
  46. assert.Nil(t, err)
  47. defer clean()
  48. r := redis.NewRedis(s.Addr(), redis.NodeType)
  49. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  50. for i := 0; i < 10; i++ {
  51. var str string
  52. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  53. *v.(*string) = "zero"
  54. return nil
  55. })
  56. if err != nil {
  57. t.Error(err)
  58. }
  59. }
  60. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  61. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  62. }
  63. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  64. resetStats()
  65. s, clean, err := createMiniRedis()
  66. assert.Nil(t, err)
  67. defer clean()
  68. r := redis.NewRedis(s.Addr(), redis.NodeType)
  69. c := NewConn(dummySqlConn{}, cache.CacheConf{
  70. {
  71. RedisConf: redis.RedisConf{
  72. Host: s.Addr(),
  73. Type: redis.NodeType,
  74. },
  75. Weight: 100,
  76. },
  77. }, cache.WithExpiry(time.Second*10))
  78. var str string
  79. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  80. return fmt.Sprintf("%s/1234", s)
  81. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  82. *v.(*string) = "zero"
  83. return "primary", errors.New("foo")
  84. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  85. assert.Equal(t, "primary", pri)
  86. *v.(*string) = "xin"
  87. return nil
  88. })
  89. assert.NotNil(t, err)
  90. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  91. return fmt.Sprintf("%s/1234", s)
  92. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  93. *v.(*string) = "zero"
  94. return "primary", nil
  95. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  96. assert.Equal(t, "primary", pri)
  97. *v.(*string) = "xin"
  98. return nil
  99. })
  100. assert.Nil(t, err)
  101. assert.Equal(t, "zero", str)
  102. val, err := r.Get("index")
  103. assert.Nil(t, err)
  104. assert.Equal(t, `"primary"`, val)
  105. val, err = r.Get("primary/1234")
  106. assert.Nil(t, err)
  107. assert.Equal(t, `"zero"`, val)
  108. }
  109. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  110. resetStats()
  111. s, clean, err := createMiniRedis()
  112. assert.Nil(t, err)
  113. defer clean()
  114. r := redis.NewRedis(s.Addr(), redis.NodeType)
  115. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  116. cache.WithNotFoundExpiry(time.Second))
  117. var str string
  118. r.Set("index", `"primary"`)
  119. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  120. return fmt.Sprintf("%s/1234", s)
  121. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  122. assert.Fail(t, "should not go here")
  123. return "primary", nil
  124. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  125. *v.(*string) = "xin"
  126. assert.Equal(t, "primary", primary)
  127. return nil
  128. })
  129. assert.Nil(t, err)
  130. assert.Equal(t, "xin", str)
  131. val, err := r.Get("index")
  132. assert.Nil(t, err)
  133. assert.Equal(t, `"primary"`, val)
  134. val, err = r.Get("primary/1234")
  135. assert.Nil(t, err)
  136. assert.Equal(t, `"xin"`, val)
  137. }
  138. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  139. const (
  140. primaryInt8 int8 = 100
  141. primaryInt16 int16 = 10000
  142. primaryInt32 int32 = 10000000
  143. primaryInt64 int64 = 10000000
  144. primaryUint8 uint8 = 100
  145. primaryUint16 uint16 = 10000
  146. primaryUint32 uint32 = 10000000
  147. primaryUint64 uint64 = 10000000
  148. )
  149. tests := []struct {
  150. name string
  151. primary interface{}
  152. primaryCache string
  153. }{
  154. {
  155. name: "int8 primary",
  156. primary: primaryInt8,
  157. primaryCache: fmt.Sprint(primaryInt8),
  158. },
  159. {
  160. name: "int16 primary",
  161. primary: primaryInt16,
  162. primaryCache: fmt.Sprint(primaryInt16),
  163. },
  164. {
  165. name: "int32 primary",
  166. primary: primaryInt32,
  167. primaryCache: fmt.Sprint(primaryInt32),
  168. },
  169. {
  170. name: "int64 primary",
  171. primary: primaryInt64,
  172. primaryCache: fmt.Sprint(primaryInt64),
  173. },
  174. {
  175. name: "uint8 primary",
  176. primary: primaryUint8,
  177. primaryCache: fmt.Sprint(primaryUint8),
  178. },
  179. {
  180. name: "uint16 primary",
  181. primary: primaryUint16,
  182. primaryCache: fmt.Sprint(primaryUint16),
  183. },
  184. {
  185. name: "uint32 primary",
  186. primary: primaryUint32,
  187. primaryCache: fmt.Sprint(primaryUint32),
  188. },
  189. {
  190. name: "uint64 primary",
  191. primary: primaryUint64,
  192. primaryCache: fmt.Sprint(primaryUint64),
  193. },
  194. }
  195. s, clean, err := createMiniRedis()
  196. assert.Nil(t, err)
  197. defer clean()
  198. for _, test := range tests {
  199. t.Run(test.name, func(t *testing.T) {
  200. resetStats()
  201. s.FlushAll()
  202. r := redis.NewRedis(s.Addr(), redis.NodeType)
  203. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  204. cache.WithNotFoundExpiry(time.Second))
  205. var str string
  206. r.Set("index", test.primaryCache)
  207. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  208. return fmt.Sprintf("%v/1234", s)
  209. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  210. assert.Fail(t, "should not go here")
  211. return test.primary, nil
  212. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  213. *v.(*string) = "xin"
  214. assert.Equal(t, primary, primary)
  215. return nil
  216. })
  217. assert.Nil(t, err)
  218. assert.Equal(t, "xin", str)
  219. val, err := r.Get("index")
  220. assert.Nil(t, err)
  221. assert.Equal(t, test.primaryCache, val)
  222. val, err = r.Get(test.primaryCache + "/1234")
  223. assert.Nil(t, err)
  224. assert.Equal(t, `"xin"`, val)
  225. })
  226. }
  227. }
  228. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  229. caches := map[string]string{
  230. "index": "primary",
  231. "primary/1234": "xin",
  232. }
  233. for k, v := range caches {
  234. t.Run(k+"/"+v, func(t *testing.T) {
  235. resetStats()
  236. s, clean, err := createMiniRedis()
  237. assert.Nil(t, err)
  238. defer clean()
  239. r := redis.NewRedis(s.Addr(), redis.NodeType)
  240. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  241. cache.WithNotFoundExpiry(time.Second))
  242. var str string
  243. r.Set(k, v)
  244. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  245. return fmt.Sprintf("%s/1234", s)
  246. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  247. *v.(*string) = "xin"
  248. return "primary", nil
  249. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  250. *v.(*string) = "xin"
  251. assert.Equal(t, "primary", primary)
  252. return nil
  253. })
  254. assert.Nil(t, err)
  255. assert.Equal(t, "xin", str)
  256. val, err := r.Get("index")
  257. assert.Nil(t, err)
  258. assert.Equal(t, `"primary"`, val)
  259. val, err = r.Get("primary/1234")
  260. assert.Nil(t, err)
  261. assert.Equal(t, `"xin"`, val)
  262. })
  263. }
  264. }
  265. func TestStatCacheFails(t *testing.T) {
  266. resetStats()
  267. log.SetOutput(ioutil.Discard)
  268. defer log.SetOutput(os.Stdout)
  269. r := redis.NewRedis("localhost:59999", redis.NodeType)
  270. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  271. for i := 0; i < 20; i++ {
  272. var str string
  273. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  274. return errors.New("db failed")
  275. })
  276. assert.NotNil(t, err)
  277. }
  278. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  279. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  280. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  281. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  282. }
  283. func TestStatDbFails(t *testing.T) {
  284. resetStats()
  285. s, clean, err := createMiniRedis()
  286. assert.Nil(t, err)
  287. defer clean()
  288. r := redis.NewRedis(s.Addr(), redis.NodeType)
  289. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  290. for i := 0; i < 20; i++ {
  291. var str string
  292. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  293. return errors.New("db failed")
  294. })
  295. assert.NotNil(t, err)
  296. }
  297. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  298. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  299. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  300. }
  301. func TestStatFromMemory(t *testing.T) {
  302. resetStats()
  303. s, clean, err := createMiniRedis()
  304. assert.Nil(t, err)
  305. defer clean()
  306. r := redis.NewRedis(s.Addr(), redis.NodeType)
  307. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  308. var all sync.WaitGroup
  309. var wait sync.WaitGroup
  310. all.Add(10)
  311. wait.Add(4)
  312. go func() {
  313. var str string
  314. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  315. *v.(*string) = "zero"
  316. return nil
  317. })
  318. if err != nil {
  319. t.Error(err)
  320. }
  321. wait.Wait()
  322. runtime.Gosched()
  323. all.Done()
  324. }()
  325. for i := 0; i < 4; i++ {
  326. go func() {
  327. var str string
  328. wait.Done()
  329. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  330. *v.(*string) = "zero"
  331. return nil
  332. })
  333. if err != nil {
  334. t.Error(err)
  335. }
  336. all.Done()
  337. }()
  338. }
  339. for i := 0; i < 5; i++ {
  340. go func() {
  341. var str string
  342. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  343. *v.(*string) = "zero"
  344. return nil
  345. })
  346. if err != nil {
  347. t.Error(err)
  348. }
  349. all.Done()
  350. }()
  351. }
  352. all.Wait()
  353. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  354. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  355. }
  356. func TestCachedConnQueryRow(t *testing.T) {
  357. s, clean, err := createMiniRedis()
  358. assert.Nil(t, err)
  359. defer clean()
  360. const (
  361. key = "user"
  362. value = "any"
  363. )
  364. var conn trackedConn
  365. var user string
  366. var ran bool
  367. r := redis.NewRedis(s.Addr(), redis.NodeType)
  368. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  369. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  370. ran = true
  371. user = value
  372. return nil
  373. })
  374. assert.Nil(t, err)
  375. actualValue, err := s.Get(key)
  376. assert.Nil(t, err)
  377. var actual string
  378. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  379. assert.Equal(t, value, actual)
  380. assert.Equal(t, value, user)
  381. assert.True(t, ran)
  382. }
  383. func TestCachedConnQueryRowFromCache(t *testing.T) {
  384. s, clean, err := createMiniRedis()
  385. assert.Nil(t, err)
  386. defer clean()
  387. const (
  388. key = "user"
  389. value = "any"
  390. )
  391. var conn trackedConn
  392. var user string
  393. var ran bool
  394. r := redis.NewRedis(s.Addr(), redis.NodeType)
  395. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  396. assert.Nil(t, c.SetCache(key, value))
  397. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  398. ran = true
  399. user = value
  400. return nil
  401. })
  402. assert.Nil(t, err)
  403. actualValue, err := s.Get(key)
  404. assert.Nil(t, err)
  405. var actual string
  406. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  407. assert.Equal(t, value, actual)
  408. assert.Equal(t, value, user)
  409. assert.False(t, ran)
  410. }
  411. func TestQueryRowNotFound(t *testing.T) {
  412. s, clean, err := createMiniRedis()
  413. assert.Nil(t, err)
  414. defer clean()
  415. const key = "user"
  416. var conn trackedConn
  417. var user string
  418. var ran int
  419. r := redis.NewRedis(s.Addr(), redis.NodeType)
  420. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  421. for i := 0; i < 20; i++ {
  422. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  423. ran++
  424. return sql.ErrNoRows
  425. })
  426. assert.Exactly(t, sqlx.ErrNotFound, err)
  427. }
  428. assert.Equal(t, 1, ran)
  429. }
  430. func TestCachedConnExec(t *testing.T) {
  431. s, clean, err := createMiniRedis()
  432. assert.Nil(t, err)
  433. defer clean()
  434. var conn trackedConn
  435. r := redis.NewRedis(s.Addr(), redis.NodeType)
  436. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  437. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  438. assert.Nil(t, err)
  439. assert.True(t, conn.execValue)
  440. }
  441. func TestCachedConnExecDropCache(t *testing.T) {
  442. s, clean, err := createMiniRedis()
  443. assert.Nil(t, err)
  444. defer clean()
  445. const (
  446. key = "user"
  447. value = "any"
  448. )
  449. var conn trackedConn
  450. r := redis.NewRedis(s.Addr(), redis.NodeType)
  451. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  452. assert.Nil(t, c.SetCache(key, value))
  453. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  454. return conn.Exec("delete from user_table where id='kevin'")
  455. }, key)
  456. assert.Nil(t, err)
  457. assert.True(t, conn.execValue)
  458. _, err = s.Get(key)
  459. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  460. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  461. return nil, errors.New("foo")
  462. }, key)
  463. assert.NotNil(t, err)
  464. }
  465. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  466. const key = "user"
  467. var conn trackedConn
  468. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  469. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  470. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  471. return conn.Exec("delete from user_table where id='kevin'")
  472. }, key)
  473. // async background clean, retry logic
  474. assert.Nil(t, err)
  475. }
  476. func TestCachedConnQueryRows(t *testing.T) {
  477. s, clean, err := createMiniRedis()
  478. assert.Nil(t, err)
  479. defer clean()
  480. var conn trackedConn
  481. r := redis.NewRedis(s.Addr(), redis.NodeType)
  482. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  483. var users []string
  484. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  485. assert.Nil(t, err)
  486. assert.True(t, conn.queryRowsValue)
  487. }
  488. func TestCachedConnTransact(t *testing.T) {
  489. s, clean, err := createMiniRedis()
  490. assert.Nil(t, err)
  491. defer clean()
  492. var conn trackedConn
  493. r := redis.NewRedis(s.Addr(), redis.NodeType)
  494. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  495. err = c.Transact(func(session sqlx.Session) error {
  496. return nil
  497. })
  498. assert.Nil(t, err)
  499. assert.True(t, conn.transactValue)
  500. }
  501. func TestQueryRowNoCache(t *testing.T) {
  502. s, clean, err := createMiniRedis()
  503. assert.Nil(t, err)
  504. defer clean()
  505. const (
  506. key = "user"
  507. value = "any"
  508. )
  509. var user string
  510. var ran bool
  511. r := redis.NewRedis(s.Addr(), redis.NodeType)
  512. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  513. user = value
  514. ran = true
  515. return nil
  516. }}
  517. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  518. err = c.QueryRowNoCache(&user, key)
  519. assert.Nil(t, err)
  520. assert.Equal(t, value, user)
  521. assert.True(t, ran)
  522. }
  523. func resetStats() {
  524. atomic.StoreUint64(&stats.Total, 0)
  525. atomic.StoreUint64(&stats.Hit, 0)
  526. atomic.StoreUint64(&stats.Miss, 0)
  527. atomic.StoreUint64(&stats.DbFails, 0)
  528. }
  529. type dummySqlConn struct {
  530. queryRow func(interface{}, string, ...interface{}) error
  531. }
  532. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  533. return nil, nil
  534. }
  535. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  536. return nil, nil
  537. }
  538. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  539. if d.queryRow != nil {
  540. return d.queryRow(v, query, args...)
  541. }
  542. return nil
  543. }
  544. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  545. return nil
  546. }
  547. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  548. return nil
  549. }
  550. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  551. return nil
  552. }
  553. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  554. return nil
  555. }
  556. type trackedConn struct {
  557. dummySqlConn
  558. execValue bool
  559. queryRowsValue bool
  560. transactValue bool
  561. }
  562. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  563. c.execValue = true
  564. return c.dummySqlConn.Exec(query, args...)
  565. }
  566. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  567. c.queryRowsValue = true
  568. return c.dummySqlConn.QueryRows(v, query, args...)
  569. }
  570. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  571. c.transactValue = true
  572. return c.dummySqlConn.Transact(fn)
  573. }
  574. func createMiniRedis() (r *miniredis.Miniredis, clean func(), err error) {
  575. r, err = miniredis.Run()
  576. if err != nil {
  577. return nil, nil, err
  578. }
  579. return r, func() {
  580. ch := make(chan lang.PlaceholderType)
  581. go func() {
  582. r.Close()
  583. close(ch)
  584. }()
  585. select {
  586. case <-ch:
  587. case <-time.After(time.Second):
  588. }
  589. }, nil
  590. }