registry_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package internal
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "github.com/golang/mock/gomock"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/zeromicro/go-zero/core/contextx"
  9. "github.com/zeromicro/go-zero/core/lang"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "github.com/zeromicro/go-zero/core/stringx"
  12. "go.etcd.io/etcd/api/v3/etcdserverpb"
  13. "go.etcd.io/etcd/api/v3/mvccpb"
  14. clientv3 "go.etcd.io/etcd/client/v3"
  15. )
  16. var mockLock sync.Mutex
  17. func init() {
  18. logx.Disable()
  19. }
  20. func setMockClient(cli EtcdClient) func() {
  21. mockLock.Lock()
  22. NewClient = func([]string) (EtcdClient, error) {
  23. return cli, nil
  24. }
  25. return func() {
  26. NewClient = DialClient
  27. mockLock.Unlock()
  28. }
  29. }
  30. func TestGetCluster(t *testing.T) {
  31. AddAccount([]string{"first"}, "foo", "bar")
  32. c1, _ := GetRegistry().getCluster([]string{"first"})
  33. c2, _ := GetRegistry().getCluster([]string{"second"})
  34. c3, _ := GetRegistry().getCluster([]string{"first"})
  35. assert.Equal(t, c1, c3)
  36. assert.NotEqual(t, c1, c2)
  37. }
  38. func TestGetClusterKey(t *testing.T) {
  39. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  40. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  41. }
  42. func TestCluster_HandleChanges(t *testing.T) {
  43. ctrl := gomock.NewController(t)
  44. l := NewMockUpdateListener(ctrl)
  45. l.EXPECT().OnAdd(KV{
  46. Key: "first",
  47. Val: "1",
  48. })
  49. l.EXPECT().OnAdd(KV{
  50. Key: "second",
  51. Val: "2",
  52. })
  53. l.EXPECT().OnDelete(KV{
  54. Key: "first",
  55. Val: "1",
  56. })
  57. l.EXPECT().OnDelete(KV{
  58. Key: "second",
  59. Val: "2",
  60. })
  61. l.EXPECT().OnAdd(KV{
  62. Key: "third",
  63. Val: "3",
  64. })
  65. l.EXPECT().OnAdd(KV{
  66. Key: "fourth",
  67. Val: "4",
  68. })
  69. c := newCluster([]string{"any"})
  70. c.listeners["any"] = []UpdateListener{l}
  71. c.handleChanges("any", []KV{
  72. {
  73. Key: "first",
  74. Val: "1",
  75. },
  76. {
  77. Key: "second",
  78. Val: "2",
  79. },
  80. })
  81. assert.EqualValues(t, map[string]string{
  82. "first": "1",
  83. "second": "2",
  84. }, c.values["any"])
  85. c.handleChanges("any", []KV{
  86. {
  87. Key: "third",
  88. Val: "3",
  89. },
  90. {
  91. Key: "fourth",
  92. Val: "4",
  93. },
  94. })
  95. assert.EqualValues(t, map[string]string{
  96. "third": "3",
  97. "fourth": "4",
  98. }, c.values["any"])
  99. }
  100. func TestCluster_Load(t *testing.T) {
  101. ctrl := gomock.NewController(t)
  102. defer ctrl.Finish()
  103. cli := NewMockEtcdClient(ctrl)
  104. restore := setMockClient(cli)
  105. defer restore()
  106. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  107. Header: &etcdserverpb.ResponseHeader{},
  108. Kvs: []*mvccpb.KeyValue{
  109. {
  110. Key: []byte("hello"),
  111. Value: []byte("world"),
  112. },
  113. },
  114. }, nil)
  115. cli.EXPECT().Ctx().Return(context.Background())
  116. c := &cluster{
  117. values: make(map[string]map[string]string),
  118. }
  119. c.load(cli, "any")
  120. }
  121. func TestCluster_Watch(t *testing.T) {
  122. tests := []struct {
  123. name string
  124. method int
  125. eventType mvccpb.Event_EventType
  126. }{
  127. {
  128. name: "add",
  129. eventType: clientv3.EventTypePut,
  130. },
  131. {
  132. name: "delete",
  133. eventType: clientv3.EventTypeDelete,
  134. },
  135. }
  136. for _, test := range tests {
  137. t.Run(test.name, func(t *testing.T) {
  138. ctrl := gomock.NewController(t)
  139. defer ctrl.Finish()
  140. cli := NewMockEtcdClient(ctrl)
  141. restore := setMockClient(cli)
  142. defer restore()
  143. ch := make(chan clientv3.WatchResponse)
  144. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  145. cli.EXPECT().Ctx().Return(context.Background())
  146. var wg sync.WaitGroup
  147. wg.Add(1)
  148. c := &cluster{
  149. listeners: make(map[string][]UpdateListener),
  150. values: make(map[string]map[string]string),
  151. }
  152. listener := NewMockUpdateListener(ctrl)
  153. c.listeners["any"] = []UpdateListener{listener}
  154. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  155. assert.Equal(t, "hello", kv.Key)
  156. assert.Equal(t, "world", kv.Val)
  157. wg.Done()
  158. }).MaxTimes(1)
  159. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ any) {
  160. wg.Done()
  161. }).MaxTimes(1)
  162. go c.watch(cli, "any", 0)
  163. ch <- clientv3.WatchResponse{
  164. Events: []*clientv3.Event{
  165. {
  166. Type: test.eventType,
  167. Kv: &mvccpb.KeyValue{
  168. Key: []byte("hello"),
  169. Value: []byte("world"),
  170. },
  171. },
  172. },
  173. }
  174. wg.Wait()
  175. })
  176. }
  177. }
  178. func TestClusterWatch_RespFailures(t *testing.T) {
  179. resps := []clientv3.WatchResponse{
  180. {
  181. Canceled: true,
  182. },
  183. {
  184. // cause resp.Err() != nil
  185. CompactRevision: 1,
  186. },
  187. }
  188. for _, resp := range resps {
  189. t.Run(stringx.Rand(), func(t *testing.T) {
  190. ctrl := gomock.NewController(t)
  191. defer ctrl.Finish()
  192. cli := NewMockEtcdClient(ctrl)
  193. restore := setMockClient(cli)
  194. defer restore()
  195. ch := make(chan clientv3.WatchResponse)
  196. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  197. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  198. c := new(cluster)
  199. c.done = make(chan lang.PlaceholderType)
  200. go func() {
  201. ch <- resp
  202. close(c.done)
  203. }()
  204. c.watch(cli, "any", 0)
  205. })
  206. }
  207. }
  208. func TestClusterWatch_CloseChan(t *testing.T) {
  209. ctrl := gomock.NewController(t)
  210. defer ctrl.Finish()
  211. cli := NewMockEtcdClient(ctrl)
  212. restore := setMockClient(cli)
  213. defer restore()
  214. ch := make(chan clientv3.WatchResponse)
  215. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  216. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  217. c := new(cluster)
  218. c.done = make(chan lang.PlaceholderType)
  219. go func() {
  220. close(ch)
  221. close(c.done)
  222. }()
  223. c.watch(cli, "any", 0)
  224. }
  225. func TestValueOnlyContext(t *testing.T) {
  226. ctx := contextx.ValueOnlyFrom(context.Background())
  227. ctx.Done()
  228. assert.Nil(t, ctx.Err())
  229. }