11
11
import logging
12
12
import time
13
13
import os
14
+ import subprocess
14
15
15
16
from support import *
16
17
22
23
@pytest .mark .kind
23
24
class TestRayLocalInteractiveOauth :
24
25
def setup_method (self ):
25
- logger .info ("Setting up test environment..." )
26
26
initialize_kubernetes_client (self )
27
- logger .info ("Kubernetes client initialized " )
27
+ logger .info ("Kubernetes client initalized " )
28
28
29
29
def teardown_method (self ):
30
- logger .info ("Cleaning up test environment..." )
31
30
delete_namespace (self )
32
31
delete_kueue_resources (self )
33
- logger .info ("Cleanup completed" )
34
32
35
33
def test_local_interactives (self ):
36
- logger .info ("Starting test_local_interactives..." )
37
34
self .setup_method ()
38
35
create_namespace (self )
39
36
create_kueue_resources (self )
40
37
self .run_local_interactives ()
41
- logger .info ("test_local_interactives completed" )
42
38
43
39
@pytest .mark .nvidia_gpu
44
40
def test_local_interactives_nvidia_gpu (self ):
45
- logger .info ("Starting test_local_interactives_nvidia_gpu..." )
46
41
self .setup_method ()
47
42
create_namespace (self )
48
43
create_kueue_resources (self )
49
44
self .run_local_interactives (number_of_gpus = 1 )
50
- logger .info ("test_local_interactives_nvidia_gpu completed" )
51
45
52
46
def run_local_interactives (
53
47
self , gpu_resource_name = "nvidia.com/gpu" , number_of_gpus = 0
54
48
):
55
49
cluster_name = "test-ray-cluster-li"
56
50
logger .info (f"Starting run_local_interactives with { number_of_gpus } GPUs" )
57
51
58
- logger .info ("Creating cluster configuration..." )
52
+ ray .shutdown ()
53
+
59
54
cluster = Cluster (
60
55
ClusterConfiguration (
61
56
name = cluster_name ,
@@ -66,87 +61,185 @@ def run_local_interactives(
66
61
head_memory_requests = 2 ,
67
62
head_memory_limits = 2 ,
68
63
worker_cpu_requests = "500m" ,
69
- worker_cpu_limits = 1 ,
64
+ worker_cpu_limits = "500m" ,
70
65
worker_memory_requests = 1 ,
71
66
worker_memory_limits = 4 ,
72
67
worker_extended_resource_requests = {gpu_resource_name : number_of_gpus },
73
68
write_to_file = True ,
74
69
verify_tls = False ,
75
70
)
76
71
)
77
- logger .info ("Cluster configuration created" )
78
72
79
- logger .info ("Starting cluster deployment..." )
80
73
cluster .up ()
81
74
logger .info ("Cluster deployment initiated" )
82
75
83
- logger .info ("Waiting for cluster to be ready..." )
84
76
cluster .wait_ready ()
77
+ cluster .status ()
85
78
logger .info ("Cluster is ready" )
86
79
87
- logger .info ("Generating TLS certificates ..." )
88
- generate_cert . generate_tls_cert ( cluster_name , self . namespace )
89
- logger . info ( "TLS certificates generated" )
80
+ logger .info ("Waiting for head and worker pods to be fully ready ..." )
81
+ TIMEOUT = 300 # 5 minutes timeout
82
+ END = time . time () + TIMEOUT
90
83
91
- logger .info ("Exporting environment variables..." )
92
- generate_cert .export_env (cluster_name , self .namespace )
93
- logger .info ("Environment variables exported" )
84
+ head_pod_name = None
85
+ worker_pod_name = None
94
86
95
- client_url = cluster .local_client_url ()
96
- logger .info (f"Ray client URL: { client_url } " )
87
+ while time .time () < END :
88
+ # Dynamically find pod names using substrings
89
+ if not head_pod_name :
90
+ head_pod_name = kubectl_get_pod_name_by_substring (
91
+ self .namespace , cluster_name , "head"
92
+ )
93
+ if head_pod_name :
94
+ logger .info (f"Discovered head pod by substring: { head_pod_name } " )
95
+ else :
96
+ logger .info (
97
+ f"Head pod not yet found by searching for '{ cluster_name } ' and 'head' in pod names. Retrying..."
98
+ )
97
99
98
- logger .info ("Checking cluster status..." )
99
- status = cluster .status ()
100
- logger .info (f"Cluster status: { status } " )
100
+ if not worker_pod_name :
101
+ worker_pod_name = kubectl_get_pod_name_by_substring (
102
+ self .namespace , cluster_name , "worker"
103
+ )
104
+ if worker_pod_name :
105
+ logger .info (
106
+ f"Discovered worker pod by substring: { worker_pod_name } "
107
+ )
108
+ else :
109
+ logger .info (
110
+ f"Worker pod not yet found by searching for '{ cluster_name } ' and 'worker' in pod names. Retrying..."
111
+ )
101
112
102
- logger .info ("Checking cluster dashboard URI..." )
103
- dashboard_uri = cluster .cluster_dashboard_uri ()
104
- logger .info (f"Dashboard URI: { dashboard_uri } " )
113
+ head_status = "NotFound"
114
+ worker_status = "NotFound"
105
115
106
- logger .info ("Checking cluster URI..." )
107
- cluster_uri = cluster .cluster_uri ()
108
- logger .info (f"Cluster URI: { cluster_uri } " )
116
+ if head_pod_name :
117
+ head_status = kubectl_get_pod_status (self .namespace , head_pod_name )
118
+ if worker_pod_name :
119
+ worker_status = kubectl_get_pod_status (self .namespace , worker_pod_name )
109
120
110
- logger .info ("Shutting down any existing Ray connections..." )
111
- ray .shutdown ()
112
- logger .info ("Ray shutdown completed" )
121
+ logger .info (f"Head pod ({ head_pod_name or 'N/A' } ) status: { head_status } " )
122
+ logger .info (
123
+ f"Worker pod ({ worker_pod_name or 'N/A' } ) status: { worker_status } "
124
+ )
125
+
126
+ if (
127
+ head_pod_name
128
+ and worker_pod_name
129
+ and "Running" in head_status
130
+ and "Running" in worker_status
131
+ ):
132
+ head_ready = kubectl_get_pod_ready (self .namespace , head_pod_name )
133
+ worker_ready = kubectl_get_pod_ready (self .namespace , worker_pod_name )
134
+
135
+ if head_ready and worker_ready :
136
+ logger .info ("All discovered pods and containers are ready!" )
137
+ break
138
+ else :
139
+ logger .info (
140
+ "Discovered pods are running but containers are not all ready yet..."
141
+ )
142
+ if not head_ready and head_pod_name :
143
+ head_container_status = kubectl_get_pod_container_status (
144
+ self .namespace , head_pod_name
145
+ )
146
+ logger .info (
147
+ f"Head pod ({ head_pod_name } ) container status: { head_container_status } "
148
+ )
149
+ if not worker_ready and worker_pod_name :
150
+ worker_container_status = kubectl_get_pod_container_status (
151
+ self .namespace , worker_pod_name
152
+ )
153
+ logger .info (
154
+ f"Worker pod ({ worker_pod_name } ) container status: { worker_container_status } "
155
+ )
156
+ elif (head_pod_name and "Error" in head_status ) or (
157
+ worker_pod_name and "Error" in worker_status
158
+ ):
159
+ logger .error (
160
+ "Error getting pod status for one or more pods, retrying..."
161
+ )
162
+ else :
163
+ logger .info (
164
+ f"Waiting for pods to be discovered and running... Current status - Head ({ head_pod_name or 'N/A' } ): { head_status } , Worker ({ worker_pod_name or 'N/A' } ): { worker_status } "
165
+ )
166
+
167
+ time .sleep (10 )
168
+
169
+ if time .time () >= END :
170
+ logger .error ("Timeout waiting for pods to be ready or discovered" )
171
+ if not head_pod_name or not worker_pod_name :
172
+ logger .error (
173
+ "Could not discover head and/or worker pods by name substring. Listing all pods in namespace for debugging:"
174
+ )
175
+ try :
176
+ all_pods_result = subprocess .run (
177
+ ["kubectl" , "get" , "pods" , "-n" , self .namespace , "-o" , "wide" ],
178
+ capture_output = True ,
179
+ text = True ,
180
+ check = False ,
181
+ )
182
+ logger .error (
183
+ f"Pods in namespace '{ self .namespace } ':\n { all_pods_result .stdout } "
184
+ )
185
+ if all_pods_result .stderr :
186
+ logger .error (f"Error listing pods: { all_pods_result .stderr } " )
187
+ except Exception as e_pods :
188
+ logger .error (f"Exception while trying to list all pods: { e_pods } " )
189
+
190
+ if head_pod_name :
191
+ logger .error (
192
+ f"Final head pod ({ head_pod_name } ) status: { kubectl_get_pod_container_status (self .namespace , head_pod_name )} "
193
+ )
194
+ else :
195
+ logger .error (
196
+ f"Final head pod status: Not Discovered by searching for '{ cluster_name } ' and 'head' in pod names."
197
+ )
198
+
199
+ if worker_pod_name :
200
+ logger .error (
201
+ f"Final worker pod ({ worker_pod_name } ) status: { kubectl_get_pod_container_status (self .namespace , worker_pod_name )} "
202
+ )
203
+ else :
204
+ logger .error (
205
+ f"Final worker pod status: Not Discovered by searching for '{ cluster_name } ' and 'worker' in pod names."
206
+ )
207
+ raise TimeoutError (
208
+ "Pods did not become ready (or were not discovered by name substring) within the timeout period"
209
+ )
210
+
211
+ generate_cert .generate_tls_cert (cluster_name , self .namespace )
212
+ generate_cert .export_env (cluster_name , self .namespace )
213
+
214
+ client_url = cluster .local_client_url ()
215
+ cluster .status ()
113
216
114
217
logger .info ("Initializing Ray connection..." )
115
218
try :
116
- ray .init (address = client_url , logging_level = "DEBUG " )
219
+ ray .init (address = client_url , logging_level = "INFO " )
117
220
logger .info ("Ray initialization successful" )
118
221
except Exception as e :
119
222
logger .error (f"Ray initialization failed: { str (e )} " )
120
223
logger .error (f"Error type: { type (e )} " )
121
224
raise
122
225
123
- logger .info ("Defining Ray remote functions..." )
124
-
125
226
@ray .remote (num_gpus = number_of_gpus / 2 )
126
227
def heavy_calculation_part (num_iterations ):
127
- logger .info (
128
- f"Starting heavy_calculation_part with { num_iterations } iterations"
129
- )
130
228
result = 0.0
131
229
for i in range (num_iterations ):
132
230
for j in range (num_iterations ):
133
231
for k in range (num_iterations ):
134
232
result += math .sin (i ) * math .cos (j ) * math .tan (k )
135
- logger .info ("heavy_calculation_part completed" )
136
233
return result
137
234
138
235
@ray .remote (num_gpus = number_of_gpus / 2 )
139
236
def heavy_calculation (num_iterations ):
140
- logger .info (f"Starting heavy_calculation with { num_iterations } iterations" )
141
237
results = ray .get (
142
238
[heavy_calculation_part .remote (num_iterations // 30 ) for _ in range (30 )]
143
239
)
144
- logger .info ("heavy_calculation completed" )
145
240
return sum (results )
146
241
147
- logger .info ("Submitting calculation task..." )
148
242
ref = heavy_calculation .remote (3000 )
149
- logger .info ("Task submitted, waiting for result..." )
150
243
151
244
try :
152
245
result = ray .get (ref )
@@ -161,10 +254,5 @@ def heavy_calculation(num_iterations):
161
254
ray .cancel (ref )
162
255
logger .info ("Task cancelled" )
163
256
164
- logger .info ("Shutting down Ray..." )
165
257
ray .shutdown ()
166
- logger .info ("Ray shutdown completed" )
167
-
168
- logger .info ("Tearing down cluster..." )
169
258
cluster .down ()
170
- logger .info ("Cluster teardown completed" )
0 commit comments