From 40e4b2065b72b95ab1f87f71bf71f61f376a0101 Mon Sep 17 00:00:00 2001 From: let-bit Date: Sat, 20 Jul 2024 09:31:25 +0000 Subject: [PATCH 1/4] kmesh restart with config change Signed-off-by: let-bit --- .../workload/cache/service_cache.go | 7 + pkg/controller/workload/workload_processor.go | 196 +++++++++++------- 2 files changed, 131 insertions(+), 72 deletions(-) diff --git a/pkg/controller/workload/cache/service_cache.go b/pkg/controller/workload/cache/service_cache.go index 8ec056aca..c12c1de80 100644 --- a/pkg/controller/workload/cache/service_cache.go +++ b/pkg/controller/workload/cache/service_cache.go @@ -26,6 +26,7 @@ type ServiceCache interface { List() []*workloadapi.Service AddOrUpdateService(svc *workloadapi.Service) DeleteService(resourceName string) + GetService(resourceName string) *workloadapi.Service } type serviceCache struct { @@ -62,3 +63,9 @@ func (s *serviceCache) List() []*workloadapi.Service { return out } + +func (s *serviceCache) GetService(resourceName string) *workloadapi.Service { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.servicesByResourceName[resourceName] +} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 23af5391a..6111315f1 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -29,6 +29,7 @@ import ( "kmesh.net/kmesh/api/v2/workloadapi/security" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/pkg/auth" + kmeshbpf "kmesh.net/kmesh/pkg/bpf" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/config" bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" @@ -141,6 +142,16 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { } func (p *Processor) removeWorkloadResource(removedResources []string) error { + for _, uid := range removedResources { + p.WorkloadCache.DeleteWorkload(uid) + if err := p.removeWorkloadResourceByUid(uid); err != nil { + return err + } + } + return nil +} + +func (p *Processor) removeWorkloadResourceByUid(uid string) error { var ( err error skUpdate = bpf.ServiceKey{} @@ -150,66 +161,63 @@ func (p *Processor) removeWorkloadResource(removedResources []string) error { bkDelete = bpf.BackendKey{} ) - for _, uid := range removedResources { - p.WorkloadCache.DeleteWorkload(uid) - backendUid := p.hashName.StrToNum(uid) - // for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records - if err = p.deletePodFrontendData(backendUid); err != nil { - log.Errorf("deletePodFrontendData failed: %s", err) - goto failed - } + backendUid := p.hashName.StrToNum(uid) + // for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records + if err = p.deletePodFrontendData(backendUid); err != nil { + log.Errorf("deletePodFrontendData failed: %s", err) + return err + } - // 1. find all endpoint keys related to this workload - if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 { - for _, ek := range eks { - log.Debugf("Find EndpointKey: [%#v]", ek) - // 2. find the service - skUpdate.ServiceId = ek.ServiceId - if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil { - log.Debugf("Find ServiceValue: [%#v]", svUpdate) - // 3. find the last indexed endpoint of the service - lastEndpointKey.ServiceId = skUpdate.ServiceId - lastEndpointKey.BackendIndex = svUpdate.EndpointCount - if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil { - log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue) - // 4. switch the index of the last with the current removed endpoint - if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil { - log.Errorf("EndpointUpdate failed: %s", err) - goto failed - } - if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil { - log.Errorf("EndpointDelete failed: %s", err) - goto failed - } - svUpdate.EndpointCount = svUpdate.EndpointCount - 1 - if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil { - log.Errorf("ServiceUpdate failed: %s", err) - goto failed - } - } else { - // last indexed endpoint not exists, this should not occur - // we should delete the endpoint just in case leak - if err = p.bpf.EndpointDelete(&ek); err != nil { - log.Errorf("EndpointDelete failed: %s", err) - goto failed - } + // 1. find all endpoint keys related to this workload + if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 { + for _, ek := range eks { + log.Debugf("Find EndpointKey: [%#v]", ek) + // 2. find the service + skUpdate.ServiceId = ek.ServiceId + if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil { + log.Debugf("Find ServiceValue: [%#v]", svUpdate) + // 3. find the last indexed endpoint of the service + lastEndpointKey.ServiceId = skUpdate.ServiceId + lastEndpointKey.BackendIndex = svUpdate.EndpointCount + if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil { + log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue) + // 4. switch the index of the last with the current removed endpoint + if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil { + log.Errorf("EndpointUpdate failed: %s", err) + goto failed } - } else { // service not exist, we should delete the endpoint + if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil { + log.Errorf("EndpointDelete failed: %s", err) + goto failed + } + svUpdate.EndpointCount = svUpdate.EndpointCount - 1 + if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil { + log.Errorf("ServiceUpdate failed: %s", err) + goto failed + } + } else { + // last indexed endpoint not exists, this should not occur + // we should delete the endpoint just in case leak if err = p.bpf.EndpointDelete(&ek); err != nil { log.Errorf("EndpointDelete failed: %s", err) goto failed } } + } else { // service not exist, we should delete the endpoint + if err = p.bpf.EndpointDelete(&ek); err != nil { + log.Errorf("EndpointDelete failed: %s", err) + goto failed + } } } + } - bkDelete.BackendUid = backendUid - if err = p.bpf.BackendDelete(&bkDelete); err != nil { - log.Errorf("BackendDelete failed: %s", err) - goto failed - } - p.hashName.Delete(uid) + bkDelete.BackendUid = backendUid + if err = p.bpf.BackendDelete(&bkDelete); err != nil { + log.Errorf("BackendDelete failed: %s", err) + goto failed } + p.hashName.Delete(uid) failed: return err @@ -235,6 +243,17 @@ func (p *Processor) deleteFrontendData(id uint32) error { } func (p *Processor) removeServiceResource(resources []string) error { + var err error + for _, name := range resources { + p.ServiceCache.DeleteService(name) + if err = p.removeServiceResourceByUid(name); err != nil { + return err + } + } + return err +} + +func (p *Processor) removeServiceResourceByUid(name string) error { var ( err error skDelete = bpf.ServiceKey{} @@ -242,34 +261,31 @@ func (p *Processor) removeServiceResource(resources []string) error { ekDelete = bpf.EndpointKey{} ) - for _, name := range resources { - p.ServiceCache.DeleteService(name) - serviceId := p.hashName.StrToNum(name) - skDelete.ServiceId = serviceId - if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil { - if err = p.deleteFrontendData(serviceId); err != nil { - log.Errorf("deleteFrontendData failed: %s", err) - goto failed - } + p.ServiceCache.DeleteService(name) + serviceId := p.hashName.StrToNum(name) + skDelete.ServiceId = serviceId + if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil { + if err = p.deleteFrontendData(serviceId); err != nil { + log.Errorf("deleteFrontendData failed: %s", err) + goto failed + } - if err = p.bpf.ServiceDelete(&skDelete); err != nil { - log.Errorf("ServiceDelete failed: %s", err) - goto failed - } + if err = p.bpf.ServiceDelete(&skDelete); err != nil { + log.Errorf("ServiceDelete failed: %s", err) + goto failed + } - var i uint32 - for i = 1; i <= svDelete.EndpointCount; i++ { - ekDelete.ServiceId = serviceId - ekDelete.BackendIndex = i - if err = p.bpf.EndpointDelete(&ekDelete); err != nil { - log.Errorf("EndpointDelete failed: %s", err) - goto failed - } + var i uint32 + for i = 1; i <= svDelete.EndpointCount; i++ { + ekDelete.ServiceId = serviceId + ekDelete.BackendIndex = i + if err = p.bpf.EndpointDelete(&ekDelete); err != nil { + log.Errorf("EndpointDelete failed: %s", err) + goto failed } } - p.hashName.Delete(name) } - + p.hashName.Delete(name) failed: return err } @@ -587,10 +603,46 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis } _ = p.handleRemovedAddresses(rsp.RemovedResources) + p.compareWorkloadAndService() return err } +// When processing the workload's response for the first time, +// fetch the data from the /mnt/workload_hash_name.yaml file +// and compare it with the data in the cache. +func (p *Processor) compareWorkloadAndService() { + var ( + bk = bpf.BackendKey{} + bv = bpf.BackendValue{} + ) + + if kmeshbpf.GetStartType() == kmeshbpf.Normal { + return + } + + log.Infof("reload workload config from last start") + kmeshbpf.SetStartType(kmeshbpf.Normal) + for str, num := range p.hashName.strToNum { + if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { + log.Debugf("GetWorkloadByUid and GetService nil:%v", str) + + bk.BackendUid = num + if err := p.bpf.BackendLookup(&bk, &bv); err == nil { + log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv) + if err := p.removeWorkloadResourceByUid(str); err != nil { + log.Errorf("RemoveWorkloadResource failed: %v", err) + } + } else { + log.Debugf("RemoveServiceResource") + if err := p.removeServiceResourceByUid(str); err != nil { + log.Errorf("RemoveServiceResource failed: %v", err) + } + } + } + } +} + func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error { if rbac == nil { return fmt.Errorf("Rbac module uninitialized") From 7a37b8c09ed00a88cf2af998e28fbc8aab71a7ce Mon Sep 17 00:00:00 2001 From: let-bit Date: Sat, 27 Jul 2024 08:20:53 +0000 Subject: [PATCH 2/4] ut for config update with kmesh restart Signed-off-by: let-bit --- pkg/controller/workload/workload_processor.go | 4 +- .../workload/workload_processor_test.go | 51 +++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 6111315f1..e4bacf2da 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -603,7 +603,7 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis } _ = p.handleRemovedAddresses(rsp.RemovedResources) - p.compareWorkloadAndService() + p.compareWorkloadAndServiceWithHashName() return err } @@ -611,7 +611,7 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis // When processing the workload's response for the first time, // fetch the data from the /mnt/workload_hash_name.yaml file // and compare it with the data in the cache. -func (p *Processor) compareWorkloadAndService() { +func (p *Processor) compareWorkloadAndServiceWithHashName() { var ( bk = bpf.BackendKey{} bv = bpf.BackendValue{} diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 0fe2c333a..6b836deb5 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -25,6 +25,7 @@ import ( "kmesh.net/kmesh/api/v2/workloadapi" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/nets" @@ -90,6 +91,7 @@ func Test_handleWorkload(t *testing.T) { // 3.2 check service map contains service checkServiceMap(t, p, svcID, fakeSvc, 2) + hashNameClean(p) } func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workloadapi.Service, endpointCount uint32) { @@ -251,3 +253,52 @@ func createFakeService(name, ip, waypoint string) *workloadapi.Service { }, } } + +func Test_deleteWorkloadWithRestart(t *testing.T) { + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := newProcessor(workloadMap) + + // 1. handle workload with service, but service not handled yet + // In this case, only frontend map and backend map should be updated. + wl := createTestWorkloadWithService() + _ = p.handleDataWithService(createTestWorkloadWithService()) + + workloadID := checkFrontEndMap(t, wl.Addresses[0], p) + checkBackendMap(t, p, workloadID, wl) + + epKeys := p.bpf.EndpointIterFindKey(workloadID) + assert.Equal(t, len(epKeys), 0) + for svcName := range wl.Services { + endpoints := p.endpointsByService[svcName] + assert.Len(t, endpoints, 1) + if _, ok := endpoints[wl.Uid]; ok { + assert.True(t, ok) + } + } + + // Set a restart label and simulate missing data in the cache + bpf.SetStartType(bpf.Restart) + for key := range wl.GetServices() { + p.ServiceCache.DeleteService(key) + } + + p.compareWorkloadAndServiceWithHashName() + hashNameClean(p) +} + +// The hashname will be saved as a file by default. +// If it is not cleaned, it will affect other use cases. +func hashNameClean(p *Processor) { + for str := range p.hashName.strToNum { + if err := p.removeWorkloadResourceByUid(str); err != nil { + log.Errorf("RemoveWorkloadResource failed: %v", err) + } + + if err := p.removeServiceResourceByUid(str); err != nil { + log.Errorf("RemoveServiceResource failed: %v", err) + } + p.hashName.Delete(str) + } +} From d21b8f4a86e2fea9b46532f214786b51ac59535f Mon Sep 17 00:00:00 2001 From: let-bit Date: Sat, 3 Aug 2024 08:49:29 +0000 Subject: [PATCH 3/4] update Signed-off-by: let-bit --- pkg/controller/workload/workload_processor.go | 24 ++++++++++++------- .../workload/workload_processor_test.go | 4 ++-- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index e4bacf2da..d2fcf254f 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -144,14 +144,14 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { func (p *Processor) removeWorkloadResource(removedResources []string) error { for _, uid := range removedResources { p.WorkloadCache.DeleteWorkload(uid) - if err := p.removeWorkloadResourceByUid(uid); err != nil { + if err := p.removeWorkloadFromBpfMap(uid); err != nil { return err } } return nil } -func (p *Processor) removeWorkloadResourceByUid(uid string) error { +func (p *Processor) removeWorkloadFromBpfMap(uid string) error { var ( err error skUpdate = bpf.ServiceKey{} @@ -246,14 +246,14 @@ func (p *Processor) removeServiceResource(resources []string) error { var err error for _, name := range resources { p.ServiceCache.DeleteService(name) - if err = p.removeServiceResourceByUid(name); err != nil { + if err = p.removeServiceResourceFromBpfMap(name); err != nil { return err } } return err } -func (p *Processor) removeServiceResourceByUid(name string) error { +func (p *Processor) removeServiceResourceFromBpfMap(name string) error { var ( err error skDelete = bpf.ServiceKey{} @@ -615,27 +615,33 @@ func (p *Processor) compareWorkloadAndServiceWithHashName() { var ( bk = bpf.BackendKey{} bv = bpf.BackendValue{} + sk = bpf.ServiceKey{} + sv = bpf.ServiceValue{} ) - if kmeshbpf.GetStartType() == kmeshbpf.Normal { + if kmeshbpf.GetStartType() != kmeshbpf.Restart { return } log.Infof("reload workload config from last start") kmeshbpf.SetStartType(kmeshbpf.Normal) + + // The record exists in the hashName file, exists in Backend or Service bpfmap, + // and does not exist in cache. for str, num := range p.hashName.strToNum { if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { log.Debugf("GetWorkloadByUid and GetService nil:%v", str) bk.BackendUid = num + sk.ServiceId = num if err := p.bpf.BackendLookup(&bk, &bv); err == nil { log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv) - if err := p.removeWorkloadResourceByUid(str); err != nil { + if err := p.removeWorkloadFromBpfMap(str); err != nil { log.Errorf("RemoveWorkloadResource failed: %v", err) } - } else { - log.Debugf("RemoveServiceResource") - if err := p.removeServiceResourceByUid(str); err != nil { + } else if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { + log.Debugf("Find ServiceValue: [%#v] RemoveServiceResource", sv) + if err := p.removeServiceResourceFromBpfMap(str); err != nil { log.Errorf("RemoveServiceResource failed: %v", err) } } diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 6b836deb5..afb20ea81 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -292,11 +292,11 @@ func Test_deleteWorkloadWithRestart(t *testing.T) { // If it is not cleaned, it will affect other use cases. func hashNameClean(p *Processor) { for str := range p.hashName.strToNum { - if err := p.removeWorkloadResourceByUid(str); err != nil { + if err := p.removeWorkloadFromBpfMap(str); err != nil { log.Errorf("RemoveWorkloadResource failed: %v", err) } - if err := p.removeServiceResourceByUid(str); err != nil { + if err := p.removeServiceResourceFromBpfMap(str); err != nil { log.Errorf("RemoveServiceResource failed: %v", err) } p.hashName.Delete(str) From 2d1dc685e53799f2b5d2dd0d129f7b3db0ba004f Mon Sep 17 00:00:00 2001 From: let-bit Date: Mon, 5 Aug 2024 06:24:16 +0000 Subject: [PATCH 4/4] update notes and logs Signed-off-by: let-bit --- pkg/controller/workload/workload_processor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index d2fcf254f..19bcf53db 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -623,11 +623,14 @@ func (p *Processor) compareWorkloadAndServiceWithHashName() { return } - log.Infof("reload workload config from last start") + log.Infof("reload workload config from last epoch") kmeshbpf.SetStartType(kmeshbpf.Normal) - // The record exists in the hashName file, exists in Backend or Service bpfmap, - // and does not exist in cache. + /* We traverse hashName, if there is a record exists in bpf map + * but not in usercache, that means the data in the bpf map load + * from the last epoch is inconsistent with the data that should + * actually be stored now. then we should delete it from bpf map + */ for str, num := range p.hashName.strToNum { if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { log.Debugf("GetWorkloadByUid and GetService nil:%v", str)