1
0

registry_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package internal
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "github.com/golang/mock/gomock"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/tal-tech/go-zero/core/contextx"
  9. "github.com/tal-tech/go-zero/core/lang"
  10. "github.com/tal-tech/go-zero/core/logx"
  11. "github.com/tal-tech/go-zero/core/stringx"
  12. "go.etcd.io/etcd/api/v3/mvccpb"
  13. clientv3 "go.etcd.io/etcd/client/v3"
  14. )
  15. var mockLock sync.Mutex
  16. func init() {
  17. logx.Disable()
  18. }
  19. func setMockClient(cli EtcdClient) func() {
  20. mockLock.Lock()
  21. NewClient = func([]string) (EtcdClient, error) {
  22. return cli, nil
  23. }
  24. return func() {
  25. NewClient = DialClient
  26. mockLock.Unlock()
  27. }
  28. }
  29. func TestGetCluster(t *testing.T) {
  30. AddAccount([]string{"first"}, "foo", "bar")
  31. c1, _ := GetRegistry().getCluster([]string{"first"})
  32. c2, _ := GetRegistry().getCluster([]string{"second"})
  33. c3, _ := GetRegistry().getCluster([]string{"first"})
  34. assert.Equal(t, c1, c3)
  35. assert.NotEqual(t, c1, c2)
  36. }
  37. func TestGetClusterKey(t *testing.T) {
  38. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  39. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  40. }
  41. func TestCluster_HandleChanges(t *testing.T) {
  42. ctrl := gomock.NewController(t)
  43. l := NewMockUpdateListener(ctrl)
  44. l.EXPECT().OnAdd(KV{
  45. Key: "first",
  46. Val: "1",
  47. })
  48. l.EXPECT().OnAdd(KV{
  49. Key: "second",
  50. Val: "2",
  51. })
  52. l.EXPECT().OnDelete(KV{
  53. Key: "first",
  54. Val: "1",
  55. })
  56. l.EXPECT().OnDelete(KV{
  57. Key: "second",
  58. Val: "2",
  59. })
  60. l.EXPECT().OnAdd(KV{
  61. Key: "third",
  62. Val: "3",
  63. })
  64. l.EXPECT().OnAdd(KV{
  65. Key: "fourth",
  66. Val: "4",
  67. })
  68. c := newCluster([]string{"any"})
  69. c.listeners["any"] = []UpdateListener{l}
  70. c.handleChanges("any", []KV{
  71. {
  72. Key: "first",
  73. Val: "1",
  74. },
  75. {
  76. Key: "second",
  77. Val: "2",
  78. },
  79. })
  80. assert.EqualValues(t, map[string]string{
  81. "first": "1",
  82. "second": "2",
  83. }, c.values["any"])
  84. c.handleChanges("any", []KV{
  85. {
  86. Key: "third",
  87. Val: "3",
  88. },
  89. {
  90. Key: "fourth",
  91. Val: "4",
  92. },
  93. })
  94. assert.EqualValues(t, map[string]string{
  95. "third": "3",
  96. "fourth": "4",
  97. }, c.values["any"])
  98. }
  99. func TestCluster_Load(t *testing.T) {
  100. ctrl := gomock.NewController(t)
  101. defer ctrl.Finish()
  102. cli := NewMockEtcdClient(ctrl)
  103. restore := setMockClient(cli)
  104. defer restore()
  105. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  106. Kvs: []*mvccpb.KeyValue{
  107. {
  108. Key: []byte("hello"),
  109. Value: []byte("world"),
  110. },
  111. },
  112. }, nil)
  113. cli.EXPECT().Ctx().Return(context.Background())
  114. c := &cluster{
  115. values: make(map[string]map[string]string),
  116. }
  117. c.load(cli, "any")
  118. }
  119. func TestCluster_Watch(t *testing.T) {
  120. tests := []struct {
  121. name string
  122. method int
  123. eventType mvccpb.Event_EventType
  124. }{
  125. {
  126. name: "add",
  127. eventType: clientv3.EventTypePut,
  128. },
  129. {
  130. name: "delete",
  131. eventType: clientv3.EventTypeDelete,
  132. },
  133. }
  134. for _, test := range tests {
  135. t.Run(test.name, func(t *testing.T) {
  136. ctrl := gomock.NewController(t)
  137. defer ctrl.Finish()
  138. cli := NewMockEtcdClient(ctrl)
  139. restore := setMockClient(cli)
  140. defer restore()
  141. ch := make(chan clientv3.WatchResponse)
  142. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  143. cli.EXPECT().Ctx().Return(context.Background())
  144. var wg sync.WaitGroup
  145. wg.Add(1)
  146. c := &cluster{
  147. listeners: make(map[string][]UpdateListener),
  148. values: make(map[string]map[string]string),
  149. }
  150. listener := NewMockUpdateListener(ctrl)
  151. c.listeners["any"] = []UpdateListener{listener}
  152. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  153. assert.Equal(t, "hello", kv.Key)
  154. assert.Equal(t, "world", kv.Val)
  155. wg.Done()
  156. }).MaxTimes(1)
  157. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
  158. wg.Done()
  159. }).MaxTimes(1)
  160. go c.watch(cli, "any")
  161. ch <- clientv3.WatchResponse{
  162. Events: []*clientv3.Event{
  163. {
  164. Type: test.eventType,
  165. Kv: &mvccpb.KeyValue{
  166. Key: []byte("hello"),
  167. Value: []byte("world"),
  168. },
  169. },
  170. },
  171. }
  172. wg.Wait()
  173. })
  174. }
  175. }
  176. func TestClusterWatch_RespFailures(t *testing.T) {
  177. resps := []clientv3.WatchResponse{
  178. {
  179. Canceled: true,
  180. },
  181. {
  182. // cause resp.Err() != nil
  183. CompactRevision: 1,
  184. },
  185. }
  186. for _, resp := range resps {
  187. t.Run(stringx.Rand(), func(t *testing.T) {
  188. ctrl := gomock.NewController(t)
  189. defer ctrl.Finish()
  190. cli := NewMockEtcdClient(ctrl)
  191. restore := setMockClient(cli)
  192. defer restore()
  193. ch := make(chan clientv3.WatchResponse)
  194. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  195. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  196. c := new(cluster)
  197. c.done = make(chan lang.PlaceholderType)
  198. go func() {
  199. ch <- resp
  200. close(c.done)
  201. }()
  202. c.watch(cli, "any")
  203. })
  204. }
  205. }
  206. func TestClusterWatch_CloseChan(t *testing.T) {
  207. ctrl := gomock.NewController(t)
  208. defer ctrl.Finish()
  209. cli := NewMockEtcdClient(ctrl)
  210. restore := setMockClient(cli)
  211. defer restore()
  212. ch := make(chan clientv3.WatchResponse)
  213. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  214. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  215. c := new(cluster)
  216. c.done = make(chan lang.PlaceholderType)
  217. go func() {
  218. close(ch)
  219. close(c.done)
  220. }()
  221. c.watch(cli, "any")
  222. }
  223. func TestValueOnlyContext(t *testing.T) {
  224. ctx := contextx.ValueOnlyFrom(context.Background())
  225. ctx.Done()
  226. assert.Nil(t, ctx.Err())
  227. }