@@ -21,6 +21,7 @@ source ./tests/integration/utils/azure-utils.sh
21
21
22
22
# Global Variables
23
23
NAMESPACE=tests
24
+ SERVICE_ACCOUNT=spark
24
25
ADMIN_POD_NAME=testpod-admin
25
26
S3_BUCKET=spark-$( uuidgen)
26
27
AZURE_CONTAINER=$S3_BUCKET
@@ -154,153 +155,95 @@ run_example_job_in_pod() {
154
155
validate_pi_value $pi
155
156
}
156
157
158
+ setup_s3_properties_in_pod (){
159
+ # Setup S3 related Spark properties in the service account inside the pod
157
160
158
- test_iceberg_example_in_pod (){
159
- # Test Iceberg integration in Charmed Spark Rock
160
-
161
- # First create S3 bucket named 'spark'
162
- create_s3_bucket $S3_BUCKET
163
-
164
- # Copy 'test-iceberg.py' script to 'spark' bucket
165
- copy_file_to_s3_bucket $S3_BUCKET ./tests/integration/resources/test-iceberg.py
166
-
167
- NAMESPACE=" tests"
168
- USERNAME=" spark"
169
-
170
- # Number of rows that are to be inserted during the test.
171
- NUM_ROWS_TO_INSERT=" 4"
172
-
173
- # Number of driver pods that exist in the namespace already.
174
- PREVIOUS_DRIVER_PODS_COUNT=$( kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | wc -l)
175
-
176
- # Submit the job from inside 'testpod'
177
161
kubectl -n $NAMESPACE exec testpod -- \
178
162
env \
179
- UU=" $USERNAME " \
163
+ UU=" $SERVICE_ACCOUNT " \
180
164
NN=" $NAMESPACE " \
181
- IM=" $( spark_image) " \
182
- NUM_ROWS=" $NUM_ROWS_TO_INSERT " \
183
165
ACCESS_KEY=" $( get_s3_access_key) " \
184
166
SECRET_KEY=" $( get_s3_secret_key) " \
185
167
S3_ENDPOINT=" $( get_s3_endpoint) " \
186
168
BUCKET=" $S3_BUCKET " \
187
169
/bin/bash -c ' \
188
- spark-client.spark-submit \
170
+ spark-client.service-account-registry add-config \
189
171
--username $UU --namespace $NN \
190
- --conf spark.kubernetes.driver.request.cores=100m \
191
- --conf spark.kubernetes.executor.request.cores=100m \
192
- --conf spark.kubernetes.container.image=$IM \
193
172
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
194
173
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
195
174
--conf spark.hadoop.fs.s3a.path.style.access=true \
196
175
--conf spark.hadoop.fs.s3a.endpoint=$S3_ENDPOINT \
197
176
--conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY \
198
177
--conf spark.hadoop.fs.s3a.secret.key=$SECRET_KEY \
199
- --conf spark.jars.ivy=/tmp \
200
- --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
201
- --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
202
- --conf spark.sql.catalog.spark_catalog.type=hive \
203
- --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
204
- --conf spark.sql.catalog.local.type=hadoop \
205
- --conf spark.sql.catalog.local.warehouse=s3a://$BUCKET/warehouse \
206
- --conf spark.sql.defaultCatalog=local \
207
- s3a://$BUCKET/test-iceberg.py -n $NUM_ROWS'
208
-
209
- # Delete 'spark' bucket
210
- delete_s3_bucket $S3_BUCKET
211
-
212
- # Number of driver pods after the job is completed.
213
- DRIVER_PODS_COUNT=$( kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | wc -l)
214
-
215
- # If the number of driver pods is same as before, job has not been run at all!
216
- if [[ " ${PREVIOUS_DRIVER_PODS_COUNT} " == " ${DRIVER_PODS_COUNT} " ]]
217
- then
218
- echo " ERROR: Sample job has not run!"
219
- exit 1
220
- fi
221
-
222
- # Find the ID of the driver pod that ran the job.
223
- # tail -n 1 => Filter out the last line
224
- # cut -d' ' -f1 => Split by spaces and pick the first part
225
- DRIVER_POD_ID=$( kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep test-iceberg-.* -driver | tail -n 1 | cut -d' ' -f1)
226
-
227
- # Filter out the output log line
228
- OUTPUT_LOG_LINE=$( kubectl logs ${DRIVER_POD_ID} -n ${NAMESPACE} | grep ' Number of rows inserted:' )
178
+ --conf spark.sql.catalog.local.warehouse=s3a://$BUCKET/warehouse'
179
+ }
229
180
230
- # Fetch out the number of rows inserted
231
- # rev => Reverse the string
232
- # cut -d' ' -f1 => Split by spaces and pick the first part
233
- # rev => Reverse the string back
234
- NUM_ROWS_INSERTED=$( echo $OUTPUT_LOG_LINE | rev | cut -d' ' -f1 | rev)
181
+ setup_azure_storage_properties_in_pod (){
182
+ # Setup Azure Storage related Spark properties in the service account inside the pod
235
183
236
- if [ " ${NUM_ROWS_INSERTED} " != " ${NUM_ROWS_TO_INSERT} " ]; then
237
- echo " ERROR: ${NUM_ROWS_TO_INSERT} were supposed to be inserted. Found ${NUM_ROWS_INSERTED} rows. Aborting with exit code 1."
238
- exit 1
239
- fi
184
+ warehouse_path=$( construct_resource_uri $AZURE_CONTAINER warehouse abfss)
240
185
186
+ kubectl -n $NAMESPACE exec testpod -- \
187
+ env \
188
+ UU=" $SERVICE_ACCOUNT " \
189
+ NN=" $NAMESPACE " \
190
+ ACCOUNT_NAME=" $( get_azure_storage_account_name) " \
191
+ SECRET_KEY=" $( get_azure_storage_secret_key) " \
192
+ WAREHOUSE=" $warehouse_path " \
193
+ /bin/bash -c ' \
194
+ spark-client.service-account-registry add-config \
195
+ --username $UU --namespace $NN \
196
+ --conf spark.hadoop.fs.azure.account.key.$ACCOUNT_NAME.dfs.core.windows.net=$SECRET_KEY \
197
+ --conf spark.sql.catalog.local.warehouse=$WAREHOUSE'
241
198
}
242
199
243
200
244
- test_iceberg_example_in_pod_with_azure_using_abfss (){
245
- # Test Iceberg integration in Charmed Spark Rock with Azure Storage
246
-
247
- # First create S3 bucket named 'spark'
248
- create_azure_container $AZURE_CONTAINER
249
-
250
- # Copy 'test-iceberg.py' script to 'spark' bucket
251
- copy_file_to_azure_container $AZURE_CONTAINER ./tests/integration/resources/test-iceberg.py
201
+ test_iceberg_example_in_pod (){
202
+ # Test Iceberg integration in Charmed Spark Rock
203
+ #
204
+ # Arguments:
205
+ # $1: The path of the script in the cloud
206
+ echo $0 $1
252
207
253
- STORAGE_ACCOUNT_NAME=$( get_storage_account)
254
- STORAGE_ACCOUNT_KEY=$( get_azure_secret_key)
255
- USERNAME=" spark"
256
208
257
209
# Number of rows that are to be inserted during the test.
258
210
NUM_ROWS_TO_INSERT=" 4"
211
+ script_path=$1
259
212
260
213
# Number of driver pods that exist in the namespace already.
261
214
PREVIOUS_DRIVER_PODS_COUNT=$( kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | wc -l)
262
215
263
- iceberg_script=$( construct_resource_uri $AZURE_CONTAINER test-iceberg.py abfss)
264
- warehouse_path=$( construct_resource_uri $AZURE_CONTAINER warehouse abfss)
265
216
# Submit the job from inside 'testpod'
266
217
kubectl -n $NAMESPACE exec testpod -- \
267
218
env \
268
219
UU=" $USERNAME " \
269
220
NN=" $NAMESPACE " \
270
221
IM=" $( spark_image) " \
271
222
NUM_ROWS=" $NUM_ROWS_TO_INSERT " \
272
- ACCOUNT_NAME=" $STORAGE_ACCOUNT_NAME " \
273
- SECRET_KEY=" $STORAGE_ACCOUNT_KEY " \
274
- SCRIPT=" $iceberg_script " \
275
- WAREHOUSE=" $warehouse_path " \
223
+ SCRIPT=" $script_path " \
276
224
/bin/bash -c ' \
277
225
spark-client.spark-submit \
278
226
--username $UU --namespace $NN \
279
227
--conf spark.kubernetes.driver.request.cores=100m \
280
228
--conf spark.kubernetes.executor.request.cores=100m \
281
229
--conf spark.kubernetes.container.image=$IM \
282
- --conf spark.hadoop.fs.azure.account.key.$ACCOUNT_NAME.dfs.core.windows.net=$SECRET_KEY \
283
230
--conf spark.jars.ivy=/tmp \
284
231
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
285
232
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
286
233
--conf spark.sql.catalog.spark_catalog.type=hive \
287
234
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
288
235
--conf spark.sql.catalog.local.type=hadoop \
289
- --conf spark.sql.catalog.local.warehouse=$WAREHOUSE \
290
236
--conf spark.sql.defaultCatalog=local \
291
237
$SCRIPT -n $NUM_ROWS'
292
238
293
- # Delete 'spark' bucket
294
- delete_azure_container $AZURE_CONTAINER
295
-
296
239
# Number of driver pods after the job is completed.
297
240
DRIVER_PODS_COUNT=$( kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | wc -l)
298
241
299
242
# If the number of driver pods is same as before, job has not been run at all!
300
243
if [[ " ${PREVIOUS_DRIVER_PODS_COUNT} " == " ${DRIVER_PODS_COUNT} " ]]
301
244
then
302
245
echo " ERROR: Sample job has not run!"
303
- exit 1
246
+ return 1
304
247
fi
305
248
306
249
# Find the ID of the driver pod that ran the job.
@@ -319,9 +262,58 @@ test_iceberg_example_in_pod_with_azure_using_abfss(){
319
262
320
263
if [ " ${NUM_ROWS_INSERTED} " != " ${NUM_ROWS_TO_INSERT} " ]; then
321
264
echo " ERROR: ${NUM_ROWS_TO_INSERT} were supposed to be inserted. Found ${NUM_ROWS_INSERTED} rows. Aborting with exit code 1."
322
- exit 1
265
+ return 1
323
266
fi
324
267
268
+ return 0
269
+ }
270
+
271
+
272
+ test_iceberg_example_in_pod_using_s3 (){
273
+ # Test Iceberg integration in Charmed Spark Rock using S3
274
+
275
+ # First create S3 bucket named 'spark'
276
+ create_s3_bucket $S3_BUCKET
277
+
278
+ # Now, setup S3 properties in service account inside the pod
279
+ setup_s3_properties_in_pod
280
+
281
+ # Copy 'test-iceberg.py' script to 'spark' bucket
282
+ copy_file_to_s3_bucket $S3_BUCKET ./tests/integration/resources/test-iceberg.py
283
+ script_path=" s3a://$S3_BUCKET /test-iceberg.py"
284
+
285
+ test_iceberg_example_in_pod $script_path
286
+ return_value=$?
287
+
288
+ delete_s3_bucket $S3_BUCKET
289
+
290
+ if [ $return_value -eq 1 ]; then
291
+ exit 1
292
+ fi
293
+ }
294
+
295
+
296
+ test_iceberg_example_in_pod_using_abfss (){
297
+ # Test Iceberg integration in Charmed Spark Rock with Azure Storage
298
+
299
+ # First create S3 bucket named 'spark'
300
+ create_azure_container $AZURE_CONTAINER
301
+
302
+ # Now, setup S3 properties in service account inside the pod
303
+ setup_azure_storage_properties_in_pod
304
+
305
+ # Copy 'test-iceberg.py' script to 'spark' bucket
306
+ copy_file_to_azure_container $AZURE_CONTAINER ./tests/integration/resources/test-iceberg.py
307
+ script_path=$( construct_resource_uri $AZURE_CONTAINER test-iceberg.py abfss)
308
+
309
+ test_iceberg_example_in_pod $script_path
310
+ return_value=$?
311
+
312
+ delete_azure_container $AZURE_CONTAINER
313
+
314
+ if [ $return_value -eq 1 ]; then
315
+ exit 1
316
+ fi
325
317
}
326
318
327
319
@@ -642,13 +634,13 @@ echo -e "##################################"
642
634
echo -e " RUN EXAMPLE THAT USES ICEBERG LIBRARIES"
643
635
echo -e " ##################################"
644
636
645
- (setup_user_context && test_iceberg_example_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod
637
+ (setup_user_context && test_iceberg_example_in_pod_using_s3 && cleanup_user_success) || cleanup_user_failure_in_pod
646
638
647
639
echo -e " ##################################"
648
640
echo -e " RUN EXAMPLE THAT USES AZURE STORAGE"
649
641
echo -e " ##################################"
650
642
651
- (setup_user_context && test_iceberg_example_in_pod_with_azure_using_abfss && cleanup_user_success) || cleanup_user_failure_in_pod
643
+ (setup_user_context && test_iceberg_example_in_pod_using_abfss && cleanup_user_success) || cleanup_user_failure_in_pod
652
644
653
645
echo -e " ##################################"
654
646
echo -e " TEARDOWN TEST POD"
0 commit comments