-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kmesh restart with config change #640
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -29,6 +29,7 @@ | |||||
"kmesh.net/kmesh/api/v2/workloadapi/security" | ||||||
"kmesh.net/kmesh/bpf/kmesh/bpf2go" | ||||||
"kmesh.net/kmesh/pkg/auth" | ||||||
kmeshbpf "kmesh.net/kmesh/pkg/bpf" | ||||||
"kmesh.net/kmesh/pkg/constants" | ||||||
"kmesh.net/kmesh/pkg/controller/config" | ||||||
bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" | ||||||
|
@@ -141,6 +142,16 @@ | |||||
} | ||||||
|
||||||
func (p *Processor) removeWorkloadResource(removedResources []string) error { | ||||||
for _, uid := range removedResources { | ||||||
p.WorkloadCache.DeleteWorkload(uid) | ||||||
if err := p.removeWorkloadResourceByUid(uid); err != nil { | ||||||
return err | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func (p *Processor) removeWorkloadResourceByUid(uid string) error { | ||||||
var ( | ||||||
err error | ||||||
skUpdate = bpf.ServiceKey{} | ||||||
|
@@ -150,66 +161,63 @@ | |||||
bkDelete = bpf.BackendKey{} | ||||||
) | ||||||
|
||||||
for _, uid := range removedResources { | ||||||
p.WorkloadCache.DeleteWorkload(uid) | ||||||
backendUid := p.hashName.StrToNum(uid) | ||||||
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records | ||||||
if err = p.deletePodFrontendData(backendUid); err != nil { | ||||||
log.Errorf("deletePodFrontendData failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
backendUid := p.hashName.StrToNum(uid) | ||||||
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records | ||||||
if err = p.deletePodFrontendData(backendUid); err != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @nlgwcy Is fixing this, but it think any one can merge first |
||||||
log.Errorf("deletePodFrontendData failed: %s", err) | ||||||
return err | ||||||
} | ||||||
|
||||||
// 1. find all endpoint keys related to this workload | ||||||
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 { | ||||||
for _, ek := range eks { | ||||||
log.Debugf("Find EndpointKey: [%#v]", ek) | ||||||
// 2. find the service | ||||||
skUpdate.ServiceId = ek.ServiceId | ||||||
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil { | ||||||
log.Debugf("Find ServiceValue: [%#v]", svUpdate) | ||||||
// 3. find the last indexed endpoint of the service | ||||||
lastEndpointKey.ServiceId = skUpdate.ServiceId | ||||||
lastEndpointKey.BackendIndex = svUpdate.EndpointCount | ||||||
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil { | ||||||
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue) | ||||||
// 4. switch the index of the last with the current removed endpoint | ||||||
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil { | ||||||
log.Errorf("EndpointUpdate failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
svUpdate.EndpointCount = svUpdate.EndpointCount - 1 | ||||||
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil { | ||||||
log.Errorf("ServiceUpdate failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} else { | ||||||
// last indexed endpoint not exists, this should not occur | ||||||
// we should delete the endpoint just in case leak | ||||||
if err = p.bpf.EndpointDelete(&ek); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
// 1. find all endpoint keys related to this workload | ||||||
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 { | ||||||
for _, ek := range eks { | ||||||
log.Debugf("Find EndpointKey: [%#v]", ek) | ||||||
// 2. find the service | ||||||
skUpdate.ServiceId = ek.ServiceId | ||||||
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil { | ||||||
log.Debugf("Find ServiceValue: [%#v]", svUpdate) | ||||||
// 3. find the last indexed endpoint of the service | ||||||
lastEndpointKey.ServiceId = skUpdate.ServiceId | ||||||
lastEndpointKey.BackendIndex = svUpdate.EndpointCount | ||||||
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil { | ||||||
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue) | ||||||
// 4. switch the index of the last with the current removed endpoint | ||||||
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil { | ||||||
log.Errorf("EndpointUpdate failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} else { // service not exist, we should delete the endpoint | ||||||
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
svUpdate.EndpointCount = svUpdate.EndpointCount - 1 | ||||||
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil { | ||||||
log.Errorf("ServiceUpdate failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} else { | ||||||
// last indexed endpoint not exists, this should not occur | ||||||
// we should delete the endpoint just in case leak | ||||||
if err = p.bpf.EndpointDelete(&ek); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} | ||||||
} else { // service not exist, we should delete the endpoint | ||||||
if err = p.bpf.EndpointDelete(&ek); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
bkDelete.BackendUid = backendUid | ||||||
if err = p.bpf.BackendDelete(&bkDelete); err != nil { | ||||||
log.Errorf("BackendDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
p.hashName.Delete(uid) | ||||||
bkDelete.BackendUid = backendUid | ||||||
if err = p.bpf.BackendDelete(&bkDelete); err != nil { | ||||||
log.Errorf("BackendDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
p.hashName.Delete(uid) | ||||||
|
||||||
failed: | ||||||
return err | ||||||
|
@@ -235,41 +243,49 @@ | |||||
} | ||||||
|
||||||
func (p *Processor) removeServiceResource(resources []string) error { | ||||||
var err error | ||||||
for _, name := range resources { | ||||||
p.ServiceCache.DeleteService(name) | ||||||
if err = p.removeServiceResourceByUid(name); err != nil { | ||||||
return err | ||||||
} | ||||||
} | ||||||
return err | ||||||
} | ||||||
|
||||||
func (p *Processor) removeServiceResourceByUid(name string) error { | ||||||
var ( | ||||||
err error | ||||||
skDelete = bpf.ServiceKey{} | ||||||
svDelete = bpf.ServiceValue{} | ||||||
ekDelete = bpf.EndpointKey{} | ||||||
) | ||||||
|
||||||
for _, name := range resources { | ||||||
p.ServiceCache.DeleteService(name) | ||||||
serviceId := p.hashName.StrToNum(name) | ||||||
skDelete.ServiceId = serviceId | ||||||
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil { | ||||||
if err = p.deleteFrontendData(serviceId); err != nil { | ||||||
log.Errorf("deleteFrontendData failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
p.ServiceCache.DeleteService(name) | ||||||
serviceId := p.hashName.StrToNum(name) | ||||||
skDelete.ServiceId = serviceId | ||||||
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil { | ||||||
if err = p.deleteFrontendData(serviceId); err != nil { | ||||||
log.Errorf("deleteFrontendData failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
|
||||||
if err = p.bpf.ServiceDelete(&skDelete); err != nil { | ||||||
log.Errorf("ServiceDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
if err = p.bpf.ServiceDelete(&skDelete); err != nil { | ||||||
log.Errorf("ServiceDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
|
||||||
var i uint32 | ||||||
for i = 1; i <= svDelete.EndpointCount; i++ { | ||||||
ekDelete.ServiceId = serviceId | ||||||
ekDelete.BackendIndex = i | ||||||
if err = p.bpf.EndpointDelete(&ekDelete); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
var i uint32 | ||||||
for i = 1; i <= svDelete.EndpointCount; i++ { | ||||||
ekDelete.ServiceId = serviceId | ||||||
ekDelete.BackendIndex = i | ||||||
if err = p.bpf.EndpointDelete(&ekDelete); err != nil { | ||||||
log.Errorf("EndpointDelete failed: %s", err) | ||||||
goto failed | ||||||
} | ||||||
} | ||||||
p.hashName.Delete(name) | ||||||
} | ||||||
|
||||||
p.hashName.Delete(name) | ||||||
failed: | ||||||
return err | ||||||
} | ||||||
|
@@ -587,10 +603,46 @@ | |||||
} | ||||||
|
||||||
_ = p.handleRemovedAddresses(rsp.RemovedResources) | ||||||
p.compareWorkloadAndServiceWithHashName() | ||||||
|
||||||
return err | ||||||
} | ||||||
|
||||||
// When processing the workload's response for the first time, | ||||||
// fetch the data from the /mnt/workload_hash_name.yaml file | ||||||
// and compare it with the data in the cache. | ||||||
func (p *Processor) compareWorkloadAndServiceWithHashName() { | ||||||
var ( | ||||||
bk = bpf.BackendKey{} | ||||||
bv = bpf.BackendValue{} | ||||||
) | ||||||
|
||||||
if kmeshbpf.GetStartType() == kmeshbpf.Normal { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we haven't support upgrade, to be safer shall we compare with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i will change it |
||||||
return | ||||||
} | ||||||
|
||||||
log.Infof("reload workload config from last start") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
kmeshbpf.SetStartType(kmeshbpf.Normal) | ||||||
for str, num := range p.hashName.strToNum { | ||||||
if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { | ||||||
log.Debugf("GetWorkloadByUid and GetService nil:%v", str) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot assume all the strs are service or pod, can we? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I was careless here. I should add a check process in bpf_map. |
||||||
|
||||||
bk.BackendUid = num | ||||||
if err := p.bpf.BackendLookup(&bk, &bv); err == nil { | ||||||
log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv) | ||||||
if err := p.removeWorkloadResourceByUid(str); err != nil { | ||||||
log.Errorf("RemoveWorkloadResource failed: %v", err) | ||||||
} | ||||||
} else { | ||||||
log.Debugf("RemoveServiceResource") | ||||||
if err := p.removeServiceResourceByUid(str); err != nil { | ||||||
log.Errorf("RemoveServiceResource failed: %v", err) | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error { | ||||||
if rbac == nil { | ||||||
return fmt.Errorf("Rbac module uninitialized") | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest renaming to removeWorkloadFromBpfMap to distinguish with remove from local cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, the same interface is called by remove from local chace, I split it out and reused it .