Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kmesh restart with config change #640

Merged
merged 4 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/controller/workload/cache/service_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
List() []*workloadapi.Service
AddOrUpdateService(svc *workloadapi.Service)
DeleteService(resourceName string)
GetService(resourceName string) *workloadapi.Service
}

type serviceCache struct {
Expand Down Expand Up @@ -62,3 +63,9 @@

return out
}

func (s *serviceCache) GetService(resourceName string) *workloadapi.Service {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.servicesByResourceName[resourceName]

Check warning on line 70 in pkg/controller/workload/cache/service_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/cache/service_cache.go#L67-L70

Added lines #L67 - L70 were not covered by tests
}
196 changes: 124 additions & 72 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"kmesh.net/kmesh/api/v2/workloadapi/security"
"kmesh.net/kmesh/bpf/kmesh/bpf2go"
"kmesh.net/kmesh/pkg/auth"
kmeshbpf "kmesh.net/kmesh/pkg/bpf"
"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/controller/config"
bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache"
Expand Down Expand Up @@ -141,6 +142,16 @@
}

func (p *Processor) removeWorkloadResource(removedResources []string) error {
for _, uid := range removedResources {
p.WorkloadCache.DeleteWorkload(uid)
if err := p.removeWorkloadResourceByUid(uid); err != nil {
return err

Check warning on line 148 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}
}
return nil
}

func (p *Processor) removeWorkloadResourceByUid(uid string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest renaming to removeWorkloadFromBpfMap to distinguish with remove from local cache

Copy link
Contributor Author

@lec-bit lec-bit Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, the same interface is called by remove from local chace, I split it out and reused it .

var (
err error
skUpdate = bpf.ServiceKey{}
Expand All @@ -150,66 +161,63 @@
bkDelete = bpf.BackendKey{}
)

for _, uid := range removedResources {
p.WorkloadCache.DeleteWorkload(uid)
backendUid := p.hashName.StrToNum(uid)
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
if err = p.deletePodFrontendData(backendUid); err != nil {
log.Errorf("deletePodFrontendData failed: %s", err)
goto failed
}
backendUid := p.hashName.StrToNum(uid)
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
if err = p.deletePodFrontendData(backendUid); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @nlgwcy Is fixing this, but it think any one can merge first

log.Errorf("deletePodFrontendData failed: %s", err)
return err

Check warning on line 168 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}

// 1. find all endpoint keys related to this workload
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
for _, ek := range eks {
log.Debugf("Find EndpointKey: [%#v]", ek)
// 2. find the service
skUpdate.ServiceId = ek.ServiceId
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
// 3. find the last indexed endpoint of the service
lastEndpointKey.ServiceId = skUpdate.ServiceId
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
// 4. switch the index of the last with the current removed endpoint
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
log.Errorf("EndpointUpdate failed: %s", err)
goto failed
}
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed
}
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
log.Errorf("ServiceUpdate failed: %s", err)
goto failed
}
} else {
// last indexed endpoint not exists, this should not occur
// we should delete the endpoint just in case leak
if err = p.bpf.EndpointDelete(&ek); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed
}
// 1. find all endpoint keys related to this workload
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
for _, ek := range eks {
log.Debugf("Find EndpointKey: [%#v]", ek)
// 2. find the service
skUpdate.ServiceId = ek.ServiceId
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
// 3. find the last indexed endpoint of the service
lastEndpointKey.ServiceId = skUpdate.ServiceId
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
// 4. switch the index of the last with the current removed endpoint
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
log.Errorf("EndpointUpdate failed: %s", err)
goto failed

Check warning on line 187 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L186-L187

Added lines #L186 - L187 were not covered by tests
}
} else { // service not exist, we should delete the endpoint
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed

Check warning on line 191 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L190-L191

Added lines #L190 - L191 were not covered by tests
}
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
log.Errorf("ServiceUpdate failed: %s", err)
goto failed

Check warning on line 196 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L195-L196

Added lines #L195 - L196 were not covered by tests
}
} else {

Check warning on line 198 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L198

Added line #L198 was not covered by tests
// last indexed endpoint not exists, this should not occur
// we should delete the endpoint just in case leak
if err = p.bpf.EndpointDelete(&ek); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed
}
}
} else { // service not exist, we should delete the endpoint
if err = p.bpf.EndpointDelete(&ek); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed

Check warning on line 209 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L206-L209

Added lines #L206 - L209 were not covered by tests
}
}
}
}

bkDelete.BackendUid = backendUid
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
log.Errorf("BackendDelete failed: %s", err)
goto failed
}
p.hashName.Delete(uid)
bkDelete.BackendUid = backendUid
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
log.Errorf("BackendDelete failed: %s", err)
goto failed
}
p.hashName.Delete(uid)

failed:
return err
Expand All @@ -235,41 +243,49 @@
}

func (p *Processor) removeServiceResource(resources []string) error {
var err error
for _, name := range resources {
p.ServiceCache.DeleteService(name)
if err = p.removeServiceResourceByUid(name); err != nil {
return err

Check warning on line 250 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L248-L250

Added lines #L248 - L250 were not covered by tests
}
}
return err
}

