Skip to content

Commit

Permalink
Upload fixed set of metadata files. #940
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 25, 2024
1 parent 88ab283 commit 59e45ec
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
20 changes: 9 additions & 11 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def run_job(
):
result_metadata = {}

stac_hrefs = []
try:
# We actually expect type Path, but in reality paths as strings tend to
# slip in anyway, so we better catch them and convert them.
Expand Down Expand Up @@ -483,18 +484,18 @@ def run_job(

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
_export_to_workspaces(
stac_hrefs += _export_to_workspaces(
result,
result_metadata,
result_assets_metadata=result_assets_metadata,
job_dir=job_dir,
remove_exported_assets=job_options.get("remove-exported-assets", False),
)
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir, stac_hrefs)


def write_metadata(metadata, metadata_file, job_dir: Path):
def write_metadata(metadata, metadata_file, job_dir: Path, stac_hrefs: List[Union[str, Path]] = None):
with open(metadata_file, 'w') as f:
json.dump(metadata, f, default=json_default)
add_permissions(metadata_file, stat.S_IWGRP)
Expand All @@ -508,16 +509,12 @@ def write_metadata(metadata, metadata_file, job_dir: Path):
bucket = os.environ.get('SWIFT_BUCKET')
s3_instance = s3_client()

paths = list(filter(lambda x: x.is_file(), job_dir.rglob("*")))
paths = [metadata_file] + (stac_hrefs or [])
# asset files are already uploaded by Scala code
logger.info(f"Writing results to object storage. paths={paths}")
for file_path in paths:
# TODO: Get list of files to upload from metadata file.
# AFAIK, nothing else is created locally and needs to be uploaded to s3 afterwards.
if UDF_PYTHON_DEPENDENCIES_FOLDER_NAME in str(file_path):
logger.warning(f"Omitting {file_path} as the executors will not be able to access it")
else:
full_path = str(file_path.absolute())
s3_instance.upload_file(full_path, bucket, full_path.strip("/"))
file_path = urlparse(str(file_path)).path
s3_instance.upload_file(file_path, bucket, file_path.strip("/"))
else:
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)

Expand Down Expand Up @@ -588,6 +585,7 @@ def _export_to_workspaces(

if alternate:
result_metadata["assets"][asset_key]["alternate"] = alternate
return stac_hrefs


def _export_to_workspace(job_dir: str, source_uri: str, target: Workspace, merge: str, remove_original: bool) -> str:
Expand Down
19 changes: 10 additions & 9 deletions tests/test_batch_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,15 +1286,16 @@ def write_line(message):
job_dir_files_s3 = [
o["Key"] for o in s3_instance.list_objects(Bucket=get_backend_config().s3_bucket_name)["Contents"]
]
assert job_dir_files_s3 == ListSubSet(
[
f"{merge}/collection.json",
f"{merge}/folder1/lon.tif",
f"{merge}/folder1/lon.tif.json",
f"{merge}/lat.tif",
f"{merge}/lat.tif.json",
]
)
for prefix in [merge, tmp_path.relative_to("/")]:
assert job_dir_files_s3 == ListSubSet(
[
f"{prefix}/collection.json",
f"{prefix}/folder1/lon.tif",
f"{prefix}/folder1/lon.tif.json",
f"{prefix}/lat.tif",
f"{prefix}/lat.tif.json",
]
)

else:
workspace_dir = Path(f"{workspace.root_directory}/{merge}")
Expand Down

0 comments on commit 59e45ec

Please sign in to comment.