Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locality loadbalance to kmesh workload mode. #900

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

derekwin
Copy link
Contributor

@derekwin derekwin commented Sep 26, 2024

What type of PR is this?
/kind enhancement

This PR is not ready to be merged; some features are not working properly.
1. The priority of the service Pod written to the BPF map is observed to be correct in user space, but the behavior of locality load balancing in kernel space is random. (This issue does not exist in the test version of the code at this link.)
2. The code for the workload has undergone significant changes in the past one to two months. The current PR's code attempts to adapt to the new logic of the workload as much as possible. However, a lot of redundant deletion behaviors have been observed. These parts of the code need to be optimized in conjunction with the existing workload logic.

This pr is ok.

But i find a bug in new version kubectl and istio work with kmesh(#910).

this pr's enhancement need new version kubectl and istio.

@kmesh-bot kmesh-bot added the kind/enhancement New feature or request label Sep 26, 2024
@kmesh-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign kevin-wangzefeng for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

// notice: s should set by lb.GetRoutingPreference()
if len(s) > 0 {
l.RoutingPreference = s
l.LbStrictIndex = uint32(len(s))
Copy link
Member

@hzxuzhonghu hzxuzhonghu Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure i understand this, previously i thought this is a bool flag for strict mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In strict mode, only the workloads that exactly match every item in the routePreference are considered. In other words, only the workloads with a priority equal to len(routePreference) are taken into account. We set LbStrictIndex so that the kernel BPF program filters out priorities other than lbStrictIndex in strict mode.

Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at the go code part

@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
wpkDelete = bpf.WorkloadPolicyKey{}
)

log.Warnf("== removeWorkloadFromBpfMap: workload uid: %#v, backendUid: %#v", uid, p.hashName.Hash(uid))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spammy, please use debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

ekDelete := bpf.EndpointKey{
ServiceId: serviceId,
BackendIndex: i,
var j uint32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you donot need to define j here, move below

for j:=0; j <= bpf.MaxPrio; j++ {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -290,16 +300,17 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na
}

// addWorkloadToService update service & endpoint bpf map when a workload has new bound services
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32) error {
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error {
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, priority uint32) error {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -359,6 +382,7 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error {
)

uid := p.hashName.Hash(workload.GetUid())
log.Warnf("=in= updateWorkload: workload uid: %#v, backendUid: %#v", workload.GetUid(), uid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spammy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -401,6 +425,11 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error {
p.WorkloadCache.AddOrUpdateWorkload(workload)
p.storeWorkloadPolicies(workload.GetUid(), workload.GetAuthorizationPolicies())

// update kmesh localityCache
if p.nodeName == workload.GetNode() {
p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand why only setLocality when p.nodeName == workload.GetNode()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the kmesh process retrieves information about all workloads from xDS, and the locality associated with the kmesh process should be carried by the workloads on the same node, we only update locality information from workloads that have the same nodeName as the current one.

newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode
p.locality.SetRoutingPreference(lb.GetRoutingPreference())
p.locality.LbPolicy = newValue.LbPolicy
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return l.isLocalityInfoSet && l.isRoutingPreferenceSet
}

func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload) uint32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand this: From my perspective, the workload priority has to do with the original node info.

If the node and the workload both reside in same RoutingPreference [], the workload has highest priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the earliest design version, it was designed this way. During a kmesh community meeting, you suggested considering the current design format. Given that the kmesh process exists on every node as a DaemonSet, it is possible to pre-calculate the priority of each workload within a service relative to that node on each node. This way, when subsequent traffic arrives and accesses the service, the backend to serve can be selected based on the pre-calculated priorities of the workloads for that service.

@derekwin
Copy link
Contributor Author

After resolving the issue mentioned in #910, functional testing was conducted on the current code, and the functionality was found to be working correctly.

Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to review the control plane part later

@@ -10,6 +10,8 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define MIN_PRIO 6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant with PRIO_COUNT

endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can return fast by checking 0 here since you removed it from the caller

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short circuit

__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 lb_strict_prio; // for failover strict mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is not needed , LB_POLICY_FAILOVER and LB_POLICY_STRICT values are supported

}
return 0; // find the backend successfully
}
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a little bit more, i am confused about this

return 0; // find the backend successfully
}
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index
return -ENOENT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not align with ztunnel behavior, this return value would make the traffic goes through like directly accessing the clusterIP, but not blocking the traffic

@@ -388,7 +388,7 @@ func TestServer_dumpWorkloadBpfMap(t *testing.T) {
{ServiceId: 1}, {ServiceId: 2},
}
testServiceVals := []bpfcache.ServiceValue{
{EndpointCount: 1234}, {EndpointCount: 5678},
{EndpointCount: [7]uint32{1234, 1234, 1234, 1234, 1234, 1234, 1234}}, {EndpointCount: [7]uint32{5678, 5678, 5678, 5678, 5678, 5678, 5678}},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand

@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
wpkDelete = bpf.WorkloadPolicyKey{}
)

log.Debugf("removeWorkloadFromBpfMap: workload %s, backendUid %d", uid, p.hashName.Hash(uid))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move after L185

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a test coverage for service lb policy update

log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
return err
if sv.LbPolicy == LbPolicyRandom { // random mode
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MinPrio); err != nil { // In random mode, we save all workload to minprio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest store with highest prio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement New feature or request size/L
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants