registry_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package internal
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "github.com/golang/mock/gomock"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/tal-tech/go-zero/core/contextx"
  9. "github.com/tal-tech/go-zero/core/lang"
  10. "github.com/tal-tech/go-zero/core/logx"
  11. "github.com/tal-tech/go-zero/core/stringx"
  12. "go.etcd.io/etcd/api/v3/mvccpb"
  13. clientv3 "go.etcd.io/etcd/client/v3"
  14. )
  15. var mockLock sync.Mutex
  16. func init() {
  17. logx.Disable()
  18. }
  19. func setMockClient(cli EtcdClient) func() {
  20. mockLock.Lock()
  21. NewClient = func([]string) (EtcdClient, error) {
  22. return cli, nil
  23. }
  24. return func() {
  25. NewClient = DialClient
  26. mockLock.Unlock()
  27. }
  28. }
  29. func TestGetCluster(t *testing.T) {
  30. c1 := GetRegistry().getCluster([]string{"first"})
  31. c2 := GetRegistry().getCluster([]string{"second"})
  32. c3 := GetRegistry().getCluster([]string{"first"})
  33. assert.Equal(t, c1, c3)
  34. assert.NotEqual(t, c1, c2)
  35. }
  36. func TestGetClusterKey(t *testing.T) {
  37. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  38. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  39. }
  40. func TestCluster_HandleChanges(t *testing.T) {
  41. ctrl := gomock.NewController(t)
  42. l := NewMockUpdateListener(ctrl)
  43. l.EXPECT().OnAdd(KV{
  44. Key: "first",
  45. Val: "1",
  46. })
  47. l.EXPECT().OnAdd(KV{
  48. Key: "second",
  49. Val: "2",
  50. })
  51. l.EXPECT().OnDelete(KV{
  52. Key: "first",
  53. Val: "1",
  54. })
  55. l.EXPECT().OnDelete(KV{
  56. Key: "second",
  57. Val: "2",
  58. })
  59. l.EXPECT().OnAdd(KV{
  60. Key: "third",
  61. Val: "3",
  62. })
  63. l.EXPECT().OnAdd(KV{
  64. Key: "fourth",
  65. Val: "4",
  66. })
  67. c := newCluster([]string{"any"})
  68. c.listeners["any"] = []UpdateListener{l}
  69. c.handleChanges("any", []KV{
  70. {
  71. Key: "first",
  72. Val: "1",
  73. },
  74. {
  75. Key: "second",
  76. Val: "2",
  77. },
  78. })
  79. assert.EqualValues(t, map[string]string{
  80. "first": "1",
  81. "second": "2",
  82. }, c.values["any"])
  83. c.handleChanges("any", []KV{
  84. {
  85. Key: "third",
  86. Val: "3",
  87. },
  88. {
  89. Key: "fourth",
  90. Val: "4",
  91. },
  92. })
  93. assert.EqualValues(t, map[string]string{
  94. "third": "3",
  95. "fourth": "4",
  96. }, c.values["any"])
  97. }
  98. func TestCluster_Load(t *testing.T) {
  99. ctrl := gomock.NewController(t)
  100. defer ctrl.Finish()
  101. cli := NewMockEtcdClient(ctrl)
  102. restore := setMockClient(cli)
  103. defer restore()
  104. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  105. Kvs: []*mvccpb.KeyValue{
  106. {
  107. Key: []byte("hello"),
  108. Value: []byte("world"),
  109. },
  110. },
  111. }, nil)
  112. cli.EXPECT().Ctx().Return(context.Background())
  113. c := &cluster{
  114. values: make(map[string]map[string]string),
  115. }
  116. c.load(cli, "any")
  117. }
  118. func TestCluster_Watch(t *testing.T) {
  119. tests := []struct {
  120. name string
  121. method int
  122. eventType mvccpb.Event_EventType
  123. }{
  124. {
  125. name: "add",
  126. eventType: clientv3.EventTypePut,
  127. },
  128. {
  129. name: "delete",
  130. eventType: clientv3.EventTypeDelete,
  131. },
  132. }
  133. for _, test := range tests {
  134. t.Run(test.name, func(t *testing.T) {
  135. ctrl := gomock.NewController(t)
  136. defer ctrl.Finish()
  137. cli := NewMockEtcdClient(ctrl)
  138. restore := setMockClient(cli)
  139. defer restore()
  140. ch := make(chan clientv3.WatchResponse)
  141. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  142. cli.EXPECT().Ctx().Return(context.Background())
  143. var wg sync.WaitGroup
  144. wg.Add(1)
  145. c := &cluster{
  146. listeners: make(map[string][]UpdateListener),
  147. values: make(map[string]map[string]string),
  148. }
  149. listener := NewMockUpdateListener(ctrl)
  150. c.listeners["any"] = []UpdateListener{listener}
  151. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  152. assert.Equal(t, "hello", kv.Key)
  153. assert.Equal(t, "world", kv.Val)
  154. wg.Done()
  155. }).MaxTimes(1)
  156. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
  157. wg.Done()
  158. }).MaxTimes(1)
  159. go c.watch(cli, "any")
  160. ch <- clientv3.WatchResponse{
  161. Events: []*clientv3.Event{
  162. {
  163. Type: test.eventType,
  164. Kv: &mvccpb.KeyValue{
  165. Key: []byte("hello"),
  166. Value: []byte("world"),
  167. },
  168. },
  169. },
  170. }
  171. wg.Wait()
  172. })
  173. }
  174. }
  175. func TestClusterWatch_RespFailures(t *testing.T) {
  176. resps := []clientv3.WatchResponse{
  177. {
  178. Canceled: true,
  179. },
  180. {
  181. // cause resp.Err() != nil
  182. CompactRevision: 1,
  183. },
  184. }
  185. for _, resp := range resps {
  186. t.Run(stringx.Rand(), func(t *testing.T) {
  187. ctrl := gomock.NewController(t)
  188. defer ctrl.Finish()
  189. cli := NewMockEtcdClient(ctrl)
  190. restore := setMockClient(cli)
  191. defer restore()
  192. ch := make(chan clientv3.WatchResponse)
  193. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  194. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  195. c := new(cluster)
  196. c.done = make(chan lang.PlaceholderType)
  197. go func() {
  198. ch <- resp
  199. close(c.done)
  200. }()
  201. c.watch(cli, "any")
  202. })
  203. }
  204. }
  205. func TestClusterWatch_CloseChan(t *testing.T) {
  206. ctrl := gomock.NewController(t)
  207. defer ctrl.Finish()
  208. cli := NewMockEtcdClient(ctrl)
  209. restore := setMockClient(cli)
  210. defer restore()
  211. ch := make(chan clientv3.WatchResponse)
  212. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
  213. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  214. c := new(cluster)
  215. c.done = make(chan lang.PlaceholderType)
  216. go func() {
  217. close(ch)
  218. close(c.done)
  219. }()
  220. c.watch(cli, "any")
  221. }
  222. func TestValueOnlyContext(t *testing.T) {
  223. ctx := contextx.ValueOnlyFrom(context.Background())
  224. ctx.Done()
  225. assert.Nil(t, ctx.Err())
  226. }