Skip to content

Commit

Permalink
Filesystem updates to support AWS ACL (#78)
Browse files Browse the repository at this point in the history
* Added AWS ACL support to Storage

* Updated logging with the new way of filesystem handling

* Upgraded BatchToEOPatchPipeline with filesystem changes

* Filesystem updates in pipelines

* Filesystem updated in prediction pipelines

* Fixed logging related to filesystem change

* Minor update in logging handlers
  • Loading branch information
AleksMat authored Jul 27, 2022
1 parent 90eed65 commit ddbf208
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 109 deletions.
25 changes: 14 additions & 11 deletions eogrow/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from fs.errors import FilesystemClosed
from pydantic import Field

from eolearn.core.utils.fs import join_path
from sentinelhub import SHConfig
from eolearn.core.utils.fs import join_path, unpickle_fs

from ..utils.fs import LocalFile
from ..utils.general import jsonify
Expand Down Expand Up @@ -246,17 +245,18 @@ class FilesystemHandler(FileHandler):
process stuck waiting for a thread lock release.
"""

def __init__(self, path: str, filesystem: Optional[FS] = None, config: Optional[SHConfig] = None, **kwargs: Any):
def __init__(self, path: str, filesystem: Union[FS, bytes], encoding: Optional[str] = "utf-8", **kwargs: Any):
"""
:param path: A path to a log file. It should be an absolute path if filesystem object is not given and relative
otherwise.
:param filesystem: A filesystem to where logs will be written.
:param config: A config object holding credentials.
:param path: A path to a log file that is relative to the given `filesystem` object.
:param filesystem: A filesystem to where logs will be written. It can either be an instance of a filesystem
object or its pickled copy.
:param encoding: Encoding used to write log files.
:param kwargs: Keyword arguments that will be propagated to FileHandler.
"""
self.local_file = LocalFile(path, mode="w", filesystem=filesystem, config=config)
filesystem_object = unpickle_fs(filesystem) if isinstance(filesystem, bytes) else filesystem
self.local_file = LocalFile(path, mode="w", filesystem=filesystem_object)

super().__init__(self.local_file.path, **kwargs)
super().__init__(self.local_file.path, encoding=encoding, **kwargs)

self.addFilter(FilesystemFilter())

Expand All @@ -272,12 +272,15 @@ def close(self) -> None:
class RegularBackupHandler(FilesystemHandler):
"""A customized FilesystemHandler that makes a copy to a remote location regularly after given amount of time."""

def __init__(self, *args: Any, backup_interval: Union[float, int], **kwargs: Any):
def __init__(self, path: str, filesystem: Union[FS, bytes], backup_interval: Union[float, int], **kwargs: Any):
"""
:param path: A path to a log file that is relative to the given `filesystem` object.
:param filesystem: A filesystem to where logs will be written. It can either be an instance of a filesystem
object or its pickled copy.
:param backup_interval: A minimal number of seconds before handler will back up the log file to the remote
location. The backup will only happen when the next log record will be emitted.
"""
super().__init__(*args, **kwargs)
super().__init__(path=path, filesystem=filesystem, **kwargs)

self.backup_interval = backup_interval
self._last_backup_time = time.monotonic()
Expand Down
3 changes: 1 addition & 2 deletions eogrow/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Module where base Pipeline class is implemented
"""
import datetime as dt
import functools
import logging
import time
import uuid
Expand Down Expand Up @@ -200,7 +199,7 @@ def run_execution(
logs_folder=self.logging_manager.get_pipeline_logs_folder(self.current_execution_name),
filesystem=self.storage.filesystem,
logs_filter=EOExecutionFilter(ignore_packages=self.logging_manager.config.eoexecution_ignore_packages),
logs_handler_factory=functools.partial(EOExecutionHandler, config=self.sh_config, encoding="utf-8"),
logs_handler_factory=EOExecutionHandler,
)
execution_results = executor.run(**executor_run_params)

Expand Down
13 changes: 12 additions & 1 deletion eogrow/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from eolearn.core.utils.fs import get_aws_credentials, get_filesystem, is_s3_path
from sentinelhub import SHConfig

from ..utils.types import AwsAclType
from .base import EOGrowObject
from .schemas import ManagerSchema

Expand All @@ -27,6 +28,12 @@ class Schema(ManagerSchema):
aws_profile: Optional[str] = Field(
description="The AWS profile with credentials needed to access the S3 bucket"
)
aws_acl: Optional[AwsAclType] = Field(
description=(
"An optional parameter to specify under what kind of access control list (ACL) objects should be saved"
" to an AWS S3 bucket."
)
)
structure: Dict[str, str] = Field(
default_factory=dict, description="A flat key: value store mapping each key to a path in the project."
)
Expand All @@ -44,7 +51,11 @@ def __init__(self, config: Schema):
if self.is_on_aws() and self.config.aws_profile:
self.sh_config = get_aws_credentials(aws_profile=self.config.aws_profile, config=self.sh_config)

self.filesystem = get_filesystem(self.config.project_folder, create=True, config=self.sh_config)
fs_kwargs: Dict[str, str] = {}
if is_s3_path(self.config.project_folder) and self.config.aws_acl:
fs_kwargs["acl"] = self.config.aws_acl

self.filesystem = get_filesystem(self.config.project_folder, create=True, config=self.sh_config, **fs_kwargs)

def get_folder(self, key: str, full_path: bool = False) -> str:
"""Returns the path associated with a key in the structure config."""
Expand Down
19 changes: 12 additions & 7 deletions eogrow/pipelines/batch_to_eopatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, *args: Any, **kwargs: Any):
"""Additionally sets some basic parameters calculated from config parameters"""
super().__init__(*args, **kwargs)

