Skip to content

Commit d51ea1d

Browse files
committed
pr feedback
Signed-off-by: Geoff Flarity <[email protected]>
1 parent 5230fcc commit d51ea1d

20 files changed

+334
-185
lines changed

operator/e2e_testing/setup/helm.go renamed to operator/e2e/setup/helm.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"path/filepath"
2525
"strings"
2626

27-
"github.com/sirupsen/logrus"
27+
"github.com/ai-dynamo/grove/operator/e2e/utils"
2828
"helm.sh/helm/v3/pkg/action"
2929
"helm.sh/helm/v3/pkg/chart/loader"
3030
"helm.sh/helm/v3/pkg/cli"
@@ -57,7 +57,7 @@ type HelmInstallConfig struct {
5757
// HelmLoggerFunc is called for Helm operation logging.
5858
HelmLoggerFunc func(format string, v ...interface{})
5959
// Logger is the full logger for component operations.
60-
Logger *logrus.Logger
60+
Logger *utils.Logger
6161
// RepoURL is the base URL of the Helm repository (optional, for direct chart downloads).
6262
RepoURL string
6363
}

operator/e2e_testing/setup/k8s_clusters.go renamed to operator/e2e/setup/k8s_clusters.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27-
"github.com/ai-dynamo/grove/operator/e2e_testing/utils"
27+
"github.com/ai-dynamo/grove/operator/e2e/utils"
2828
"github.com/docker/docker/api/types/container"
2929
dockerclient "github.com/docker/docker/client"
3030
"github.com/k3d-io/k3d/v5/pkg/client"
@@ -95,7 +95,7 @@ func DefaultClusterConfig() ClusterConfig {
9595
}
9696

