12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package internal
- import (
- "context"
- "fmt"
- "time"
- "github.com/wuntsong-org/go-zero-plus/core/logx"
- "github.com/wuntsong-org/go-zero-plus/core/proc"
- "github.com/wuntsong-org/go-zero-plus/core/threading"
- "github.com/wuntsong-org/go-zero-plus/zrpc/resolver/internal/kube"
- "google.golang.org/grpc/resolver"
- v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- )
- const (
- resyncInterval = 5 * time.Minute
- nameSelector = "metadata.name="
- )
- type kubeBuilder struct{}
- func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
- _ resolver.BuildOptions) (resolver.Resolver, error) {
- svc, err := kube.ParseTarget(target)
- if err != nil {
- return nil, err
- }
- config, err := rest.InClusterConfig()
- if err != nil {
- return nil, err
- }
- cs, err := kubernetes.NewForConfig(config)
- if err != nil {
- return nil, err
- }
- if svc.Port == 0 {
- endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
- if err != nil {
- return nil, err
- }
- svc.Port = int(endpoints.Subsets[0].Ports[0].Port)
- }
- handler := kube.NewEventHandler(func(endpoints []string) {
- var addrs []resolver.Address
- for _, val := range subset(endpoints, subsetSize) {
- addrs = append(addrs, resolver.Address{
- Addr: fmt.Sprintf("%s:%d", val, svc.Port),
- })
- }
- if err := cc.UpdateState(resolver.State{
- Addresses: addrs,
- }); err != nil {
- logx.Error(err)
- }
- })
- inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
- informers.WithNamespace(svc.Namespace),
- informers.WithTweakListOptions(func(options *v1.ListOptions) {
- options.FieldSelector = nameSelector + svc.Name
- }))
- in := inf.Core().V1().Endpoints()
- _, err = in.Informer().AddEventHandler(handler)
- if err != nil {
- return nil, err
- }
- threading.GoSafe(func() {
- inf.Start(proc.Done())
- })
- endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
- if err != nil {
- return nil, err
- }
- handler.Update(endpoints)
- return &nopResolver{cc: cc}, nil
- }
- func (b *kubeBuilder) Scheme() string {
- return KubernetesScheme
- }
|