123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package kube
- import (
- "sync"
- "github.com/wuntsong-org/go-zero-plus/core/lang"
- "github.com/wuntsong-org/go-zero-plus/core/logx"
- v1 "k8s.io/api/core/v1"
- "k8s.io/client-go/tools/cache"
- )
- var _ cache.ResourceEventHandler = (*EventHandler)(nil)
- // EventHandler is ResourceEventHandler implementation.
- type EventHandler struct {
- update func([]string)
- endpoints map[string]lang.PlaceholderType
- lock sync.Mutex
- }
- // NewEventHandler returns an EventHandler.
- func NewEventHandler(update func([]string)) *EventHandler {
- return &EventHandler{
- update: update,
- endpoints: make(map[string]lang.PlaceholderType),
- }
- }
- // OnAdd handles the endpoints add events.
- func (h *EventHandler) OnAdd(obj any, _ bool) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
- return
- }
- h.lock.Lock()
- defer h.lock.Unlock()
- var changed bool
- for _, sub := range endpoints.Subsets {
- for _, point := range sub.Addresses {
- if _, ok := h.endpoints[point.IP]; !ok {
- h.endpoints[point.IP] = lang.Placeholder
- changed = true
- }
- }
- }
- if changed {
- h.notify()
- }
- }
- // OnDelete handles the endpoints delete events.
- func (h *EventHandler) OnDelete(obj any) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
- return
- }
- h.lock.Lock()
- defer h.lock.Unlock()
- var changed bool
- for _, sub := range endpoints.Subsets {
- for _, point := range sub.Addresses {
- if _, ok := h.endpoints[point.IP]; ok {
- delete(h.endpoints, point.IP)
- changed = true
- }
- }
- }
- if changed {
- h.notify()
- }
- }
- // OnUpdate handles the endpoints update events.
- func (h *EventHandler) OnUpdate(oldObj, newObj any) {
- oldEndpoints, ok := oldObj.(*v1.Endpoints)
- if !ok {
- logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
- return
- }
- newEndpoints, ok := newObj.(*v1.Endpoints)
- if !ok {
- logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
- return
- }
- if oldEndpoints.ResourceVersion == newEndpoints.ResourceVersion {
- return
- }
- h.Update(newEndpoints)
- }
- // Update updates the endpoints.
- func (h *EventHandler) Update(endpoints *v1.Endpoints) {
- h.lock.Lock()
- defer h.lock.Unlock()
- old := h.endpoints
- h.endpoints = make(map[string]lang.PlaceholderType)
- for _, sub := range endpoints.Subsets {
- for _, point := range sub.Addresses {
- h.endpoints[point.IP] = lang.Placeholder
- }
- }
- if diff(old, h.endpoints) {
- h.notify()
- }
- }
- func (h *EventHandler) notify() {
- targets := make([]string, 0, len(h.endpoints))
- for k := range h.endpoints {
- targets = append(targets, k)
- }
- h.update(targets)
- }
- func diff(o, n map[string]lang.PlaceholderType) bool {
- if len(o) != len(n) {
- return true
- }
- for k := range o {
- if _, ok := n[k]; !ok {
- return true
- }
- }
- return false
- }
|