registry_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package internal
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "zero/core/contextx"
  7. "zero/core/stringx"
  8. "github.com/coreos/etcd/mvcc/mvccpb"
  9. "github.com/golang/mock/gomock"
  10. "github.com/stretchr/testify/assert"
  11. "go.etcd.io/etcd/clientv3"
  12. )
  13. var mockLock sync.Mutex
  14. func setMockClient(cli EtcdClient) func() {
  15. mockLock.Lock()
  16. NewClient = func([]string) (EtcdClient, error) {
  17. return cli, nil
  18. }
  19. return func() {
  20. NewClient = DialClient
  21. mockLock.Unlock()
  22. }
  23. }
  24. func TestGetCluster(t *testing.T) {
  25. c1 := GetRegistry().getCluster([]string{"first"})
  26. c2 := GetRegistry().getCluster([]string{"second"})
  27. c3 := GetRegistry().getCluster([]string{"first"})
  28. assert.Equal(t, c1, c3)
  29. assert.NotEqual(t, c1, c2)
  30. }
  31. func TestGetClusterKey(t *testing.T) {
  32. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  33. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  34. }
  35. func TestCluster_HandleChanges(t *testing.T) {
  36. ctrl := gomock.NewController(t)
  37. l := NewMockUpdateListener(ctrl)
  38. l.EXPECT().OnAdd(KV{
  39. Key: "first",
  40. Val: "1",
  41. })
  42. l.EXPECT().OnAdd(KV{
  43. Key: "second",
  44. Val: "2",
  45. })
  46. l.EXPECT().OnDelete(KV{
  47. Key: "first",
  48. Val: "1",
  49. })
  50. l.EXPECT().OnDelete(KV{
  51. Key: "second",
  52. Val: "2",
  53. })
  54. l.EXPECT().OnAdd(KV{
  55. Key: "third",
  56. Val: "3",
  57. })
  58. l.EXPECT().OnAdd(KV{
  59. Key: "fourth",
  60. Val: "4",
  61. })
  62. c := newCluster([]string{"any"})
  63. c.listeners["any"] = []UpdateListener{l}
  64. c.handleChanges("any", []KV{
  65. {
  66. Key: "first",
  67. Val: "1",
  68. },
  69. {
  70. Key: "second",
  71. Val: "2",
  72. },
  73. })
  74. assert.EqualValues(t, map[string]string{
  75. "first": "1",
  76. "second": "2",
  77. }, c.values["any"])
  78. c.handleChanges("any", []KV{
  79. {
  80. Key: "third",
  81. Val: "3",
  82. },
  83. {
  84. Key: "fourth",
  85. Val: "4",
  86. },
  87. })
  88. assert.EqualValues(t, map[string]string{
  89. "third": "3",
  90. "fourth": "4",
  91. }, c.values["any"])
  92. }
  93. func TestCluster_Load(t *testing.T) {
  94. ctrl := gomock.NewController(t)
  95. defer ctrl.Finish()
  96. cli := NewMockEtcdClient(ctrl)
  97. restore := setMockClient(cli)
  98. defer restore()
  99. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  100. Kvs: []*mvccpb.KeyValue{
  101. {
  102. Key: []byte("hello"),
  103. Value: []byte("world"),
  104. },
  105. },
  106. }, nil)
  107. cli.EXPECT().Ctx().Return(context.Background())
  108. c := &cluster{
  109. values: make(map[string]map[string]string),
  110. }
  111. c.load(cli, "any")
  112. }
  113. func TestCluster_Watch(t *testing.T) {
  114. tests := []struct {
  115. name string
  116. method int
  117. eventType mvccpb.Event_EventType
  118. }{
  119. {
  120. name: "add",
  121. eventType: clientv3.EventTypePut,
  122. },
  123. {
  124. name: "delete",
  125. eventType: clientv3.EventTypeDelete,
  126. },
  127. }
  128. for _, test := range tests {
  129. t.Run(test.name, func(t *testing.T) {
  130. ctrl := gomock.NewController(t)
  131. defer ctrl.Finish()
  132. cli := NewMockEtcdClient(ctrl)
  133. restore := setMockClient(cli)
  134. defer restore()
  135. ch := make(chan clientv3.WatchResponse)
  136. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  137. cli.EXPECT().Ctx().Return(context.Background())
  138. var wg sync.WaitGroup
  139. wg.Add(1)
  140. c := &cluster{
  141. listeners: make(map[string][]UpdateListener),
  142. values: make(map[string]map[string]string),
  143. }
  144. listener := NewMockUpdateListener(ctrl)
  145. c.listeners["any"] = []UpdateListener{listener}
  146. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  147. assert.Equal(t, "hello", kv.Key)
  148. assert.Equal(t, "world", kv.Val)
  149. wg.Done()
  150. }).MaxTimes(1)
  151. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
  152. wg.Done()
  153. }).MaxTimes(1)
  154. go c.watch(cli, "any")
  155. ch <- clientv3.WatchResponse{
  156. Events: []*clientv3.Event{
  157. {
  158. Type: test.eventType,
  159. Kv: &mvccpb.KeyValue{
  160. Key: []byte("hello"),
  161. Value: []byte("world"),
  162. },
  163. },
  164. },
  165. }
  166. wg.Wait()
  167. })
  168. }
  169. }
  170. func TestClusterWatch_RespFailures(t *testing.T) {
  171. resps := []clientv3.WatchResponse{
  172. {
  173. Canceled: true,
  174. },
  175. {
  176. // cause resp.Err() != nil
  177. CompactRevision: 1,
  178. },
  179. }
  180. for _, resp := range resps {
  181. t.Run(stringx.Rand(), func(t *testing.T) {
  182. ctrl := gomock.NewController(t)
  183. defer ctrl.Finish()
  184. cli := NewMockEtcdClient(ctrl)
  185. restore := setMockClient(cli)
  186. defer restore()
  187. ch := make(chan clientv3.WatchResponse)
  188. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  189. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  190. c := new(cluster)
  191. go func() {
  192. ch <- resp
  193. }()
  194. c.watch(cli, "any")
  195. })
  196. }
  197. }
  198. func TestClusterWatch_CloseChan(t *testing.T) {
  199. ctrl := gomock.NewController(t)
  200. defer ctrl.Finish()
  201. cli := NewMockEtcdClient(ctrl)
  202. restore := setMockClient(cli)
  203. defer restore()
  204. ch := make(chan clientv3.WatchResponse)
  205. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  206. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  207. c := new(cluster)
  208. go func() {
  209. close(ch)
  210. }()
  211. c.watch(cli, "any")
  212. }
  213. func TestValueOnlyContext(t *testing.T) {
  214. ctx := contextx.ValueOnlyFrom(context.Background())
  215. ctx.Done()
  216. assert.Nil(t, ctx.Err())
  217. }
  218. type mockedSharedCalls struct {
  219. fn func() (interface{}, error)
  220. }
  221. func (c mockedSharedCalls) Do(_ string, fn func() (interface{}, error)) (interface{}, error) {
  222. return c.fn()
  223. }
  224. func (c mockedSharedCalls) DoEx(_ string, fn func() (interface{}, error)) (interface{}, bool, error) {
  225. val, err := c.fn()
  226. return val, true, err
  227. }