Skip to content

Commit 996f216

Browse files
authored
Merge pull request #640 from lec-bit/kmesh-reboot
kmesh restart with config change
2 parents 76d557a + 2d1dc68 commit 996f216

File tree

3 files changed

+191
-72
lines changed

3 files changed

+191
-72
lines changed

pkg/controller/workload/cache/service_cache.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ServiceCache interface {
2626
List() []*workloadapi.Service
2727
AddOrUpdateService(svc *workloadapi.Service)
2828
DeleteService(resourceName string)
29+
GetService(resourceName string) *workloadapi.Service
2930
}
3031

3132
type serviceCache struct {
@@ -62,3 +63,9 @@ func (s *serviceCache) List() []*workloadapi.Service {
6263

6364
return out
6465
}
66+
67+
func (s *serviceCache) GetService(resourceName string) *workloadapi.Service {
68+
s.mutex.RLock()
69+
defer s.mutex.RUnlock()
70+
return s.servicesByResourceName[resourceName]
71+
}

pkg/controller/workload/workload_processor.go

Lines changed: 133 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"kmesh.net/kmesh/api/v2/workloadapi/security"
3030
"kmesh.net/kmesh/bpf/kmesh/bpf2go"
3131
"kmesh.net/kmesh/pkg/auth"
32+
kmeshbpf "kmesh.net/kmesh/pkg/bpf"
3233
"kmesh.net/kmesh/pkg/constants"
3334
"kmesh.net/kmesh/pkg/controller/config"
3435
bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache"
@@ -141,6 +142,16 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error {
141142
}
142143

143144
func (p *Processor) removeWorkloadResource(removedResources []string) error {
145+
for _, uid := range removedResources {
146+
p.WorkloadCache.DeleteWorkload(uid)
147+
if err := p.removeWorkloadFromBpfMap(uid); err != nil {
148+
return err
149+
}
150+
}
151+
return nil
152+
}
153+
154+
func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
144155
var (
145156
err error
146157
skUpdate = bpf.ServiceKey{}
@@ -150,66 +161,63 @@ func (p *Processor) removeWorkloadResource(removedResources []string) error {
150161
bkDelete = bpf.BackendKey{}
151162
)
152163

153-
for _, uid := range removedResources {
154-
p.WorkloadCache.DeleteWorkload(uid)
155-
backendUid := p.hashName.StrToNum(uid)
156-
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
157-
if err = p.deletePodFrontendData(backendUid); err != nil {
158-
log.Errorf("deletePodFrontendData failed: %s", err)
159-
goto failed
160-
}
164+
backendUid := p.hashName.StrToNum(uid)
165+
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
166+
if err = p.deletePodFrontendData(backendUid); err != nil {
167+
log.Errorf("deletePodFrontendData failed: %s", err)
168+
return err
169+
}
161170

162-
// 1. find all endpoint keys related to this workload
163-
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
164-
for _, ek := range eks {
165-
log.Debugf("Find EndpointKey: [%#v]", ek)
166-
// 2. find the service
167-
skUpdate.ServiceId = ek.ServiceId
168-
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
169-
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
170-
// 3. find the last indexed endpoint of the service
171-
lastEndpointKey.ServiceId = skUpdate.ServiceId
172-
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
173-
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
174-
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
175-
// 4. switch the index of the last with the current removed endpoint
176-
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
177-
log.Errorf("EndpointUpdate failed: %s", err)
178-
goto failed
179-
}
180-
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
181-
log.Errorf("EndpointDelete failed: %s", err)
182-
goto failed
183-
}
184-
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
185-
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
186-
log.Errorf("ServiceUpdate failed: %s", err)
187-
goto failed
188-
}
189-
} else {
190-
// last indexed endpoint not exists, this should not occur
191-
// we should delete the endpoint just in case leak
192-
if err = p.bpf.EndpointDelete(&ek); err != nil {
193-
log.Errorf("EndpointDelete failed: %s", err)
194-
goto failed
195-
}
171+
// 1. find all endpoint keys related to this workload
172+
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
173+
for _, ek := range eks {
174+
log.Debugf("Find EndpointKey: [%#v]", ek)
175+
// 2. find the service
176+
skUpdate.ServiceId = ek.ServiceId
177+
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
178+
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
179+
// 3. find the last indexed endpoint of the service
180+
lastEndpointKey.ServiceId = skUpdate.ServiceId
181+
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
182+
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
183+
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
184+
// 4. switch the index of the last with the current removed endpoint
185+
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
186+
log.Errorf("EndpointUpdate failed: %s", err)
187+
goto failed
196188
}
197-
} else { // service not exist, we should delete the endpoint
189+
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
190+
log.Errorf("EndpointDelete failed: %s", err)
191+
goto failed
192+
}
193+
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
194+
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
195+
log.Errorf("ServiceUpdate failed: %s", err)
196+
goto failed
197+
}
198+
} else {
199+
// last indexed endpoint not exists, this should not occur
200+
// we should delete the endpoint just in case leak
198201
if err = p.bpf.EndpointDelete(&ek); err != nil {
199202
log.Errorf("EndpointDelete failed: %s", err)
200203
goto failed
201204
}
202205
}
206+
} else { // service not exist, we should delete the endpoint
207+
if err = p.bpf.EndpointDelete(&ek); err != nil {
208+
log.Errorf("EndpointDelete failed: %s", err)
209+
goto failed
210+
}
203211
}
204212
}
213+
}
205214