func (p *Processor) removeServiceResourceByUid(name string) error {
var (
err error
skDelete = bpf.ServiceKey{}
svDelete = bpf.ServiceValue{}
ekDelete = bpf.EndpointKey{}
)

for _, name := range resources {
p.ServiceCache.DeleteService(name)
serviceId := p.hashName.StrToNum(name)
skDelete.ServiceId = serviceId
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
if err = p.deleteFrontendData(serviceId); err != nil {
log.Errorf("deleteFrontendData failed: %s", err)
goto failed
}
p.ServiceCache.DeleteService(name)
serviceId := p.hashName.StrToNum(name)
skDelete.ServiceId = serviceId
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
if err = p.deleteFrontendData(serviceId); err != nil {
log.Errorf("deleteFrontendData failed: %s", err)
goto failed

Check warning on line 270 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L269-L270

Added lines #L269 - L270 were not covered by tests
}

if err = p.bpf.ServiceDelete(&skDelete); err != nil {
log.Errorf("ServiceDelete failed: %s", err)
goto failed
}
if err = p.bpf.ServiceDelete(&skDelete); err != nil {
log.Errorf("ServiceDelete failed: %s", err)
goto failed

Check warning on line 275 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L274-L275

Added lines #L274 - L275 were not covered by tests
}

var i uint32
for i = 1; i <= svDelete.EndpointCount; i++ {
ekDelete.ServiceId = serviceId
ekDelete.BackendIndex = i
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed
}
var i uint32
for i = 1; i <= svDelete.EndpointCount; i++ {
ekDelete.ServiceId = serviceId
ekDelete.BackendIndex = i
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
log.Errorf("EndpointDelete failed: %s", err)
goto failed

Check warning on line 284 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L283-L284

Added lines #L283 - L284 were not covered by tests
}
}
p.hashName.Delete(name)
}

p.hashName.Delete(name)
failed:
return err
}
Expand Down Expand Up @@ -587,10 +603,46 @@
}

_ = p.handleRemovedAddresses(rsp.RemovedResources)
p.compareWorkloadAndServiceWithHashName()

return err
}

// When processing the workload's response for the first time,
// fetch the data from the /mnt/workload_hash_name.yaml file
// and compare it with the data in the cache.
func (p *Processor) compareWorkloadAndServiceWithHashName() {
var (
bk = bpf.BackendKey{}
bv = bpf.BackendValue{}
)

if kmeshbpf.GetStartType() == kmeshbpf.Normal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we haven't support upgrade, to be safer shall we compare with RESTART only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i will change it

return
}

log.Infof("reload workload config from last start")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Infof("reload workload config from last start")
log.Infof("reload workload config from last epoch")

kmeshbpf.SetStartType(kmeshbpf.Normal)
for str, num := range p.hashName.strToNum {
if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil {
log.Debugf("GetWorkloadByUid and GetService nil:%v", str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot assume all the strs are service or pod, can we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was careless here. I should add a check process in bpf_map.


bk.BackendUid = num
if err := p.bpf.BackendLookup(&bk, &bv); err == nil {
log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv)
if err := p.removeWorkloadResourceByUid(str); err != nil {
log.Errorf("RemoveWorkloadResource failed: %v", err)

Check warning on line 634 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L634

Added line #L634 was not covered by tests
}
} else {
log.Debugf("RemoveServiceResource")
if err := p.removeServiceResourceByUid(str); err != nil {
log.Errorf("RemoveServiceResource failed: %v", err)
}
}
}
}
}

func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error {
if rbac == nil {
return fmt.Errorf("Rbac module uninitialized")
Expand Down
51 changes: 51 additions & 0 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"kmesh.net/kmesh/api/v2/workloadapi"
"kmesh.net/kmesh/daemon/options"
"kmesh.net/kmesh/pkg/bpf"
"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/controller/workload/bpfcache"
"kmesh.net/kmesh/pkg/nets"
Expand Down Expand Up @@ -90,6 +91,7 @@ func Test_handleWorkload(t *testing.T) {

// 3.2 check service map contains service
checkServiceMap(t, p, svcID, fakeSvc, 2)
hashNameClean(p)
}

func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workloadapi.Service, endpointCount uint32) {
Expand Down Expand Up @@ -251,3 +253,52 @@ func createFakeService(name, ip, waypoint string) *workloadapi.Service {
},
}
}

func Test_deleteWorkloadWithRestart(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)

// 1. handle workload with service, but service not handled yet
// In this case, only frontend map and backend map should be updated.
wl := createTestWorkloadWithService()
_ = p.handleDataWithService(createTestWorkloadWithService())

workloadID := checkFrontEndMap(t, wl.Addresses[0], p)
checkBackendMap(t, p, workloadID, wl)

epKeys := p.bpf.EndpointIterFindKey(workloadID)
assert.Equal(t, len(epKeys), 0)
for svcName := range wl.Services {
endpoints := p.endpointsByService[svcName]
assert.Len(t, endpoints, 1)
if _, ok := endpoints[wl.Uid]; ok {
assert.True(t, ok)
}
}

// Set a restart label and simulate missing data in the cache
bpf.SetStartType(bpf.Restart)
for key := range wl.GetServices() {
p.ServiceCache.DeleteService(key)
}

p.compareWorkloadAndServiceWithHashName()
hashNameClean(p)
}

// The hashname will be saved as a file by default.
// If it is not cleaned, it will affect other use cases.
func hashNameClean(p *Processor) {
for str := range p.hashName.strToNum {
if err := p.removeWorkloadResourceByUid(str); err != nil {
log.Errorf("RemoveWorkloadResource failed: %v", err)
}

if err := p.removeServiceResourceByUid(str); err != nil {
log.Errorf("RemoveServiceResource failed: %v", err)
}
p.hashName.Delete(str)
}
}
Loading