-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add locality loadbalance to kmesh workload mode. #900
base: main
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
e37cf23
to
aea4eec
Compare
Signed-off-by: derekwin <[email protected]>
Signed-off-by: derekwin <[email protected]>
// notice: s should set by lb.GetRoutingPreference() | ||
if len(s) > 0 { | ||
l.RoutingPreference = s | ||
l.LbStrictIndex = uint32(len(s)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure i understand this, previously i thought this is a bool flag for strict mode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In strict mode, only the workloads that exactly match every item in the routePreference are considered. In other words, only the workloads with a priority equal to len(routePreference) are taken into account. We set LbStrictIndex so that the kernel BPF program filters out priorities other than lbStrictIndex in strict mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only looked at the go code part
@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error { | |||
wpkDelete = bpf.WorkloadPolicyKey{} | |||
) | |||
|
|||
log.Warnf("== removeWorkloadFromBpfMap: workload uid: %#v, backendUid: %#v", uid, p.hashName.Hash(uid)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spammy, please use debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
ekDelete := bpf.EndpointKey{ | ||
ServiceId: serviceId, | ||
BackendIndex: i, | ||
var j uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you donot need to define j here, move below
for j:=0; j <= bpf.MaxPrio; j++ {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -290,16 +300,17 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na | |||
} | |||
|
|||
// addWorkloadToService update service & endpoint bpf map when a workload has new bound services | |||
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32) error { | |||
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error { | |
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, priority uint32) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -359,6 +382,7 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { | |||
) | |||
|
|||
uid := p.hashName.Hash(workload.GetUid()) | |||
log.Warnf("=in= updateWorkload: workload uid: %#v, backendUid: %#v", workload.GetUid(), uid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spammy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -401,6 +425,11 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { | |||
p.WorkloadCache.AddOrUpdateWorkload(workload) | |||
p.storeWorkloadPolicies(workload.GetUid(), workload.GetAuthorizationPolicies()) | |||
|
|||
// update kmesh localityCache | |||
if p.nodeName == workload.GetNode() { | |||
p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand why only setLocality when p.nodeName == workload.GetNode()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the kmesh process retrieves information about all workloads from xDS, and the locality associated with the kmesh process should be carried by the workloads on the same node, we only update locality information from workloads that have the same nodeName as the current one.
newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode | ||
p.locality.SetRoutingPreference(lb.GetRoutingPreference()) | ||
p.locality.LbPolicy = newValue.LbPolicy | ||
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) | |
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return l.isLocalityInfoSet && l.isRoutingPreferenceSet | ||
} | ||
|
||
func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload) uint32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand this: From my perspective, the workload priority has to do with the original node info.
If the node and the workload both reside in same RoutingPreference [], the workload has highest priority.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the earliest design version, it was designed this way. During a kmesh community meeting, you suggested considering the current design format. Given that the kmesh process exists on every node as a DaemonSet, it is possible to pre-calculate the priority of each workload within a service relative to that node on each node. This way, when subsequent traffic arrives and accesses the service, the backend to serve can be selected based on the pre-calculated priorities of the workloads for that service.
Signed-off-by: derekwin <[email protected]>
Signed-off-by: derekwin <[email protected]>
After resolving the issue mentioned in #910, functional testing was conducted on the current code, and the functionality was found to be working correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try to review the control plane part later
@@ -10,6 +10,8 @@ | |||
#define MAX_PORT_COUNT 10 | |||
#define MAX_SERVICE_COUNT 10 | |||
#define RINGBUF_SIZE (1 << 12) | |||
#define MIN_PRIO 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant with PRIO_COUNT
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1; | ||
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO | ||
|
||
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can return fast by checking 0 here since you removed it from the caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
short circuit
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio | ||
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance | ||
// Failover/strict mode | ||
__u32 lb_strict_prio; // for failover strict mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this is not needed , LB_POLICY_FAILOVER and LB_POLICY_STRICT values are supported
} | ||
return 0; // find the backend successfully | ||
} | ||
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain a little bit more, i am confused about this
return 0; // find the backend successfully | ||
} | ||
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index | ||
return -ENOENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not align with ztunnel behavior, this return value would make the traffic goes through like directly accessing the clusterIP, but not blocking the traffic
@@ -388,7 +388,7 @@ func TestServer_dumpWorkloadBpfMap(t *testing.T) { | |||
{ServiceId: 1}, {ServiceId: 2}, | |||
} | |||
testServiceVals := []bpfcache.ServiceValue{ | |||
{EndpointCount: 1234}, {EndpointCount: 5678}, | |||
{EndpointCount: [7]uint32{1234, 1234, 1234, 1234, 1234, 1234, 1234}}, {EndpointCount: [7]uint32{5678, 5678, 5678, 5678, 5678, 5678, 5678}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand
@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error { | |||
wpkDelete = bpf.WorkloadPolicyKey{} | |||
) | |||
|
|||
log.Debugf("removeWorkloadFromBpfMap: workload %s, backendUid %d", uid, p.hashName.Hash(uid)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move after L185
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need a test coverage for service lb policy update
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) | ||
return err | ||
if sv.LbPolicy == LbPolicyRandom { // random mode | ||
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MinPrio); err != nil { // In random mode, we save all workload to minprio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest store with highest prio
What type of PR is this?
/kind enhancement
This PR is not ready to be merged; some features are not working properly.1. The priority of the service Pod written to the BPF map is observed to be correct in user space, but the behavior of locality load balancing in kernel space is random. (This issue does not exist in the test version of the code at this link.)2. The code for the workload has undergone significant changes in the past one to two months. The current PR's code attempts to adapt to the new logic of the workload as much as possible. However, a lot of redundant deletion behaviors have been observed. These parts of the code need to be optimized in conjunction with the existing workload logic.This pr is ok.
But i find a bug in new version kubectl and istio work with kmesh(#910).