Skip to content

Add locality loadbalance to kmesh workload mode. #900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 30, 2024

Conversation

derekwin
Copy link
Contributor

@derekwin derekwin commented Sep 26, 2024

What type of PR is this?
/kind enhancement

This PR is not ready to be merged; some features are not working properly.
1. The priority of the service Pod written to the BPF map is observed to be correct in user space, but the behavior of locality load balancing in kernel space is random. (This issue does not exist in the test version of the code at this link.)
2. The code for the workload has undergone significant changes in the past one to two months. The current PR's code attempts to adapt to the new logic of the workload as much as possible. However, a lot of redundant deletion behaviors have been observed. These parts of the code need to be optimized in conjunction with the existing workload logic.

This pr is ok.

But i find a bug in new version kubectl and istio work with kmesh(#910).

this pr's enhancement need new version kubectl and istio.

// notice: s should set by lb.GetRoutingPreference()
if len(s) > 0 {
l.RoutingPreference = s
l.LbStrictIndex = uint32(len(s))
Copy link
Member

@hzxuzhonghu hzxuzhonghu Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure i understand this, previously i thought this is a bool flag for strict mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In strict mode, only the workloads that exactly match every item in the routePreference are considered. In other words, only the workloads with a priority equal to len(routePreference) are taken into account. We set LbStrictIndex so that the kernel BPF program filters out priorities other than lbStrictIndex in strict mode.

Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at the go code part

@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
wpkDelete = bpf.WorkloadPolicyKey{}
)

log.Warnf("== removeWorkloadFromBpfMap: workload uid: %#v, backendUid: %#v", uid, p.hashName.Hash(uid))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spammy, please use debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

ekDelete := bpf.EndpointKey{
ServiceId: serviceId,
BackendIndex: i,
var j uint32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you donot need to define j here, move below

for j:=0; j <= bpf.MaxPrio; j++ {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -290,16 +300,17 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na
}

// addWorkloadToService update service & endpoint bpf map when a workload has new bound services
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32) error {
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error {
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, priority uint32) error {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -359,6 +382,7 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error {
)

uid := p.hashName.Hash(workload.GetUid())
log.Warnf("=in= updateWorkload: workload uid: %#v, backendUid: %#v", workload.GetUid(), uid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spammy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode
p.locality.SetRoutingPreference(lb.GetRoutingPreference())
p.locality.LbPolicy = newValue.LbPolicy
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@derekwin derekwin force-pushed the lb-dev-09 branch 4 times, most recently from 606db1a to 1be65fb Compare October 11, 2024 02:36
@derekwin
Copy link
Contributor Author

After resolving the issue mentioned in #910, functional testing was conducted on the current code, and the functionality was found to be working correctly.

Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to review the control plane part later

@@ -10,6 +10,8 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define MIN_PRIO 6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant with PRIO_COUNT

endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can return fast by checking 0 here since you removed it from the caller

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short circuit

__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 lb_strict_prio; // for failover strict mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is not needed , LB_POLICY_FAILOVER and LB_POLICY_STRICT values are supported

}
return 0; // find the backend successfully
}
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a little bit more, i am confused about this

@@ -388,7 +388,7 @@ func TestServer_dumpWorkloadBpfMap(t *testing.T) {
{ServiceId: 1}, {ServiceId: 2},
}
testServiceVals := []bpfcache.ServiceValue{
{EndpointCount: 1234}, {EndpointCount: 5678},
{EndpointCount: [7]uint32{1234, 1234, 1234, 1234, 1234, 1234, 1234}}, {EndpointCount: [7]uint32{5678, 5678, 5678, 5678, 5678, 5678, 5678}},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just a dummy code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in this test i donot see you set the routing preference, why expect 7 priority groups

@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
wpkDelete = bpf.WorkloadPolicyKey{}
)

log.Debugf("removeWorkloadFromBpfMap: workload %s, backendUid %d", uid, p.hashName.Hash(uid))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move after L185

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a test coverage for service lb policy update

log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
return err
if sv.LbPolicy == LbPolicyRandom { // random mode
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MinPrio); err != nil { // In random mode, we save all workload to minprio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest store with highest prio

if p.locality.CanLocalityLB() {
prio := p.locality.CalcuLocalityLBPrio(workload)
if err = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil {
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
log.Errorf("addWorkloadToService workload %d service %d pirority %d failed: %v", workloadId, sk.ServiceId, prio, err)

return MinPrio - rank
}

func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need a lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not necessary, the workload processor works in serial

Signed-off-by: derekwin <[email protected]>
@hzxuzhonghu
Copy link
Member

When trying to delete multiple endpoints that all belong to the same service, the BackendIndex dynamically changes, leading to errors.

How could we delete endpoints belong to same service? The caller is actually deleting a workload

When updating a service, it's likely that the associated endpoints will be deleted in batches. we should not rule out the possibility of batch deletions in the future. Additionally, the function name deleteEndpointRecords suggests a general operation and should have a more robust implementation.

How are the endpoints be deleted in batch? I cannot understand

}
}
}
return uint32(len(rp)) - rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not right. Keep in mind, we shoud match rp []workloadapi.LoadBalancing_Scope in order, if one doesnot match, we should stop.

So i am super sure this result now is nort right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not test coverage, i will write a test later to test your function. I am feeling this function is still not correct

Signed-off-by: derekwin <[email protected]>
@derekwin
Copy link
Contributor Author

When trying to delete multiple endpoints that all belong to the same service, the BackendIndex dynamically changes, leading to errors.

How could we delete endpoints belong to same service? The caller is actually deleting a workload

When updating a service, it's likely that the associated endpoints will be deleted in batches. we should not rule out the possibility of batch deletions in the future. Additionally, the function name deleteEndpointRecords suggests a general operation and should have a more robust implementation.

How are the endpoints be deleted in batch? I cannot understand

In function func (p *Processor) updateEndpoint(serviceId uint32, toLLb bool) [line 500], we get all endpoints needed to be deleted and delete them in batch using deleteEndpointRecords [line 520].
p.s. In the previous meeting, we agreed on the plan to update the endpoints one by one. However, during the subsequent implementation, I found that deleting one endpoint causes the indices of other endpoints to change, leading to confusion during deletion. Therefore, I believe we should delete all the endpoints to be removed at once and then re-add each new endpoint.

@derekwin
Copy link
Contributor Author

/retest

@kmesh-bot
Copy link
Collaborator

@derekwin: Cannot trigger testing until a trusted user reviews the PR and leaves an /ok-to-test message.

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@hzxuzhonghu
Copy link
Member

/ok-to-test

@hzxuzhonghu hzxuzhonghu changed the title WIP: Add locality loadbalance to kmesh workload mode. Add locality loadbalance to kmesh workload mode. Oct 29, 2024
eksDelete := []bpf.EndpointKey{}
backendUids := []uint32{}
for _, ep := range p.EndpointCache.List(serviceId) {
func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsDelete []cache.Endpoint, toLLb bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epsDelete may look confusing, indeed we are trying to re orgnize them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using epsUpdate now

return nil
}

for i := len(epsDelete) - 1; i >= 0; i-- {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please document why we do thin in reverse order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it

hzxuzhonghu
hzxuzhonghu previously approved these changes Oct 29, 2024
Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Signed-off-by: derekwin <[email protected]>
Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@kmesh-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: hzxuzhonghu

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@kmesh-bot kmesh-bot merged commit 6e56261 into kmesh-net:main Oct 30, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants