diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index fca6d6e72..65735bf1c 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -70,12 +70,99 @@ jobs: - name: Install NVidia GPU operator for KinD uses: ./common/github-actions/nvidia-gpu-operator + - name: Verify GPU availability in KinD + run: | + echo "Checking for available GPUs in the KinD cluster..." + + # Wait for GPU operator pods to be ready (with timeout) + echo "Waiting for GPU operator pods to be ready..." + TIMEOUT=300 # 5 minutes timeout + END=$((SECONDS + TIMEOUT)) + + while [ $SECONDS -lt $END ]; do + # Get total number of pods in the namespace + TOTAL_PODS=$(kubectl get pods -n gpu-operator --no-headers | wc -l) + + # Count pods that are either running and ready or completed successfully + # Exclude pods that are still initializing + READY_PODS=$(kubectl get pods -n gpu-operator --no-headers | grep -E 'Running|Completed' | grep -v 'PodInitializing' | wc -l) + + if [ "$READY_PODS" -eq "$TOTAL_PODS" ] && [ "$TOTAL_PODS" -gt 0 ]; then + echo "All GPU operator pods are ready or completed successfully!" + break + fi + + echo "Waiting for GPU operator pods to be ready... ($READY_PODS/$TOTAL_PODS)" + echo "Pod status:" + kubectl get pods -n gpu-operator + sleep 10 + done + + if [ $SECONDS -ge $END ]; then + echo "::error::Timeout waiting for GPU operator pods to be ready" + echo "GPU operator pod status:" + kubectl get pods -n gpu-operator -o wide + echo "GPU operator pod logs:" + kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator + echo "GPU operator pod events:" + kubectl get events -n gpu-operator + exit 1 + fi + + echo "Node details:" + kubectl describe nodes | grep -E 'nvidia.com/gpu|Allocatable:|Capacity:|Name:' + + # Check if GPU operator has labeled nodes + GPU_LABELS=$(kubectl describe nodes | grep -c "nvidia.com/gpu") + if [ "$GPU_LABELS" -eq 0 ]; then + echo "::error::No NVIDIA GPU labels found on nodes. GPU operator may not be running correctly." + echo "Full node descriptions for debugging:" + kubectl describe nodes + exit 1 + fi + + # Check if GPUs are actually allocatable + GPU_ALLOCATABLE=$(kubectl get nodes -o jsonpath='{.items[*].status.allocatable.nvidia\.com/gpu}' | tr ' ' '\n' | grep -v '^$' | wc -l) + if [ "$GPU_ALLOCATABLE" -eq 0 ]; then + echo "::error::GPU operator is running but no GPUs are allocatable. Check GPU operator logs." + echo "Checking GPU operator pods:" + kubectl get pods -n gpu-operator -o wide + echo "GPU operator pod logs:" + kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator + echo "GPU operator pod events:" + kubectl get events -n gpu-operator + echo "GPU operator pod descriptions:" + kubectl describe pods -n gpu-operator + exit 1 + fi + + echo "Successfully found $GPU_ALLOCATABLE allocatable GPU(s) in the cluster." + - name: Deploy CodeFlare stack id: deploy run: | cd codeflare-operator echo Setting up CodeFlare stack make setup-e2e + + # Create ConfigMap to disable mTLS + echo "Creating ConfigMap to disable mTLS..." + cat <= END: + logger.error("Timeout waiting for pods to be ready or discovered") + if not head_pod_name or not worker_pod_name: + logger.error( + "Could not discover head and/or worker pods by name substring. Listing all pods in namespace for debugging:" + ) + try: + all_pods_result = subprocess.run( + [ + "kubectl", + "get", + "pods", + "-n", + self.namespace, + "-o", + "wide", + ], + capture_output=True, + text=True, + check=False, + ) + logger.error( + f"Pods in namespace '{self.namespace}':\\n{all_pods_result.stdout}" + ) + if all_pods_result.stderr: + logger.error( + f"Error listing pods: {all_pods_result.stderr}" + ) + except Exception as e_pods: + logger.error( + f"Exception while trying to list all pods: {e_pods}" + ) + + if head_pod_name: + logger.error( + f"Final head pod ({head_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, head_pod_name)}" + ) + else: + logger.error( + f"Final head pod status: Not Discovered by searching for '{cluster_name}' and 'head' in pod names." + ) + + if worker_pod_name: + logger.error( + f"Final worker pod ({worker_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, worker_pod_name)}" + ) + else: + logger.error( + f"Final worker pod status: Not Discovered by searching for '{cluster_name}' and 'worker' in pod names." + ) + raise TimeoutError( + "Pods did not become ready (or were not discovered by name substring) within the timeout period" + ) + + generate_cert.generate_tls_cert(cluster_name, self.namespace) + generate_cert.export_env(cluster_name, self.namespace) + + # Start port forwarding + local_port = "20001" + ray_client_port = "10001" + head_service_name = f"{cluster_name}-head-svc" + + port_forward_cmd = [ + "kubectl", + "port-forward", + "-n", + self.namespace, + f"svc/{head_service_name}", + f"{local_port}:{ray_client_port}", + ] + logger.info(f"Starting port-forward: {' '.join(port_forward_cmd)}") + # Using preexec_fn=os.setsid to create a new session, so we can kill the whole process group later if needed. + # However, os.setsid is not available on Windows. For simplicity in a test, direct Popen is used. + # Proper cross-platform process group management can be more complex. + self.port_forward_process = subprocess.Popen( + port_forward_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + logger.info( + f"Port-forward process started with PID: {self.port_forward_process.pid}" ) - return sum(results) + time.sleep(5) # Give port-forward a few seconds to establish - ref = heavy_calculation.remote(3000) - result = ray.get(ref) - assert result == 1789.4644387076714 - ray.cancel(ref) - ray.shutdown() + client_url = f"ray://localhost:{local_port}" + # client_url = cluster.local_client_url() # Original line, now replaced + cluster.status() + + logger.info(f"Attempting to connect to Ray client at: {client_url}") + logger.info("Initializing Ray connection...") + try: + ray.init( + address=client_url, logging_level="INFO" + ) # Removed local_mode=True + logger.info("Ray initialization successful") + except Exception as e: + logger.error(f"Ray initialization failed: {str(e)}") + logger.error(f"Error type: {type(e)}") + # Log port-forward stdout/stderr if connection fails + if self.port_forward_process: + stdout, stderr = self.port_forward_process.communicate( + timeout=5 + ) # attempt to read + logger.error( + f"Port-forward stdout: {stdout.decode(errors='ignore')}" + ) + logger.error( + f"Port-forward stderr: {stderr.decode(errors='ignore')}" + ) + raise + + @ray.remote(num_gpus=number_of_gpus / 2) + def heavy_calculation_part(num_iterations): + result = 0.0 + for i in range(num_iterations): + for j in range(num_iterations): + for k in range(num_iterations): + result += math.sin(i) * math.cos(j) * math.tan(k) + return result + + @ray.remote(num_gpus=number_of_gpus / 2) + def heavy_calculation(num_iterations): + results = ray.get( + [ + heavy_calculation_part.remote(num_iterations // 30) + for _ in range(30) + ] + ) + return sum(results) + + ref = heavy_calculation.remote(3000) + + try: + result = ray.get(ref) + logger.info(f"Calculation completed with result: {result}") + assert result == 1789.4644387076714 + logger.info("Result assertion passed") + except Exception as e: + logger.error(f"Error during calculation: {str(e)}") + raise + finally: + logger.info("Cancelling task reference...") + ray.cancel(ref) + logger.info("Task cancelled") + + ray.shutdown() + # Port-forward process is stopped in finally block or teardown_method - cluster.down() + finally: + if self.port_forward_process: + logger.info( + f"Stopping port-forward process (PID: {self.port_forward_process.pid}) in finally block..." + ) + self.port_forward_process.terminate() + try: + self.port_forward_process.wait(timeout=10) + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) terminated from finally." + ) + except subprocess.TimeoutExpired: + logger.warning( + f"Port-forward process (PID: {self.port_forward_process.pid}) did not terminate in time from finally, killing..." + ) + self.port_forward_process.kill() + self.port_forward_process.wait() + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) killed from finally." + ) + self.port_forward_process = None + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index d7bee8054..4063a8eaf 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -9,6 +9,7 @@ from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( _kube_api_error_handling, ) +import time def get_ray_cluster(cluster_name, namespace): @@ -299,31 +300,38 @@ def create_kueue_resources( def delete_kueue_resources(self): - # Delete if given cluster-queue exists - for cq in self.cluster_queues: - try: - self.custom_api.delete_cluster_custom_object( - group="kueue.x-k8s.io", - plural="clusterqueues", - version="v1beta1", - name=cq, - ) - print(f"\n'{cq}' cluster-queue deleted") - except Exception as e: - print(f"\nError deleting cluster-queue '{cq}' : {e}") - - # Delete if given resource-flavor exists - for flavor in self.resource_flavors: - try: - self.custom_api.delete_cluster_custom_object( - group="kueue.x-k8s.io", - plural="resourceflavors", - version="v1beta1", - name=flavor, - ) - print(f"'{flavor}' resource-flavor deleted") - except Exception as e: - print(f"\nError deleting resource-flavor '{flavor}': {e}") + try: + # Delete if given cluster-queue exists + for cq in getattr(self, "cluster_queues", []): + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cq, + ) + print(f"\n'{cq}' cluster-queue deleted") + except Exception as e: + print(f"\nError deleting cluster-queue '{cq}' : {e}") + + # Delete if given resource-flavor exists + for flavor in getattr(self, "resource_flavors", []): + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor, + ) + print(f"'{flavor}' resource-flavor deleted") + except Exception as e: + print(f"\nError deleting resource-flavor '{flavor}': {e}") + + # Wait for resources to be cleaned up + time.sleep(5) + except Exception as e: + print(f"Error during Kueue resource cleanup: {e}") + raise def get_pod_node(self, namespace, name): @@ -407,3 +415,326 @@ def assert_get_cluster_and_jobsubmit( assert job_list[0].submission_id == submission_id cluster.down() + + +def kubectl_get_pod_status(namespace, pod_name): + """Get the status of a pod.""" + try: + # First check if the pod exists + result = subprocess.run( + ["kubectl", "get", "pod", pod_name, "-n", namespace], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + print(f"Pod {pod_name} not found in namespace {namespace}") + print(f"kubectl error output: {result.stderr}") + # Try to get events in the namespace to see if there are any issues + events = subprocess.run( + [ + "kubectl", + "get", + "events", + "-n", + namespace, + "--sort-by='.lastTimestamp'", + ], + capture_output=True, + text=True, + check=False, + ) + if events.returncode == 0: + print(f"Recent events in namespace {namespace}:") + print(events.stdout) + return "NotFound" + + # Get the pod phase + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.phase}'", + ], + capture_output=True, + text=True, + check=True, + ) + status = result.stdout.strip("'") + + # Get pod conditions for more detailed status + conditions = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.conditions}'", + ], + capture_output=True, + text=True, + check=True, + ) + + return status + except subprocess.CalledProcessError as e: + print(f"Error getting pod status for {pod_name}: {e.stderr}") + return "Error" + + +def kubectl_get_pod_ready(namespace, pod_name): + """Check if all containers in a pod are ready.""" + try: + # Get container statuses + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses}'", + ], + capture_output=True, + text=True, + check=True, + ) + + # Get ready status + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].ready}'", + ], + capture_output=True, + text=True, + check=True, + ) + statuses = result.stdout.strip("'").split() + ready = all(status == "true" for status in statuses) + + if not ready: + # Get container names and their ready status + names_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].name}'", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = names_result.stdout.strip("'").split() + + # Get container states for more detailed status + states_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state}'", + ], + capture_output=True, + text=True, + check=True, + ) + states = states_result.stdout.strip("'").split() + + # Get container reasons if not ready + reasons_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.reason}'", + ], + capture_output=True, + text=True, + check=True, + ) + reasons = reasons_result.stdout.strip("'").split() + + for name, status, state, reason in zip( + container_names, statuses, states, reasons + ): + print(f"Container {name}:") + print(f" Ready status: {status}") + print(f" State: {state}") + if reason and reason != "": + print(f" Reason: {reason}") + + return ready + except subprocess.CalledProcessError as e: + print(f"Error checking pod readiness for {pod_name}: {e.stderr}") + return False + + +def kubectl_get_pod_container_status(namespace, pod_name): + """Get detailed container status for a pod.""" + try: + # Get container names + names_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].name}'", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = names_result.stdout.strip("'").split() + + # Get container states + states_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state}'", + ], + capture_output=True, + text=True, + check=True, + ) + states = states_result.stdout.strip("'").split() + + # Get container reasons if waiting + reasons_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.reason}'", + ], + capture_output=True, + text=True, + check=True, + ) + reasons = reasons_result.stdout.strip("'").split() + + # Get container messages if waiting + messages_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.message}'", + ], + capture_output=True, + text=True, + check=True, + ) + messages = messages_result.stdout.strip("'").split() + + # Combine all information + status = {} + for name, state, reason, message in zip( + container_names, states, reasons, messages + ): + status[name] = { + "state": state, + "reason": reason if reason != "" else None, + "message": message if message != "" else None, + } + + return status + except subprocess.CalledProcessError as e: + print(f"Error getting container status for {pod_name}: {e.stderr}") + return "Error" + + +def kubectl_get_pod_name_by_substring(namespace, cluster_name_part, type_substring): + """Get the name of the first pod in the namespace that contains both cluster_name_part and type_substring in its name.""" + try: + command = [ + "kubectl", + "get", + "pods", + "-n", + namespace, + "-o", + "jsonpath={.items[*].metadata.name}", + ] + result = subprocess.run(command, capture_output=True, text=True, check=False) + + if result.returncode != 0: + print( + f"kubectl command failed to list pods in {namespace}. stderr: {result.stderr}" + ) + return None + + pod_names_str = result.stdout.strip().strip("'") + if not pod_names_str: + # print(f"No pods found in namespace {namespace}") # Uncomment for debugging + return None + + pod_names = pod_names_str.split() + # print(f"Pods found in namespace {namespace}: {pod_names}") # Uncomment for debugging + + for pod_name in pod_names: + # Ensure both parts are present. Using lower() for case-insensitive matching of type_substring (e.g. Head vs head) + if ( + cluster_name_part.lower() in pod_name.lower() + and type_substring.lower() in pod_name.lower() + ): + # print(f"Found matching pod: {pod_name} for cluster part '{cluster_name_part}' and type '{type_substring}'") # Uncomment for debugging + return pod_name + + # print(f"No pod found containing '{cluster_name_part}' and '{type_substring}' in namespace {namespace}") # Uncomment for debugging + return None + except subprocess.CalledProcessError as e: + print( + f"Error listing pods in namespace {namespace} to find by substring: {e.stderr}" + ) + return None + except Exception as e: + print(f"An unexpected error occurred while getting pod name by substring: {e}") + return None