cachedsql_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/logx"
  18. "github.com/tal-tech/go-zero/core/stat"
  19. "github.com/tal-tech/go-zero/core/stores/cache"
  20. "github.com/tal-tech/go-zero/core/stores/redis"
  21. "github.com/tal-tech/go-zero/core/stores/sqlx"
  22. )
  23. func init() {
  24. logx.Disable()
  25. stat.SetReporter(nil)
  26. }
  27. func TestCachedConn_GetCache(t *testing.T) {
  28. resetStats()
  29. s, err := miniredis.Run()
  30. if err != nil {
  31. t.Error(err)
  32. }
  33. r := redis.NewRedis(s.Addr(), redis.NodeType)
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. s.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. s, err := miniredis.Run()
  46. if err != nil {
  47. t.Error(err)
  48. }
  49. r := redis.NewRedis(s.Addr(), redis.NodeType)
  50. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  51. for i := 0; i < 10; i++ {
  52. var str string
  53. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  54. *v.(*string) = "zero"
  55. return nil
  56. })
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. }
  61. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  62. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  63. }
  64. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  65. resetStats()
  66. s, err := miniredis.Run()
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. r := redis.NewRedis(s.Addr(), redis.NodeType)
  71. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  72. var str string
  73. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  74. return fmt.Sprintf("%s/1234", s)
  75. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  76. *v.(*string) = "zero"
  77. return "primary", nil
  78. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  79. assert.Equal(t, "primary", pri)
  80. *v.(*string) = "xin"
  81. return nil
  82. })
  83. assert.Nil(t, err)
  84. assert.Equal(t, "zero", str)
  85. val, err := r.Get("index")
  86. assert.Nil(t, err)
  87. assert.Equal(t, `"primary"`, val)
  88. val, err = r.Get("primary/1234")
  89. assert.Nil(t, err)
  90. assert.Equal(t, `"zero"`, val)
  91. }
  92. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  93. resetStats()
  94. s, err := miniredis.Run()
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. r := redis.NewRedis(s.Addr(), redis.NodeType)
  99. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  100. cache.WithNotFoundExpiry(time.Second))
  101. var str string
  102. r.Set("index", `"primary"`)
  103. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  104. return fmt.Sprintf("%s/1234", s)
  105. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  106. assert.Fail(t, "should not go here")
  107. return "primary", nil
  108. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  109. *v.(*string) = "xin"
  110. assert.Equal(t, "primary", primary)
  111. return nil
  112. })
  113. assert.Nil(t, err)
  114. assert.Equal(t, "xin", str)
  115. val, err := r.Get("index")
  116. assert.Nil(t, err)
  117. assert.Equal(t, `"primary"`, val)
  118. val, err = r.Get("primary/1234")
  119. assert.Nil(t, err)
  120. assert.Equal(t, `"xin"`, val)
  121. }
  122. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  123. caches := map[string]string{
  124. "index": "primary",
  125. "primary/1234": "xin",
  126. }
  127. for k, v := range caches {
  128. t.Run(k+"/"+v, func(t *testing.T) {
  129. resetStats()
  130. s, err := miniredis.Run()
  131. if err != nil {
  132. t.Error(err)
  133. }
  134. r := redis.NewRedis(s.Addr(), redis.NodeType)
  135. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  136. cache.WithNotFoundExpiry(time.Second))
  137. var str string
  138. r.Set(k, v)
  139. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  140. return fmt.Sprintf("%s/1234", s)
  141. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  142. *v.(*string) = "xin"
  143. return "primary", nil
  144. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  145. *v.(*string) = "xin"
  146. assert.Equal(t, "primary", primary)
  147. return nil
  148. })
  149. assert.Nil(t, err)
  150. assert.Equal(t, "xin", str)
  151. val, err := r.Get("index")
  152. assert.Nil(t, err)
  153. assert.Equal(t, `"primary"`, val)
  154. val, err = r.Get("primary/1234")
  155. assert.Nil(t, err)
  156. assert.Equal(t, `"xin"`, val)
  157. })
  158. }
  159. }
  160. func TestStatCacheFails(t *testing.T) {
  161. resetStats()
  162. log.SetOutput(ioutil.Discard)
  163. defer log.SetOutput(os.Stdout)
  164. r := redis.NewRedis("localhost:59999", redis.NodeType)
  165. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  166. for i := 0; i < 20; i++ {
  167. var str string
  168. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  169. return errors.New("db failed")
  170. })
  171. assert.NotNil(t, err)
  172. }
  173. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  174. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  175. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  176. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  177. }
  178. func TestStatDbFails(t *testing.T) {
  179. resetStats()
  180. s, err := miniredis.Run()
  181. if err != nil {
  182. t.Error(err)
  183. }
  184. r := redis.NewRedis(s.Addr(), redis.NodeType)
  185. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  186. for i := 0; i < 20; i++ {
  187. var str string
  188. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  189. return errors.New("db failed")
  190. })
  191. assert.NotNil(t, err)
  192. }
  193. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  194. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  195. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  196. }
  197. func TestStatFromMemory(t *testing.T) {
  198. resetStats()
  199. s, err := miniredis.Run()
  200. if err != nil {
  201. t.Error(err)
  202. }
  203. r := redis.NewRedis(s.Addr(), redis.NodeType)
  204. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  205. var all sync.WaitGroup
  206. var wait sync.WaitGroup
  207. all.Add(10)
  208. wait.Add(4)
  209. go func() {
  210. var str string
  211. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  212. *v.(*string) = "zero"
  213. return nil
  214. })
  215. if err != nil {
  216. t.Error(err)
  217. }
  218. wait.Wait()
  219. runtime.Gosched()
  220. all.Done()
  221. }()
  222. for i := 0; i < 4; i++ {
  223. go func() {
  224. var str string
  225. wait.Done()
  226. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  227. *v.(*string) = "zero"
  228. return nil
  229. })
  230. if err != nil {
  231. t.Error(err)
  232. }
  233. all.Done()
  234. }()
  235. }
  236. for i := 0; i < 5; i++ {
  237. go func() {
  238. var str string
  239. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  240. *v.(*string) = "zero"
  241. return nil
  242. })
  243. if err != nil {
  244. t.Error(err)
  245. }
  246. all.Done()
  247. }()
  248. }
  249. all.Wait()
  250. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  251. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  252. }
  253. func TestCachedConnQueryRow(t *testing.T) {
  254. s, err := miniredis.Run()
  255. if err != nil {
  256. t.Error(err)
  257. }
  258. const (
  259. key = "user"
  260. value = "any"
  261. )
  262. var conn trackedConn
  263. var user string
  264. var ran bool
  265. r := redis.NewRedis(s.Addr(), redis.NodeType)
  266. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  267. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  268. ran = true
  269. user = value
  270. return nil
  271. })
  272. assert.Nil(t, err)
  273. actualValue, err := s.Get(key)
  274. assert.Nil(t, err)
  275. var actual string
  276. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  277. assert.Equal(t, value, actual)
  278. assert.Equal(t, value, user)
  279. assert.True(t, ran)
  280. }
  281. func TestCachedConnQueryRowFromCache(t *testing.T) {
  282. s, err := miniredis.Run()
  283. if err != nil {
  284. t.Error(err)
  285. }
  286. const (
  287. key = "user"
  288. value = "any"
  289. )
  290. var conn trackedConn
  291. var user string
  292. var ran bool
  293. r := redis.NewRedis(s.Addr(), redis.NodeType)
  294. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  295. assert.Nil(t, c.SetCache(key, value))
  296. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  297. ran = true
  298. user = value
  299. return nil
  300. })
  301. assert.Nil(t, err)
  302. actualValue, err := s.Get(key)
  303. assert.Nil(t, err)
  304. var actual string
  305. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  306. assert.Equal(t, value, actual)
  307. assert.Equal(t, value, user)
  308. assert.False(t, ran)
  309. }
  310. func TestQueryRowNotFound(t *testing.T) {
  311. s, err := miniredis.Run()
  312. if err != nil {
  313. t.Error(err)
  314. }
  315. const key = "user"
  316. var conn trackedConn
  317. var user string
  318. var ran int
  319. r := redis.NewRedis(s.Addr(), redis.NodeType)
  320. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  321. for i := 0; i < 20; i++ {
  322. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  323. ran++
  324. return sql.ErrNoRows
  325. })
  326. assert.Exactly(t, sqlx.ErrNotFound, err)
  327. }
  328. assert.Equal(t, 1, ran)
  329. }
  330. func TestCachedConnExec(t *testing.T) {
  331. s, err := miniredis.Run()
  332. if err != nil {
  333. t.Error(err)
  334. }
  335. var conn trackedConn
  336. r := redis.NewRedis(s.Addr(), redis.NodeType)
  337. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  338. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  339. assert.Nil(t, err)
  340. assert.True(t, conn.execValue)
  341. }
  342. func TestCachedConnExecDropCache(t *testing.T) {
  343. s, err := miniredis.Run()
  344. if err != nil {
  345. t.Error(err)
  346. }
  347. const (
  348. key = "user"
  349. value = "any"
  350. )
  351. var conn trackedConn
  352. r := redis.NewRedis(s.Addr(), redis.NodeType)
  353. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  354. assert.Nil(t, c.SetCache(key, value))
  355. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  356. return conn.Exec("delete from user_table where id='kevin'")
  357. }, key)
  358. assert.Nil(t, err)
  359. assert.True(t, conn.execValue)
  360. _, err = s.Get(key)
  361. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  362. }
  363. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  364. const key = "user"
  365. var conn trackedConn
  366. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  367. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  368. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  369. return conn.Exec("delete from user_table where id='kevin'")
  370. }, key)
  371. // async background clean, retry logic
  372. assert.Nil(t, err)
  373. }
  374. func TestCachedConnQueryRows(t *testing.T) {
  375. s, err := miniredis.Run()
  376. if err != nil {
  377. t.Error(err)
  378. }
  379. var conn trackedConn
  380. r := redis.NewRedis(s.Addr(), redis.NodeType)
  381. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  382. var users []string
  383. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  384. assert.Nil(t, err)
  385. assert.True(t, conn.queryRowsValue)
  386. }
  387. func TestCachedConnTransact(t *testing.T) {
  388. s, err := miniredis.Run()
  389. if err != nil {
  390. t.Error(err)
  391. }
  392. var conn trackedConn
  393. r := redis.NewRedis(s.Addr(), redis.NodeType)
  394. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  395. err = c.Transact(func(session sqlx.Session) error {
  396. return nil
  397. })
  398. assert.Nil(t, err)
  399. assert.True(t, conn.transactValue)
  400. }
  401. func TestQueryRowNoCache(t *testing.T) {
  402. s, err := miniredis.Run()
  403. if err != nil {
  404. t.Error(err)
  405. }
  406. const (
  407. key = "user"
  408. value = "any"
  409. )
  410. var user string
  411. var ran bool
  412. r := redis.NewRedis(s.Addr(), redis.NodeType)
  413. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  414. user = value
  415. ran = true
  416. return nil
  417. }}
  418. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  419. err = c.QueryRowNoCache(&user, key)
  420. assert.Nil(t, err)
  421. assert.Equal(t, value, user)
  422. assert.True(t, ran)
  423. }
  424. func TestFloatKeyer(t *testing.T) {
  425. primaries := []interface{}{
  426. float32(1),
  427. float64(1),
  428. }
  429. for _, primary := range primaries {
  430. val := floatKeyer(func(i interface{}) string {
  431. return fmt.Sprint(i)
  432. })(primary)
  433. assert.Equal(t, "1", val)
  434. }
  435. }
  436. func resetStats() {
  437. atomic.StoreUint64(&stats.Total, 0)
  438. atomic.StoreUint64(&stats.Hit, 0)
  439. atomic.StoreUint64(&stats.Miss, 0)
  440. atomic.StoreUint64(&stats.DbFails, 0)
  441. }
  442. type dummySqlConn struct {
  443. queryRow func(interface{}, string, ...interface{}) error
  444. }
  445. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  446. return nil, nil
  447. }
  448. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  449. return nil, nil
  450. }
  451. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  452. if d.queryRow != nil {
  453. return d.queryRow(v, query, args...)
  454. }
  455. return nil
  456. }
  457. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  458. return nil
  459. }
  460. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  461. return nil
  462. }
  463. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  464. return nil
  465. }
  466. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  467. return nil
  468. }
  469. type trackedConn struct {
  470. dummySqlConn
  471. execValue bool
  472. queryRowsValue bool
  473. transactValue bool
  474. }
  475. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  476. c.execValue = true
  477. return c.dummySqlConn.Exec(query, args...)
  478. }
  479. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  480. c.queryRowsValue = true
  481. return c.dummySqlConn.QueryRows(v, query, args...)
  482. }
  483. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  484. c.transactValue = true
  485. return c.dummySqlConn.Transact(fn)
  486. }