206-
bkDelete.BackendUid = backendUid
207-
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
208-
log.Errorf("BackendDelete failed: %s", err)
209-
goto failed
210-
}
211-
p.hashName.Delete(uid)
215+
bkDelete.BackendUid = backendUid
216+
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
217+
log.Errorf("BackendDelete failed: %s", err)
218+
goto failed
212219
}
220+
p.hashName.Delete(uid)
213221

214222
failed:
215223
return err
@@ -235,41 +243,49 @@ func (p *Processor) deleteFrontendData(id uint32) error {
235243
}
236244

237245
func (p *Processor) removeServiceResource(resources []string) error {
246+
var err error
247+
for _, name := range resources {
248+
p.ServiceCache.DeleteService(name)
249+
if err = p.removeServiceResourceFromBpfMap(name); err != nil {
250+
return err
251+
}
252+
}
253+
return err
254+
}
255+
256+
func (p *Processor) removeServiceResourceFromBpfMap(name string) error {
238257
var (
239258
err error
240259
skDelete = bpf.ServiceKey{}
241260
svDelete = bpf.ServiceValue{}
242261
ekDelete = bpf.EndpointKey{}
243262
)
244263

245-
for _, name := range resources {
246-
p.ServiceCache.DeleteService(name)
247-
serviceId := p.hashName.StrToNum(name)
248-
skDelete.ServiceId = serviceId
249-
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
250-
if err = p.deleteFrontendData(serviceId); err != nil {
251-
log.Errorf("deleteFrontendData failed: %s", err)
252-
goto failed
253-
}
264+
p.ServiceCache.DeleteService(name)
265+
serviceId := p.hashName.StrToNum(name)
266+
skDelete.ServiceId = serviceId
267+
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
268+
if err = p.deleteFrontendData(serviceId); err != nil {
269+
log.Errorf("deleteFrontendData failed: %s", err)
270+
goto failed
271+
}
254272

255-
if err = p.bpf.ServiceDelete(&skDelete); err != nil {
256-
log.Errorf("ServiceDelete failed: %s", err)
257-
goto failed
258-
}
273+
if err = p.bpf.ServiceDelete(&skDelete); err != nil {
274+
log.Errorf("ServiceDelete failed: %s", err)
275+
goto failed
276+
}
259277

260-
var i uint32
261-
for i = 1; i <= svDelete.EndpointCount; i++ {
262-
ekDelete.ServiceId = serviceId
263-
ekDelete.BackendIndex = i
264-
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
265-
log.Errorf("EndpointDelete failed: %s", err)
266-
goto failed
267-
}
278+
var i uint32
279+
for i = 1; i <= svDelete.EndpointCount; i++ {
280+
ekDelete.ServiceId = serviceId
281+
ekDelete.BackendIndex = i
282+
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
283+
log.Errorf("EndpointDelete failed: %s", err)
284+
goto failed
268285
}
269286
}
270-
p.hashName.Delete(name)
271287
}
272-
288+
p.hashName.Delete(name)
273289
failed:
274290
return err
275291
}
@@ -587,10 +603,55 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis
587603
}
588604

589605
_ = p.handleRemovedAddresses(rsp.RemovedResources)
606+
p.compareWorkloadAndServiceWithHashName()
590607

591608
return err
592609
}
593610

