diff --git a/dpdispatcher/contexts/openapi_context.py b/dpdispatcher/contexts/openapi_context.py index 9251ce48..8346fcd5 100644 --- a/dpdispatcher/contexts/openapi_context.py +++ b/dpdispatcher/contexts/openapi_context.py @@ -1,18 +1,20 @@ +import glob import os import shutil import uuid +from zipfile import ZipFile import tqdm try: - from bohriumsdk.client import Client - from bohriumsdk.job import Job - from bohriumsdk.storage import Storage - from bohriumsdk.util import Util -except ModuleNotFoundError: + from bohrium import Bohrium + from bohrium.resources import Job, Tiefblue +except ModuleNotFoundError as e: found_bohriumsdk = False + import_bohrium_error = e else: found_bohriumsdk = True + import_bohrium_error = None from dpdispatcher.base_context import BaseContext from dpdispatcher.dlog import dlog @@ -23,6 +25,36 @@ ) +def unzip_file(zip_file, out_dir="./"): + obj = ZipFile(zip_file, "r") + for item in obj.namelist(): + obj.extract(item, out_dir) + + +def zip_file_list(root_path, zip_filename, file_list=[]): + out_zip_file = os.path.join(root_path, zip_filename) + # print('debug: file_list', file_list) + zip_obj = ZipFile(out_zip_file, "w") + for f in file_list: + matched_files = os.path.join(root_path, f) + for ii in glob.glob(matched_files): + # print('debug: matched_files:ii', ii) + if os.path.isdir(ii): + arcname = os.path.relpath(ii, start=root_path) + zip_obj.write(ii, arcname) + for root, dirs, files in os.walk(ii): + for file in files: + filename = os.path.join(root, file) + arcname = os.path.relpath(filename, start=root_path) + # print('debug: filename:arcname:root_path', filename, arcname, root_path) + zip_obj.write(filename, arcname) + else: + arcname = os.path.relpath(ii, start=root_path) + zip_obj.write(ii, arcname) + zip_obj.close() + return out_zip_file + + class OpenAPIContext(BaseContext): def __init__( self, @@ -35,15 +67,39 @@ def __init__( if not found_bohriumsdk: raise ModuleNotFoundError( "bohriumsdk not installed. Install dpdispatcher with `pip install dpdispatcher[bohrium]`" - ) + ) from import_bohrium_error self.init_local_root = local_root self.init_remote_root = remote_root self.temp_local_root = os.path.abspath(local_root) self.remote_profile = remote_profile - self.client = Client() - self.storage = Storage(client=self.client) + access_key = ( + remote_profile.get("access_key", None) + or os.getenv("BOHRIUM_ACCESS_KEY", None) + or os.getenv("ACCESS_KEY", None) + ) + project_id = ( + remote_profile.get("project_id", None) + or os.getenv("BOHRIUM_PROJECT_ID", None) + or os.getenv("PROJECT_ID", None) + ) + app_key = ( + remote_profile.get("app_key", None) + or os.getenv("BOHRIUM_APP_KEY", None) + or os.getenv("APP_KEY", None) + ) + if access_key is None: + raise ValueError( + "remote_profile must contain 'access_key' or set environment variable 'BOHRIUM_ACCESS_KEY'" + ) + if project_id is None: + raise ValueError( + "remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'" + ) + self.client = Bohrium( + access_key=access_key, project_id=project_id, app_key=app_key + ) + self.storage = Tiefblue() self.job = Job(client=self.client) - self.util = Util() self.jgid = None @classmethod @@ -97,7 +153,7 @@ def upload_job(self, job, common_files=None): for file in task.forward_files: upload_file_list.append(os.path.join(task.task_work_path, file)) - upload_zip = Util.zip_file_list( + upload_zip = zip_file_list( self.local_root, zip_task_file, file_list=upload_file_list ) project_id = self.remote_profile.get("project_id", 0) @@ -189,7 +245,7 @@ def download(self, submission): ): continue self.storage.download_from_url(info["resultUrl"], target_result_zip) - Util.unzip_file(target_result_zip, out_dir=self.local_root) + unzip_file(target_result_zip, out_dir=self.local_root) self._backup(self.local_root, target_result_zip) self._clean_backup( self.local_root, keep_backup=self.remote_profile.get("keep_backup", True) diff --git a/dpdispatcher/machines/openapi.py b/dpdispatcher/machines/openapi.py index 83ba165d..3073c2f9 100644 --- a/dpdispatcher/machines/openapi.py +++ b/dpdispatcher/machines/openapi.py @@ -1,14 +1,13 @@ import os import shutil import time +from zipfile import ZipFile from dpdispatcher.utils.utils import customized_script_header_template try: - from bohriumsdk.client import Client - from bohriumsdk.job import Job - from bohriumsdk.storage import Storage - from bohriumsdk.util import Util + from bohrium import Bohrium + from bohrium.resources import Job, Tiefblue except ModuleNotFoundError: found_bohriumsdk = False else: @@ -23,6 +22,12 @@ """ +def unzip_file(zip_file, out_dir="./"): + obj = ZipFile(zip_file, "r") + for item in obj.namelist(): + obj.extract(item, out_dir) + + class OpenAPI(Machine): def __init__(self, context): if not found_bohriumsdk: @@ -35,9 +40,35 @@ def __init__(self, context): self.grouped = self.remote_profile.get("grouped", True) self.retry_count = self.remote_profile.get("retry_count", 3) self.ignore_exit_code = context.remote_profile.get("ignore_exit_code", True) - self.client = Client() + + access_key = ( + self.remote_profile.get("access_key", None) + or os.getenv("BOHRIUM_ACCESS_KEY", None) + or os.getenv("ACCESS_KEY", None) + ) + project_id = ( + self.remote_profile.get("project_id", None) + or os.getenv("BOHRIUM_PROJECT_ID", None) + or os.getenv("PROJECT_ID", None) + ) + app_key = ( + self.remote_profile.get("app_key", None) + or os.getenv("BOHRIUM_APP_KEY", None) + or os.getenv("APP_KEY", None) + ) + if access_key is None: + raise ValueError( + "remote_profile must contain 'access_key' or set environment variable 'BOHRIUM_ACCESS_KEY'" + ) + if project_id is None: + raise ValueError( + "remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'" + ) + self.client = Bohrium( + access_key=access_key, project_id=project_id, app_key=app_key + ) + self.storage = Tiefblue() self.job = Job(client=self.client) - self.storage = Storage(client=self.client) self.group_id = None def gen_script(self, job): @@ -102,7 +133,6 @@ def do_submit(self, job): } if job.job_state == JobStatus.unsubmitted: openapi_params["job_id"] = job.job_id - data = self.job.insert(**openapi_params) job.job_id = data.get("jobId", 0) # type: ignore @@ -170,7 +200,7 @@ def _download_job(self, job): result_filename = job_hash + "_back.zip" target_result_zip = os.path.join(self.context.local_root, result_filename) self.storage.download_from_url(job_url, target_result_zip) - Util.unzip_file(target_result_zip, out_dir=self.context.local_root) + unzip_file(target_result_zip, out_dir=self.context.local_root) try: os.makedirs(os.path.join(self.context.local_root, "backup"), exist_ok=True) shutil.move(