1
0

kubebuilder.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/wuntsong-org/go-zero-plus/core/logx"
  7. "github.com/wuntsong-org/go-zero-plus/core/proc"
  8. "github.com/wuntsong-org/go-zero-plus/core/threading"
  9. "github.com/wuntsong-org/go-zero-plus/zrpc/resolver/internal/kube"
  10. "google.golang.org/grpc/resolver"
  11. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  12. "k8s.io/client-go/informers"
  13. "k8s.io/client-go/kubernetes"
  14. "k8s.io/client-go/rest"
  15. )
  16. const (
  17. resyncInterval = 5 * time.Minute
  18. nameSelector = "metadata.name="
  19. )
  20. type kubeBuilder struct{}
  21. func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
  22. _ resolver.BuildOptions) (resolver.Resolver, error) {
  23. svc, err := kube.ParseTarget(target)
  24. if err != nil {
  25. return nil, err
  26. }
  27. config, err := rest.InClusterConfig()
  28. if err != nil {
  29. return nil, err
  30. }
  31. cs, err := kubernetes.NewForConfig(config)
  32. if err != nil {
  33. return nil, err
  34. }
  35. if svc.Port == 0 {
  36. endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
  37. if err != nil {
  38. return nil, err
  39. }
  40. svc.Port = int(endpoints.Subsets[0].Ports[0].Port)
  41. }
  42. handler := kube.NewEventHandler(func(endpoints []string) {
  43. var addrs []resolver.Address
  44. for _, val := range subset(endpoints, subsetSize) {
  45. addrs = append(addrs, resolver.Address{
  46. Addr: fmt.Sprintf("%s:%d", val, svc.Port),
  47. })
  48. }
  49. if err := cc.UpdateState(resolver.State{
  50. Addresses: addrs,
  51. }); err != nil {
  52. logx.Error(err)
  53. }
  54. })
  55. inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
  56. informers.WithNamespace(svc.Namespace),
  57. informers.WithTweakListOptions(func(options *v1.ListOptions) {
  58. options.FieldSelector = nameSelector + svc.Name
  59. }))
  60. in := inf.Core().V1().Endpoints()
  61. _, err = in.Informer().AddEventHandler(handler)
  62. if err != nil {
  63. return nil, err
  64. }
  65. threading.GoSafe(func() {
  66. inf.Start(proc.Done())
  67. })
  68. endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
  69. if err != nil {
  70. return nil, err
  71. }
  72. handler.Update(endpoints)
  73. return &nopResolver{cc: cc}, nil
  74. }
  75. func (b *kubeBuilder) Scheme() string {
  76. return KubernetesScheme
  77. }