kubebuilder.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package resolver
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. "github.com/tal-tech/go-zero/core/proc"
  8. "github.com/tal-tech/go-zero/core/threading"
  9. "github.com/tal-tech/go-zero/zrpc/internal/resolver/kube"
  10. "google.golang.org/grpc/resolver"
  11. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  12. "k8s.io/client-go/informers"
  13. "k8s.io/client-go/kubernetes"
  14. "k8s.io/client-go/rest"
  15. )
  16. const (
  17. resyncInterval = 5 * time.Minute
  18. nameSelector = "metadata.name="
  19. )
  20. type kubeBuilder struct{}
  21. func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
  22. opts resolver.BuildOptions) (resolver.Resolver, error) {
  23. svc, err := kube.ParseTarget(target)
  24. if err != nil {
  25. return nil, err
  26. }
  27. config, err := rest.InClusterConfig()
  28. if err != nil {
  29. return nil, err
  30. }
  31. cs, err := kubernetes.NewForConfig(config)
  32. if err != nil {
  33. return nil, err
  34. }
  35. handler := kube.NewEventHandler(func(endpoints []string) {
  36. var addrs []resolver.Address
  37. for _, val := range subset(endpoints, subsetSize) {
  38. addrs = append(addrs, resolver.Address{
  39. Addr: fmt.Sprintf("%s:%d", val, svc.Port),
  40. })
  41. }
  42. if err := cc.UpdateState(resolver.State{
  43. Addresses: addrs,
  44. }); err != nil {
  45. logx.Error(err)
  46. }
  47. })
  48. inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
  49. informers.WithNamespace(svc.Namespace),
  50. informers.WithTweakListOptions(func(options *v1.ListOptions) {
  51. options.FieldSelector = nameSelector + svc.Name
  52. }))
  53. in := inf.Core().V1().Endpoints()
  54. in.Informer().AddEventHandler(handler)
  55. threading.GoSafe(func() {
  56. inf.Start(proc.Done())
  57. })
  58. endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
  59. if err != nil {
  60. return nil, err
  61. }
  62. handler.Update(endpoints)
  63. return &nopResolver{cc: cc}, nil
  64. }
  65. func (b *kubeBuilder) Scheme() string {
  66. return KubernetesScheme
  67. }