From 070f1bea2b35e15adfa2c18a85c4976505634a89 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 09:21:43 +0100 Subject: [PATCH 1/9] test running only failing test --- .github/workflows/e2e_tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index fca6d6e72..d100241c5 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -117,7 +117,7 @@ jobs: pip install poetry poetry install --with test,docs echo "Running e2e tests..." - poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + poetry run pytest -v -s ./tests/e2e/local_interactive_sdk_kind_test.py > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 env: GRPC_DNS_RESOLVER: "native" From d6a07f3bd64b206176022b19379cd7b4ed272483 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 09:54:14 +0100 Subject: [PATCH 2/9] add excessive amounts of logs --- tests/e2e/local_interactive_sdk_kind_test.py | 87 ++++++++++++++++++-- 1 file changed, 82 insertions(+), 5 deletions(-) diff --git a/tests/e2e/local_interactive_sdk_kind_test.py b/tests/e2e/local_interactive_sdk_kind_test.py index c20fd8793..3d2365aa4 100644 --- a/tests/e2e/local_interactive_sdk_kind_test.py +++ b/tests/e2e/local_interactive_sdk_kind_test.py @@ -8,37 +8,54 @@ import pytest import ray import math +import logging +import time +import os from support import * +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + @pytest.mark.kind class TestRayLocalInteractiveOauth: def setup_method(self): + logger.info("Setting up test environment...") initialize_kubernetes_client(self) + logger.info("Kubernetes client initialized") def teardown_method(self): + logger.info("Cleaning up test environment...") delete_namespace(self) delete_kueue_resources(self) + logger.info("Cleanup completed") def test_local_interactives(self): + logger.info("Starting test_local_interactives...") self.setup_method() create_namespace(self) create_kueue_resources(self) self.run_local_interactives() + logger.info("test_local_interactives completed") @pytest.mark.nvidia_gpu def test_local_interactives_nvidia_gpu(self): + logger.info("Starting test_local_interactives_nvidia_gpu...") self.setup_method() create_namespace(self) create_kueue_resources(self) self.run_local_interactives(number_of_gpus=1) + logger.info("test_local_interactives_nvidia_gpu completed") def run_local_interactives( self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0 ): cluster_name = "test-ray-cluster-li" + logger.info(f"Starting run_local_interactives with {number_of_gpus} GPUs") + logger.info("Creating cluster configuration...") cluster = Cluster( ClusterConfiguration( name=cluster_name, @@ -57,37 +74,97 @@ def run_local_interactives( verify_tls=False, ) ) + logger.info("Cluster configuration created") + + logger.info("Starting cluster deployment...") cluster.up() + logger.info("Cluster deployment initiated") + + logger.info("Waiting for cluster to be ready...") cluster.wait_ready() + logger.info("Cluster is ready") + logger.info("Generating TLS certificates...") generate_cert.generate_tls_cert(cluster_name, self.namespace) + logger.info("TLS certificates generated") + + logger.info("Exporting environment variables...") generate_cert.export_env(cluster_name, self.namespace) + logger.info("Environment variables exported") + + client_url = cluster.local_client_url() + logger.info(f"Ray client URL: {client_url}") - print(cluster.local_client_url()) + logger.info("Checking cluster status...") + status = cluster.status() + logger.info(f"Cluster status: {status}") + logger.info("Checking cluster dashboard URI...") + dashboard_uri = cluster.cluster_dashboard_uri() + logger.info(f"Dashboard URI: {dashboard_uri}") + + logger.info("Checking cluster URI...") + cluster_uri = cluster.cluster_uri() + logger.info(f"Cluster URI: {cluster_uri}") + + logger.info("Shutting down any existing Ray connections...") ray.shutdown() - ray.init(address=cluster.local_client_url(), logging_level="DEBUG") + logger.info("Ray shutdown completed") + + logger.info("Initializing Ray connection...") + try: + ray.init(address=client_url, logging_level="DEBUG") + logger.info("Ray initialization successful") + except Exception as e: + logger.error(f"Ray initialization failed: {str(e)}") + logger.error(f"Error type: {type(e)}") + raise + + logger.info("Defining Ray remote functions...") @ray.remote(num_gpus=number_of_gpus / 2) def heavy_calculation_part(num_iterations): + logger.info( + f"Starting heavy_calculation_part with {num_iterations} iterations" + ) result = 0.0 for i in range(num_iterations): for j in range(num_iterations): for k in range(num_iterations): result += math.sin(i) * math.cos(j) * math.tan(k) + logger.info("heavy_calculation_part completed") return result @ray.remote(num_gpus=number_of_gpus / 2) def heavy_calculation(num_iterations): + logger.info(f"Starting heavy_calculation with {num_iterations} iterations") results = ray.get( [heavy_calculation_part.remote(num_iterations // 30) for _ in range(30)] ) + logger.info("heavy_calculation completed") return sum(results) + logger.info("Submitting calculation task...") ref = heavy_calculation.remote(3000) - result = ray.get(ref) - assert result == 1789.4644387076714 - ray.cancel(ref) + logger.info("Task submitted, waiting for result...") + + try: + result = ray.get(ref) + logger.info(f"Calculation completed with result: {result}") + assert result == 1789.4644387076714 + logger.info("Result assertion passed") + except Exception as e: + logger.error(f"Error during calculation: {str(e)}") + raise + finally: + logger.info("Cancelling task reference...") + ray.cancel(ref) + logger.info("Task cancelled") + + logger.info("Shutting down Ray...") ray.shutdown() + logger.info("Ray shutdown completed") + logger.info("Tearing down cluster...") cluster.down() + logger.info("Cluster teardown completed") From 6eeb49cdc7b3937481c90cb9eba33c3dfdb79a00 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 10:51:41 +0100 Subject: [PATCH 3/9] test certificate changes --- .../common/utils/generate_cert.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/codeflare_sdk/common/utils/generate_cert.py b/src/codeflare_sdk/common/utils/generate_cert.py index 7c072da0e..3a98b4e57 100644 --- a/src/codeflare_sdk/common/utils/generate_cert.py +++ b/src/codeflare_sdk/common/utils/generate_cert.py @@ -240,12 +240,27 @@ def export_env(cluster_name, namespace): Environment Variables Set: - RAY_USE_TLS: Enables TLS for Ray. - - RAY_TLS_SERVER_CERT: Path to the TLS server certificate. - - RAY_TLS_SERVER_KEY: Path to the TLS server private key. - RAY_TLS_CA_CERT: Path to the CA certificate. """ + # Assuming logger is configured elsewhere or add basicConfig here for the module + # import logging + # logger = logging.getLogger(__name__) + # logging.basicConfig(level=logging.INFO) # Or use existing logger if available + tls_dir = os.path.join(os.getcwd(), f"tls-{cluster_name}-{namespace}") os.environ["RAY_USE_TLS"] = "1" - os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt") - os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key") + # os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt") # Client usually doesn't need to present a server cert + # os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key") # Client usually doesn't need to present a server key + if "RAY_TLS_SERVER_CERT" in os.environ: + del os.environ["RAY_TLS_SERVER_CERT"] + if "RAY_TLS_SERVER_KEY" in os.environ: + del os.environ["RAY_TLS_SERVER_KEY"] os.environ["RAY_TLS_CA_CERT"] = os.path.join(tls_dir, "ca.crt") + + # It's better to use a logger instance if this module has one, + # otherwise, these prints will go to stdout. + # For now, using print for visibility in test logs if logger isn't set up in this exact scope. + print(f"generate_cert.export_env: RAY_USE_TLS set to: {os.environ.get('RAY_USE_TLS')}") + print(f"generate_cert.export_env: RAY_TLS_CA_CERT set to: {os.environ.get('RAY_TLS_CA_CERT')}") + print(f"generate_cert.export_env: RAY_TLS_SERVER_CERT is: {os.environ.get('RAY_TLS_SERVER_CERT')}") + print(f"generate_cert.export_env: RAY_TLS_SERVER_KEY is: {os.environ.get('RAY_TLS_SERVER_KEY')}") From 440fac259e8534c9da1f1f472e3abc335c91b486 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 11:04:02 +0100 Subject: [PATCH 4/9] Revert "test certificate changes" This reverts commit 6eeb49cdc7b3937481c90cb9eba33c3dfdb79a00. --- .github/workflows/e2e_tests.yaml | 2 +- .../common/utils/generate_cert.py | 22 +++++++------------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index d100241c5..26246c67b 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -117,7 +117,7 @@ jobs: pip install poetry poetry install --with test,docs echo "Running e2e tests..." - poetry run pytest -v -s ./tests/e2e/local_interactive_sdk_kind_test.py > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + poetry run pytest -v -s --log-cli-level=INFO ./tests/e2e/local_interactive_sdk_kind_test.py > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 env: GRPC_DNS_RESOLVER: "native" diff --git a/src/codeflare_sdk/common/utils/generate_cert.py b/src/codeflare_sdk/common/utils/generate_cert.py index 3a98b4e57..a0b4f8cda 100644 --- a/src/codeflare_sdk/common/utils/generate_cert.py +++ b/src/codeflare_sdk/common/utils/generate_cert.py @@ -240,27 +240,21 @@ def export_env(cluster_name, namespace): Environment Variables Set: - RAY_USE_TLS: Enables TLS for Ray. + - RAY_TLS_SERVER_CERT: Path to the TLS server certificate. + - RAY_TLS_SERVER_KEY: Path to the TLS server private key. - RAY_TLS_CA_CERT: Path to the CA certificate. + - RAY_CLIENT_SKIP_TLS_VERIFY: Skips TLS verification by the client. """ - # Assuming logger is configured elsewhere or add basicConfig here for the module - # import logging - # logger = logging.getLogger(__name__) - # logging.basicConfig(level=logging.INFO) # Or use existing logger if available - tls_dir = os.path.join(os.getcwd(), f"tls-{cluster_name}-{namespace}") os.environ["RAY_USE_TLS"] = "1" - # os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt") # Client usually doesn't need to present a server cert - # os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key") # Client usually doesn't need to present a server key - if "RAY_TLS_SERVER_CERT" in os.environ: - del os.environ["RAY_TLS_SERVER_CERT"] - if "RAY_TLS_SERVER_KEY" in os.environ: - del os.environ["RAY_TLS_SERVER_KEY"] + os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt") + os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key") os.environ["RAY_TLS_CA_CERT"] = os.path.join(tls_dir, "ca.crt") + os.environ["RAY_CLIENT_SKIP_TLS_VERIFY"] = "1" # Skip verification for E2E - # It's better to use a logger instance if this module has one, - # otherwise, these prints will go to stdout. - # For now, using print for visibility in test logs if logger isn't set up in this exact scope. + # Optional: Add print statements here if you still want to log them for verification print(f"generate_cert.export_env: RAY_USE_TLS set to: {os.environ.get('RAY_USE_TLS')}") print(f"generate_cert.export_env: RAY_TLS_CA_CERT set to: {os.environ.get('RAY_TLS_CA_CERT')}") print(f"generate_cert.export_env: RAY_TLS_SERVER_CERT is: {os.environ.get('RAY_TLS_SERVER_CERT')}") print(f"generate_cert.export_env: RAY_TLS_SERVER_KEY is: {os.environ.get('RAY_TLS_SERVER_KEY')}") + print(f"generate_cert.export_env: RAY_CLIENT_SKIP_TLS_VERIFY is: {os.environ.get('RAY_CLIENT_SKIP_TLS_VERIFY')}") From 02bee7812ccf0ae25de1d3045bf4ad385c2df5d6 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 11:59:39 +0100 Subject: [PATCH 5/9] add gpu checker --- .github/workflows/e2e_tests.yaml | 87 ++++ codeflare-kuberay.code-workspace | 13 + tests/e2e/local_interactive_sdk_kind_test.py | 186 ++++++--- tests/e2e/support.py | 394 +++++++++++++++++-- 4 files changed, 606 insertions(+), 74 deletions(-) create mode 100644 codeflare-kuberay.code-workspace diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 26246c67b..2a12e1122 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -70,12 +70,99 @@ jobs: - name: Install NVidia GPU operator for KinD uses: ./common/github-actions/nvidia-gpu-operator + - name: Verify GPU availability in KinD + run: | + echo "Checking for available GPUs in the KinD cluster..." + + # Wait for GPU operator pods to be ready (with timeout) + echo "Waiting for GPU operator pods to be ready..." + TIMEOUT=300 # 5 minutes timeout + END=$((SECONDS + TIMEOUT)) + + while [ $SECONDS -lt $END ]; do + # Get total number of pods in the namespace + TOTAL_PODS=$(kubectl get pods -n gpu-operator --no-headers | wc -l) + + # Count pods that are either running and ready or completed successfully + # Exclude pods that are still initializing + READY_PODS=$(kubectl get pods -n gpu-operator --no-headers | grep -E 'Running|Completed' | grep -v 'PodInitializing' | wc -l) + + if [ "$READY_PODS" -eq "$TOTAL_PODS" ] && [ "$TOTAL_PODS" -gt 0 ]; then + echo "All GPU operator pods are ready or completed successfully!" + break + fi + + echo "Waiting for GPU operator pods to be ready... ($READY_PODS/$TOTAL_PODS)" + echo "Pod status:" + kubectl get pods -n gpu-operator + sleep 10 + done + + if [ $SECONDS -ge $END ]; then + echo "::error::Timeout waiting for GPU operator pods to be ready" + echo "GPU operator pod status:" + kubectl get pods -n gpu-operator -o wide + echo "GPU operator pod logs:" + kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator + echo "GPU operator pod events:" + kubectl get events -n gpu-operator + exit 1 + fi + + echo "Node details:" + kubectl describe nodes | grep -E 'nvidia.com/gpu|Allocatable:|Capacity:|Name:' + + # Check if GPU operator has labeled nodes + GPU_LABELS=$(kubectl describe nodes | grep -c "nvidia.com/gpu") + if [ "$GPU_LABELS" -eq 0 ]; then + echo "::error::No NVIDIA GPU labels found on nodes. GPU operator may not be running correctly." + echo "Full node descriptions for debugging:" + kubectl describe nodes + exit 1 + fi + + # Check if GPUs are actually allocatable + GPU_ALLOCATABLE=$(kubectl get nodes -o jsonpath='{.items[*].status.allocatable.nvidia\.com/gpu}' | tr ' ' '\n' | grep -v '^$' | wc -l) + if [ "$GPU_ALLOCATABLE" -eq 0 ]; then + echo "::error::GPU operator is running but no GPUs are allocatable. Check GPU operator logs." + echo "Checking GPU operator pods:" + kubectl get pods -n gpu-operator -o wide + echo "GPU operator pod logs:" + kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator + echo "GPU operator pod events:" + kubectl get events -n gpu-operator + echo "GPU operator pod descriptions:" + kubectl describe pods -n gpu-operator + exit 1 + fi + + echo "Successfully found $GPU_ALLOCATABLE allocatable GPU(s) in the cluster." + - name: Deploy CodeFlare stack id: deploy run: | cd codeflare-operator echo Setting up CodeFlare stack make setup-e2e + + # Create ConfigMap to disable mTLS + echo "Creating ConfigMap to disable mTLS..." + cat <= END: + logger.error("Timeout waiting for pods to be ready or discovered") + if not head_pod_name or not worker_pod_name: + logger.error( + "Could not discover head and/or worker pods by name substring. Listing all pods in namespace for debugging:" + ) + try: + all_pods_result = subprocess.run( + ["kubectl", "get", "pods", "-n", self.namespace, "-o", "wide"], + capture_output=True, + text=True, + check=False, + ) + logger.error( + f"Pods in namespace '{self.namespace}':\n{all_pods_result.stdout}" + ) + if all_pods_result.stderr: + logger.error(f"Error listing pods: {all_pods_result.stderr}") + except Exception as e_pods: + logger.error(f"Exception while trying to list all pods: {e_pods}") + + if head_pod_name: + logger.error( + f"Final head pod ({head_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, head_pod_name)}" + ) + else: + logger.error( + f"Final head pod status: Not Discovered by searching for '{cluster_name}' and 'head' in pod names." + ) + + if worker_pod_name: + logger.error( + f"Final worker pod ({worker_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, worker_pod_name)}" + ) + else: + logger.error( + f"Final worker pod status: Not Discovered by searching for '{cluster_name}' and 'worker' in pod names." + ) + raise TimeoutError( + "Pods did not become ready (or were not discovered by name substring) within the timeout period" + ) + + generate_cert.generate_tls_cert(cluster_name, self.namespace) + generate_cert.export_env(cluster_name, self.namespace) + + client_url = cluster.local_client_url() + cluster.status() logger.info("Initializing Ray connection...") try: - ray.init(address=client_url, logging_level="DEBUG") + ray.init(address=client_url, logging_level="INFO") logger.info("Ray initialization successful") except Exception as e: logger.error(f"Ray initialization failed: {str(e)}") logger.error(f"Error type: {type(e)}") raise - logger.info("Defining Ray remote functions...") - @ray.remote(num_gpus=number_of_gpus / 2) def heavy_calculation_part(num_iterations): - logger.info( - f"Starting heavy_calculation_part with {num_iterations} iterations" - ) result = 0.0 for i in range(num_iterations): for j in range(num_iterations): for k in range(num_iterations): result += math.sin(i) * math.cos(j) * math.tan(k) - logger.info("heavy_calculation_part completed") return result @ray.remote(num_gpus=number_of_gpus / 2) def heavy_calculation(num_iterations): - logger.info(f"Starting heavy_calculation with {num_iterations} iterations") results = ray.get( [heavy_calculation_part.remote(num_iterations // 30) for _ in range(30)] ) - logger.info("heavy_calculation completed") return sum(results) - logger.info("Submitting calculation task...") ref = heavy_calculation.remote(3000) - logger.info("Task submitted, waiting for result...") try: result = ray.get(ref) @@ -161,10 +254,5 @@ def heavy_calculation(num_iterations): ray.cancel(ref) logger.info("Task cancelled") - logger.info("Shutting down Ray...") ray.shutdown() - logger.info("Ray shutdown completed") - - logger.info("Tearing down cluster...") cluster.down() - logger.info("Cluster teardown completed") diff --git a/tests/e2e/support.py b/tests/e2e/support.py index d7bee8054..3df73655a 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -9,6 +9,7 @@ from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( _kube_api_error_handling, ) +import time def get_ray_cluster(cluster_name, namespace): @@ -299,31 +300,38 @@ def create_kueue_resources( def delete_kueue_resources(self): - # Delete if given cluster-queue exists - for cq in self.cluster_queues: - try: - self.custom_api.delete_cluster_custom_object( - group="kueue.x-k8s.io", - plural="clusterqueues", - version="v1beta1", - name=cq, - ) - print(f"\n'{cq}' cluster-queue deleted") - except Exception as e: - print(f"\nError deleting cluster-queue '{cq}' : {e}") - - # Delete if given resource-flavor exists - for flavor in self.resource_flavors: - try: - self.custom_api.delete_cluster_custom_object( - group="kueue.x-k8s.io", - plural="resourceflavors", - version="v1beta1", - name=flavor, - ) - print(f"'{flavor}' resource-flavor deleted") - except Exception as e: - print(f"\nError deleting resource-flavor '{flavor}': {e}") + try: + # Delete if given cluster-queue exists + for cq in getattr(self, "cluster_queues", []): + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cq, + ) + print(f"\n'{cq}' cluster-queue deleted") + except Exception as e: + print(f"\nError deleting cluster-queue '{cq}' : {e}") + + # Delete if given resource-flavor exists + for flavor in getattr(self, "resource_flavors", []): + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor, + ) + print(f"'{flavor}' resource-flavor deleted") + except Exception as e: + print(f"\nError deleting resource-flavor '{flavor}': {e}") + + # Wait for resources to be cleaned up + time.sleep(5) + except Exception as e: + print(f"Error during Kueue resource cleanup: {e}") + raise def get_pod_node(self, namespace, name): @@ -407,3 +415,339 @@ def assert_get_cluster_and_jobsubmit( assert job_list[0].submission_id == submission_id cluster.down() + + +def kubectl_get_pod_status(namespace, pod_name): + """Get the status of a pod.""" + try: + # First check if the pod exists + result = subprocess.run( + ["kubectl", "get", "pod", pod_name, "-n", namespace], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + print(f"Pod {pod_name} not found in namespace {namespace}") + print(f"kubectl error output: {result.stderr}") + # Try to get events in the namespace to see if there are any issues + events = subprocess.run( + [ + "kubectl", + "get", + "events", + "-n", + namespace, + "--sort-by='.lastTimestamp'", + ], + capture_output=True, + text=True, + check=False, + ) + if events.returncode == 0: + print(f"Recent events in namespace {namespace}:") + print(events.stdout) + return "NotFound" + + # Get the pod phase + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.phase}'", + ], + capture_output=True, + text=True, + check=True, + ) + status = result.stdout.strip("'") + + # Get pod conditions for more detailed status + conditions = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.conditions}'", + ], + capture_output=True, + text=True, + check=True, + ) + print(f"Pod {pod_name} conditions: {conditions.stdout}") + + # Get pod events for more context + events = subprocess.run( + ["kubectl", "describe", "pod", pod_name, "-n", namespace], + capture_output=True, + text=True, + check=False, + ) + if events.returncode == 0: + print(f"Pod {pod_name} details:") + print(events.stdout) + + return status + except subprocess.CalledProcessError as e: + print(f"Error getting pod status for {pod_name}: {e.stderr}") + return "Error" + + +def kubectl_get_pod_ready(namespace, pod_name): + """Check if all containers in a pod are ready.""" + try: + # Get container statuses + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses}'", + ], + capture_output=True, + text=True, + check=True, + ) + print(f"Container statuses for {pod_name}: {result.stdout}") + + # Get ready status + result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].ready}'", + ], + capture_output=True, + text=True, + check=True, + ) + statuses = result.stdout.strip("'").split() + ready = all(status == "true" for status in statuses) + + if not ready: + # Get container names and their ready status + names_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].name}'", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = names_result.stdout.strip("'").split() + + # Get container states for more detailed status + states_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state}'", + ], + capture_output=True, + text=True, + check=True, + ) + states = states_result.stdout.strip("'").split() + + # Get container reasons if not ready + reasons_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.reason}'", + ], + capture_output=True, + text=True, + check=True, + ) + reasons = reasons_result.stdout.strip("'").split() + + for name, status, state, reason in zip( + container_names, statuses, states, reasons + ): + print(f"Container {name}:") + print(f" Ready status: {status}") + print(f" State: {state}") + if reason and reason != "": + print(f" Reason: {reason}") + + return ready + except subprocess.CalledProcessError as e: + print(f"Error checking pod readiness for {pod_name}: {e.stderr}") + return False + + +def kubectl_get_pod_container_status(namespace, pod_name): + """Get detailed container status for a pod.""" + try: + # Get container names + names_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].name}'", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = names_result.stdout.strip("'").split() + + # Get container states + states_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state}'", + ], + capture_output=True, + text=True, + check=True, + ) + states = states_result.stdout.strip("'").split() + + # Get container reasons if waiting + reasons_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.reason}'", + ], + capture_output=True, + text=True, + check=True, + ) + reasons = reasons_result.stdout.strip("'").split() + + # Get container messages if waiting + messages_result = subprocess.run( + [ + "kubectl", + "get", + "pod", + pod_name, + "-n", + namespace, + "-o", + "jsonpath='{.status.containerStatuses[*].state.waiting.message}'", + ], + capture_output=True, + text=True, + check=True, + ) + messages = messages_result.stdout.strip("'").split() + + # Combine all information + status = {} + for name, state, reason, message in zip( + container_names, states, reasons, messages + ): + status[name] = { + "state": state, + "reason": reason if reason != "" else None, + "message": message if message != "" else None, + } + + return status + except subprocess.CalledProcessError as e: + print(f"Error getting container status for {pod_name}: {e.stderr}") + return "Error" + + +def kubectl_get_pod_name_by_substring(namespace, cluster_name_part, type_substring): + """Get the name of the first pod in the namespace that contains both cluster_name_part and type_substring in its name.""" + try: + command = [ + "kubectl", + "get", + "pods", + "-n", + namespace, + "-o", + "jsonpath={.items[*].metadata.name}", + ] + result = subprocess.run(command, capture_output=True, text=True, check=False) + + if result.returncode != 0: + print( + f"kubectl command failed to list pods in {namespace}. stderr: {result.stderr}" + ) + return None + + pod_names_str = result.stdout.strip().strip("'") + if not pod_names_str: + # print(f"No pods found in namespace {namespace}") # Uncomment for debugging + return None + + pod_names = pod_names_str.split() + # print(f"Pods found in namespace {namespace}: {pod_names}") # Uncomment for debugging + + for pod_name in pod_names: + # Ensure both parts are present. Using lower() for case-insensitive matching of type_substring (e.g. Head vs head) + if ( + cluster_name_part.lower() in pod_name.lower() + and type_substring.lower() in pod_name.lower() + ): + # print(f"Found matching pod: {pod_name} for cluster part '{cluster_name_part}' and type '{type_substring}'") # Uncomment for debugging + return pod_name + + # print(f"No pod found containing '{cluster_name_part}' and '{type_substring}' in namespace {namespace}") # Uncomment for debugging + return None + except subprocess.CalledProcessError as e: + print( + f"Error listing pods in namespace {namespace} to find by substring: {e.stderr}" + ) + return None + except Exception as e: + print(f"An unexpected error occurred while getting pod name by substring: {e}") + return None From b147e93959302e2d1142ff39c7db30f090d70b62 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 16:24:57 +0100 Subject: [PATCH 6/9] help me --- src/codeflare_sdk/ray/cluster/config.py | 38 +++++++ src/codeflare_sdk/ray/job_config.py | 100 +++++++++++++++++++ src/codeflare_sdk/ray/service_config.py | 55 ++++++++++ tests/e2e/local_interactive_sdk_kind_test.py | 8 +- tests/e2e/support.py | 13 --- 5 files changed, 194 insertions(+), 20 deletions(-) create mode 100644 src/codeflare_sdk/ray/job_config.py create mode 100644 src/codeflare_sdk/ray/service_config.py diff --git a/src/codeflare_sdk/ray/cluster/config.py b/src/codeflare_sdk/ray/cluster/config.py index 4f646baaa..2f954adb9 100644 --- a/src/codeflare_sdk/ray/cluster/config.py +++ b/src/codeflare_sdk/ray/cluster/config.py @@ -108,6 +108,16 @@ class ClusterConfiguration: Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"} external_storage_namespace: The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster. + worker_idle_timeout_seconds: + The idle timeout for worker nodes in seconds. + worker_num_of_hosts: + The number of hosts per worker replica for TPUs. + suspend: + A boolean indicating whether to suspend the cluster. + managed_by: + The managed by field value. + redis_username_secret: + Kubernetes secret reference containing Redis username. """ name: str @@ -134,6 +144,8 @@ class ClusterConfiguration: max_memory: Optional[Union[int, str]] = None # Deprecating num_gpus: Optional[int] = None # Deprecating worker_tolerations: Optional[List[V1Toleration]] = None + worker_idle_timeout_seconds: Optional[int] = None + worker_num_of_hosts: Optional[int] = None appwrapper: bool = False envs: Dict[str, str] = field(default_factory=dict) image: str = "" @@ -150,8 +162,11 @@ class ClusterConfiguration: annotations: Dict[str, str] = field(default_factory=dict) volumes: list[V1Volume] = field(default_factory=list) volume_mounts: list[V1VolumeMount] = field(default_factory=list) + suspend: Optional[bool] = None + managed_by: Optional[str] = None enable_gcs_ft: bool = False redis_address: Optional[str] = None + redis_username_secret: Optional[Dict[str, str]] = None redis_password_secret: Optional[Dict[str, str]] = None external_storage_namespace: Optional[str] = None @@ -181,6 +196,29 @@ def __post_init__(self): raise ValueError( "redis_password_secret must contain both 'name' and 'key' fields" ) + + if self.redis_username_secret and not isinstance( + self.redis_username_secret, dict + ): + raise ValueError( + "redis_username_secret must be a dictionary with 'name' and 'key' fields" + ) + + if self.redis_username_secret and ( + "name" not in self.redis_username_secret + or "key" not in self.redis_username_secret + ): + raise ValueError( + "redis_username_secret must contain both 'name' and 'key' fields" + ) + + if self.managed_by and self.managed_by not in [ + "ray.io/kuberay-operator", + "kueue.x-k8s.io/multikueue", + ]: + raise ValueError( + "managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'" + ) self._validate_types() self._memory_to_resource() diff --git a/src/codeflare_sdk/ray/job_config.py b/src/codeflare_sdk/ray/job_config.py new file mode 100644 index 000000000..212f08f01 --- /dev/null +++ b/src/codeflare_sdk/ray/job_config.py @@ -0,0 +1,100 @@ +""" +Defines the RayJobConfiguration dataclass for specifying KubeRay RayJob custom resources. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Union + +from codeflare_sdk.ray.cluster.config import ClusterConfiguration +import corev1_client # Placeholder for kubernetes.client.models.V1PodTemplateSpec + +# Placeholder for V1PodTemplateSpec until actual import is resolved +# from kubernetes.client.models import V1PodTemplateSpec +# For now, using a generic Dict as a placeholder +V1PodTemplateSpec = Dict[str, Any] + + +@dataclass +class RayJobConfiguration: + """ + Configuration for a KubeRay RayJob. + + Args: + name: Name of the RayJob. + namespace: Namespace for the RayJob. + entrypoint: Command to execute for the job. + runtime_env_yaml: Runtime environment configuration as a YAML string. + job_id: Optional ID for the job. Auto-generated if not set. + active_deadline_seconds: Duration in seconds the job may be active. + backoff_limit: Number of retries before marking job as failed. + deletion_policy: Policy for resource deletion on job completion. + Valid values: "DeleteCluster", "DeleteWorkers", "DeleteSelf", "DeleteNone". + submission_mode: How the Ray job is submitted to the RayCluster. + Valid values: "K8sJobMode", "HTTPMode", "InteractiveMode". + managed_by: Controller managing the RayJob (e.g., "kueue.x-k8s.io/multikueue"). + ray_cluster_spec: Specification for the RayCluster if created by this RayJob. + cluster_selector: Labels to select an existing RayCluster. + submitter_pod_template: Pod template for the job submitter (if K8sJobMode). + shutdown_after_job_finishes: Whether to delete the RayCluster after job completion. + ttl_seconds_after_finished: TTL for RayCluster cleanup after job completion. + suspend: Whether to suspend the RayJob (prevents RayCluster creation). + metadata: Metadata for the RayJob. + submitter_config_backoff_limit: BackoffLimit for the submitter Kubernetes Job. + """ + name: str + namespace: Optional[str] = None + entrypoint: str + runtime_env_yaml: Optional[str] = None + job_id: Optional[str] = None + active_deadline_seconds: Optional[int] = None + backoff_limit: int = 0 # KubeRay default is 0 + deletion_policy: Optional[str] = None # Needs validation: DeleteCluster, DeleteWorkers, DeleteSelf, DeleteNone + submission_mode: str = "K8sJobMode" # KubeRay default + managed_by: Optional[str] = None + ray_cluster_spec: Optional[ClusterConfiguration] = None + cluster_selector: Dict[str, str] = field(default_factory=dict) + submitter_pod_template: Optional[V1PodTemplateSpec] = None # Kubernetes V1PodTemplateSpec + shutdown_after_job_finishes: bool = True # Common default, KubeRay itself doesn't default this in RayJobSpec directly + ttl_seconds_after_finished: int = 0 # KubeRay default + suspend: bool = False + metadata: Dict[str, str] = field(default_factory=dict) + submitter_config_backoff_limit: Optional[int] = None + + + def __post_init__(self): + if self.deletion_policy and self.deletion_policy not in [ + "DeleteCluster", + "DeleteWorkers", + "DeleteSelf", + "DeleteNone", + ]: + raise ValueError( + "deletion_policy must be one of 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'" + ) + + if self.submission_mode not in ["K8sJobMode", "HTTPMode", "InteractiveMode"]: + raise ValueError( + "submission_mode must be one of 'K8sJobMode', 'HTTPMode', or 'InteractiveMode'" + ) + + if self.managed_by and self.managed_by not in [ + "ray.io/kuberay-operator", + "kueue.x-k8s.io/multikueue", + ]: + raise ValueError( + "managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'" + ) + + if self.ray_cluster_spec and self.cluster_selector: + raise ValueError("Only one of ray_cluster_spec or cluster_selector can be provided.") + + if not self.ray_cluster_spec and not self.cluster_selector and self.submission_mode != "InteractiveMode": + # In interactive mode, a cluster might already exist and the user connects to it. + # Otherwise, a RayJob needs either a spec to create a cluster or a selector to find one. + raise ValueError( + "Either ray_cluster_spec (to create a new cluster) or cluster_selector (to use an existing one) must be specified unless in InteractiveMode." + ) + + # TODO: Add validation for submitter_pod_template if submission_mode is K8sJobMode + # TODO: Add type validation for all fields + pass \ No newline at end of file diff --git a/src/codeflare_sdk/ray/service_config.py b/src/codeflare_sdk/ray/service_config.py new file mode 100644 index 000000000..ffea90263 --- /dev/null +++ b/src/codeflare_sdk/ray/service_config.py @@ -0,0 +1,55 @@ +""" +Defines the RayServiceConfiguration dataclass for specifying KubeRay RayService custom resources. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from codeflare_sdk.ray.cluster.config import ClusterConfiguration +import corev1_client # Placeholder for kubernetes.client.models.V1Service + +# Placeholder for V1Service until actual import is resolved +# from kubernetes.client.models import V1Service +# For now, using a generic Dict as a placeholder +V1Service = Dict[str, Any] + +@dataclass +class RayServiceConfiguration: + """ + Configuration for a KubeRay RayService. + + Args: + name: Name of the RayService. + namespace: Namespace for the RayService. + serve_config_v2: YAML string defining the applications and deployments to deploy. + ray_cluster_spec: Specification for the RayCluster underpinning the RayService. + upgrade_strategy_type: Strategy for upgrading the RayService ("NewCluster" or "None"). + serve_service: Optional Kubernetes service definition for the serve endpoints. + exclude_head_pod_from_serve_svc: If true, head pod won't be part of the K8s serve service. + metadata: Metadata for the RayService. + annotations: Annotations for the RayService. + """ + name: str + namespace: Optional[str] = None + serve_config_v2: str + ray_cluster_spec: ClusterConfiguration # A RayService always needs a RayClusterSpec + upgrade_strategy_type: Optional[str] = "NewCluster" # KubeRay default if not specified, but good to be explicit. + serve_service: Optional[V1Service] = None # Kubernetes V1Service + exclude_head_pod_from_serve_svc: bool = False + metadata: Dict[str, str] = field(default_factory=dict) + annotations: Dict[str, str] = field(default_factory=dict) + + def __post_init__(self): + if self.upgrade_strategy_type and self.upgrade_strategy_type not in [ + "NewCluster", + "None", + ]: + raise ValueError( + "upgrade_strategy_type must be one of 'NewCluster' or 'None'" + ) + + if not self.serve_config_v2: + raise ValueError("serve_config_v2 must be provided.") + + # TODO: Add type validation for all fields + pass \ No newline at end of file diff --git a/tests/e2e/local_interactive_sdk_kind_test.py b/tests/e2e/local_interactive_sdk_kind_test.py index 7de513503..bef396ce4 100644 --- a/tests/e2e/local_interactive_sdk_kind_test.py +++ b/tests/e2e/local_interactive_sdk_kind_test.py @@ -77,7 +77,6 @@ def run_local_interactives( cluster.status() logger.info("Cluster is ready") - logger.info("Waiting for head and worker pods to be fully ready...") TIMEOUT = 300 # 5 minutes timeout END = time.time() + TIMEOUT @@ -118,11 +117,6 @@ def run_local_interactives( if worker_pod_name: worker_status = kubectl_get_pod_status(self.namespace, worker_pod_name) - logger.info(f"Head pod ({head_pod_name or 'N/A'}) status: {head_status}") - logger.info( - f"Worker pod ({worker_pod_name or 'N/A'}) status: {worker_status}" - ) - if ( head_pod_name and worker_pod_name @@ -216,7 +210,7 @@ def run_local_interactives( logger.info("Initializing Ray connection...") try: - ray.init(address=client_url, logging_level="INFO") + ray.init(address=client_url, logging_level="INFO", local_mode=True) logger.info("Ray initialization successful") except Exception as e: logger.error(f"Ray initialization failed: {str(e)}") diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 3df73655a..4063a8eaf 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -483,18 +483,6 @@ def kubectl_get_pod_status(namespace, pod_name): text=True, check=True, ) - print(f"Pod {pod_name} conditions: {conditions.stdout}") - - # Get pod events for more context - events = subprocess.run( - ["kubectl", "describe", "pod", pod_name, "-n", namespace], - capture_output=True, - text=True, - check=False, - ) - if events.returncode == 0: - print(f"Pod {pod_name} details:") - print(events.stdout) return status except subprocess.CalledProcessError as e: @@ -521,7 +509,6 @@ def kubectl_get_pod_ready(namespace, pod_name): text=True, check=True, ) - print(f"Container statuses for {pod_name}: {result.stdout}") # Get ready status result = subprocess.run( From 57f958f19456f58a8d0dcad6e7bf0fd79a61f9b2 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 21:33:53 +0100 Subject: [PATCH 7/9] test port forward --- tests/e2e/local_interactive_sdk_kind_test.py | 428 ++++++++++++------- 1 file changed, 274 insertions(+), 154 deletions(-) diff --git a/tests/e2e/local_interactive_sdk_kind_test.py b/tests/e2e/local_interactive_sdk_kind_test.py index bef396ce4..e08f543ef 100644 --- a/tests/e2e/local_interactive_sdk_kind_test.py +++ b/tests/e2e/local_interactive_sdk_kind_test.py @@ -12,6 +12,7 @@ import time import os import subprocess +import signal # For explicit signal sending from support import * @@ -25,8 +26,29 @@ class TestRayLocalInteractiveOauth: def setup_method(self): initialize_kubernetes_client(self) logger.info("Kubernetes client initalized") + self.port_forward_process = None # Initialize port_forward_process def teardown_method(self): + if self.port_forward_process: + logger.info( + f"Terminating port-forward process (PID: {self.port_forward_process.pid})..." + ) + self.port_forward_process.terminate() # Send SIGTERM + try: + self.port_forward_process.wait(timeout=10) # Wait for termination + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) terminated gracefully." + ) + except subprocess.TimeoutExpired: + logger.warning( + f"Port-forward process (PID: {self.port_forward_process.pid}) did not terminate in time, killing..." + ) + self.port_forward_process.kill() # Send SIGKILL if terminate fails + self.port_forward_process.wait() # Ensure it's dead + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) killed." + ) + self.port_forward_process = None delete_namespace(self) delete_kueue_resources(self) @@ -66,187 +88,285 @@ def run_local_interactives( worker_memory_limits=4, worker_extended_resource_requests={gpu_resource_name: number_of_gpus}, write_to_file=True, - verify_tls=False, + verify_tls=False, # This is for SDK's JobSubmissionClient, not ray.init directly ) ) - cluster.up() - logger.info("Cluster deployment initiated") + try: # Wrap main logic in try-finally to ensure port-forward cleanup + cluster.up() + logger.info("Cluster deployment initiated") - cluster.wait_ready() - cluster.status() - logger.info("Cluster is ready") + cluster.wait_ready() + cluster.status() + logger.info("Cluster is ready") - TIMEOUT = 300 # 5 minutes timeout - END = time.time() + TIMEOUT + TIMEOUT = 300 # 5 minutes timeout + END = time.time() + TIMEOUT - head_pod_name = None - worker_pod_name = None + head_pod_name = None + worker_pod_name = None - while time.time() < END: - # Dynamically find pod names using substrings - if not head_pod_name: - head_pod_name = kubectl_get_pod_name_by_substring( - self.namespace, cluster_name, "head" - ) - if head_pod_name: - logger.info(f"Discovered head pod by substring: {head_pod_name}") - else: - logger.info( - f"Head pod not yet found by searching for '{cluster_name}' and 'head' in pod names. Retrying..." + while time.time() < END: + # Dynamically find pod names using substrings + if not head_pod_name: + head_pod_name = kubectl_get_pod_name_by_substring( + self.namespace, cluster_name, "head" ) + if head_pod_name: + logger.info( + f"Discovered head pod by substring: {head_pod_name}" + ) + else: + logger.info( + f"Head pod not yet found by searching for '{cluster_name}' and 'head' in pod names. Retrying..." + ) - if not worker_pod_name: - worker_pod_name = kubectl_get_pod_name_by_substring( - self.namespace, cluster_name, "worker" - ) - if worker_pod_name: - logger.info( - f"Discovered worker pod by substring: {worker_pod_name}" - ) - else: - logger.info( - f"Worker pod not yet found by searching for '{cluster_name}' and 'worker' in pod names. Retrying..." + if not worker_pod_name: + worker_pod_name = kubectl_get_pod_name_by_substring( + self.namespace, cluster_name, "worker" ) + if worker_pod_name: + logger.info( + f"Discovered worker pod by substring: {worker_pod_name}" + ) + else: + logger.info( + f"Worker pod not yet found by searching for '{cluster_name}' and 'worker' in pod names. Retrying..." + ) - head_status = "NotFound" - worker_status = "NotFound" + head_status = "NotFound" + worker_status = "NotFound" - if head_pod_name: - head_status = kubectl_get_pod_status(self.namespace, head_pod_name) - if worker_pod_name: - worker_status = kubectl_get_pod_status(self.namespace, worker_pod_name) + if head_pod_name: + head_status = kubectl_get_pod_status(self.namespace, head_pod_name) + if worker_pod_name: + worker_status = kubectl_get_pod_status( + self.namespace, worker_pod_name + ) - if ( - head_pod_name - and worker_pod_name - and "Running" in head_status - and "Running" in worker_status - ): - head_ready = kubectl_get_pod_ready(self.namespace, head_pod_name) - worker_ready = kubectl_get_pod_ready(self.namespace, worker_pod_name) + if ( + head_pod_name + and worker_pod_name + and "Running" in head_status + and "Running" in worker_status + ): + head_ready = kubectl_get_pod_ready(self.namespace, head_pod_name) + worker_ready = kubectl_get_pod_ready( + self.namespace, worker_pod_name + ) - if head_ready and worker_ready: - logger.info("All discovered pods and containers are ready!") - break + if head_ready and worker_ready: + logger.info("All discovered pods and containers are ready!") + break + else: + logger.info( + "Discovered pods are running but containers are not all ready yet..." + ) + if not head_ready and head_pod_name: + head_container_status = kubectl_get_pod_container_status( + self.namespace, head_pod_name + ) + logger.info( + f"Head pod ({head_pod_name}) container status: {head_container_status}" + ) + if not worker_ready and worker_pod_name: + worker_container_status = kubectl_get_pod_container_status( + self.namespace, worker_pod_name + ) + logger.info( + f"Worker pod ({worker_pod_name}) container status: {worker_container_status}" + ) + elif (head_pod_name and "Error" in head_status) or ( + worker_pod_name and "Error" in worker_status + ): + logger.error( + "Error getting pod status for one or more pods, retrying..." + ) else: logger.info( - "Discovered pods are running but containers are not all ready yet..." + f"Waiting for pods to be discovered and running... Current status - Head ({head_pod_name or 'N/A'}): {head_status}, Worker ({worker_pod_name or 'N/A'}): {worker_status}" ) - if not head_ready and head_pod_name: - head_container_status = kubectl_get_pod_container_status( - self.namespace, head_pod_name - ) - logger.info( - f"Head pod ({head_pod_name}) container status: {head_container_status}" + + time.sleep(10) + + if time.time() >= END: + logger.error("Timeout waiting for pods to be ready or discovered") + if not head_pod_name or not worker_pod_name: + logger.error( + "Could not discover head and/or worker pods by name substring. Listing all pods in namespace for debugging:" + ) + try: + all_pods_result = subprocess.run( + [ + "kubectl", + "get", + "pods", + "-n", + self.namespace, + "-o", + "wide", + ], + capture_output=True, + text=True, + check=False, ) - if not worker_ready and worker_pod_name: - worker_container_status = kubectl_get_pod_container_status( - self.namespace, worker_pod_name + logger.error( + f"Pods in namespace '{self.namespace}':\\n{all_pods_result.stdout}" ) - logger.info( - f"Worker pod ({worker_pod_name}) container status: {worker_container_status}" + if all_pods_result.stderr: + logger.error( + f"Error listing pods: {all_pods_result.stderr}" + ) + except Exception as e_pods: + logger.error( + f"Exception while trying to list all pods: {e_pods}" ) - elif (head_pod_name and "Error" in head_status) or ( - worker_pod_name and "Error" in worker_status - ): - logger.error( - "Error getting pod status for one or more pods, retrying..." - ) - else: - logger.info( - f"Waiting for pods to be discovered and running... Current status - Head ({head_pod_name or 'N/A'}): {head_status}, Worker ({worker_pod_name or 'N/A'}): {worker_status}" - ) - - time.sleep(10) - if time.time() >= END: - logger.error("Timeout waiting for pods to be ready or discovered") - if not head_pod_name or not worker_pod_name: - logger.error( - "Could not discover head and/or worker pods by name substring. Listing all pods in namespace for debugging:" - ) - try: - all_pods_result = subprocess.run( - ["kubectl", "get", "pods", "-n", self.namespace, "-o", "wide"], - capture_output=True, - text=True, - check=False, + if head_pod_name: + logger.error( + f"Final head pod ({head_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, head_pod_name)}" ) + else: logger.error( - f"Pods in namespace '{self.namespace}':\n{all_pods_result.stdout}" + f"Final head pod status: Not Discovered by searching for '{cluster_name}' and 'head' in pod names." ) - if all_pods_result.stderr: - logger.error(f"Error listing pods: {all_pods_result.stderr}") - except Exception as e_pods: - logger.error(f"Exception while trying to list all pods: {e_pods}") - if head_pod_name: - logger.error( - f"Final head pod ({head_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, head_pod_name)}" - ) - else: - logger.error( - f"Final head pod status: Not Discovered by searching for '{cluster_name}' and 'head' in pod names." + if worker_pod_name: + logger.error( + f"Final worker pod ({worker_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, worker_pod_name)}" + ) + else: + logger.error( + f"Final worker pod status: Not Discovered by searching for '{cluster_name}' and 'worker' in pod names." + ) + raise TimeoutError( + "Pods did not become ready (or were not discovered by name substring) within the timeout period" ) - if worker_pod_name: - logger.error( - f"Final worker pod ({worker_pod_name}) status: {kubectl_get_pod_container_status(self.namespace, worker_pod_name)}" - ) - else: - logger.error( - f"Final worker pod status: Not Discovered by searching for '{cluster_name}' and 'worker' in pod names." - ) - raise TimeoutError( - "Pods did not become ready (or were not discovered by name substring) within the timeout period" - ) + generate_cert.generate_tls_cert(cluster_name, self.namespace) + generate_cert.export_env(cluster_name, self.namespace) + + # Unset server cert/key for client mode if skip_verify is true, to avoid client trying to use them as its own identity. + if os.environ.get("RAY_CLIENT_SKIP_TLS_VERIFY") == "1": + if "RAY_TLS_SERVER_CERT" in os.environ: + del os.environ["RAY_TLS_SERVER_CERT"] + logger.info( + "Removed RAY_TLS_SERVER_CERT from env for client connection" + ) + if "RAY_TLS_SERVER_KEY" in os.environ: + del os.environ["RAY_TLS_SERVER_KEY"] + logger.info( + "Removed RAY_TLS_SERVER_KEY from env for client connection" + ) - generate_cert.generate_tls_cert(cluster_name, self.namespace) - generate_cert.export_env(cluster_name, self.namespace) - - client_url = cluster.local_client_url() - cluster.status() - - logger.info("Initializing Ray connection...") - try: - ray.init(address=client_url, logging_level="INFO", local_mode=True) - logger.info("Ray initialization successful") - except Exception as e: - logger.error(f"Ray initialization failed: {str(e)}") - logger.error(f"Error type: {type(e)}") - raise - - @ray.remote(num_gpus=number_of_gpus / 2) - def heavy_calculation_part(num_iterations): - result = 0.0 - for i in range(num_iterations): - for j in range(num_iterations): - for k in range(num_iterations): - result += math.sin(i) * math.cos(j) * math.tan(k) - return result - - @ray.remote(num_gpus=number_of_gpus / 2) - def heavy_calculation(num_iterations): - results = ray.get( - [heavy_calculation_part.remote(num_iterations // 30) for _ in range(30)] + # Start port forwarding + local_port = "20001" + ray_client_port = "10001" + head_service_name = f"{cluster_name}-head-svc" + + port_forward_cmd = [ + "kubectl", + "port-forward", + "-n", + self.namespace, + f"svc/{head_service_name}", + f"{local_port}:{ray_client_port}", + ] + logger.info(f"Starting port-forward: {' '.join(port_forward_cmd)}") + # Using preexec_fn=os.setsid to create a new session, so we can kill the whole process group later if needed. + # However, os.setsid is not available on Windows. For simplicity in a test, direct Popen is used. + # Proper cross-platform process group management can be more complex. + self.port_forward_process = subprocess.Popen( + port_forward_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - return sum(results) - - ref = heavy_calculation.remote(3000) - - try: - result = ray.get(ref) - logger.info(f"Calculation completed with result: {result}") - assert result == 1789.4644387076714 - logger.info("Result assertion passed") - except Exception as e: - logger.error(f"Error during calculation: {str(e)}") - raise - finally: - logger.info("Cancelling task reference...") - ray.cancel(ref) - logger.info("Task cancelled") + logger.info( + f"Port-forward process started with PID: {self.port_forward_process.pid}" + ) + time.sleep(5) # Give port-forward a few seconds to establish + + client_url = f"ray://localhost:{local_port}" + # client_url = cluster.local_client_url() # Original line, now replaced + cluster.status() + + logger.info(f"Attempting to connect to Ray client at: {client_url}") + logger.info("Initializing Ray connection...") + try: + ray.init( + address=client_url, logging_level="INFO" + ) # Removed local_mode=True + logger.info("Ray initialization successful") + except Exception as e: + logger.error(f"Ray initialization failed: {str(e)}") + logger.error(f"Error type: {type(e)}") + # Log port-forward stdout/stderr if connection fails + if self.port_forward_process: + stdout, stderr = self.port_forward_process.communicate( + timeout=5 + ) # attempt to read + logger.error( + f"Port-forward stdout: {stdout.decode(errors='ignore')}" + ) + logger.error( + f"Port-forward stderr: {stderr.decode(errors='ignore')}" + ) + raise + + @ray.remote(num_gpus=number_of_gpus / 2) + def heavy_calculation_part(num_iterations): + result = 0.0 + for i in range(num_iterations): + for j in range(num_iterations): + for k in range(num_iterations): + result += math.sin(i) * math.cos(j) * math.tan(k) + return result + + @ray.remote(num_gpus=number_of_gpus / 2) + def heavy_calculation(num_iterations): + results = ray.get( + [ + heavy_calculation_part.remote(num_iterations // 30) + for _ in range(30) + ] + ) + return sum(results) + + ref = heavy_calculation.remote(3000) + + try: + result = ray.get(ref) + logger.info(f"Calculation completed with result: {result}") + assert result == 1789.4644387076714 + logger.info("Result assertion passed") + except Exception as e: + logger.error(f"Error during calculation: {str(e)}") + raise + finally: + logger.info("Cancelling task reference...") + ray.cancel(ref) + logger.info("Task cancelled") + + ray.shutdown() + # Port-forward process is stopped in finally block or teardown_method - ray.shutdown() - cluster.down() + finally: + if self.port_forward_process: + logger.info( + f"Stopping port-forward process (PID: {self.port_forward_process.pid}) in finally block..." + ) + self.port_forward_process.terminate() + try: + self.port_forward_process.wait(timeout=10) + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) terminated from finally." + ) + except subprocess.TimeoutExpired: + logger.warning( + f"Port-forward process (PID: {self.port_forward_process.pid}) did not terminate in time from finally, killing..." + ) + self.port_forward_process.kill() + self.port_forward_process.wait() + logger.info( + f"Port-forward process (PID: {self.port_forward_process.pid}) killed from finally." + ) + self.port_forward_process = None + cluster.down() From 4c0d0727f621f7349d8417bef93ebca5e17a3248 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 20 May 2025 21:53:56 +0100 Subject: [PATCH 8/9] sdk user changes --- .github/workflows/e2e_tests.yaml | 30 ++++++++++++++++++++ tests/e2e/local_interactive_sdk_kind_test.py | 13 --------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 2a12e1122..bdae26db4 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -173,6 +173,36 @@ jobs: with: user-name: sdk-user + - name: Grant sdk-user port-forwarding permissions + run: | + cat < Date: Tue, 20 May 2025 22:08:28 +0100 Subject: [PATCH 9/9] there's literally no way --- .github/workflows/e2e_tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index bdae26db4..65735bf1c 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -234,7 +234,7 @@ jobs: pip install poetry poetry install --with test,docs echo "Running e2e tests..." - poetry run pytest -v -s --log-cli-level=INFO ./tests/e2e/local_interactive_sdk_kind_test.py > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 env: GRPC_DNS_RESOLVER: "native"