9797
// SetupCompleteK3DCluster creates a complete k3d cluster with Grove, Kai Scheduler, and NVIDIA GPU Operator
98-
func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAMLPath string, logger *logrus.Logger) (*rest.Config, func(), error) {
98+
func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAMLPath string, logger *utils.Logger) (*rest.Config, func(), error) {
9999
restConfig, cleanup, err := SetupK3DCluster(ctx, cfg, logger)
100100
if err != nil {
101101
return nil, cleanup, err
@@ -216,9 +216,10 @@ func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAM
216216
}
217217

218218
// SetupK3DCluster creates a k3d cluster and returns a REST config
219-
func SetupK3DCluster(ctx context.Context, cfg ClusterConfig, logger *logrus.Logger) (*rest.Config, func(), error) {
219+
func SetupK3DCluster(ctx context.Context, cfg ClusterConfig, logger *utils.Logger) (*rest.Config, func(), error) {
220220
// k3d is very verbose, we don't want the INFO level logs unless the logger is set to DEBUG
221-
if logger.GetLevel() == logrus.DebugLevel {
221+
// k3d uses logrus internally, so we need to convert our level to logrus level
222+
if logger.GetLevel() == utils.DebugLevel {
222223
k3dlogger.Log().SetLevel(logrus.DebugLevel)
223224
} else {
224225
k3dlogger.Log().SetLevel(logrus.ErrorLevel)
@@ -336,7 +337,7 @@ func SetupK3DCluster(ctx context.Context, cfg ClusterConfig, logger *logrus.Logg
336337
}
337338

338339
// InstallCoreComponents installs the core components (Grove via Skaffold, Kai Scheduler and NVIDIA GPU Operator via Helm)
339-
func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConfig *HelmInstallConfig, nvidiaConfig *HelmInstallConfig, skaffoldYAMLPath string, registryPort string, logger *logrus.Logger) error {
340+
func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConfig *HelmInstallConfig, nvidiaConfig *HelmInstallConfig, skaffoldYAMLPath string, registryPort string, logger *utils.Logger) error {
340341
// use wait group to wait for all installations to complete
341342
var wg sync.WaitGroup
342343

@@ -443,7 +444,7 @@ func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConf
443444
}
444445

445446
// retryInstallation retries an installation function up to maxRetries times with a delay between attempts
446-
func retryInstallation(installFunc func() error, componentName string, maxRetries int, retryDelay time.Duration, logger *logrus.Logger) error {
447+
func retryInstallation(installFunc func() error, componentName string, maxRetries int, retryDelay time.Duration, logger *utils.Logger) error {
447448
var lastErr error
448449

449450
for attempt := 1; attempt <= maxRetries; attempt++ {
@@ -481,7 +482,7 @@ func retryInstallation(installFunc func() error, componentName string, maxRetrie
481482
// 3. Deletes the not ready node from Kubernetes
482483
// 4. Finds and restarts the corresponding Docker container (node names match container names exactly)
483484
// 5. The restarted container will rejoin the cluster as a new node
484-
func StartNodeMonitoring(ctx context.Context, clientset *kubernetes.Clientset, logger *logrus.Logger) func() {
485+
func StartNodeMonitoring(ctx context.Context, clientset *kubernetes.Clientset, logger *utils.Logger) func() {
485486
logger.Debug("🔍 Starting node monitoring for not ready nodes...")
486487

487488
// Create a context that can be cancelled to stop the monitoring
@@ -512,7 +513,7 @@ func StartNodeMonitoring(ctx context.Context, clientset *kubernetes.Clientset, l
512513
}
513514

514515
// checkAndReplaceNotReadyNodes checks for nodes that are not ready and replaces them
515-
func checkAndReplaceNotReadyNodes(ctx context.Context, clientset *kubernetes.Clientset, logger *logrus.Logger) error {
516+
func checkAndReplaceNotReadyNodes(ctx context.Context, clientset *kubernetes.Clientset, logger *utils.Logger) error {
516517
// List all nodes
517518
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
518519
if err != nil {
@@ -554,7 +555,7 @@ func isNodeReady(node *v1.Node) bool {
554555
}
555556

556557
// replaceNotReadyNode handles the process of replacing a not ready node
557-
func replaceNotReadyNode(ctx context.Context, node *v1.Node, clientset *kubernetes.Clientset, logger *logrus.Logger) error {
558+
func replaceNotReadyNode(ctx context.Context, node *v1.Node, clientset *kubernetes.Clientset, logger *utils.Logger) error {
558559
nodeName := node.Name
559560

560561
// Step 1: Delete the node from Kubernetes
@@ -573,7 +574,7 @@ func replaceNotReadyNode(ctx context.Context, node *v1.Node, clientset *kubernet
573574
}
574575

575576
// restartNodeContainer finds and restarts the Docker container corresponding to a k3d node
576-
func restartNodeContainer(ctx context.Context, nodeName string, logger *logrus.Logger) error {
577+
func restartNodeContainer(ctx context.Context, nodeName string, logger *utils.Logger) error {
577578
// Create Docker client
578579
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
579580
if err != nil {

operator/e2e_testing/setup/kai_scheduler.go renamed to operator/e2e/setup/kai_scheduler.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ package setup
1919
import (
2020
"context"
2121
"fmt"
22-
"os"
2322
"path/filepath"
2423
"runtime"
2524
"time"
2625

27-
"github.com/ai-dynamo/grove/operator/e2e_testing/utils"
26+
"github.com/ai-dynamo/grove/operator/e2e/utils"
2827
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2928
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
3029
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -95,14 +94,8 @@ func CreateDefaultKaiQueues(ctx context.Context, config *HelmInstallConfig) erro
9594
_, currentFile, _, _ := runtime.Caller(0)
9695
queuesPath := filepath.Join(filepath.Dir(currentFile), "../yaml/queues.yaml")
9796

98-
// Read the queues YAML file content
99-
yamlContent, err := os.ReadFile(queuesPath)
100-
if err != nil {
101-
return fmt.Errorf("failed to read queues YAML file %s: %w", queuesPath, err)
102-
}
103-
104-
// Apply the YAML content using the k8s client
105-
appliedResources, err := utils.ApplyYAMLContent(ctx, string(yamlContent), "", config.RestConfig, config.Logger)
97+
// Apply the YAML file using the k8s client
98+
appliedResources, err := utils.ApplyYAMLFile(ctx, queuesPath, "", config.RestConfig, config.Logger)
10699
if err != nil {
107100
return fmt.Errorf("failed to apply queues YAML: %w", err)
108101
}

operator/e2e_testing/setup/shared_cluster.go renamed to operator/e2e/setup/shared_cluster.go

Lines changed: 61 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import (
2323
"sync"
2424
"time"
2525

26-
"github.com/ai-dynamo/grove/operator/e2e_testing/utils"
26+
"github.com/ai-dynamo/grove/operator/e2e/utils"
2727
"github.com/docker/docker/api/types/image"
28-
"github.com/docker/docker/client"
29-
"github.com/sirupsen/logrus"
28+
dockerclient "github.com/docker/docker/client"
3029
v1 "k8s.io/api/core/v1"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -35,17 +34,23 @@ import (
3534
"k8s.io/client-go/rest"
3635
)
3736

37+
const (
38+
// relativeSkaffoldYAMLPath is the path to the skaffold.yaml file relative to the e2e/tests directory
39+
relativeSkaffoldYAMLPath = "../../skaffold.yaml"
40+
41+
defaulPollInterval = 1 * time.Second
42+
)
43+
3844
// SharedClusterManager manages a shared (singleton) k3d cluster for E2E tests
3945
type SharedClusterManager struct {
40-
clientset *kubernetes.Clientset
41-
restConfig *rest.Config
42-
dynamicClient dynamic.Interface
43-
cleanup func()
44-
logger *logrus.Logger
45-
isSetup bool
46-
workerNodes []string
47-
registryPort string
48-
relativeSkaffoldYAMLPath string
46+
clientset *kubernetes.Clientset
47+
restConfig *rest.Config
48+
dynamicClient dynamic.Interface
49+
cleanup func()
50+
logger *utils.Logger
51+
isSetup bool
52+
workerNodes []string
53+
registryPort string
4954
}
5055

5156
var (
@@ -54,11 +59,10 @@ var (
5459
)
5560

5661
// SharedCluster returns the singleton shared cluster manager
57-
func SharedCluster(logger *logrus.Logger, skaffoldYAMLPath string) *SharedClusterManager {
62+
func SharedCluster(logger *utils.Logger) *SharedClusterManager {
5863
once.Do(func() {
5964
sharedCluster = &SharedClusterManager{
60-
logger: logger,
61-
relativeSkaffoldYAMLPath: skaffoldYAMLPath,
65+
logger: logger,
6266
}
6367
})
6468
return sharedCluster
@@ -110,7 +114,7 @@ func (scm *SharedClusterManager) Setup(ctx context.Context, testImages []string)
110114

111115
scm.logger.Info("🚀 Setting up shared k3d cluster for all e2e tests...")
112116

113-
restConfig, cleanup, err := SetupCompleteK3DCluster(ctx, customCfg, scm.relativeSkaffoldYAMLPath, scm.logger)
117+
restConfig, cleanup, err := SetupCompleteK3DCluster(ctx, customCfg, relativeSkaffoldYAMLPath, scm.logger)
114118
if err != nil {
115119
return fmt.Errorf("failed to setup shared k3d cluster: %w", err)
116120
}
@@ -198,8 +202,8 @@ func (scm *SharedClusterManager) CleanupWorkloads(ctx context.Context) error {
198202
scm.logger.Warnf("failed to delete PodCliqueSets: %v", err)
199203
}
200204

201-
// Step 2: Poll for all resources and pods to be cleaned up (max 15 seconds)
202-
if err := scm.waitForAllResourcesAndPodsDeleted(ctx, 15*time.Second); err != nil {
205+
// Step 2: Poll for all resources and pods to be cleaned up
206+
if err := scm.waitForAllResourcesAndPodsDeleted(ctx, defaulPollInterval); err != nil {
203207
scm.logger.Warnf("timeout waiting for resources and pods to be deleted: %v", err)
204208
// List remaining resources and pods for debugging
205209
scm.listRemainingResources(ctx)
@@ -292,9 +296,6 @@ func (scm *SharedClusterManager) resetNodeStates(ctx context.Context) error {
292296

293297
// waitForAllResourcesAndPodsDeleted waits for all Grove resources and pods to be deleted
294298
func (scm *SharedClusterManager) waitForAllResourcesAndPodsDeleted(ctx context.Context, timeout time.Duration) error {
295-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
296-
defer cancel()
297-
298299
// Define all resource types to check
299300
resourceTypes := []struct {
300301
group string
@@ -308,59 +309,53 @@ func (scm *SharedClusterManager) waitForAllResourcesAndPodsDeleted(ctx context.C
308309
{"scheduler.grove.io", "v1alpha1", "podgangs", "PodGangs"},
309310
}
310311

311-
ticker := time.NewTicker(1 * time.Second)
312-
defer ticker.Stop()
313-
314-
for {
315-
select {
316-
case <-timeoutCtx.Done():
317-
return fmt.Errorf("timeout waiting for resources and pods to be deleted")
318-
case <-ticker.C:
319-
allResourcesDeleted := true
320-
totalResources := 0
321-
322-
// Check Grove resources
323-
for _, rt := range resourceTypes {
324-
gvr := schema.GroupVersionResource{
325-
Group: rt.group,
326-
Version: rt.version,
327-
Resource: rt.resource,
328-
}
312+
return utils.PollForCondition(ctx, timeout, defaulPollInterval, func() (bool, error) {
313+
allResourcesDeleted := true
314+
totalResources := 0
329315

330-
resourceList, err := scm.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})
331-
if err != nil {
332-
// If we can't list the resource type, assume it doesn't exist or is being deleted
333-
continue
334-
}
335-
336-
if len(resourceList.Items) > 0 {
337-
allResourcesDeleted = false
338-
totalResources += len(resourceList.Items)
339-
}
316+
// Check Grove resources
317+
for _, rt := range resourceTypes {
318+
gvr := schema.GroupVersionResource{
319+
Group: rt.group,
320+
Version: rt.version,
321+
Resource: rt.resource,
340322
}
341323

342-
// Check pods
343-
allPodsDeleted := true
344-
nonSystemPods := 0
345-
pods, err := scm.clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
346-
if err == nil {
347-
for _, pod := range pods.Items {
348-
if !isSystemPod(&pod) {
349-
allPodsDeleted = false
350-
nonSystemPods++
351-
}
352-
}
324+
resourceList, err := scm.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})
325+
if err != nil {
326+
// If we can't list the resource type, assume it doesn't exist or is being deleted
327+
continue
353328
}
354329

355-
if allResourcesDeleted && allPodsDeleted {
356-
return nil
330+
if len(resourceList.Items) > 0 {
331+
allResourcesDeleted = false
332+
totalResources += len(resourceList.Items)
357333
}
334+
}
358335

359-
if totalResources > 0 || nonSystemPods > 0 {
360-
scm.logger.Debugf("⏳ Waiting for %d Grove resources and %d pods to be deleted...", totalResources, nonSystemPods)
336+
// Check pods
337+
allPodsDeleted := true
338+
nonSystemPods := 0
339+
pods, err := scm.clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
340+
if err == nil {
341+
for _, pod := range pods.Items {
342+
if !isSystemPod(&pod) {
343+
allPodsDeleted = false
344+
nonSystemPods++
345+
}
361346
}
362347
}
363-
}
348+
349+
if allResourcesDeleted && allPodsDeleted {
350+
return true, nil
351+
}
352+
353+
if totalResources > 0 || nonSystemPods > 0 {
354+
scm.logger.Debugf("⏳ Waiting for %d Grove resources and %d pods to be deleted...", totalResources, nonSystemPods)
355+
}
356+
357+
return false, nil
358+
})
364359
}
365360

366361
// listRemainingResources lists remaining Grove resources for debugging
@@ -436,7 +431,7 @@ func setupRegistryTestImages(registryPort string, images []string) error {
436431
ctx := context.Background()
437432

438433
// Initialize Docker client
439-
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
434+
cli, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
440435
if err != nil {
441436
return fmt.Errorf("failed to create Docker client: %w", err)
442437
}

operator/e2e_testing/setup/skaffold.go renamed to operator/e2e/setup/skaffold.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"path/filepath"
2727
"strings"
2828

29-
"github.com/sirupsen/logrus"
29+
"github.com/ai-dynamo/grove/operator/e2e/utils"
3030
"k8s.io/client-go/rest"
3131
"k8s.io/client-go/tools/clientcmd"
3232
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
@@ -49,7 +49,7 @@ type SkaffoldInstallConfig struct {
4949
// Env are environment variables required by skaffold.yaml (e.g., VERSION, LD_FLAGS).
5050
Env map[string]string
5151
// Logger is the logger for operations (optional, will use default if nil).
52-
Logger *logrus.Logger
52+
Logger *utils.Logger
5353
}
5454

5555
// Validate validates the configuration.
@@ -69,7 +69,7 @@ func (c *SkaffoldInstallConfig) Validate() error {
6969

7070
// Set defaults
7171
if c.Logger == nil {
72-
c.Logger = logrus.New()
72+
c.Logger = utils.NewTestLogger(utils.InfoLevel)
7373
}
7474
if c.Env == nil {
7575
c.Env = make(map[string]string)
@@ -161,7 +161,7 @@ func runSkaffoldBuild(ctx context.Context, absSkaffoldPath, skaffoldDir, kubecon
161161
cmd.Stdout = &stdout
162162

163163
// cmd.Stderr is logging, it'll get passed on at debug level only
164-
cmd.Stderr = config.Logger.WriterLevel(logrus.DebugLevel)
164+
cmd.Stderr = config.Logger.WriterLevel(utils.DebugLevel)
165165

166166
config.Logger.Debugf(" Running: skaffold %v", args)
167167
if err := cmd.Run(); err != nil {
@@ -232,16 +232,16 @@ func runSkaffoldDeploy(ctx context.Context, absSkaffoldPath, skaffoldDir, kubeco
232232
cmd.Env = append(cmd.Env, fmt.Sprintf("CONTAINER_REGISTRY=%s", config.PullRepo))
233233

234234
// log cmd.Stdout and cmd.Stderr at debug level only
235-
cmd.Stdout = config.Logger.WriterLevel(logrus.DebugLevel)
236-
cmd.Stderr = config.Logger.WriterLevel(logrus.DebugLevel)
235+
cmd.Stdout = config.Logger.WriterLevel(utils.DebugLevel)
236+
cmd.Stderr = config.Logger.WriterLevel(utils.DebugLevel)
237237

238238
config.Logger.Debugf(" Running: skaffold %v", args)
239239
return cmd.Run()
240240
}
241241

242242
// writeTemporaryKubeconfig converts a rest.Config to a kubeconfig file and writes it to a temporary location.
243243
// Returns the path to the temporary file and a cleanup function.
244-
func writeTemporaryKubeconfig(restConfig *rest.Config, logger *logrus.Logger) (string, func(), error) {
244+
func writeTemporaryKubeconfig(restConfig *rest.Config, logger *utils.Logger) (string, func(), error) {
245245
// Create a temporary file for the kubeconfig
246246
tmpFile, err := os.CreateTemp("", "kubeconfig-*.yaml")
247247
if err != nil {

operator/e2e_testing/tests/gang_scheduling_test.go renamed to operator/e2e/tests/gang_scheduling_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"context"
2323
"testing"
2424

25-
"github.com/ai-dynamo/grove/operator/e2e_testing/utils"
25+
"github.com/ai-dynamo/grove/operator/e2e/utils"
2626
v1 "k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
)

0 commit comments

Comments
 (0)