1
0

publisher_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package discov
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "os"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/golang/mock/gomock"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/wuntsong-org/go-zero-plus/core/discov/internal"
  13. "github.com/wuntsong-org/go-zero-plus/core/lang"
  14. "github.com/wuntsong-org/go-zero-plus/core/logx"
  15. "github.com/wuntsong-org/go-zero-plus/core/stringx"
  16. clientv3 "go.etcd.io/etcd/client/v3"
  17. "golang.org/x/net/http2"
  18. "google.golang.org/grpc"
  19. "google.golang.org/grpc/credentials/insecure"
  20. "google.golang.org/grpc/resolver"
  21. "google.golang.org/grpc/resolver/manual"
  22. )
  23. const (
  24. certContent = `-----BEGIN CERTIFICATE-----
  25. MIIDazCCAlOgAwIBAgIUEg9GVO2oaPn+YSmiqmFIuAo10WIwDQYJKoZIhvcNAQEM
  26. BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
  27. GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yMzAzMTExMzIxMjNaGA8yMTIz
  28. MDIxNTEzMjEyM1owRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
  29. ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
  30. AQEBBQADggEPADCCAQoCggEBALplXlWsIf0O/IgnIplmiZHKGnxyfyufyE2FBRNk
  31. OofRqbKuPH8GNqbkvZm7N29fwTDAQ+mViAggCkDht4hOzoWJMA7KYJt8JnTSWL48
  32. M1lcrpc9DL2gszC/JF/FGvyANbBtLklkZPFBGdHUX14pjrT937wqPtm+SqUHSvRT
  33. B7bmwmm2drRcmhpVm98LSlV7uQ2EgnJgsLjBPITKUejLmVLHfgX0RwQ2xIpX9pS4
  34. FCe1BTacwl2gGp7Mje7y4Mfv3o0ArJW6Tuwbjx59ZXwb1KIP71b7bT04AVS8ZeYO
  35. UMLKKuB5UR9x9Rn6cLXOTWBpcMVyzDgrAFLZjnE9LPUolZMCAwEAAaNRME8wHwYD
  36. VR0jBBgwFoAUeW8w8pmhncbRgTsl48k4/7wnfx8wCQYDVR0TBAIwADALBgNVHQ8E
  37. BAMCBPAwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA0GCSqGSIb3DQEBDAUAA4IBAQAI
  38. y9xaoS88CLPBsX6mxfcTAFVfGNTRW9VN9Ng1cCnUR+YGoXGM/l+qP4f7p8ocdGwK
  39. iYZErVTzXYIn+D27//wpY3klJk3gAnEUBT3QRkStBw7XnpbeZ2oPBK+cmDnCnZPS
  40. BIF1wxPX7vIgaxs5Zsdqwk3qvZ4Djr2wP7LabNWTLSBKgQoUY45Liw6pffLwcGF9
  41. UKlu54bvGze2SufISCR3ib+I+FLvqpvJhXToZWYb/pfI/HccuCL1oot1x8vx6DQy
  42. U+TYxlZsKS5mdNxAX3dqEkEMsgEi+g/tzDPXJImfeCGGBhIOXLm8SRypiuGdEbc9
  43. xkWYxRPegajuEZGvCqVs
  44. -----END CERTIFICATE-----`
  45. keyContent = `-----BEGIN RSA PRIVATE KEY-----
  46. MIIEowIBAAKCAQEAumVeVawh/Q78iCcimWaJkcoafHJ/K5/ITYUFE2Q6h9Gpsq48
  47. fwY2puS9mbs3b1/BMMBD6ZWICCAKQOG3iE7OhYkwDspgm3wmdNJYvjwzWVyulz0M
  48. vaCzML8kX8Ua/IA1sG0uSWRk8UEZ0dRfXimOtP3fvCo+2b5KpQdK9FMHtubCabZ2
  49. tFyaGlWb3wtKVXu5DYSCcmCwuME8hMpR6MuZUsd+BfRHBDbEilf2lLgUJ7UFNpzC
  50. XaAansyN7vLgx+/ejQCslbpO7BuPHn1lfBvUog/vVvttPTgBVLxl5g5Qwsoq4HlR
  51. H3H1Gfpwtc5NYGlwxXLMOCsAUtmOcT0s9SiVkwIDAQABAoIBAD5meTJNMgO55Kjg
  52. ESExxpRcCIno+tHr5+6rvYtEXqPheOIsmmwb9Gfi4+Z3WpOaht5/Pz0Ppj6yGzyl
  53. U//6AgGKb+BDuBvVcDpjwPnOxZIBCSHwejdxeQu0scSuA97MPS0XIAvJ5FEv7ijk
  54. 5Bht6SyGYURpECltHygoTNuGgGqmO+McCJRLE9L09lTBI6UQ/JQwWJqSr7wx6iPU
  55. M1Ze/srIV+7cyEPu6i0DGjS1gSQKkX68Lqn1w6oE290O+OZvleO0gZ02fLDWCZke
  56. aeD9+EU/Pw+rqm3H6o0szOFIpzhRp41FUdW9sybB3Yp3u7c/574E+04Z/e30LMKs
  57. TCtE1QECgYEA3K7KIpw0NH2HXL5C3RHcLmr204xeBfS70riBQQuVUgYdmxak2ima
  58. 80RInskY8hRhSGTg0l+VYIH8cmjcUyqMSOELS5XfRH99r4QPiK8AguXg80T4VumY
  59. W3Pf+zEC2ssgP/gYthV0g0Xj5m2QxktOF9tRw5nkg739ZR4dI9lm/iECgYEA2Dnf
  60. uwEDGqHiQRF6/fh5BG/nGVMvrefkqx6WvTJQ3k/M/9WhxB+lr/8yH46TuS8N2b29
  61. FoTf3Mr9T7pr/PWkOPzoY3P56nYbKU8xSwCim9xMzhBMzj8/N9ukJvXy27/VOz56
  62. eQaKqnvdXNGtPJrIMDGHps2KKWlKLyAlapzjVTMCgYAA/W++tACv85g13EykfT4F
  63. n0k4LbsGP9DP4zABQLIMyiY72eAncmRVjwrcW36XJ2xATOONTgx3gF3HjZzfaqNy
  64. eD/6uNNllUTVEryXGmHgNHPL45VRnn6memCY2eFvZdXhM5W4y2PYaunY0MkDercA
  65. +GTngbs6tBF88KOk04bYwQKBgFl68cRgsdkmnwwQYNaTKfmVGYzYaQXNzkqmWPko
  66. xmCJo6tHzC7ubdG8iRCYHzfmahPuuj6EdGPZuSRyYFgJi5Ftz/nAN+84OxtIQ3zn
  67. YWOgskQgaLh9YfsKsQ7Sf1NDOsnOnD5TX7UXl07fEpLe9vNCvAFiU8e5Y9LGudU5
  68. 4bYTAoGBAMdX3a3bXp4cZvXNBJ/QLVyxC6fP1Q4haCR1Od3m+T00Jth2IX2dk/fl
  69. p6xiJT1av5JtYabv1dFKaXOS5s1kLGGuCCSKpkvFZm826aQ2AFm0XGqEQDLeei5b
  70. A52Kpy/YJ+RkG4BTFtAooFq6DmA0cnoP6oPvG2h6XtDJwDTPInJb
  71. -----END RSA PRIVATE KEY-----`
  72. caContent = `-----BEGIN CERTIFICATE-----
  73. MIIDbTCCAlWgAwIBAgIUBJvFoCowKich7MMfseJ+DYzzirowDQYJKoZIhvcNAQEM
  74. BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
  75. GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yMzAzMTExMzIxMDNaGA8yMTIz
  76. MDIxNTEzMjEwM1owRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
  77. ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
  78. AQEBBQADggEPADCCAQoCggEBAO4to2YMYj0bxgr2FCiweSTSFuPx33zSw2x/s9Wf
  79. OR41bm2DFsyYT5f3sOIKlXZEdLmOKty2e3ho3yC0EyNpVHdykkkHT3aDI17quZax
  80. kYi/URqqtl1Z08A22txolc04hAZisg2BypGi3vql81UW1t3zyloGnJoIAeXR9uca
  81. ljP6Bk3bwsxoVBLi1JtHrO0hHLQaeHmKhAyrys06X0LRdn7Px48yRZlt6FaLSa8X
  82. YiRM0G44bVy/h6BkoQjMYGwVmCVk6zjJ9U7ZPFqdnDMNxAfR+hjDnYodqdLDMTTR
  83. 1NPVrnEnNwFx0AMLvgt/ba/45vZCEAmSZnFXFAJJcM7ai9ECAwEAAaNTMFEwHQYD
  84. VR0OBBYEFHlvMPKZoZ3G0YE7JePJOP+8J38fMB8GA1UdIwQYMBaAFHlvMPKZoZ3G
  85. 0YE7JePJOP+8J38fMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEMBQADggEB
  86. AMX8dNulADOo9uQgBMyFb9TVra7iY0zZjzv4GY5XY7scd52n6CnfAPvYBBDnTr/O
  87. BgNp5jaujb4+9u/2qhV3f9n+/3WOb2CmPehBgVSzlXqHeQ9lshmgwZPeem2T+8Tm
  88. Nnc/xQnsUfCFszUDxpkr55+aLVM22j02RWqcZ4q7TAaVYL+kdFVMc8FoqG/0ro6A
  89. BjE/Qn0Nn7ciX1VUjDt8l+k7ummPJTmzdi6i6E4AwO9dzrGNgGJ4aWL8cC6xYcIX
  90. goVIRTFeONXSDno/oPjWHpIPt7L15heMpKBHNuzPkKx2YVqPHE5QZxWfS+Lzgx+Q
  91. E2oTTM0rYKOZ8p6000mhvKI=
  92. -----END CERTIFICATE-----`
  93. )
  94. func init() {
  95. logx.Disable()
  96. }
  97. func TestPublisher_register(t *testing.T) {
  98. ctrl := gomock.NewController(t)
  99. defer ctrl.Finish()
  100. const id = 1
  101. cli := internal.NewMockEtcdClient(ctrl)
  102. restore := setMockClient(cli)
  103. defer restore()
  104. cli.EXPECT().Ctx().AnyTimes()
  105. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
  106. ID: id,
  107. }, nil)
  108. cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
  109. pub := NewPublisher(nil, "thekey", "thevalue",
  110. WithPubEtcdAccount(stringx.Rand(), "bar"))
  111. _, err := pub.register(cli)
  112. assert.Nil(t, err)
  113. }
  114. func TestPublisher_registerWithOptions(t *testing.T) {
  115. ctrl := gomock.NewController(t)
  116. defer ctrl.Finish()
  117. const id = 2
  118. cli := internal.NewMockEtcdClient(ctrl)
  119. restore := setMockClient(cli)
  120. defer restore()
  121. cli.EXPECT().Ctx().AnyTimes()
  122. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
  123. ID: 1,
  124. }, nil)
  125. cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
  126. certFile := createTempFile(t, []byte(certContent))
  127. defer os.Remove(certFile)
  128. keyFile := createTempFile(t, []byte(keyContent))
  129. defer os.Remove(keyFile)
  130. caFile := createTempFile(t, []byte(caContent))
  131. defer os.Remove(caFile)
  132. pub := NewPublisher(nil, "thekey", "thevalue", WithId(id),
  133. WithPubEtcdTLS(certFile, keyFile, caFile, true))
  134. _, err := pub.register(cli)
  135. assert.Nil(t, err)
  136. }
  137. func TestPublisher_registerError(t *testing.T) {
  138. ctrl := gomock.NewController(t)
  139. defer ctrl.Finish()
  140. cli := internal.NewMockEtcdClient(ctrl)
  141. restore := setMockClient(cli)
  142. defer restore()
  143. cli.EXPECT().Ctx().AnyTimes()
  144. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(nil, errors.New("error"))
  145. pub := NewPublisher(nil, "thekey", "thevalue")
  146. val, err := pub.register(cli)
  147. assert.NotNil(t, err)
  148. assert.Equal(t, clientv3.NoLease, val)
  149. }
  150. func TestPublisher_revoke(t *testing.T) {
  151. ctrl := gomock.NewController(t)
  152. defer ctrl.Finish()
  153. const id clientv3.LeaseID = 1
  154. cli := internal.NewMockEtcdClient(ctrl)
  155. restore := setMockClient(cli)
  156. defer restore()
  157. cli.EXPECT().Ctx().AnyTimes()
  158. cli.EXPECT().Revoke(gomock.Any(), id)
  159. pub := NewPublisher(nil, "thekey", "thevalue")
  160. pub.lease = id
  161. pub.revoke(cli)
  162. }
  163. func TestPublisher_revokeError(t *testing.T) {
  164. ctrl := gomock.NewController(t)
  165. defer ctrl.Finish()
  166. const id clientv3.LeaseID = 1
  167. cli := internal.NewMockEtcdClient(ctrl)
  168. restore := setMockClient(cli)
  169. defer restore()
  170. cli.EXPECT().Ctx().AnyTimes()
  171. cli.EXPECT().Revoke(gomock.Any(), id).Return(nil, errors.New("error"))
  172. pub := NewPublisher(nil, "thekey", "thevalue")
  173. pub.lease = id
  174. pub.revoke(cli)
  175. }
  176. func TestPublisher_keepAliveAsyncError(t *testing.T) {
  177. ctrl := gomock.NewController(t)
  178. defer ctrl.Finish()
  179. const id clientv3.LeaseID = 1
  180. cli := internal.NewMockEtcdClient(ctrl)
  181. restore := setMockClient(cli)
  182. defer restore()
  183. cli.EXPECT().Ctx().AnyTimes()
  184. cli.EXPECT().KeepAlive(gomock.Any(), id).Return(nil, errors.New("error"))
  185. pub := NewPublisher(nil, "thekey", "thevalue")
  186. pub.lease = id
  187. assert.NotNil(t, pub.keepAliveAsync(cli))
  188. }
  189. func TestPublisher_keepAliveAsyncQuit(t *testing.T) {
  190. ctrl := gomock.NewController(t)
  191. defer ctrl.Finish()
  192. const id clientv3.LeaseID = 1
  193. cli := internal.NewMockEtcdClient(ctrl)
  194. cli.EXPECT().ActiveConnection()
  195. cli.EXPECT().Close()
  196. defer cli.Close()
  197. cli.ActiveConnection()
  198. restore := setMockClient(cli)
  199. defer restore()
  200. cli.EXPECT().Ctx().AnyTimes()
  201. cli.EXPECT().KeepAlive(gomock.Any(), id)
  202. var wg sync.WaitGroup
  203. wg.Add(1)
  204. cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
  205. wg.Done()
  206. })
  207. pub := NewPublisher(nil, "thekey", "thevalue")
  208. pub.lease = id
  209. pub.Stop()
  210. assert.Nil(t, pub.keepAliveAsync(cli))
  211. wg.Wait()
  212. }
  213. func TestPublisher_keepAliveAsyncPause(t *testing.T) {
  214. ctrl := gomock.NewController(t)
  215. defer ctrl.Finish()
  216. const id clientv3.LeaseID = 1
  217. cli := internal.NewMockEtcdClient(ctrl)
  218. restore := setMockClient(cli)
  219. defer restore()
  220. cli.EXPECT().Ctx().AnyTimes()
  221. cli.EXPECT().KeepAlive(gomock.Any(), id)
  222. pub := NewPublisher(nil, "thekey", "thevalue")
  223. var wg sync.WaitGroup
  224. wg.Add(1)
  225. cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
  226. pub.Stop()
  227. wg.Done()
  228. })
  229. pub.lease = id
  230. assert.Nil(t, pub.keepAliveAsync(cli))
  231. pub.Pause()
  232. wg.Wait()
  233. }
  234. func TestPublisher_Resume(t *testing.T) {
  235. publisher := new(Publisher)
  236. publisher.resumeChan = make(chan lang.PlaceholderType)
  237. go func() {
  238. publisher.Resume()
  239. }()
  240. go func() {
  241. time.Sleep(time.Minute)
  242. t.Fail()
  243. }()
  244. <-publisher.resumeChan
  245. }
  246. func TestPublisher_keepAliveAsync(t *testing.T) {
  247. ctrl := gomock.NewController(t)
  248. defer ctrl.Finish()
  249. const id clientv3.LeaseID = 1
  250. conn := createMockConn(t)
  251. defer conn.Close()
  252. cli := internal.NewMockEtcdClient(ctrl)
  253. cli.EXPECT().ActiveConnection().Return(conn).AnyTimes()
  254. cli.EXPECT().Close()
  255. defer cli.Close()
  256. cli.ActiveConnection()
  257. restore := setMockClient(cli)
  258. defer restore()
  259. cli.EXPECT().Ctx().AnyTimes()
  260. cli.EXPECT().KeepAlive(gomock.Any(), id)
  261. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
  262. ID: 1,
  263. }, nil)
  264. cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", int64(id)), "thevalue", gomock.Any())
  265. var wg sync.WaitGroup
  266. wg.Add(1)
  267. cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
  268. wg.Done()
  269. })
  270. pub := NewPublisher([]string{"the-endpoint"}, "thekey", "thevalue")
  271. pub.lease = id
  272. assert.Nil(t, pub.KeepAlive())
  273. pub.Stop()
  274. wg.Wait()
  275. }
  276. func createMockConn(t *testing.T) *grpc.ClientConn {
  277. lis, err := net.Listen("tcp", "localhost:0")
  278. if err != nil {
  279. t.Fatalf("Error while listening. Err: %v", err)
  280. }
  281. defer lis.Close()
  282. lisAddr := resolver.Address{Addr: lis.Addr().String()}
  283. lisDone := make(chan struct{})
  284. dialDone := make(chan struct{})
  285. // 1st listener accepts the connection and then does nothing
  286. go func() {
  287. defer close(lisDone)
  288. conn, err := lis.Accept()
  289. if err != nil {
  290. t.Errorf("Error while accepting. Err: %v", err)
  291. return
  292. }
  293. framer := http2.NewFramer(conn, conn)
  294. if err := framer.WriteSettings(http2.Setting{}); err != nil {
  295. t.Errorf("Error while writing settings. Err: %v", err)
  296. return
  297. }
  298. <-dialDone // Close conn only after dial returns.
  299. }()
  300. r := manual.NewBuilderWithScheme("whatever")
  301. r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}})
  302. client, err := grpc.DialContext(context.Background(), r.Scheme()+":///test.server",
  303. grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
  304. close(dialDone)
  305. if err != nil {
  306. t.Fatalf("Dial failed. Err: %v", err)
  307. }
  308. timeout := time.After(1 * time.Second)
  309. select {
  310. case <-timeout:
  311. t.Fatal("timed out waiting for server to finish")
  312. case <-lisDone:
  313. }
  314. return client
  315. }
  316. func createTempFile(t *testing.T, body []byte) string {
  317. tmpFile, err := os.CreateTemp(os.TempDir(), "go-unit-*.tmp")
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. tmpFile.Close()
  322. if err = os.WriteFile(tmpFile.Name(), body, os.ModePerm); err != nil {
  323. t.Fatal(err)
  324. }
  325. return tmpFile.Name()
  326. }