self._input_folder = self.storage.get_folder(self.config.input_folder_key, full_path=True)
self._input_folder = self.storage.get_folder(self.config.input_folder_key)
self._has_userdata = self.config.userdata_feature_name or self.config.userdata_timestamp_reader
self._all_batch_files = self._get_all_batch_files()

Expand Down Expand Up @@ -117,9 +117,9 @@ def build_workflow(self) -> EOWorkflow:
userdata_node = EONode(
LoadUserDataTask(
path=self._input_folder,
filesystem=self.storage.filesystem,
userdata_feature_name=self.config.userdata_feature_name,
userdata_timestamp_reader=self.config.userdata_timestamp_reader,
config=self.sh_config,
)
)

Expand All @@ -128,7 +128,6 @@ def build_workflow(self) -> EOWorkflow:
]

last_node = userdata_node

if len(mapping_nodes) == 1:
last_node = mapping_nodes[0]
elif len(mapping_nodes) > 1:
Expand All @@ -143,18 +142,20 @@ def build_workflow(self) -> EOWorkflow:
processing_node = self.get_processing_node(last_node)

save_task = SaveTask(
path=self.storage.get_folder(self.config.output_folder_key, full_path=True),
path=self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=1,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
)
save_node = EONode(save_task, inputs=([processing_node] if processing_node else []))

cleanup_node = None
if self.config.remove_batch_data:
delete_task = DeleteFilesTask(
path=self._input_folder, filenames=self._all_batch_files, config=self.sh_config
path=self._input_folder,
filesystem=self.storage.filesystem,
filenames=self._all_batch_files,
)
cleanup_node = EONode(delete_task, inputs=[save_node], name="Delete batch data")

Expand All @@ -179,7 +180,11 @@ def _get_tiff_mapping_node(self, mapping: FeatureMappingSchema, previous_node: O
FeatureType.MASK_TIMELESS if feature_type.is_discrete() else FeatureType.DATA_TIMELESS
), feature[1]

