cachedsql_test.go 16 KB


  1. package sqlc
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "log"
  10. "os"
  11. "runtime"
  12. "sync"
  13. "sync/atomic"
  14. "testing"
  15. "time"
  16. "github.com/alicebob/miniredis/v2"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/zeromicro/go-zero/core/fx"
  19. "github.com/zeromicro/go-zero/core/logx"
  20. "github.com/zeromicro/go-zero/core/stat"
  21. "github.com/zeromicro/go-zero/core/stores/cache"
  22. "github.com/zeromicro/go-zero/core/stores/redis"
  23. "github.com/zeromicro/go-zero/core/stores/redis/redistest"
  24. "github.com/zeromicro/go-zero/core/stores/sqlx"
  25. )
  26. func init() {
  27. logx.Disable()
  28. stat.SetReporter(nil)
  29. }
  30. func TestCachedConn_GetCache(t *testing.T) {
  31. resetStats()
  32. r := redistest.CreateRedis(t)
  33. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  34. var value string
  35. err := c.GetCache("any", &value)
  36. assert.Equal(t, ErrNotFound, err)
  37. r.Set("any", `"value"`)
  38. err = c.GetCache("any", &value)
  39. assert.Nil(t, err)
  40. assert.Equal(t, "value", value)
  41. }
  42. func TestStat(t *testing.T) {
  43. resetStats()
  44. r := redistest.CreateRedis(t)
  45. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  46. for i := 0; i < 10; i++ {
  47. var str string
  48. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  49. *v.(*string) = "zero"
  50. return nil
  51. })
  52. if err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  57. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  58. }
  59. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  60. resetStats()
  61. r := redistest.CreateRedis(t)
  62. c := NewConn(dummySqlConn{}, cache.CacheConf{
  63. {
  64. RedisConf: redis.RedisConf{
  65. Host: r.Addr,
  66. Type: redis.NodeType,
  67. },
  68. Weight: 100,
  69. },
  70. }, cache.WithExpiry(time.Second*10))
  71. var str string
  72. err := c.QueryRowIndex(&str, "index", func(s any) string {
  73. return fmt.Sprintf("%s/1234", s)
  74. }, func(conn sqlx.SqlConn, v any) (any, error) {
  75. *v.(*string) = "zero"
  76. return "primary", errors.New("foo")
  77. }, func(conn sqlx.SqlConn, v, pri any) error {
  78. assert.Equal(t, "primary", pri)
  79. *v.(*string) = "xin"
  80. return nil
  81. })
  82. assert.NotNil(t, err)
  83. err = c.QueryRowIndex(&str, "index", func(s any) string {
  84. return fmt.Sprintf("%s/1234", s)
  85. }, func(conn sqlx.SqlConn, v any) (any, error) {
  86. *v.(*string) = "zero"
  87. return "primary", nil
  88. }, func(conn sqlx.SqlConn, v, pri any) error {
  89. assert.Equal(t, "primary", pri)
  90. *v.(*string) = "xin"
  91. return nil
  92. })
  93. assert.Nil(t, err)
  94. assert.Equal(t, "zero", str)
  95. val, err := r.Get("index")
  96. assert.Nil(t, err)
  97. assert.Equal(t, `"primary"`, val)
  98. val, err = r.Get("primary/1234")
  99. assert.Nil(t, err)
  100. assert.Equal(t, `"zero"`, val)
  101. }
  102. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  103. resetStats()
  104. r := redistest.CreateRedis(t)
  105. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  106. cache.WithNotFoundExpiry(time.Second))
  107. var str string
  108. r.Set("index", `"primary"`)
  109. err := c.QueryRowIndex(&str, "index", func(s any) string {
  110. return fmt.Sprintf("%s/1234", s)
  111. }, func(conn sqlx.SqlConn, v any) (any, error) {
  112. assert.Fail(t, "should not go here")
  113. return "primary", nil
  114. }, func(conn sqlx.SqlConn, v, primary any) error {
  115. *v.(*string) = "xin"
  116. assert.Equal(t, "primary", primary)
  117. return nil
  118. })
  119. assert.Nil(t, err)
  120. assert.Equal(t, "xin", str)
  121. val, err := r.Get("index")
  122. assert.Nil(t, err)
  123. assert.Equal(t, `"primary"`, val)
  124. val, err = r.Get("primary/1234")
  125. assert.Nil(t, err)
  126. assert.Equal(t, `"xin"`, val)
  127. }
  128. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  129. const (
  130. primaryInt8 int8 = 100
  131. primaryInt16 int16 = 10000
  132. primaryInt32 int32 = 10000000
  133. primaryInt64 int64 = 10000000
  134. primaryUint8 uint8 = 100
  135. primaryUint16 uint16 = 10000
  136. primaryUint32 uint32 = 10000000
  137. primaryUint64 uint64 = 10000000
  138. )
  139. tests := []struct {
  140. name string
  141. primary any
  142. primaryCache string
  143. }{
  144. {
  145. name: "int8 primary",
  146. primary: primaryInt8,
  147. primaryCache: fmt.Sprint(primaryInt8),
  148. },
  149. {
  150. name: "int16 primary",
  151. primary: primaryInt16,
  152. primaryCache: fmt.Sprint(primaryInt16),
  153. },
  154. {
  155. name: "int32 primary",
  156. primary: primaryInt32,
  157. primaryCache: fmt.Sprint(primaryInt32),
  158. },
  159. {
  160. name: "int64 primary",
  161. primary: primaryInt64,
  162. primaryCache: fmt.Sprint(primaryInt64),
  163. },
  164. {
  165. name: "uint8 primary",
  166. primary: primaryUint8,
  167. primaryCache: fmt.Sprint(primaryUint8),
  168. },
  169. {
  170. name: "uint16 primary",
  171. primary: primaryUint16,
  172. primaryCache: fmt.Sprint(primaryUint16),
  173. },
  174. {
  175. name: "uint32 primary",
  176. primary: primaryUint32,
  177. primaryCache: fmt.Sprint(primaryUint32),
  178. },
  179. {
  180. name: "uint64 primary",
  181. primary: primaryUint64,
  182. primaryCache: fmt.Sprint(primaryUint64),
  183. },
  184. }
  185. for _, test := range tests {
  186. t.Run(test.name, func(t *testing.T) {
  187. resetStats()
  188. r := redistest.CreateRedis(t)
  189. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  190. cache.WithNotFoundExpiry(time.Second))
  191. var str string
  192. r.Set("index", test.primaryCache)
  193. err := c.QueryRowIndex(&str, "index", func(s any) string {
  194. return fmt.Sprintf("%v/1234", s)
  195. }, func(conn sqlx.SqlConn, v any) (any, error) {
  196. assert.Fail(t, "should not go here")
  197. return test.primary, nil
  198. }, func(conn sqlx.SqlConn, v, primary any) error {
  199. *v.(*string) = "xin"
  200. assert.Equal(t, primary, primary)
  201. return nil
  202. })
  203. assert.Nil(t, err)
  204. assert.Equal(t, "xin", str)
  205. val, err := r.Get("index")
  206. assert.Nil(t, err)
  207. assert.Equal(t, test.primaryCache, val)
  208. val, err = r.Get(test.primaryCache + "/1234")
  209. assert.Nil(t, err)
  210. assert.Equal(t, `"xin"`, val)
  211. })
  212. }
  213. }
  214. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  215. caches := map[string]string{
  216. "index": "primary",
  217. "primary/1234": "xin",
  218. }
  219. for k, v := range caches {
  220. t.Run(k+"/"+v, func(t *testing.T) {
  221. resetStats()
  222. r := redistest.CreateRedis(t)
  223. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  224. cache.WithNotFoundExpiry(time.Second))
  225. var str string
  226. r.Set(k, v)
  227. err := c.QueryRowIndex(&str, "index", func(s any) string {
  228. return fmt.Sprintf("%s/1234", s)
  229. }, func(conn sqlx.SqlConn, v any) (any, error) {
  230. *v.(*string) = "xin"
  231. return "primary", nil
  232. }, func(conn sqlx.SqlConn, v, primary any) error {
  233. *v.(*string) = "xin"
  234. assert.Equal(t, "primary", primary)
  235. return nil
  236. })
  237. assert.Nil(t, err)
  238. assert.Equal(t, "xin", str)
  239. val, err := r.Get("index")
  240. assert.Nil(t, err)
  241. assert.Equal(t, `"primary"`, val)
  242. val, err = r.Get("primary/1234")
  243. assert.Nil(t, err)
  244. assert.Equal(t, `"xin"`, val)
  245. })
  246. }
  247. }
  248. func TestStatCacheFails(t *testing.T) {
  249. resetStats()
  250. log.SetOutput(io.Discard)
  251. defer log.SetOutput(os.Stdout)
  252. r := redis.New("localhost:59999")
  253. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  254. for i := 0; i < 20; i++ {
  255. var str string
  256. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  257. return errors.New("db failed")
  258. })
  259. assert.NotNil(t, err)
  260. }
  261. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  262. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  263. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  264. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  265. }
  266. func TestStatDbFails(t *testing.T) {
  267. resetStats()
  268. r := redistest.CreateRedis(t)
  269. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  270. for i := 0; i < 20; i++ {
  271. var str string
  272. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  273. return errors.New("db failed")
  274. })
  275. assert.NotNil(t, err)
  276. }
  277. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  278. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  279. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  280. }
  281. func TestStatFromMemory(t *testing.T) {
  282. resetStats()
  283. r := redistest.CreateRedis(t)
  284. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  285. var all sync.WaitGroup
  286. var wait sync.WaitGroup
  287. all.Add(10)
  288. wait.Add(4)
  289. go func() {
  290. var str string
  291. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  292. *v.(*string) = "zero"
  293. return nil
  294. })
  295. if err != nil {
  296. t.Error(err)
  297. }
  298. wait.Wait()
  299. runtime.Gosched()
  300. all.Done()
  301. }()
  302. for i := 0; i < 4; i++ {
  303. go func() {
  304. var str string
  305. wait.Done()
  306. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  307. *v.(*string) = "zero"
  308. return nil
  309. })
  310. if err != nil {
  311. t.Error(err)
  312. }
  313. all.Done()
  314. }()
  315. }
  316. for i := 0; i < 5; i++ {
  317. go func() {
  318. var str string
  319. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v any) error {
  320. *v.(*string) = "zero"
  321. return nil
  322. })
  323. if err != nil {
  324. t.Error(err)
  325. }
  326. all.Done()
  327. }()
  328. }
  329. all.Wait()
  330. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  331. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  332. }
  333. func TestCachedConnQueryRow(t *testing.T) {
  334. r := redistest.CreateRedis(t)
  335. const (
  336. key = "user"
  337. value = "any"
  338. )
  339. var conn trackedConn
  340. var user string
  341. var ran bool
  342. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  343. err := c.QueryRow(&user, key, func(conn sqlx.SqlConn, v any) error {
  344. ran = true
  345. user = value
  346. return nil
  347. })
  348. assert.Nil(t, err)
  349. actualValue, err := r.Get(key)
  350. assert.Nil(t, err)
  351. var actual string
  352. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  353. assert.Equal(t, value, actual)
  354. assert.Equal(t, value, user)
  355. assert.True(t, ran)
  356. }
  357. func TestCachedConnQueryRowFromCache(t *testing.T) {
  358. r := redistest.CreateRedis(t)
  359. const (
  360. key = "user"
  361. value = "any"
  362. )
  363. var conn trackedConn
  364. var user string
  365. var ran bool
  366. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  367. assert.Nil(t, c.SetCache(key, value))
  368. err := c.QueryRow(&user, key, func(conn sqlx.SqlConn, v any) error {
  369. ran = true
  370. user = value
  371. return nil
  372. })
  373. assert.Nil(t, err)
  374. actualValue, err := r.Get(key)
  375. assert.Nil(t, err)
  376. var actual string
  377. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  378. assert.Equal(t, value, actual)
  379. assert.Equal(t, value, user)
  380. assert.False(t, ran)
  381. }
  382. func TestQueryRowNotFound(t *testing.T) {
  383. r := redistest.CreateRedis(t)
  384. const key = "user"
  385. var conn trackedConn
  386. var user string
  387. var ran int
  388. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  389. for i := 0; i < 20; i++ {
  390. err := c.QueryRow(&user, key, func(conn sqlx.SqlConn, v any) error {
  391. ran++
  392. return sql.ErrNoRows
  393. })
  394. assert.Exactly(t, sqlx.ErrNotFound, err)
  395. }
  396. assert.Equal(t, 1, ran)
  397. }
  398. func TestCachedConnExec(t *testing.T) {
  399. r := redistest.CreateRedis(t)
  400. var conn trackedConn
  401. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  402. _, err := c.ExecNoCache("delete from user_table where id='kevin'")
  403. assert.Nil(t, err)
  404. assert.True(t, conn.execValue)
  405. }
  406. func TestCachedConnExecDropCache(t *testing.T) {
  407. r, err := miniredis.Run()
  408. assert.Nil(t, err)
  409. defer fx.DoWithTimeout(func() error {
  410. r.Close()
  411. return nil
  412. }, time.Second)
  413. const (
  414. key = "user"
  415. value = "any"
  416. )
  417. var conn trackedConn
  418. c := NewNodeConn(&conn, redis.New(r.Addr()), cache.WithExpiry(time.Second*30))
  419. assert.Nil(t, c.SetCache(key, value))
  420. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  421. return conn.Exec("delete from user_table where id='kevin'")
  422. }, key)
  423. assert.Nil(t, err)
  424. assert.True(t, conn.execValue)
  425. _, err = r.Get(key)
  426. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  427. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  428. return nil, errors.New("foo")
  429. }, key)
  430. assert.NotNil(t, err)
  431. }
  432. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  433. const key = "user"
  434. var conn trackedConn
  435. r := redis.New("anyredis:8888")
  436. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  437. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  438. return conn.Exec("delete from user_table where id='kevin'")
  439. }, key)
  440. // async background clean, retry logic
  441. assert.Nil(t, err)
  442. }
  443. func TestCachedConnQueryRows(t *testing.T) {
  444. r := redistest.CreateRedis(t)
  445. var conn trackedConn
  446. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  447. var users []string
  448. err := c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  449. assert.Nil(t, err)
  450. assert.True(t, conn.queryRowsValue)
  451. }
  452. func TestCachedConnTransact(t *testing.T) {
  453. r := redistest.CreateRedis(t)
  454. var conn trackedConn
  455. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  456. err := c.Transact(func(session sqlx.Session) error {
  457. return nil
  458. })
  459. assert.Nil(t, err)
  460. assert.True(t, conn.transactValue)
  461. }
  462. func TestQueryRowNoCache(t *testing.T) {
  463. r := redistest.CreateRedis(t)
  464. const (
  465. key = "user"
  466. value = "any"
  467. )
  468. var user string
  469. var ran bool
  470. conn := dummySqlConn{queryRow: func(v any, q string, args ...any) error {
  471. user = value
  472. ran = true
  473. return nil
  474. }}
  475. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  476. err := c.QueryRowNoCache(&user, key)
  477. assert.Nil(t, err)
  478. assert.Equal(t, value, user)
  479. assert.True(t, ran)
  480. }
  481. func TestNewConnWithCache(t *testing.T) {
  482. r := redistest.CreateRedis(t)
  483. var conn trackedConn
  484. c := NewConnWithCache(&conn, cache.NewNode(r, singleFlights, stats, sql.ErrNoRows))
  485. _, err := c.ExecNoCache("delete from user_table where id='kevin'")
  486. assert.Nil(t, err)
  487. assert.True(t, conn.execValue)
  488. }
  489. func resetStats() {
  490. atomic.StoreUint64(&stats.Total, 0)
  491. atomic.StoreUint64(&stats.Hit, 0)
  492. atomic.StoreUint64(&stats.Miss, 0)
  493. atomic.StoreUint64(&stats.DbFails, 0)
  494. }
  495. type dummySqlConn struct {
  496. queryRow func(any, string, ...any) error
  497. }
  498. func (d dummySqlConn) ExecCtx(ctx context.Context, query string, args ...any) (sql.Result, error) {
  499. return nil, nil
  500. }
  501. func (d dummySqlConn) PrepareCtx(ctx context.Context, query string) (sqlx.StmtSession, error) {
  502. return nil, nil
  503. }
  504. func (d dummySqlConn) QueryRowPartialCtx(ctx context.Context, v any, query string, args ...any) error {
  505. return nil
  506. }
  507. func (d dummySqlConn) QueryRowsCtx(ctx context.Context, v any, query string, args ...any) error {
  508. return nil
  509. }
  510. func (d dummySqlConn) QueryRowsPartialCtx(ctx context.Context, v any, query string, args ...any) error {
  511. return nil
  512. }
  513. func (d dummySqlConn) TransactCtx(ctx context.Context, fn func(context.Context, sqlx.Session) error) error {
  514. return nil
  515. }
  516. func (d dummySqlConn) Exec(query string, args ...any) (sql.Result, error) {
  517. return nil, nil
  518. }
  519. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  520. return nil, nil
  521. }
  522. func (d dummySqlConn) QueryRow(v any, query string, args ...any) error {
  523. return d.QueryRowCtx(context.Background(), v, query, args...)
  524. }
  525. func (d dummySqlConn) QueryRowCtx(_ context.Context, v any, query string, args ...any) error {
  526. if d.queryRow != nil {
  527. return d.queryRow(v, query, args...)
  528. }
  529. return nil
  530. }
  531. func (d dummySqlConn) QueryRowPartial(v any, query string, args ...any) error {
  532. return nil
  533. }
  534. func (d dummySqlConn) QueryRows(v any, query string, args ...any) error {
  535. return nil
  536. }
  537. func (d dummySqlConn) QueryRowsPartial(v any, query string, args ...any) error {
  538. return nil
  539. }
  540. func (d dummySqlConn) RawDB() (*sql.DB, error) {
  541. return nil, nil
  542. }
  543. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  544. return nil
  545. }
  546. type trackedConn struct {
  547. dummySqlConn
  548. execValue bool
  549. queryRowsValue bool
  550. transactValue bool
  551. }
  552. func (c *trackedConn) Exec(query string, args ...any) (sql.Result, error) {
  553. return c.ExecCtx(context.Background(), query, args...)
  554. }
  555. func (c *trackedConn) ExecCtx(ctx context.Context, query string, args ...any) (sql.Result, error) {
  556. c.execValue = true
  557. return c.dummySqlConn.ExecCtx(ctx, query, args...)
  558. }
  559. func (c *trackedConn) QueryRows(v any, query string, args ...any) error {
  560. return c.QueryRowsCtx(context.Background(), v, query, args...)
  561. }
  562. func (c *trackedConn) QueryRowsCtx(ctx context.Context, v any, query string, args ...any) error {
  563. c.queryRowsValue = true
  564. return c.dummySqlConn.QueryRowsCtx(ctx, v, query, args...)
  565. }
  566. func (c *trackedConn) RawDB() (*sql.DB, error) {
  567. return nil, nil
  568. }
  569. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  570. return c.TransactCtx(context.Background(), func(_ context.Context, session sqlx.Session) error {
  571. return fn(session)
  572. })
  573. }
  574. func (c *trackedConn) TransactCtx(ctx context.Context, fn func(context.Context, sqlx.Session) error) error {
  575. c.transactValue = true
  576. return c.dummySqlConn.TransactCtx(ctx, fn)
  577. }