diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index 1736d5b2..0511caeb 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -270,6 +270,8 @@ def run_job( ): result_metadata = {} + # while creating the stac metadata, hrefs are temporary local paths. + stac_file_paths = [] try: # We actually expect type Path, but in reality paths as strings tend to # slip in anyway, so we better catch them and convert them. @@ -362,7 +364,7 @@ def run_job( ml_model_metadata=ml_model_metadata, ) # perform a first metadata write _before_ actually computing the result. This provides a bit more info, even if the job fails. - write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir) + write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file) for result in results: result.options["batch_mode"] = True @@ -464,7 +466,7 @@ def run_job( ml_model_metadata=ml_model_metadata, ) - write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir) + write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file) logger.debug("Starting GDAL-based retrieval of asset metadata") result_metadata = _assemble_result_metadata( tracer=tracer, @@ -483,7 +485,7 @@ def run_job( assert len(results) == len(assets_metadata) for result, result_assets_metadata in zip(results, assets_metadata): - _export_to_workspaces( + stac_file_paths += _export_to_workspaces( result, result_metadata, result_assets_metadata=result_assets_metadata, @@ -491,10 +493,10 @@ def run_job( remove_exported_assets=job_options.get("remove-exported-assets", False), ) finally: - write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir) + write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, stac_file_paths) -def write_metadata(metadata, metadata_file, job_dir: Path): +def write_metadata(metadata, metadata_file, stac_file_paths: List[Union[str, Path]] = None): with open(metadata_file, 'w') as f: json.dump(metadata, f, default=json_default) add_permissions(metadata_file, stat.S_IWGRP) @@ -508,16 +510,12 @@ def write_metadata(metadata, metadata_file, job_dir: Path): bucket = os.environ.get('SWIFT_BUCKET') s3_instance = s3_client() - paths = list(filter(lambda x: x.is_file(), job_dir.rglob("*"))) + paths = [metadata_file] + (stac_file_paths or []) + # asset files are already uploaded by Scala code logger.info(f"Writing results to object storage. paths={paths}") for file_path in paths: - # TODO: Get list of files to upload from metadata file. - # AFAIK, nothing else is created locally and needs to be uploaded to s3 afterwards. - if UDF_PYTHON_DEPENDENCIES_FOLDER_NAME in str(file_path): - logger.warning(f"Omitting {file_path} as the executors will not be able to access it") - else: - full_path = str(file_path.absolute()) - s3_instance.upload_file(full_path, bucket, full_path.strip("/")) + file_path = urlparse(str(file_path)).path + s3_instance.upload_file(file_path, bucket, file_path.strip("/")) else: _convert_job_metadatafile_outputs_to_s3_urls(metadata_file) @@ -588,6 +586,7 @@ def _export_to_workspaces( if alternate: result_metadata["assets"][asset_key]["alternate"] = alternate + return stac_hrefs def _export_to_workspace(job_dir: str, source_uri: str, target: Workspace, merge: str, remove_original: bool) -> str: diff --git a/tests/test_batch_result.py b/tests/test_batch_result.py index e3584130..ffe603d3 100644 --- a/tests/test_batch_result.py +++ b/tests/test_batch_result.py @@ -1286,15 +1286,16 @@ def write_line(message): job_dir_files_s3 = [ o["Key"] for o in s3_instance.list_objects(Bucket=get_backend_config().s3_bucket_name)["Contents"] ] - assert job_dir_files_s3 == ListSubSet( - [ - f"{merge}/collection.json", - f"{merge}/folder1/lon.tif", - f"{merge}/folder1/lon.tif.json", - f"{merge}/lat.tif", - f"{merge}/lat.tif.json", - ] - ) + for prefix in [merge, tmp_path.relative_to("/")]: + assert job_dir_files_s3 == ListSubSet( + [ + f"{prefix}/collection.json", + f"{prefix}/folder1/lon.tif", + f"{prefix}/folder1/lon.tif.json", + f"{prefix}/lat.tif", + f"{prefix}/lat.tif.json", + ] + ) else: workspace_dir = Path(f"{workspace.root_directory}/{merge}")