registry_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package internal
  2. import (
  3. "context"
  4. "os"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/golang/mock/gomock"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/wuntsong-org/go-zero-plus/core/contextx"
  11. "github.com/wuntsong-org/go-zero-plus/core/lang"
  12. "github.com/wuntsong-org/go-zero-plus/core/logx"
  13. "github.com/wuntsong-org/go-zero-plus/core/stringx"
  14. "go.etcd.io/etcd/api/v3/etcdserverpb"
  15. "go.etcd.io/etcd/api/v3/mvccpb"
  16. clientv3 "go.etcd.io/etcd/client/v3"
  17. "go.etcd.io/etcd/client/v3/mock/mockserver"
  18. )
  19. var mockLock sync.Mutex
  20. func init() {
  21. logx.Disable()
  22. }
  23. func setMockClient(cli EtcdClient) func() {
  24. mockLock.Lock()
  25. NewClient = func([]string) (EtcdClient, error) {
  26. return cli, nil
  27. }
  28. return func() {
  29. NewClient = DialClient
  30. mockLock.Unlock()
  31. }
  32. }
  33. func TestGetCluster(t *testing.T) {
  34. AddAccount([]string{"first"}, "foo", "bar")
  35. c1, _ := GetRegistry().getCluster([]string{"first"})
  36. c2, _ := GetRegistry().getCluster([]string{"second"})
  37. c3, _ := GetRegistry().getCluster([]string{"first"})
  38. assert.Equal(t, c1, c3)
  39. assert.NotEqual(t, c1, c2)
  40. }
  41. func TestGetClusterKey(t *testing.T) {
  42. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  43. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  44. }
  45. func TestCluster_HandleChanges(t *testing.T) {
  46. ctrl := gomock.NewController(t)
  47. l := NewMockUpdateListener(ctrl)
  48. l.EXPECT().OnAdd(KV{
  49. Key: "first",
  50. Val: "1",
  51. })
  52. l.EXPECT().OnAdd(KV{
  53. Key: "second",
  54. Val: "2",
  55. })
  56. l.EXPECT().OnDelete(KV{
  57. Key: "first",
  58. Val: "1",
  59. })
  60. l.EXPECT().OnDelete(KV{
  61. Key: "second",
  62. Val: "2",
  63. })
  64. l.EXPECT().OnAdd(KV{
  65. Key: "third",
  66. Val: "3",
  67. })
  68. l.EXPECT().OnAdd(KV{
  69. Key: "fourth",
  70. Val: "4",
  71. })
  72. c := newCluster([]string{"any"})
  73. c.listeners["any"] = []UpdateListener{l}
  74. c.handleChanges("any", []KV{
  75. {
  76. Key: "first",
  77. Val: "1",
  78. },
  79. {
  80. Key: "second",
  81. Val: "2",
  82. },
  83. })
  84. assert.EqualValues(t, map[string]string{
  85. "first": "1",
  86. "second": "2",
  87. }, c.values["any"])
  88. c.handleChanges("any", []KV{
  89. {
  90. Key: "third",
  91. Val: "3",
  92. },
  93. {
  94. Key: "fourth",
  95. Val: "4",
  96. },
  97. })
  98. assert.EqualValues(t, map[string]string{
  99. "third": "3",
  100. "fourth": "4",
  101. }, c.values["any"])
  102. }
  103. func TestCluster_Load(t *testing.T) {
  104. ctrl := gomock.NewController(t)
  105. defer ctrl.Finish()
  106. cli := NewMockEtcdClient(ctrl)
  107. restore := setMockClient(cli)
  108. defer restore()
  109. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  110. Header: &etcdserverpb.ResponseHeader{},
  111. Kvs: []*mvccpb.KeyValue{
  112. {
  113. Key: []byte("hello"),
  114. Value: []byte("world"),
  115. },
  116. },
  117. }, nil)
  118. cli.EXPECT().Ctx().Return(context.Background())
  119. c := &cluster{
  120. values: make(map[string]map[string]string),
  121. }
  122. c.load(cli, "any")
  123. }
  124. func TestCluster_Watch(t *testing.T) {
  125. tests := []struct {
  126. name string
  127. method int
  128. eventType mvccpb.Event_EventType
  129. }{
  130. {
  131. name: "add",
  132. eventType: clientv3.EventTypePut,
  133. },
  134. {
  135. name: "delete",
  136. eventType: clientv3.EventTypeDelete,
  137. },
  138. }
  139. for _, test := range tests {
  140. t.Run(test.name, func(t *testing.T) {
  141. ctrl := gomock.NewController(t)
  142. defer ctrl.Finish()
  143. cli := NewMockEtcdClient(ctrl)
  144. restore := setMockClient(cli)
  145. defer restore()
  146. ch := make(chan clientv3.WatchResponse)
  147. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  148. cli.EXPECT().Ctx().Return(context.Background())
  149. var wg sync.WaitGroup
  150. wg.Add(1)
  151. c := &cluster{
  152. listeners: make(map[string][]UpdateListener),
  153. values: make(map[string]map[string]string),
  154. }
  155. listener := NewMockUpdateListener(ctrl)
  156. c.listeners["any"] = []UpdateListener{listener}
  157. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  158. assert.Equal(t, "hello", kv.Key)
  159. assert.Equal(t, "world", kv.Val)
  160. wg.Done()
  161. }).MaxTimes(1)
  162. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ any) {
  163. wg.Done()
  164. }).MaxTimes(1)
  165. go c.watch(cli, "any", 0)
  166. ch <- clientv3.WatchResponse{
  167. Events: []*clientv3.Event{
  168. {
  169. Type: test.eventType,
  170. Kv: &mvccpb.KeyValue{
  171. Key: []byte("hello"),
  172. Value: []byte("world"),
  173. },
  174. },
  175. },
  176. }
  177. wg.Wait()
  178. })
  179. }
  180. }
  181. func TestClusterWatch_RespFailures(t *testing.T) {
  182. resps := []clientv3.WatchResponse{
  183. {
  184. Canceled: true,
  185. },
  186. {
  187. // cause resp.Err() != nil
  188. CompactRevision: 1,
  189. },
  190. }
  191. for _, resp := range resps {
  192. t.Run(stringx.Rand(), func(t *testing.T) {
  193. ctrl := gomock.NewController(t)
  194. defer ctrl.Finish()
  195. cli := NewMockEtcdClient(ctrl)
  196. restore := setMockClient(cli)
  197. defer restore()
  198. ch := make(chan clientv3.WatchResponse)
  199. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  200. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  201. c := new(cluster)
  202. c.done = make(chan lang.PlaceholderType)
  203. go func() {
  204. ch <- resp
  205. close(c.done)
  206. }()
  207. c.watch(cli, "any", 0)
  208. })
  209. }
  210. }
  211. func TestClusterWatch_CloseChan(t *testing.T) {
  212. ctrl := gomock.NewController(t)
  213. defer ctrl.Finish()
  214. cli := NewMockEtcdClient(ctrl)
  215. restore := setMockClient(cli)
  216. defer restore()
  217. ch := make(chan clientv3.WatchResponse)
  218. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  219. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  220. c := new(cluster)
  221. c.done = make(chan lang.PlaceholderType)
  222. go func() {
  223. close(ch)
  224. close(c.done)
  225. }()
  226. c.watch(cli, "any", 0)
  227. }
  228. func TestValueOnlyContext(t *testing.T) {
  229. ctx := contextx.ValueOnlyFrom(context.Background())
  230. ctx.Done()
  231. assert.Nil(t, ctx.Err())
  232. }
  233. func TestDialClient(t *testing.T) {
  234. svr, err := mockserver.StartMockServers(1)
  235. assert.NoError(t, err)
  236. svr.StartAt(0)
  237. certFile := createTempFile(t, []byte(certContent))
  238. defer os.Remove(certFile)
  239. keyFile := createTempFile(t, []byte(keyContent))
  240. defer os.Remove(keyFile)
  241. caFile := createTempFile(t, []byte(caContent))
  242. defer os.Remove(caFile)
  243. endpoints := []string{svr.Servers[0].Address}
  244. AddAccount(endpoints, "foo", "bar")
  245. assert.NoError(t, AddTLS(endpoints, certFile, keyFile, caFile, false))
  246. old := DialTimeout
  247. DialTimeout = time.Millisecond
  248. defer func() {
  249. DialTimeout = old
  250. }()
  251. _, err = DialClient(endpoints)
  252. assert.Error(t, err)
  253. }
  254. func TestRegistry_Monitor(t *testing.T) {
  255. svr, err := mockserver.StartMockServers(1)
  256. assert.NoError(t, err)
  257. svr.StartAt(0)
  258. endpoints := []string{svr.Servers[0].Address}
  259. GetRegistry().lock.Lock()
  260. GetRegistry().clusters = map[string]*cluster{
  261. getClusterKey(endpoints): {
  262. listeners: map[string][]UpdateListener{},
  263. values: map[string]map[string]string{
  264. "foo": {
  265. "bar": "baz",
  266. },
  267. },
  268. },
  269. }
  270. GetRegistry().lock.Unlock()
  271. assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener)))
  272. }
  273. type mockListener struct {
  274. }
  275. func (m *mockListener) OnAdd(_ KV) {
  276. }
  277. func (m *mockListener) OnDelete(_ KV) {
  278. }