-
Notifications
You must be signed in to change notification settings - Fork 111
Add locality loadbalance to kmesh workload mode. #900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e37cf23
to
aea4eec
Compare
Signed-off-by: derekwin <[email protected]>
// notice: s should set by lb.GetRoutingPreference() | ||
if len(s) > 0 { | ||
l.RoutingPreference = s | ||
l.LbStrictIndex = uint32(len(s)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure i understand this, previously i thought this is a bool flag for strict mode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In strict mode, only the workloads that exactly match every item in the routePreference are considered. In other words, only the workloads with a priority equal to len(routePreference) are taken into account. We set LbStrictIndex so that the kernel BPF program filters out priorities other than lbStrictIndex in strict mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only looked at the go code part
@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error { | |||
wpkDelete = bpf.WorkloadPolicyKey{} | |||
) | |||
|
|||
log.Warnf("== removeWorkloadFromBpfMap: workload uid: %#v, backendUid: %#v", uid, p.hashName.Hash(uid)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spammy, please use debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
ekDelete := bpf.EndpointKey{ | ||
ServiceId: serviceId, | ||
BackendIndex: i, | ||
var j uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you donot need to define j here, move below
for j:=0; j <= bpf.MaxPrio; j++ {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -290,16 +300,17 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na | |||
} | |||
|
|||
// addWorkloadToService update service & endpoint bpf map when a workload has new bound services | |||
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32) error { | |||
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, Prio uint32) error { | |
func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, priority uint32) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -359,6 +382,7 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { | |||
) | |||
|
|||
uid := p.hashName.Hash(workload.GetUid()) | |||
log.Warnf("=in= updateWorkload: workload uid: %#v, backendUid: %#v", workload.GetUid(), uid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spammy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode | ||
p.locality.SetRoutingPreference(lb.GetRoutingPreference()) | ||
p.locality.LbPolicy = newValue.LbPolicy | ||
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Debugf("lbPolicy:%#v, routingPreference:%#v, strictIndex:%#v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) | |
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
606db1a
to
1be65fb
Compare
After resolving the issue mentioned in #910, functional testing was conducted on the current code, and the functionality was found to be working correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try to review the control plane part later
@@ -10,6 +10,8 @@ | |||
#define MAX_PORT_COUNT 10 | |||
#define MAX_SERVICE_COUNT 10 | |||
#define RINGBUF_SIZE (1 << 12) | |||
#define MIN_PRIO 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant with PRIO_COUNT
bpf/kmesh/workload/include/service.h
Outdated
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1; | ||
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO | ||
|
||
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can return fast by checking 0 here since you removed it from the caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
short circuit
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio | ||
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance | ||
// Failover/strict mode | ||
__u32 lb_strict_prio; // for failover strict mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this is not needed , LB_POLICY_FAILOVER and LB_POLICY_STRICT values are supported
bpf/kmesh/workload/include/service.h
Outdated
} | ||
return 0; // find the backend successfully | ||
} | ||
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain a little bit more, i am confused about this
@@ -388,7 +388,7 @@ func TestServer_dumpWorkloadBpfMap(t *testing.T) { | |||
{ServiceId: 1}, {ServiceId: 2}, | |||
} | |||
testServiceVals := []bpfcache.ServiceValue{ | |||
{EndpointCount: 1234}, {EndpointCount: 5678}, | |||
{EndpointCount: [7]uint32{1234, 1234, 1234, 1234, 1234, 1234, 1234}}, {EndpointCount: [7]uint32{5678, 5678, 5678, 5678, 5678, 5678, 5678}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just a dummy code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean in this test i donot see you set the routing preference, why expect 7 priority groups
@@ -179,6 +181,7 @@ func (p *Processor) removeWorkloadFromBpfMap(uid string) error { | |||
wpkDelete = bpf.WorkloadPolicyKey{} | |||
) | |||
|
|||
log.Debugf("removeWorkloadFromBpfMap: workload %s, backendUid %d", uid, p.hashName.Hash(uid)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move after L185
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need a test coverage for service lb policy update
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) | ||
return err | ||
if sv.LbPolicy == LbPolicyRandom { // random mode | ||
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MinPrio); err != nil { // In random mode, we save all workload to minprio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest store with highest prio
if p.locality.CanLocalityLB() { | ||
prio := p.locality.CalcuLocalityLBPrio(workload) | ||
if err = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil { | ||
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) | |
log.Errorf("addWorkloadToService workload %d service %d pirority %d failed: %v", workloadId, sk.ServiceId, prio, err) |
return MinPrio - rank | ||
} | ||
|
||
func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need a lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not necessary, the workload processor works in serial
Signed-off-by: derekwin <[email protected]>
How are the endpoints be deleted in batch? I cannot understand |
} | ||
} | ||
} | ||
return uint32(len(rp)) - rank |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not right. Keep in mind, we shoud match rp []workloadapi.LoadBalancing_Scope in order, if one doesnot match, we should stop.
So i am super sure this result now is nort right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not test coverage, i will write a test later to test your function. I am feeling this function is still not correct
Signed-off-by: derekwin <[email protected]>
In function |
/retest |
@derekwin: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/ok-to-test |
Signed-off-by: derekwin <[email protected]>
eksDelete := []bpf.EndpointKey{} | ||
backendUids := []uint32{} | ||
for _, ep := range p.EndpointCache.List(serviceId) { | ||
func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsDelete []cache.Endpoint, toLLb bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
epsDelete
may look confusing, indeed we are trying to re orgnize them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using epsUpdate
now
return nil | ||
} | ||
|
||
for i := len(epsDelete) - 1; i >= 0; i-- { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please document why we do thin in reverse order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Signed-off-by: derekwin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hzxuzhonghu The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
/kind enhancement
This PR is not ready to be merged; some features are not working properly.1. The priority of the service Pod written to the BPF map is observed to be correct in user space, but the behavior of locality load balancing in kernel space is random. (This issue does not exist in the test version of the code at this link.)2. The code for the workload has undergone significant changes in the past one to two months. The current PR's code attempts to adapt to the new logic of the workload as much as possible. However, a lot of redundant deletion behaviors have been observed. These parts of the code need to be optimized in conjunction with the existing workload logic.This pr is ok.
But i find a bug in new version kubectl and istio work with kmesh(#910).