Skip to content

Commit 40e4b20

Browse files
committed
kmesh restart with config change
Signed-off-by: let-bit <[email protected]>
1 parent cd60b90 commit 40e4b20

File tree

2 files changed

+131
-72
lines changed

2 files changed

+131
-72
lines changed

pkg/controller/workload/cache/service_cache.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ServiceCache interface {
2626
List() []*workloadapi.Service
2727
AddOrUpdateService(svc *workloadapi.Service)
2828
DeleteService(resourceName string)
29+
GetService(resourceName string) *workloadapi.Service
2930
}
3031

3132
type serviceCache struct {
@@ -62,3 +63,9 @@ func (s *serviceCache) List() []*workloadapi.Service {
6263

6364
return out
6465
}
66+
67+
func (s *serviceCache) GetService(resourceName string) *workloadapi.Service {
68+
s.mutex.RLock()
69+
defer s.mutex.RUnlock()
70+
return s.servicesByResourceName[resourceName]
71+
}

pkg/controller/workload/workload_processor.go

Lines changed: 124 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"kmesh.net/kmesh/api/v2/workloadapi/security"
3030
"kmesh.net/kmesh/bpf/kmesh/bpf2go"
3131
"kmesh.net/kmesh/pkg/auth"
32+
kmeshbpf "kmesh.net/kmesh/pkg/bpf"
3233
"kmesh.net/kmesh/pkg/constants"
3334
"kmesh.net/kmesh/pkg/controller/config"
3435
bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache"
@@ -141,6 +142,16 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error {
141142
}
142143

143144
func (p *Processor) removeWorkloadResource(removedResources []string) error {
145+
for _, uid := range removedResources {
146+
p.WorkloadCache.DeleteWorkload(uid)
147+
if err := p.removeWorkloadResourceByUid(uid); err != nil {
148+
return err
149+
}
150+
}
151+
return nil
152+
}
153+
154+
func (p *Processor) removeWorkloadResourceByUid(uid string) error {
144155
var (
145156
err error
146157
skUpdate = bpf.ServiceKey{}
@@ -150,66 +161,63 @@ func (p *Processor) removeWorkloadResource(removedResources []string) error {
150161
bkDelete = bpf.BackendKey{}
151162
)
152163

153-
for _, uid := range removedResources {
154-
p.WorkloadCache.DeleteWorkload(uid)
155-
backendUid := p.hashName.StrToNum(uid)
156-
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
157-
if err = p.deletePodFrontendData(backendUid); err != nil {
158-
log.Errorf("deletePodFrontendData failed: %s", err)
159-
goto failed
160-
}
164+
backendUid := p.hashName.StrToNum(uid)
165+
// for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records
166+
if err = p.deletePodFrontendData(backendUid); err != nil {
167+
log.Errorf("deletePodFrontendData failed: %s", err)
168+
return err
169+
}
161170

162-
// 1. find all endpoint keys related to this workload
163-
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
164-
for _, ek := range eks {
165-
log.Debugf("Find EndpointKey: [%#v]", ek)
166-
// 2. find the service
167-
skUpdate.ServiceId = ek.ServiceId
168-
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
169-
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
170-
// 3. find the last indexed endpoint of the service
171-
lastEndpointKey.ServiceId = skUpdate.ServiceId
172-
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
173-
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
174-
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
175-
// 4. switch the index of the last with the current removed endpoint
176-
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
177-
log.Errorf("EndpointUpdate failed: %s", err)
178-
goto failed
179-
}
180-
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
181-
log.Errorf("EndpointDelete failed: %s", err)
182-
goto failed
183-
}
184-
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
185-
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
186-
log.Errorf("ServiceUpdate failed: %s", err)
187-
goto failed
188-
}
189-
} else {
190-
// last indexed endpoint not exists, this should not occur
191-
// we should delete the endpoint just in case leak
192-
if err = p.bpf.EndpointDelete(&ek); err != nil {
193-
log.Errorf("EndpointDelete failed: %s", err)
194-
goto failed
195-
}
171+
// 1. find all endpoint keys related to this workload
172+
if eks := p.bpf.EndpointIterFindKey(backendUid); len(eks) != 0 {
173+
for _, ek := range eks {
174+
log.Debugf("Find EndpointKey: [%#v]", ek)
175+
// 2. find the service
176+
skUpdate.ServiceId = ek.ServiceId
177+
if err = p.bpf.ServiceLookup(&skUpdate, &svUpdate); err == nil {
178+
log.Debugf("Find ServiceValue: [%#v]", svUpdate)
179+
// 3. find the last indexed endpoint of the service
180+
lastEndpointKey.ServiceId = skUpdate.ServiceId
181+
lastEndpointKey.BackendIndex = svUpdate.EndpointCount
182+
if err = p.bpf.EndpointLookup(&lastEndpointKey, &lastEndpointValue); err == nil {
183+
log.Debugf("Find EndpointValue: [%#v]", lastEndpointValue)
184+
// 4. switch the index of the last with the current removed endpoint
185+
if err = p.bpf.EndpointUpdate(&ek, &lastEndpointValue); err != nil {
186+
log.Errorf("EndpointUpdate failed: %s", err)
187+
goto failed
196188
}
197-
} else { // service not exist, we should delete the endpoint
189+
if err = p.bpf.EndpointDelete(&lastEndpointKey); err != nil {
190+
log.Errorf("EndpointDelete failed: %s", err)
191+
goto failed
192+
}
193+
svUpdate.EndpointCount = svUpdate.EndpointCount - 1
194+
if err = p.bpf.ServiceUpdate(&skUpdate, &svUpdate); err != nil {
195+
log.Errorf("ServiceUpdate failed: %s", err)
196+
goto failed
197+
}
198+
} else {
199+
// last indexed endpoint not exists, this should not occur
200+
// we should delete the endpoint just in case leak
198201
if err = p.bpf.EndpointDelete(&ek); err != nil {
199202
log.Errorf("EndpointDelete failed: %s", err)
200203
goto failed
201204
}
202205
}
206+
} else { // service not exist, we should delete the endpoint
207+
if err = p.bpf.EndpointDelete(&ek); err != nil {
208+
log.Errorf("EndpointDelete failed: %s", err)
209+
goto failed
210+
}
203211
}
204212
}
213+
}
205214

206-
bkDelete.BackendUid = backendUid
207-
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
208-
log.Errorf("BackendDelete failed: %s", err)
209-
goto failed
210-
}
211-
p.hashName.Delete(uid)
215+
bkDelete.BackendUid = backendUid
216+
if err = p.bpf.BackendDelete(&bkDelete); err != nil {
217+
log.Errorf("BackendDelete failed: %s", err)
218+
goto failed
212219
}
220+
p.hashName.Delete(uid)
213221

214222
failed:
215223
return err
@@ -235,41 +243,49 @@ func (p *Processor) deleteFrontendData(id uint32) error {
235243
}
236244

237245
func (p *Processor) removeServiceResource(resources []string) error {
246+
var err error
247+
for _, name := range resources {
248+
p.ServiceCache.DeleteService(name)
249+
if err = p.removeServiceResourceByUid(name); err != nil {
250+
return err
251+
}
252+
}
253+
return err
254+
}
255+
256+
func (p *Processor) removeServiceResourceByUid(name string) error {
238257
var (
239258
err error
240259
skDelete = bpf.ServiceKey{}
241260
svDelete = bpf.ServiceValue{}
242261
ekDelete = bpf.EndpointKey{}
243262
)
244263

245-
for _, name := range resources {
246-
p.ServiceCache.DeleteService(name)
247-
serviceId := p.hashName.StrToNum(name)
248-
skDelete.ServiceId = serviceId
249-
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
250-
if err = p.deleteFrontendData(serviceId); err != nil {
251-
log.Errorf("deleteFrontendData failed: %s", err)
252-
goto failed
253-
}
264+
p.ServiceCache.DeleteService(name)
265+
serviceId := p.hashName.StrToNum(name)
266+
skDelete.ServiceId = serviceId
267+
if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil {
268+
if err = p.deleteFrontendData(serviceId); err != nil {
269+
log.Errorf("deleteFrontendData failed: %s", err)
270+
goto failed
271+
}
254272

255-
if err = p.bpf.ServiceDelete(&skDelete); err != nil {
256-
log.Errorf("ServiceDelete failed: %s", err)
257-
goto failed
258-
}
273+
if err = p.bpf.ServiceDelete(&skDelete); err != nil {
274+
log.Errorf("ServiceDelete failed: %s", err)
275+
goto failed
276+
}
259277

260-
var i uint32
261-
for i = 1; i <= svDelete.EndpointCount; i++ {
262-
ekDelete.ServiceId = serviceId
263-
ekDelete.BackendIndex = i
264-
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
265-
log.Errorf("EndpointDelete failed: %s", err)
266-
goto failed
267-
}
278+
var i uint32
279+
for i = 1; i <= svDelete.EndpointCount; i++ {
280+
ekDelete.ServiceId = serviceId
281+
ekDelete.BackendIndex = i
282+
if err = p.bpf.EndpointDelete(&ekDelete); err != nil {
283+
log.Errorf("EndpointDelete failed: %s", err)
284+
goto failed
268285
}
269286
}
270-
p.hashName.Delete(name)
271287
}
272-
288+
p.hashName.Delete(name)
273289
failed:
274290
return err
275291
}
@@ -587,10 +603,46 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis
587603
}
588604

589605
_ = p.handleRemovedAddresses(rsp.RemovedResources)
606+
p.compareWorkloadAndService()
590607

591608
return err
592609
}
593610