import_task = ImportFromTiffTask(tmp_timeless_feature, folder=self._input_folder, config=self.sh_config)
import_task = ImportFromTiffTask(
tmp_timeless_feature,
folder=self._input_folder,
filesystem=self.storage.filesystem,
)
# Filename is written into the dependency name to be used later for execution arguments:
import_node = EONode(
import_task,
Expand Down
4 changes: 2 additions & 2 deletions eogrow/pipelines/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ def build_workflow(self, session_loader: SessionLoaderType) -> EOWorkflow:

end_node = EONode(
SaveTask(
self.storage.get_folder(self.config.output_folder_key, full_path=True),
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=self.config.compress_level,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
),
inputs=[postprocessing_node or download_node],
)
Expand Down
10 changes: 5 additions & 5 deletions eogrow/pipelines/export_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import Field

from eolearn.core import EONode, EOTask, EOWorkflow, FeatureType, LoadTask, linearly_connect_tasks
from eolearn.core.utils.fs import get_full_path, join_path
from eolearn.core.utils.fs import get_full_path
from eolearn.core.utils.parallelize import parallelize
from eolearn.features import LinearFunctionTask
from eolearn.io import ExportToTiffTask
Expand Down Expand Up @@ -74,9 +74,9 @@ def run_procedure(self) -> Tuple[List[str], List[str]]:
def build_workflow(self) -> EOWorkflow:
"""Method where workflow is constructed"""
load_task = LoadTask(
self.storage.get_folder(self.config.input_folder_key, full_path=True),
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
features=[self.config.feature, FeatureType.BBOX],
config=self.sh_config,
)
task_list: List[EOTask] = [load_task]

Expand All @@ -85,14 +85,14 @@ def build_workflow(self) -> EOWorkflow:
task_list.append(rescale_task)

feature_name = self.config.feature[1]
folder = join_path(self.storage.get_folder(self.config.output_folder_key, full_path=True), feature_name)
folder = fs.path.join(self.storage.get_folder(self.config.output_folder_key), feature_name)
export_to_tiff_task = ExportToTiffTask(
self.config.feature,
folder=folder,
filesystem=self.storage.filesystem,
no_data_value=self.config.no_data_value,
image_dtype=np.dtype(self.config.map_dtype),
band_indices=self.config.band_indices,
config=self.sh_config,
)
task_list.append(export_to_tiff_task)

Expand Down
8 changes: 4 additions & 4 deletions eogrow/pipelines/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ def build_workflow(self) -> EOWorkflow:
postprocessing_node = self.get_postprocessing_node(ndi_node)

save_task = SaveTask(
self.storage.get_folder(self.config.output_folder_key, full_path=True),
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=self.config.compress_level,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
)
save_node = EONode(save_task, inputs=[postprocessing_node])

Expand All @@ -132,9 +132,9 @@ def get_data_preparation_node(self) -> EONode:
filtering_config = self.config.data_preparation

load_task = LoadTask(
self.storage.get_folder(self.config.input_folder_key, full_path=True),
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
lazy_loading=True,
config=self.sh_config,
)
end_node = EONode(load_task)

Expand Down
8 changes: 4 additions & 4 deletions eogrow/pipelines/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ def build_workflow(self) -> EOWorkflow:
output_features.append(FeatureType.BBOX)

load_task = LoadTask(
self.storage.get_folder(self.config.input_folder_key, full_path=True),
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
features=input_features,
config=self.sh_config,
)

mapping_task = MappingTask(input_feature, output_feature, self.config.mapping_dictionary)

save_task = SaveTask(
self.storage.get_folder(self.config.output_folder_key, full_path=True),
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=output_features,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
compress_level=self.config.compress_level,
)

Expand Down
4 changes: 2 additions & 2 deletions eogrow/pipelines/merge_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def build_workflow(self) -> EOWorkflow:
features_to_load: List[FeatureSpec] = [FeatureType.TIMESTAMP] if self.config.include_timestamp else []
features_to_load.extend(self.config.features_to_merge)
load_task = LoadTask(
self.storage.get_folder(self.config.input_folder_key, full_path=True),
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
features=features_to_load,
config=self.sh_config,
)
output_task = OutputTask(name=self._OUTPUT_NAME)
return EOWorkflow(linearly_connect_tasks(load_task, output_task))
Expand Down
25 changes: 13 additions & 12 deletions eogrow/pipelines/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import abc
from typing import List, Optional, Tuple

import fs
import numpy as np
from pydantic import Field, root_validator

Expand Down Expand Up @@ -98,9 +99,9 @@ def _get_data_preparation_node(self) -> EONode:
"""Returns nodes containing for loading and preparing the data as well as the endpoint tasks"""
features_load_node = EONode(
LoadTask(
self.storage.get_folder(self.config.input_folder_key, full_path=True),
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
features=[FeatureType.BBOX, FeatureType.TIMESTAMP, *self.config.input_features],
config=self.sh_config,
)
)

Expand All @@ -109,9 +110,9 @@ def _get_data_preparation_node(self) -> EONode:

mask_load_node = EONode(
LoadTask(
self.storage.get_folder(self.config.prediction_mask_folder_key, full_path=True),
self.storage.get_folder(self.config.prediction_mask_folder_key),
filesystem=self.storage.filesystem,
features=[(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name)],
config=self.sh_config,
)
)

Expand All @@ -124,10 +125,10 @@ def _get_prediction_node(self, previous_node: EONode) -> EONode:
def _get_saving_node(self, previous_node: EONode) -> EONode:
"""Returns nodes for finalizing and saving features"""
save_task = SaveTask(
self.storage.get_folder(self.config.output_folder_key, full_path=True),
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
compress_level=self.config.compress_level,
)

Expand All @@ -148,15 +149,15 @@ def _get_output_features(self) -> List[FeatureSpec]:
return [FeatureType.BBOX, (FeatureType.DATA_TIMELESS, self.config.output_feature_name)]

def _get_prediction_node(self, previous_node: EONode) -> EONode:
model_path = fs.path.join(self.storage.get_folder(self.config.model_folder_key), self.config.model_filename)
prediction_task = RegressionPredictionTask(
model_folder=self.storage.get_folder(self.config.model_folder_key, full_path=True),
model_filename=self.config.model_filename,
model_path=model_path,
filesystem=self.storage.filesystem,
input_features=self.config.input_features,
mask_feature=_optional_typed_feature(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name),
output_feature=(FeatureType.DATA_TIMELESS, self.config.output_feature_name),
output_dtype=self.config.dtype,
mp_lock=self._is_mp_lock_needed,
sh_config=self.sh_config,
clip_predictions=self.config.clip_predictions,
)
return EONode(prediction_task, inputs=[previous_node])
Expand All @@ -183,9 +184,10 @@ def _get_output_features(self) -> List[FeatureSpec]:
return features

def _get_prediction_node(self, previous_node: EONode) -> EONode:
model_path = fs.path.join(self.storage.get_folder(self.config.model_folder_key), self.config.model_filename)
prediction_task = ClassificationPredictionTask(
model_folder=self.storage.get_folder(self.config.model_folder_key, full_path=True),
model_filename=self.config.model_filename,
model_path=model_path,
filesystem=self.storage.filesystem,
input_features=self.config.input_features,
mask_feature=_optional_typed_feature(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name),
output_feature=(FeatureType.MASK_TIMELESS, self.config.output_feature_name),
Expand All @@ -194,7 +196,6 @@ def _get_prediction_node(self, previous_node: EONode) -> EONode:
),
output_dtype=self.config.dtype,
mp_lock=self._is_mp_lock_needed,
sh_config=self.sh_config,
label_encoder_filename=self.config.label_encoder_filename,
)
return EONode(prediction_task, inputs=[previous_node])
Expand Down
Loading

0 comments on commit ddbf208

Please sign in to comment.