Skip to content

Commit 078ec30

Browse files
authored
Add configurable QPS and burst settings for kube API client (#2411)
* Add configurable QPS and burst settings for kube API client Introduce new flags to configure `QPS` and `Burst` for the Kubernetes API client, enabling better control over API rate limits. Signed-off-by: R.K <[email protected]> * Set a token bucket rate limiter for Kubernetes client Replaced direct QPS and Burst configuration with a token bucket rate limiter using Kubernetes client-go's flowcontrol package. Signed-off-by: R.K <[email protected]> * Reorganize import for flowcontrol in main.go Signed-off-by: R.K <[email protected]> --------- Signed-off-by: R.K <[email protected]>
1 parent 778cd72 commit 078ec30

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

cmd/training-operator.v1/main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3232
_ "k8s.io/client-go/plugin/pkg/client/auth"
33+
"k8s.io/client-go/util/flowcontrol"
3334
ctrl "sigs.k8s.io/controller-runtime"
3435
"sigs.k8s.io/controller-runtime/pkg/cache"
3536
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -81,6 +82,8 @@ func main() {
8182
var webhookServerPort int
8283
var webhookServiceName string
8384
var webhookSecretName string
85+
var clientQps int
86+
var clientBurst int
8487

8588
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
8689
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -95,7 +98,8 @@ func main() {
9598
flag.StringVar(&namespace, "namespace", os.Getenv(EnvKubeflowNamespace), "The namespace to monitor kubeflow jobs. If unset, it monitors all namespaces cluster-wide."+
9699
"If set, it only monitors kubeflow jobs in the given namespace.")
97100
flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.")
98-
101+
flag.IntVar(&clientQps, "kube-api-qps", 20, "QPS indicates the maximum QPS to the master from this client.")
102+
flag.IntVar(&clientBurst, "kube-api-burst", 30, "Maximum burst for throttle.")
99103
// PyTorch related flags
100104
flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image",
101105
config.PyTorchInitContainerImageDefault, "The image for pytorch init container")
@@ -131,7 +135,10 @@ func main() {
131135
}
132136
}
133137

134-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
138+
cfg := ctrl.GetConfigOrDie()
139+
cfg.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(clientQps), clientBurst)
140+
141+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
135142
Scheme: scheme,
136143
Metrics: metricsserver.Options{
137144
BindAddress: metricsAddr,

0 commit comments

Comments
 (0)