611+
// When processing the workload's response for the first time,
612+
// fetch the data from the /mnt/workload_hash_name.yaml file
613+
// and compare it with the data in the cache.
614+
func (p *Processor) compareWorkloadAndService() {
615+
var (
616+
bk = bpf.BackendKey{}
617+
bv = bpf.BackendValue{}
618+
)
619+
620+
if kmeshbpf.GetStartType() == kmeshbpf.Normal {
621+
return
622+
}
623+
624+
log.Infof("reload workload config from last start")
625+
kmeshbpf.SetStartType(kmeshbpf.Normal)
626+
for str, num := range p.hashName.strToNum {
627+
if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil {
628+
log.Debugf("GetWorkloadByUid and GetService nil:%v", str)
629+
630+
bk.BackendUid = num
631+
if err := p.bpf.BackendLookup(&bk, &bv); err == nil {
632+
log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv)
633+
if err := p.removeWorkloadResourceByUid(str); err != nil {
634+
log.Errorf("RemoveWorkloadResource failed: %v", err)
635+
}
636+
} else {
637+
log.Debugf("RemoveServiceResource")
638+
if err := p.removeServiceResourceByUid(str); err != nil {
639+
log.Errorf("RemoveServiceResource failed: %v", err)
640+
}
641+
}
642+
}
643+
}
644+
}
645+
594646
func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error {
595647
if rbac == nil {
596648
return fmt.Errorf("Rbac module uninitialized")

0 commit comments

Comments
 (0)