611+
// When processing the workload's response for the first time,
612+
// fetch the data from the /mnt/workload_hash_name.yaml file
613+
// and compare it with the data in the cache.
614+
func (p *Processor) compareWorkloadAndServiceWithHashName() {
615+
var (
616+
bk = bpf.BackendKey{}
617+
bv = bpf.BackendValue{}
618+
sk = bpf.ServiceKey{}
619+
sv = bpf.ServiceValue{}
620+
)
621+
622+
if kmeshbpf.GetStartType() != kmeshbpf.Restart {
623+
return
624+
}
625+
626+
log.Infof("reload workload config from last epoch")
627+
kmeshbpf.SetStartType(kmeshbpf.Normal)
628+
629+
/* We traverse hashName, if there is a record exists in bpf map
630+
* but not in usercache, that means the data in the bpf map load
631+
* from the last epoch is inconsistent with the data that should
632+
* actually be stored now. then we should delete it from bpf map
633+
*/
634+
for str, num := range p.hashName.strToNum {
635+
if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil {
636+
log.Debugf("GetWorkloadByUid and GetService nil:%v", str)
637+
638+
bk.BackendUid = num
639+
sk.ServiceId = num
640+
if err := p.bpf.BackendLookup(&bk, &bv); err == nil {
641+
log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv)
642+
if err := p.removeWorkloadFromBpfMap(str); err != nil {
643+
log.Errorf("RemoveWorkloadResource failed: %v", err)
644+
}
645+
} else if err := p.bpf.ServiceLookup(&sk, &sv); err == nil {
646+
log.Debugf("Find ServiceValue: [%#v] RemoveServiceResource", sv)
647+
if err := p.removeServiceResourceFromBpfMap(str); err != nil {
648+
log.Errorf("RemoveServiceResource failed: %v", err)
649+
}
650+
}
651+
}
652+
}
653+
}
654+
594655
func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error {
595656
if rbac == nil {
596657
return fmt.Errorf("Rbac module uninitialized")

pkg/controller/workload/workload_processor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"kmesh.net/kmesh/api/v2/workloadapi"
2727
"kmesh.net/kmesh/daemon/options"
28+
"kmesh.net/kmesh/pkg/bpf"
2829
"kmesh.net/kmesh/pkg/constants"
2930
"kmesh.net/kmesh/pkg/controller/workload/bpfcache"
3031
"kmesh.net/kmesh/pkg/nets"
@@ -90,6 +91,7 @@ func Test_handleWorkload(t *testing.T) {
9091

9192
// 3.2 check service map contains service
9293
checkServiceMap(t, p, svcID, fakeSvc, 2)
94+
hashNameClean(p)
9395
}
9496

9597
func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workloadapi.Service, endpointCount uint32) {
@@ -251,3 +253,52 @@ func createFakeService(name, ip, waypoint string) *workloadapi.Service {
251253
},
252254
}
253255
}
256+
257+
func Test_deleteWorkloadWithRestart(t *testing.T) {
258+
workloadMap := bpfcache.NewFakeWorkloadMap(t)
259+
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)
260+
261+
p := newProcessor(workloadMap)
262+
263+
// 1. handle workload with service, but service not handled yet
264+
// In this case, only frontend map and backend map should be updated.
265+
wl := createTestWorkloadWithService()
266+
_ = p.handleDataWithService(createTestWorkloadWithService())
267+
268+
workloadID := checkFrontEndMap(t, wl.Addresses[0], p)
269+
checkBackendMap(t, p, workloadID, wl)
270+
271+
epKeys := p.bpf.EndpointIterFindKey(workloadID)
272+
assert.Equal(t, len(epKeys), 0)
273+
for svcName := range wl.Services {
274+
endpoints := p.endpointsByService[svcName]
275+
assert.Len(t, endpoints, 1)
276+
if _, ok := endpoints[wl.Uid]; ok {
277+
assert.True(t, ok)
278+
}
279+
}
280+
281+
// Set a restart label and simulate missing data in the cache
282+
bpf.SetStartType(bpf.Restart)
283+
for key := range wl.GetServices() {
284+
p.ServiceCache.DeleteService(key)
285+
}
286+
287+
p.compareWorkloadAndServiceWithHashName()
288+
hashNameClean(p)
289+
}
290+
291+
// The hashname will be saved as a file by default.
292+
// If it is not cleaned, it will affect other use cases.
293+
func hashNameClean(p *Processor) {
294+
for str := range p.hashName.strToNum {
295+
if err := p.removeWorkloadFromBpfMap(str); err != nil {
296+
log.Errorf("RemoveWorkloadResource failed: %v", err)
297+
}
298+
299+
if err := p.removeServiceResourceFromBpfMap(str); err != nil {
300+
log.Errorf("RemoveServiceResource failed: %v", err)
301+
}
302+
p.hashName.Delete(str)
303+
}
304+
}

0 commit comments

Comments
 (0)