redisblockingnode.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package redis
  2. import (
  3. "fmt"
  4. "strings"
  5. red "github.com/go-redis/redis/v8"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. )
  8. // ClosableNode interface represents a closable redis node.
  9. type ClosableNode interface {
  10. RedisNode
  11. Close()
  12. }
  13. // CreateBlockingNode returns a ClosableNode.
  14. func CreateBlockingNode(r *Redis) (ClosableNode, error) {
  15. timeout := readWriteTimeout + blockingQueryTimeout
  16. switch r.Type {
  17. case NodeType:
  18. client := red.NewClient(&red.Options{
  19. Addr: r.Addr,
  20. Password: r.Pass,
  21. DB: defaultDatabase,
  22. MaxRetries: maxRetries,
  23. PoolSize: 1,
  24. MinIdleConns: 1,
  25. ReadTimeout: timeout,
  26. })
  27. return &clientBridge{client}, nil
  28. case ClusterType:
  29. client := red.NewClusterClient(&red.ClusterOptions{
  30. Addrs: strings.Split(r.Addr, ","),
  31. Password: r.Pass,
  32. MaxRetries: maxRetries,
  33. PoolSize: 1,
  34. MinIdleConns: 1,
  35. ReadTimeout: timeout,
  36. })
  37. return &clusterBridge{client}, nil
  38. default:
  39. return nil, fmt.Errorf("unknown redis type: %s", r.Type)
  40. }
  41. }
  42. type (
  43. clientBridge struct {
  44. *red.Client
  45. }
  46. clusterBridge struct {
  47. *red.ClusterClient
  48. }
  49. )
  50. func (bridge *clientBridge) Close() {
  51. if err := bridge.Client.Close(); err != nil {
  52. logx.Errorf("Error occurred on close redis client: %s", err)
  53. }
  54. }
  55. func (bridge *clusterBridge) Close() {
  56. if err := bridge.ClusterClient.Close(); err != nil {
  57. logx.Errorf("Error occurred on close redis cluster: %s", err)
  58. }
  59. }