Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed upload metadata #946

Merged
merged 4 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def run_job(
):
result_metadata = {}

stac_hrefs = []
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
try:
# We actually expect type Path, but in reality paths as strings tend to
# slip in anyway, so we better catch them and convert them.
Expand Down Expand Up @@ -362,7 +363,7 @@ def run_job(
ml_model_metadata=ml_model_metadata,
)
# perform a first metadata write _before_ actually computing the result. This provides a bit more info, even if the job fails.
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file)

for result in results:
result.options["batch_mode"] = True
Expand Down Expand Up @@ -464,7 +465,7 @@ def run_job(
ml_model_metadata=ml_model_metadata,
)

write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file)
logger.debug("Starting GDAL-based retrieval of asset metadata")
result_metadata = _assemble_result_metadata(
tracer=tracer,
Expand All @@ -483,18 +484,18 @@ def run_job(

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
_export_to_workspaces(
stac_hrefs += _export_to_workspaces(
result,
result_metadata,
result_assets_metadata=result_assets_metadata,
job_dir=job_dir,
remove_exported_assets=job_options.get("remove-exported-assets", False),
)
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, stac_hrefs)


def write_metadata(metadata, metadata_file, job_dir: Path):
def write_metadata(metadata, metadata_file, stac_hrefs: List[Union[str, Path]] = None):
with open(metadata_file, 'w') as f:
json.dump(metadata, f, default=json_default)
add_permissions(metadata_file, stat.S_IWGRP)
Expand All @@ -508,16 +509,12 @@ def write_metadata(metadata, metadata_file, job_dir: Path):
bucket = os.environ.get('SWIFT_BUCKET')
s3_instance = s3_client()

paths = list(filter(lambda x: x.is_file(), job_dir.rglob("*")))
paths = [metadata_file] + (stac_hrefs or [])
# asset files are already uploaded by Scala code
logger.info(f"Writing results to object storage. paths={paths}")
for file_path in paths:
# TODO: Get list of files to upload from metadata file.
# AFAIK, nothing else is created locally and needs to be uploaded to s3 afterwards.
if UDF_PYTHON_DEPENDENCIES_FOLDER_NAME in str(file_path):
logger.warning(f"Omitting {file_path} as the executors will not be able to access it")
else:
full_path = str(file_path.absolute())
s3_instance.upload_file(full_path, bucket, full_path.strip("/"))
file_path = urlparse(str(file_path)).path
s3_instance.upload_file(file_path, bucket, file_path.strip("/"))
else:
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)

Expand Down Expand Up @@ -588,6 +585,7 @@ def _export_to_workspaces(

if alternate:
result_metadata["assets"][asset_key]["alternate"] = alternate
return stac_hrefs


def _export_to_workspace(job_dir: str, source_uri: str, target: Workspace, merge: str, remove_original: bool) -> str:
Expand Down
19 changes: 10 additions & 9 deletions tests/test_batch_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,15 +1286,16 @@ def write_line(message):
job_dir_files_s3 = [
o["Key"] for o in s3_instance.list_objects(Bucket=get_backend_config().s3_bucket_name)["Contents"]
]
assert job_dir_files_s3 == ListSubSet(
[
f"{merge}/collection.json",
f"{merge}/folder1/lon.tif",
f"{merge}/folder1/lon.tif.json",
f"{merge}/lat.tif",
f"{merge}/lat.tif.json",
]
)
for prefix in [merge, tmp_path.relative_to("/")]:
assert job_dir_files_s3 == ListSubSet(
[
f"{prefix}/collection.json",
f"{prefix}/folder1/lon.tif",
f"{prefix}/folder1/lon.tif.json",
f"{prefix}/lat.tif",
f"{prefix}/lat.tif.json",
]
)

else:
workspace_dir = Path(f"{workspace.root_directory}/{merge}")
Expand Down