123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- package discov
- import (
- "context"
- "errors"
- "net"
- "os"
- "sync"
- "testing"
- "time"
- "github.com/golang/mock/gomock"
- "github.com/stretchr/testify/assert"
- "github.com/wuntsong-org/go-zero-plus/core/discov/internal"
- "github.com/wuntsong-org/go-zero-plus/core/lang"
- "github.com/wuntsong-org/go-zero-plus/core/logx"
- "github.com/wuntsong-org/go-zero-plus/core/stringx"
- clientv3 "go.etcd.io/etcd/client/v3"
- "golang.org/x/net/http2"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- )
- const (
- certContent = `-----BEGIN CERTIFICATE-----
- MIIDazCCAlOgAwIBAgIUEg9GVO2oaPn+YSmiqmFIuAo10WIwDQYJKoZIhvcNAQEM
- BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
- GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yMzAzMTExMzIxMjNaGA8yMTIz
- MDIxNTEzMjEyM1owRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
- ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
- AQEBBQADggEPADCCAQoCggEBALplXlWsIf0O/IgnIplmiZHKGnxyfyufyE2FBRNk
- OofRqbKuPH8GNqbkvZm7N29fwTDAQ+mViAggCkDht4hOzoWJMA7KYJt8JnTSWL48
- M1lcrpc9DL2gszC/JF/FGvyANbBtLklkZPFBGdHUX14pjrT937wqPtm+SqUHSvRT
- B7bmwmm2drRcmhpVm98LSlV7uQ2EgnJgsLjBPITKUejLmVLHfgX0RwQ2xIpX9pS4
- FCe1BTacwl2gGp7Mje7y4Mfv3o0ArJW6Tuwbjx59ZXwb1KIP71b7bT04AVS8ZeYO
- UMLKKuB5UR9x9Rn6cLXOTWBpcMVyzDgrAFLZjnE9LPUolZMCAwEAAaNRME8wHwYD
- VR0jBBgwFoAUeW8w8pmhncbRgTsl48k4/7wnfx8wCQYDVR0TBAIwADALBgNVHQ8E
- BAMCBPAwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA0GCSqGSIb3DQEBDAUAA4IBAQAI
- y9xaoS88CLPBsX6mxfcTAFVfGNTRW9VN9Ng1cCnUR+YGoXGM/l+qP4f7p8ocdGwK
- iYZErVTzXYIn+D27//wpY3klJk3gAnEUBT3QRkStBw7XnpbeZ2oPBK+cmDnCnZPS
- BIF1wxPX7vIgaxs5Zsdqwk3qvZ4Djr2wP7LabNWTLSBKgQoUY45Liw6pffLwcGF9
- UKlu54bvGze2SufISCR3ib+I+FLvqpvJhXToZWYb/pfI/HccuCL1oot1x8vx6DQy
- U+TYxlZsKS5mdNxAX3dqEkEMsgEi+g/tzDPXJImfeCGGBhIOXLm8SRypiuGdEbc9
- xkWYxRPegajuEZGvCqVs
- -----END CERTIFICATE-----`
- keyContent = `-----BEGIN RSA PRIVATE KEY-----
- MIIEowIBAAKCAQEAumVeVawh/Q78iCcimWaJkcoafHJ/K5/ITYUFE2Q6h9Gpsq48
- fwY2puS9mbs3b1/BMMBD6ZWICCAKQOG3iE7OhYkwDspgm3wmdNJYvjwzWVyulz0M
- vaCzML8kX8Ua/IA1sG0uSWRk8UEZ0dRfXimOtP3fvCo+2b5KpQdK9FMHtubCabZ2
- tFyaGlWb3wtKVXu5DYSCcmCwuME8hMpR6MuZUsd+BfRHBDbEilf2lLgUJ7UFNpzC
- XaAansyN7vLgx+/ejQCslbpO7BuPHn1lfBvUog/vVvttPTgBVLxl5g5Qwsoq4HlR
- H3H1Gfpwtc5NYGlwxXLMOCsAUtmOcT0s9SiVkwIDAQABAoIBAD5meTJNMgO55Kjg
- ESExxpRcCIno+tHr5+6rvYtEXqPheOIsmmwb9Gfi4+Z3WpOaht5/Pz0Ppj6yGzyl
- U//6AgGKb+BDuBvVcDpjwPnOxZIBCSHwejdxeQu0scSuA97MPS0XIAvJ5FEv7ijk
- 5Bht6SyGYURpECltHygoTNuGgGqmO+McCJRLE9L09lTBI6UQ/JQwWJqSr7wx6iPU
- M1Ze/srIV+7cyEPu6i0DGjS1gSQKkX68Lqn1w6oE290O+OZvleO0gZ02fLDWCZke
- aeD9+EU/Pw+rqm3H6o0szOFIpzhRp41FUdW9sybB3Yp3u7c/574E+04Z/e30LMKs
- TCtE1QECgYEA3K7KIpw0NH2HXL5C3RHcLmr204xeBfS70riBQQuVUgYdmxak2ima
- 80RInskY8hRhSGTg0l+VYIH8cmjcUyqMSOELS5XfRH99r4QPiK8AguXg80T4VumY
- W3Pf+zEC2ssgP/gYthV0g0Xj5m2QxktOF9tRw5nkg739ZR4dI9lm/iECgYEA2Dnf
- uwEDGqHiQRF6/fh5BG/nGVMvrefkqx6WvTJQ3k/M/9WhxB+lr/8yH46TuS8N2b29
- FoTf3Mr9T7pr/PWkOPzoY3P56nYbKU8xSwCim9xMzhBMzj8/N9ukJvXy27/VOz56
- eQaKqnvdXNGtPJrIMDGHps2KKWlKLyAlapzjVTMCgYAA/W++tACv85g13EykfT4F
- n0k4LbsGP9DP4zABQLIMyiY72eAncmRVjwrcW36XJ2xATOONTgx3gF3HjZzfaqNy
- eD/6uNNllUTVEryXGmHgNHPL45VRnn6memCY2eFvZdXhM5W4y2PYaunY0MkDercA
- +GTngbs6tBF88KOk04bYwQKBgFl68cRgsdkmnwwQYNaTKfmVGYzYaQXNzkqmWPko
- xmCJo6tHzC7ubdG8iRCYHzfmahPuuj6EdGPZuSRyYFgJi5Ftz/nAN+84OxtIQ3zn
- YWOgskQgaLh9YfsKsQ7Sf1NDOsnOnD5TX7UXl07fEpLe9vNCvAFiU8e5Y9LGudU5
- 4bYTAoGBAMdX3a3bXp4cZvXNBJ/QLVyxC6fP1Q4haCR1Od3m+T00Jth2IX2dk/fl
- p6xiJT1av5JtYabv1dFKaXOS5s1kLGGuCCSKpkvFZm826aQ2AFm0XGqEQDLeei5b
- A52Kpy/YJ+RkG4BTFtAooFq6DmA0cnoP6oPvG2h6XtDJwDTPInJb
- -----END RSA PRIVATE KEY-----`
- caContent = `-----BEGIN CERTIFICATE-----
- MIIDbTCCAlWgAwIBAgIUBJvFoCowKich7MMfseJ+DYzzirowDQYJKoZIhvcNAQEM
- BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
- GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yMzAzMTExMzIxMDNaGA8yMTIz
- MDIxNTEzMjEwM1owRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
- ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
- AQEBBQADggEPADCCAQoCggEBAO4to2YMYj0bxgr2FCiweSTSFuPx33zSw2x/s9Wf
- OR41bm2DFsyYT5f3sOIKlXZEdLmOKty2e3ho3yC0EyNpVHdykkkHT3aDI17quZax
- kYi/URqqtl1Z08A22txolc04hAZisg2BypGi3vql81UW1t3zyloGnJoIAeXR9uca
- ljP6Bk3bwsxoVBLi1JtHrO0hHLQaeHmKhAyrys06X0LRdn7Px48yRZlt6FaLSa8X
- YiRM0G44bVy/h6BkoQjMYGwVmCVk6zjJ9U7ZPFqdnDMNxAfR+hjDnYodqdLDMTTR
- 1NPVrnEnNwFx0AMLvgt/ba/45vZCEAmSZnFXFAJJcM7ai9ECAwEAAaNTMFEwHQYD
- VR0OBBYEFHlvMPKZoZ3G0YE7JePJOP+8J38fMB8GA1UdIwQYMBaAFHlvMPKZoZ3G
- 0YE7JePJOP+8J38fMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEMBQADggEB
- AMX8dNulADOo9uQgBMyFb9TVra7iY0zZjzv4GY5XY7scd52n6CnfAPvYBBDnTr/O
- BgNp5jaujb4+9u/2qhV3f9n+/3WOb2CmPehBgVSzlXqHeQ9lshmgwZPeem2T+8Tm
- Nnc/xQnsUfCFszUDxpkr55+aLVM22j02RWqcZ4q7TAaVYL+kdFVMc8FoqG/0ro6A
- BjE/Qn0Nn7ciX1VUjDt8l+k7ummPJTmzdi6i6E4AwO9dzrGNgGJ4aWL8cC6xYcIX
- goVIRTFeONXSDno/oPjWHpIPt7L15heMpKBHNuzPkKx2YVqPHE5QZxWfS+Lzgx+Q
- E2oTTM0rYKOZ8p6000mhvKI=
- -----END CERTIFICATE-----`
- )
- func init() {
- logx.Disable()
- }
- func TestPublisher_register(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id = 1
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
- ID: id,
- }, nil)
- cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
- pub := NewPublisher(nil, "thekey", "thevalue",
- WithPubEtcdAccount(stringx.Rand(), "bar"))
- _, err := pub.register(cli)
- assert.Nil(t, err)
- }
- func TestPublisher_registerWithOptions(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id = 2
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
- ID: 1,
- }, nil)
- cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
- certFile := createTempFile(t, []byte(certContent))
- defer os.Remove(certFile)
- keyFile := createTempFile(t, []byte(keyContent))
- defer os.Remove(keyFile)
- caFile := createTempFile(t, []byte(caContent))
- defer os.Remove(caFile)
- pub := NewPublisher(nil, "thekey", "thevalue", WithId(id),
- WithPubEtcdTLS(certFile, keyFile, caFile, true))
- _, err := pub.register(cli)
- assert.Nil(t, err)
- }
- func TestPublisher_registerError(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(nil, errors.New("error"))
- pub := NewPublisher(nil, "thekey", "thevalue")
- val, err := pub.register(cli)
- assert.NotNil(t, err)
- assert.Equal(t, clientv3.NoLease, val)
- }
- func TestPublisher_revoke(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().Revoke(gomock.Any(), id)
- pub := NewPublisher(nil, "thekey", "thevalue")
- pub.lease = id
- pub.revoke(cli)
- }
- func TestPublisher_revokeError(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().Revoke(gomock.Any(), id).Return(nil, errors.New("error"))
- pub := NewPublisher(nil, "thekey", "thevalue")
- pub.lease = id
- pub.revoke(cli)
- }
- func TestPublisher_keepAliveAsyncError(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().KeepAlive(gomock.Any(), id).Return(nil, errors.New("error"))
- pub := NewPublisher(nil, "thekey", "thevalue")
- pub.lease = id
- assert.NotNil(t, pub.keepAliveAsync(cli))
- }
- func TestPublisher_keepAliveAsyncQuit(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- cli := internal.NewMockEtcdClient(ctrl)
- cli.EXPECT().ActiveConnection()
- cli.EXPECT().Close()
- defer cli.Close()
- cli.ActiveConnection()
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().KeepAlive(gomock.Any(), id)
- var wg sync.WaitGroup
- wg.Add(1)
- cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
- wg.Done()
- })
- pub := NewPublisher(nil, "thekey", "thevalue")
- pub.lease = id
- pub.Stop()
- assert.Nil(t, pub.keepAliveAsync(cli))
- wg.Wait()
- }
- func TestPublisher_keepAliveAsyncPause(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- cli := internal.NewMockEtcdClient(ctrl)
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().KeepAlive(gomock.Any(), id)
- pub := NewPublisher(nil, "thekey", "thevalue")
- var wg sync.WaitGroup
- wg.Add(1)
- cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
- pub.Stop()
- wg.Done()
- })
- pub.lease = id
- assert.Nil(t, pub.keepAliveAsync(cli))
- pub.Pause()
- wg.Wait()
- }
- func TestPublisher_Resume(t *testing.T) {
- publisher := new(Publisher)
- publisher.resumeChan = make(chan lang.PlaceholderType)
- go func() {
- publisher.Resume()
- }()
- go func() {
- time.Sleep(time.Minute)
- t.Fail()
- }()
- <-publisher.resumeChan
- }
- func TestPublisher_keepAliveAsync(t *testing.T) {
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- const id clientv3.LeaseID = 1
- conn := createMockConn(t)
- defer conn.Close()
- cli := internal.NewMockEtcdClient(ctrl)
- cli.EXPECT().ActiveConnection().Return(conn).AnyTimes()
- cli.EXPECT().Close()
- defer cli.Close()
- cli.ActiveConnection()
- restore := setMockClient(cli)
- defer restore()
- cli.EXPECT().Ctx().AnyTimes()
- cli.EXPECT().KeepAlive(gomock.Any(), id)
- cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
- ID: 1,
- }, nil)
- cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", int64(id)), "thevalue", gomock.Any())
- var wg sync.WaitGroup
- wg.Add(1)
- cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
- wg.Done()
- })
- pub := NewPublisher([]string{"the-endpoint"}, "thekey", "thevalue")
- pub.lease = id
- assert.Nil(t, pub.KeepAlive())
- pub.Stop()
- wg.Wait()
- }
- func createMockConn(t *testing.T) *grpc.ClientConn {
- lis, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis.Close()
- lisAddr := resolver.Address{Addr: lis.Addr().String()}
- lisDone := make(chan struct{})
- dialDone := make(chan struct{})
- // 1st listener accepts the connection and then does nothing
- go func() {
- defer close(lisDone)
- conn, err := lis.Accept()
- if err != nil {
- t.Errorf("Error while accepting. Err: %v", err)
- return
- }
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings. Err: %v", err)
- return
- }
- <-dialDone // Close conn only after dial returns.
- }()
- r := manual.NewBuilderWithScheme("whatever")
- r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}})
- client, err := grpc.DialContext(context.Background(), r.Scheme()+":///test.server",
- grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
- close(dialDone)
- if err != nil {
- t.Fatalf("Dial failed. Err: %v", err)
- }
- timeout := time.After(1 * time.Second)
- select {
- case <-timeout:
- t.Fatal("timed out waiting for server to finish")
- case <-lisDone:
- }
- return client
- }
- func createTempFile(t *testing.T, body []byte) string {
- tmpFile, err := os.CreateTemp(os.TempDir(), "go-unit-*.tmp")
- if err != nil {
- t.Fatal(err)
- }
- tmpFile.Close()
- if err = os.WriteFile(tmpFile.Name(), body, os.ModePerm); err != nil {
- t.Fatal(err)
- }
- return tmpFile.Name()
- }
|