Skip to content

Commit

Permalink
Refactor: Extract pipeline_test.py into a shared file for reuse acros…
Browse files Browse the repository at this point in the history
…s workflows (#2972)

* Refactor: Extract pipeline_test.py into a shared file for reuse across workflows

Signed-off-by: Your Name <[email protected]>

* fix

Signed-off-by: Your Name <[email protected]>

* fix lint

Signed-off-by: Your Name <[email protected]>

---------

Signed-off-by: Your Name <[email protected]>
  • Loading branch information
akagami-harsh authored Feb 4, 2025
1 parent 550e0b3 commit f007cf7
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 82 deletions.
32 changes: 1 addition & 31 deletions .github/workflows/pipeline_swfs_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,34 +92,4 @@ jobs:
pip3 install kfp==2.11.0
KF_PROFILE=kubeflow-user-example-com
TOKEN="$(kubectl -n $KF_PROFILE create token default-editor)"
python -c '
from time import sleep
import kfp
import sys
token = sys.argv[1]
namespace = sys.argv[2]
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
pipeline = client.list_pipelines().pipelines[0]
pipeline_name = pipeline.display_name
pipeline_id = pipeline.pipeline_id
pipeline_version_id = client.list_pipeline_versions(pipeline_id).pipeline_versions[0].pipeline_version_id
experiment_id = client.create_experiment("seaweedfs-test", namespace=namespace).experiment_id
print(f"Starting pipeline {pipeline_name}.")
run_id = client.run_pipeline(experiment_id=experiment_id, job_name="m2m-test", pipeline_id=pipeline_id, version_id=pipeline_version_id).run_id
while True:
status = client.get_run(run_id=run_id).state
if status in ["PENDING", "RUNNING"]:
print(f"Waiting for run_id: {run_id}, status: {status}.")
sleep(10)
else:
print(f"Run with id {run_id} finished with status: {status}.")
if status != "SUCCEEDED":
print("Pipeline failed")
raise SystemExit(1)
break
' "${TOKEN}" "${KF_PROFILE}"
python3 tests/gh-actions/pipeline_test.py run_pipeline "${TOKEN}" "${KF_PROFILE}"
55 changes: 5 additions & 50 deletions .github/workflows/pipeline_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ on:
- tests/gh-actions/install_oauth2-proxy.sh
- common/cert-manager/**
- common/oauth2-proxy/**
- common/istio*/**
- common/istio/**
- tests/gh-actions/pipeline_test.py

jobs:
build:
Expand Down Expand Up @@ -85,58 +86,12 @@ jobs:
pip3 install kfp==2.11.0
KF_PROFILE=kubeflow-user-example-com
TOKEN="$(kubectl -n $KF_PROFILE create token default-editor)"
python -c '
from time import sleep
import kfp
import sys
token = sys.argv[1]
namespace = sys.argv[2]
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
pipeline = client.list_pipelines().pipelines[0]
pipeline_name = pipeline.display_name
pipeline_id = pipeline.pipeline_id
pipeline_version_id = client.list_pipeline_versions(pipeline_id).pipeline_versions[0].pipeline_version_id
experiment_id = client.create_experiment("m2m-test", namespace=namespace).experiment_id
print(f"Starting pipeline {pipeline_name}.")
run_id = client.run_pipeline(experiment_id=experiment_id, job_name="m2m-test", pipeline_id=pipeline_id, version_id=pipeline_version_id).run_id
while True:
status = client.get_run(run_id=run_id).state
if status in ["PENDING", "RUNNING"]:
print(f"Waiting for run_id: {run_id}, status: {status}.")
sleep(10)
else:
print(f"Run with id {run_id} finished with status: {status}.")
if status != "SUCCEEDED":
print("Pipeline failed")
raise SystemExit(1)
break
' "${TOKEN}" "${KF_PROFILE}"
python3 tests/gh-actions/pipeline_test.py run_pipeline "${TOKEN}" "${KF_PROFILE}"
- name: Fail to list pipelines with unauthorized ServiceAccount Token
run: |
pip3 install kfp==2.11.0
KF_PROFILE=kubeflow-user-example-com
TOKEN="$(kubectl -n default create token default)"
python -c '
import kfp
import sys
from kfp_server_api.exceptions import ApiException
token = sys.argv[1]
namespace = sys.argv[2]
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
try:
pipeline = client.list_runs(namespace=namespace)
except ApiException as e:
assert e.status == 403, "This API Call should return unauthorized/forbidden error."
' "${TOKEN}" "${KF_PROFILE}"
echo "Test succeeded. Token from unauthorized ServiceAccount cannot list \
piplines in $KF_PROFILE namespace."
python3 tests/gh-actions/pipeline_test.py test_unauthorized_access "${TOKEN}" "${KF_PROFILE}"
echo "Test succeeded. Token from unauthorized ServiceAccount cannot list pipelines in $KF_PROFILE namespace."
4 changes: 3 additions & 1 deletion .yamllint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ rules:
indentation:
indent-sequences: false
line-length:
max: 400
max: 400
truthy:
allowed-values: ['on', 'off']
62 changes: 62 additions & 0 deletions tests/gh-actions/pipeline_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from time import sleep
import kfp
import sys
from kfp_server_api.exceptions import ApiException


def run_pipeline(token, namespace):
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)

pipeline = client.list_pipelines().pipelines[0]
pipeline_name = pipeline.display_name
pipeline_id = pipeline.pipeline_id
pipeline_version_id = (
client.list_pipeline_versions(pipeline_id)
.pipeline_versions[0]
.pipeline_version_id
)
experiment_id = client.create_experiment(
"m2m-test", namespace=namespace
).experiment_id

print(f"Starting pipeline {pipeline_name}.")
run_id = client.run_pipeline(
experiment_id=experiment_id,
job_name="m2m-test",
pipeline_id=pipeline_id,
version_id=pipeline_version_id,
).run_id

while True:
status = client.get_run(run_id=run_id).state
if status in ["PENDING", "RUNNING"]:
print(f"Waiting for run_id: {run_id}, status: {status}.")
sleep(10)
else:
print(f"Run with id {run_id} finished with status: {status}.")
if status != "SUCCEEDED":
print("Pipeline failed")
raise SystemExit(1)
break


def test_unauthorized_access(token, namespace):
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)

try:
pipeline = client.list_runs(namespace=namespace)
except ApiException as e:
assert (
e.status == 403
), "This API Call should return unauthorized/forbidden error."


if __name__ == "__main__":
action = sys.argv[1]
token = sys.argv[2]
namespace = sys.argv[3]

if action == "run_pipeline":
run_pipeline(token, namespace)
elif action == "test_unauthorized_access":
test_unauthorized_access(token, namespace)

0 comments on commit f007cf7

Please sign in to comment.