cachedsql_test.go 16 KB


  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/logx"
  18. "github.com/tal-tech/go-zero/core/stat"
  19. "github.com/tal-tech/go-zero/core/stores/cache"
  20. "github.com/tal-tech/go-zero/core/stores/redis"
  21. "github.com/tal-tech/go-zero/core/stores/sqlx"
  22. )
  23. func init() {
  24. logx.Disable()
  25. stat.SetReporter(nil)
  26. }
  27. func TestCachedConn_GetCache(t *testing.T) {
  28. resetStats()
  29. s, err := miniredis.Run()
  30. if err != nil {
  31. t.Error(err)
  32. }
  33. r := redis.NewRedis(s.Addr(), redis.NodeType)
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. s.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. s, err := miniredis.Run()
  46. if err != nil {
  47. t.Error(err)
  48. }
  49. r := redis.NewRedis(s.Addr(), redis.NodeType)
  50. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  51. for i := 0; i < 10; i++ {
  52. var str string
  53. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  54. *v.(*string) = "zero"
  55. return nil
  56. })
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. }
  61. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  62. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  63. }
  64. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  65. resetStats()
  66. s, err := miniredis.Run()
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. r := redis.NewRedis(s.Addr(), redis.NodeType)
  71. c := NewConn(dummySqlConn{}, cache.CacheConf{
  72. {
  73. RedisConf: redis.RedisConf{
  74. Host: s.Addr(),
  75. Type: redis.NodeType,
  76. },
  77. Weight: 100,
  78. },
  79. }, cache.WithExpiry(time.Second*10))
  80. var str string
  81. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  82. return fmt.Sprintf("%s/1234", s)
  83. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  84. *v.(*string) = "zero"
  85. return "primary", errors.New("foo")
  86. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  87. assert.Equal(t, "primary", pri)
  88. *v.(*string) = "xin"
  89. return nil
  90. })
  91. assert.NotNil(t, err)
  92. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  93. return fmt.Sprintf("%s/1234", s)
  94. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  95. *v.(*string) = "zero"
  96. return "primary", nil
  97. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  98. assert.Equal(t, "primary", pri)
  99. *v.(*string) = "xin"
  100. return nil
  101. })
  102. assert.Nil(t, err)
  103. assert.Equal(t, "zero", str)
  104. val, err := r.Get("index")
  105. assert.Nil(t, err)
  106. assert.Equal(t, `"primary"`, val)
  107. val, err = r.Get("primary/1234")
  108. assert.Nil(t, err)
  109. assert.Equal(t, `"zero"`, val)
  110. }
  111. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  112. resetStats()
  113. s, err := miniredis.Run()
  114. if err != nil {
  115. t.Error(err)
  116. }
  117. r := redis.NewRedis(s.Addr(), redis.NodeType)
  118. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  119. cache.WithNotFoundExpiry(time.Second))
  120. var str string
  121. r.Set("index", `"primary"`)
  122. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  123. return fmt.Sprintf("%s/1234", s)
  124. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  125. assert.Fail(t, "should not go here")
  126. return "primary", nil
  127. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  128. *v.(*string) = "xin"
  129. assert.Equal(t, "primary", primary)
  130. return nil
  131. })
  132. assert.Nil(t, err)
  133. assert.Equal(t, "xin", str)
  134. val, err := r.Get("index")
  135. assert.Nil(t, err)
  136. assert.Equal(t, `"primary"`, val)
  137. val, err = r.Get("primary/1234")
  138. assert.Nil(t, err)
  139. assert.Equal(t, `"xin"`, val)
  140. }
  141. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  142. const (
  143. primaryInt8 int8 = 100
  144. primaryInt16 int16 = 10000
  145. primaryInt32 int32 = 10000000
  146. primaryInt64 int64 = 10000000
  147. primaryUint8 uint8 = 100
  148. primaryUint16 uint16 = 10000
  149. primaryUint32 uint32 = 10000000
  150. primaryUint64 uint64 = 10000000
  151. )
  152. tests := []struct {
  153. name string
  154. primary interface{}
  155. primaryCache string
  156. }{
  157. {
  158. name: "int8 primary",
  159. primary: primaryInt8,
  160. primaryCache: fmt.Sprint(primaryInt8),
  161. },
  162. {
  163. name: "int16 primary",
  164. primary: primaryInt16,
  165. primaryCache: fmt.Sprint(primaryInt16),
  166. },
  167. {
  168. name: "int32 primary",
  169. primary: primaryInt32,
  170. primaryCache: fmt.Sprint(primaryInt32),
  171. },
  172. {
  173. name: "int64 primary",
  174. primary: primaryInt64,
  175. primaryCache: fmt.Sprint(primaryInt64),
  176. },
  177. {
  178. name: "uint8 primary",
  179. primary: primaryUint8,
  180. primaryCache: fmt.Sprint(primaryUint8),
  181. },
  182. {
  183. name: "uint16 primary",
  184. primary: primaryUint16,
  185. primaryCache: fmt.Sprint(primaryUint16),
  186. },
  187. {
  188. name: "uint32 primary",
  189. primary: primaryUint32,
  190. primaryCache: fmt.Sprint(primaryUint32),
  191. },
  192. {
  193. name: "uint64 primary",
  194. primary: primaryUint64,
  195. primaryCache: fmt.Sprint(primaryUint64),
  196. },
  197. }
  198. s, err := miniredis.Run()
  199. if err != nil {
  200. t.Error(err)
  201. }
  202. defer s.Close()
  203. for _, test := range tests {
  204. t.Run(test.name, func(t *testing.T) {
  205. resetStats()
  206. s.FlushAll()
  207. r := redis.NewRedis(s.Addr(), redis.NodeType)
  208. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  209. cache.WithNotFoundExpiry(time.Second))
  210. var str string
  211. r.Set("index", test.primaryCache)
  212. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  213. return fmt.Sprintf("%v/1234", s)
  214. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  215. assert.Fail(t, "should not go here")
  216. return test.primary, nil
  217. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  218. *v.(*string) = "xin"
  219. assert.Equal(t, primary, primary)
  220. return nil
  221. })
  222. assert.Nil(t, err)
  223. assert.Equal(t, "xin", str)
  224. val, err := r.Get("index")
  225. assert.Nil(t, err)
  226. assert.Equal(t, test.primaryCache, val)
  227. val, err = r.Get(test.primaryCache + "/1234")
  228. assert.Nil(t, err)
  229. assert.Equal(t, `"xin"`, val)
  230. })
  231. }
  232. }
  233. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  234. caches := map[string]string{
  235. "index": "primary",
  236. "primary/1234": "xin",
  237. }
  238. for k, v := range caches {
  239. t.Run(k+"/"+v, func(t *testing.T) {
  240. resetStats()
  241. s, err := miniredis.Run()
  242. if err != nil {
  243. t.Error(err)
  244. }
  245. s.FlushAll()
  246. defer s.Close()
  247. r := redis.NewRedis(s.Addr(), redis.NodeType)
  248. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  249. cache.WithNotFoundExpiry(time.Second))
  250. var str string
  251. r.Set(k, v)
  252. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  253. return fmt.Sprintf("%s/1234", s)
  254. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  255. *v.(*string) = "xin"
  256. return "primary", nil
  257. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  258. *v.(*string) = "xin"
  259. assert.Equal(t, "primary", primary)
  260. return nil
  261. })
  262. assert.Nil(t, err)
  263. assert.Equal(t, "xin", str)
  264. val, err := r.Get("index")
  265. assert.Nil(t, err)
  266. assert.Equal(t, `"primary"`, val)
  267. val, err = r.Get("primary/1234")
  268. assert.Nil(t, err)
  269. assert.Equal(t, `"xin"`, val)
  270. })
  271. }
  272. }
  273. func TestStatCacheFails(t *testing.T) {
  274. resetStats()
  275. log.SetOutput(ioutil.Discard)
  276. defer log.SetOutput(os.Stdout)
  277. r := redis.NewRedis("localhost:59999", redis.NodeType)
  278. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  279. for i := 0; i < 20; i++ {
  280. var str string
  281. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  282. return errors.New("db failed")
  283. })
  284. assert.NotNil(t, err)
  285. }
  286. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  287. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  288. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  289. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  290. }
  291. func TestStatDbFails(t *testing.T) {
  292. resetStats()
  293. s, err := miniredis.Run()
  294. if err != nil {
  295. t.Error(err)
  296. }
  297. r := redis.NewRedis(s.Addr(), redis.NodeType)
  298. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  299. for i := 0; i < 20; i++ {
  300. var str string
  301. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  302. return errors.New("db failed")
  303. })
  304. assert.NotNil(t, err)
  305. }
  306. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  307. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  308. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  309. }
  310. func TestStatFromMemory(t *testing.T) {
  311. resetStats()
  312. s, err := miniredis.Run()
  313. if err != nil {
  314. t.Error(err)
  315. }
  316. r := redis.NewRedis(s.Addr(), redis.NodeType)
  317. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  318. var all sync.WaitGroup
  319. var wait sync.WaitGroup
  320. all.Add(10)
  321. wait.Add(4)
  322. go func() {
  323. var str string
  324. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  325. *v.(*string) = "zero"
  326. return nil
  327. })
  328. if err != nil {
  329. t.Error(err)
  330. }
  331. wait.Wait()
  332. runtime.Gosched()
  333. all.Done()
  334. }()
  335. for i := 0; i < 4; i++ {
  336. go func() {
  337. var str string
  338. wait.Done()
  339. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  340. *v.(*string) = "zero"
  341. return nil
  342. })
  343. if err != nil {
  344. t.Error(err)
  345. }
  346. all.Done()
  347. }()
  348. }
  349. for i := 0; i < 5; i++ {
  350. go func() {
  351. var str string
  352. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  353. *v.(*string) = "zero"
  354. return nil
  355. })
  356. if err != nil {
  357. t.Error(err)
  358. }
  359. all.Done()
  360. }()
  361. }
  362. all.Wait()
  363. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  364. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  365. }
  366. func TestCachedConnQueryRow(t *testing.T) {
  367. s, err := miniredis.Run()
  368. if err != nil {
  369. t.Error(err)
  370. }
  371. const (
  372. key = "user"
  373. value = "any"
  374. )
  375. var conn trackedConn
  376. var user string
  377. var ran bool
  378. r := redis.NewRedis(s.Addr(), redis.NodeType)
  379. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  380. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  381. ran = true
  382. user = value
  383. return nil
  384. })
  385. assert.Nil(t, err)
  386. actualValue, err := s.Get(key)
  387. assert.Nil(t, err)
  388. var actual string
  389. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  390. assert.Equal(t, value, actual)
  391. assert.Equal(t, value, user)
  392. assert.True(t, ran)
  393. }
  394. func TestCachedConnQueryRowFromCache(t *testing.T) {
  395. s, err := miniredis.Run()
  396. if err != nil {
  397. t.Error(err)
  398. }
  399. const (
  400. key = "user"
  401. value = "any"
  402. )
  403. var conn trackedConn
  404. var user string
  405. var ran bool
  406. r := redis.NewRedis(s.Addr(), redis.NodeType)
  407. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  408. assert.Nil(t, c.SetCache(key, value))
  409. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  410. ran = true
  411. user = value
  412. return nil
  413. })
  414. assert.Nil(t, err)
  415. actualValue, err := s.Get(key)
  416. assert.Nil(t, err)
  417. var actual string
  418. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  419. assert.Equal(t, value, actual)
  420. assert.Equal(t, value, user)
  421. assert.False(t, ran)
  422. }
  423. func TestQueryRowNotFound(t *testing.T) {
  424. s, err := miniredis.Run()
  425. if err != nil {
  426. t.Error(err)
  427. }
  428. const key = "user"
  429. var conn trackedConn
  430. var user string
  431. var ran int
  432. r := redis.NewRedis(s.Addr(), redis.NodeType)
  433. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  434. for i := 0; i < 20; i++ {
  435. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  436. ran++
  437. return sql.ErrNoRows
  438. })
  439. assert.Exactly(t, sqlx.ErrNotFound, err)
  440. }
  441. assert.Equal(t, 1, ran)
  442. }
  443. func TestCachedConnExec(t *testing.T) {
  444. s, err := miniredis.Run()
  445. if err != nil {
  446. t.Error(err)
  447. }
  448. var conn trackedConn
  449. r := redis.NewRedis(s.Addr(), redis.NodeType)
  450. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  451. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  452. assert.Nil(t, err)
  453. assert.True(t, conn.execValue)
  454. }
  455. func TestCachedConnExecDropCache(t *testing.T) {
  456. s, err := miniredis.Run()
  457. if err != nil {
  458. t.Error(err)
  459. }
  460. const (
  461. key = "user"
  462. value = "any"
  463. )
  464. var conn trackedConn
  465. r := redis.NewRedis(s.Addr(), redis.NodeType)
  466. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  467. assert.Nil(t, c.SetCache(key, value))
  468. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  469. return conn.Exec("delete from user_table where id='kevin'")
  470. }, key)
  471. assert.Nil(t, err)
  472. assert.True(t, conn.execValue)
  473. _, err = s.Get(key)
  474. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  475. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  476. return nil, errors.New("foo")
  477. }, key)
  478. assert.NotNil(t, err)
  479. }
  480. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  481. const key = "user"
  482. var conn trackedConn
  483. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  484. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  485. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  486. return conn.Exec("delete from user_table where id='kevin'")
  487. }, key)
  488. // async background clean, retry logic
  489. assert.Nil(t, err)
  490. }
  491. func TestCachedConnQueryRows(t *testing.T) {
  492. s, err := miniredis.Run()
  493. if err != nil {
  494. t.Error(err)
  495. }
  496. var conn trackedConn
  497. r := redis.NewRedis(s.Addr(), redis.NodeType)
  498. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  499. var users []string
  500. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  501. assert.Nil(t, err)
  502. assert.True(t, conn.queryRowsValue)
  503. }
  504. func TestCachedConnTransact(t *testing.T) {
  505. s, err := miniredis.Run()
  506. if err != nil {
  507. t.Error(err)
  508. }
  509. var conn trackedConn
  510. r := redis.NewRedis(s.Addr(), redis.NodeType)
  511. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  512. err = c.Transact(func(session sqlx.Session) error {
  513. return nil
  514. })
  515. assert.Nil(t, err)
  516. assert.True(t, conn.transactValue)
  517. }
  518. func TestQueryRowNoCache(t *testing.T) {
  519. s, err := miniredis.Run()
  520. if err != nil {
  521. t.Error(err)
  522. }
  523. const (
  524. key = "user"
  525. value = "any"
  526. )
  527. var user string
  528. var ran bool
  529. r := redis.NewRedis(s.Addr(), redis.NodeType)
  530. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  531. user = value
  532. ran = true
  533. return nil
  534. }}
  535. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  536. err = c.QueryRowNoCache(&user, key)
  537. assert.Nil(t, err)
  538. assert.Equal(t, value, user)
  539. assert.True(t, ran)
  540. }
  541. func TestFloatKeyer(t *testing.T) {
  542. primaries := []interface{}{
  543. float32(1),
  544. float64(1),
  545. }
  546. for _, primary := range primaries {
  547. val := floatKeyer(func(i interface{}) string {
  548. return fmt.Sprint(i)
  549. })(primary)
  550. assert.Equal(t, "1", val)
  551. }
  552. }
  553. func resetStats() {
  554. atomic.StoreUint64(&stats.Total, 0)
  555. atomic.StoreUint64(&stats.Hit, 0)
  556. atomic.StoreUint64(&stats.Miss, 0)
  557. atomic.StoreUint64(&stats.DbFails, 0)
  558. }
  559. type dummySqlConn struct {
  560. queryRow func(interface{}, string, ...interface{}) error
  561. }
  562. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  563. return nil, nil
  564. }
  565. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  566. return nil, nil
  567. }
  568. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  569. if d.queryRow != nil {
  570. return d.queryRow(v, query, args...)
  571. }
  572. return nil
  573. }
  574. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  575. return nil
  576. }
  577. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  578. return nil
  579. }
  580. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  581. return nil
  582. }
  583. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  584. return nil
  585. }
  586. type trackedConn struct {
  587. dummySqlConn
  588. execValue bool
  589. queryRowsValue bool
  590. transactValue bool
  591. }
  592. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  593. c.execValue = true
  594. return c.dummySqlConn.Exec(query, args...)
  595. }
  596. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  597. c.queryRowsValue = true
  598. return c.dummySqlConn.QueryRows(v, query, args...)
  599. }
  600. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  601. c.transactValue = true
  602. return c.dummySqlConn.Transact(fn)
  603. }