From ddbf208dcd6f61b17f4f7ec648e6f3e532d4eef9 Mon Sep 17 00:00:00 2001 From: Matej Aleksandrov Date: Wed, 27 Jul 2022 14:48:36 +0200 Subject: [PATCH] Filesystem updates to support AWS ACL (#78) * Added AWS ACL support to Storage * Updated logging with the new way of filesystem handling * Upgraded BatchToEOPatchPipeline with filesystem changes * Filesystem updates in pipelines * Filesystem updated in prediction pipelines * Fixed logging related to filesystem change * Minor update in logging handlers --- eogrow/core/logging.py | 25 +++++++++------- eogrow/core/pipeline.py | 3 +- eogrow/core/storage.py | 13 +++++++- eogrow/pipelines/batch_to_eopatch.py | 19 +++++++----- eogrow/pipelines/download.py | 4 +-- eogrow/pipelines/export_maps.py | 10 +++---- eogrow/pipelines/features.py | 8 ++--- eogrow/pipelines/mapping.py | 8 ++--- eogrow/pipelines/merge_samples.py | 4 +-- eogrow/pipelines/prediction.py | 25 ++++++++-------- eogrow/pipelines/rasterize.py | 24 +++++++-------- eogrow/pipelines/sampling.py | 8 ++--- eogrow/pipelines/switch_grids.py | 9 +++--- eogrow/pipelines/testing.py | 4 +-- eogrow/tasks/batch_to_eopatch.py | 32 ++++++++++---------- eogrow/tasks/prediction.py | 44 ++++++++++++++++------------ eogrow/utils/types.py | 11 +++++++ tests/test_core/test_storage.py | 20 ++++++++++++- 18 files changed, 162 insertions(+), 109 deletions(-) diff --git a/eogrow/core/logging.py b/eogrow/core/logging.py index eb2b7fd6..a5ccc722 100644 --- a/eogrow/core/logging.py +++ b/eogrow/core/logging.py @@ -13,8 +13,7 @@ from fs.errors import FilesystemClosed from pydantic import Field -from eolearn.core.utils.fs import join_path -from sentinelhub import SHConfig +from eolearn.core.utils.fs import join_path, unpickle_fs from ..utils.fs import LocalFile from ..utils.general import jsonify @@ -246,17 +245,18 @@ class FilesystemHandler(FileHandler): process stuck waiting for a thread lock release. """ - def __init__(self, path: str, filesystem: Optional[FS] = None, config: Optional[SHConfig] = None, **kwargs: Any): + def __init__(self, path: str, filesystem: Union[FS, bytes], encoding: Optional[str] = "utf-8", **kwargs: Any): """ - :param path: A path to a log file. It should be an absolute path if filesystem object is not given and relative - otherwise. - :param filesystem: A filesystem to where logs will be written. - :param config: A config object holding credentials. + :param path: A path to a log file that is relative to the given `filesystem` object. + :param filesystem: A filesystem to where logs will be written. It can either be an instance of a filesystem + object or its pickled copy. + :param encoding: Encoding used to write log files. :param kwargs: Keyword arguments that will be propagated to FileHandler. """ - self.local_file = LocalFile(path, mode="w", filesystem=filesystem, config=config) + filesystem_object = unpickle_fs(filesystem) if isinstance(filesystem, bytes) else filesystem + self.local_file = LocalFile(path, mode="w", filesystem=filesystem_object) - super().__init__(self.local_file.path, **kwargs) + super().__init__(self.local_file.path, encoding=encoding, **kwargs) self.addFilter(FilesystemFilter()) @@ -272,12 +272,15 @@ def close(self) -> None: class RegularBackupHandler(FilesystemHandler): """A customized FilesystemHandler that makes a copy to a remote location regularly after given amount of time.""" - def __init__(self, *args: Any, backup_interval: Union[float, int], **kwargs: Any): + def __init__(self, path: str, filesystem: Union[FS, bytes], backup_interval: Union[float, int], **kwargs: Any): """ + :param path: A path to a log file that is relative to the given `filesystem` object. + :param filesystem: A filesystem to where logs will be written. It can either be an instance of a filesystem + object or its pickled copy. :param backup_interval: A minimal number of seconds before handler will back up the log file to the remote location. The backup will only happen when the next log record will be emitted. """ - super().__init__(*args, **kwargs) + super().__init__(path=path, filesystem=filesystem, **kwargs) self.backup_interval = backup_interval self._last_backup_time = time.monotonic() diff --git a/eogrow/core/pipeline.py b/eogrow/core/pipeline.py index 837de228..9111d9af 100644 --- a/eogrow/core/pipeline.py +++ b/eogrow/core/pipeline.py @@ -2,7 +2,6 @@ Module where base Pipeline class is implemented """ import datetime as dt -import functools import logging import time import uuid @@ -200,7 +199,7 @@ def run_execution( logs_folder=self.logging_manager.get_pipeline_logs_folder(self.current_execution_name), filesystem=self.storage.filesystem, logs_filter=EOExecutionFilter(ignore_packages=self.logging_manager.config.eoexecution_ignore_packages), - logs_handler_factory=functools.partial(EOExecutionHandler, config=self.sh_config, encoding="utf-8"), + logs_handler_factory=EOExecutionHandler, ) execution_results = executor.run(**executor_run_params) diff --git a/eogrow/core/storage.py b/eogrow/core/storage.py index 0a230afe..21cfb44d 100644 --- a/eogrow/core/storage.py +++ b/eogrow/core/storage.py @@ -10,6 +10,7 @@ from eolearn.core.utils.fs import get_aws_credentials, get_filesystem, is_s3_path from sentinelhub import SHConfig +from ..utils.types import AwsAclType from .base import EOGrowObject from .schemas import ManagerSchema @@ -27,6 +28,12 @@ class Schema(ManagerSchema): aws_profile: Optional[str] = Field( description="The AWS profile with credentials needed to access the S3 bucket" ) + aws_acl: Optional[AwsAclType] = Field( + description=( + "An optional parameter to specify under what kind of access control list (ACL) objects should be saved" + " to an AWS S3 bucket." + ) + ) structure: Dict[str, str] = Field( default_factory=dict, description="A flat key: value store mapping each key to a path in the project." ) @@ -44,7 +51,11 @@ def __init__(self, config: Schema): if self.is_on_aws() and self.config.aws_profile: self.sh_config = get_aws_credentials(aws_profile=self.config.aws_profile, config=self.sh_config) - self.filesystem = get_filesystem(self.config.project_folder, create=True, config=self.sh_config) + fs_kwargs: Dict[str, str] = {} + if is_s3_path(self.config.project_folder) and self.config.aws_acl: + fs_kwargs["acl"] = self.config.aws_acl + + self.filesystem = get_filesystem(self.config.project_folder, create=True, config=self.sh_config, **fs_kwargs) def get_folder(self, key: str, full_path: bool = False) -> str: """Returns the path associated with a key in the structure config.""" diff --git a/eogrow/pipelines/batch_to_eopatch.py b/eogrow/pipelines/batch_to_eopatch.py index 5c66da9f..c53cc9b2 100644 --- a/eogrow/pipelines/batch_to_eopatch.py +++ b/eogrow/pipelines/batch_to_eopatch.py @@ -81,7 +81,7 @@ def __init__(self, *args: Any, **kwargs: Any): """Additionally sets some basic parameters calculated from config parameters""" super().__init__(*args, **kwargs) - self._input_folder = self.storage.get_folder(self.config.input_folder_key, full_path=True) + self._input_folder = self.storage.get_folder(self.config.input_folder_key) self._has_userdata = self.config.userdata_feature_name or self.config.userdata_timestamp_reader self._all_batch_files = self._get_all_batch_files() @@ -117,9 +117,9 @@ def build_workflow(self) -> EOWorkflow: userdata_node = EONode( LoadUserDataTask( path=self._input_folder, + filesystem=self.storage.filesystem, userdata_feature_name=self.config.userdata_feature_name, userdata_timestamp_reader=self.config.userdata_timestamp_reader, - config=self.sh_config, ) ) @@ -128,7 +128,6 @@ def build_workflow(self) -> EOWorkflow: ] last_node = userdata_node - if len(mapping_nodes) == 1: last_node = mapping_nodes[0] elif len(mapping_nodes) > 1: @@ -143,18 +142,20 @@ def build_workflow(self) -> EOWorkflow: processing_node = self.get_processing_node(last_node) save_task = SaveTask( - path=self.storage.get_folder(self.config.output_folder_key, full_path=True), + path=self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), compress_level=1, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ) save_node = EONode(save_task, inputs=([processing_node] if processing_node else [])) cleanup_node = None if self.config.remove_batch_data: delete_task = DeleteFilesTask( - path=self._input_folder, filenames=self._all_batch_files, config=self.sh_config + path=self._input_folder, + filesystem=self.storage.filesystem, + filenames=self._all_batch_files, ) cleanup_node = EONode(delete_task, inputs=[save_node], name="Delete batch data") @@ -179,7 +180,11 @@ def _get_tiff_mapping_node(self, mapping: FeatureMappingSchema, previous_node: O FeatureType.MASK_TIMELESS if feature_type.is_discrete() else FeatureType.DATA_TIMELESS ), feature[1] - import_task = ImportFromTiffTask(tmp_timeless_feature, folder=self._input_folder, config=self.sh_config) + import_task = ImportFromTiffTask( + tmp_timeless_feature, + folder=self._input_folder, + filesystem=self.storage.filesystem, + ) # Filename is written into the dependency name to be used later for execution arguments: import_node = EONode( import_task, diff --git a/eogrow/pipelines/download.py b/eogrow/pipelines/download.py index cd5d3a0a..677b5120 100644 --- a/eogrow/pipelines/download.py +++ b/eogrow/pipelines/download.py @@ -146,11 +146,11 @@ def build_workflow(self, session_loader: SessionLoaderType) -> EOWorkflow: end_node = EONode( SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ), inputs=[postprocessing_node or download_node], ) diff --git a/eogrow/pipelines/export_maps.py b/eogrow/pipelines/export_maps.py index bbd73548..41054047 100644 --- a/eogrow/pipelines/export_maps.py +++ b/eogrow/pipelines/export_maps.py @@ -12,7 +12,7 @@ from pydantic import Field from eolearn.core import EONode, EOTask, EOWorkflow, FeatureType, LoadTask, linearly_connect_tasks -from eolearn.core.utils.fs import get_full_path, join_path +from eolearn.core.utils.fs import get_full_path from eolearn.core.utils.parallelize import parallelize from eolearn.features import LinearFunctionTask from eolearn.io import ExportToTiffTask @@ -74,9 +74,9 @@ def run_procedure(self) -> Tuple[List[str], List[str]]: def build_workflow(self) -> EOWorkflow: """Method where workflow is constructed""" load_task = LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, features=[self.config.feature, FeatureType.BBOX], - config=self.sh_config, ) task_list: List[EOTask] = [load_task] @@ -85,14 +85,14 @@ def build_workflow(self) -> EOWorkflow: task_list.append(rescale_task) feature_name = self.config.feature[1] - folder = join_path(self.storage.get_folder(self.config.output_folder_key, full_path=True), feature_name) + folder = fs.path.join(self.storage.get_folder(self.config.output_folder_key), feature_name) export_to_tiff_task = ExportToTiffTask( self.config.feature, folder=folder, + filesystem=self.storage.filesystem, no_data_value=self.config.no_data_value, image_dtype=np.dtype(self.config.map_dtype), band_indices=self.config.band_indices, - config=self.sh_config, ) task_list.append(export_to_tiff_task) diff --git a/eogrow/pipelines/features.py b/eogrow/pipelines/features.py index 788f3364..6015c610 100644 --- a/eogrow/pipelines/features.py +++ b/eogrow/pipelines/features.py @@ -114,11 +114,11 @@ def build_workflow(self) -> EOWorkflow: postprocessing_node = self.get_postprocessing_node(ndi_node) save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ) save_node = EONode(save_task, inputs=[postprocessing_node]) @@ -132,9 +132,9 @@ def get_data_preparation_node(self) -> EONode: filtering_config = self.config.data_preparation load_task = LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, lazy_loading=True, - config=self.sh_config, ) end_node = EONode(load_task) diff --git a/eogrow/pipelines/mapping.py b/eogrow/pipelines/mapping.py index ed31b03f..6ebf273b 100644 --- a/eogrow/pipelines/mapping.py +++ b/eogrow/pipelines/mapping.py @@ -38,18 +38,18 @@ def build_workflow(self) -> EOWorkflow: output_features.append(FeatureType.BBOX) load_task = LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, features=input_features, - config=self.sh_config, ) mapping_task = MappingTask(input_feature, output_feature, self.config.mapping_dictionary) save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=output_features, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, compress_level=self.config.compress_level, ) diff --git a/eogrow/pipelines/merge_samples.py b/eogrow/pipelines/merge_samples.py index 264daaa9..b50b4b70 100644 --- a/eogrow/pipelines/merge_samples.py +++ b/eogrow/pipelines/merge_samples.py @@ -62,9 +62,9 @@ def build_workflow(self) -> EOWorkflow: features_to_load: List[FeatureSpec] = [FeatureType.TIMESTAMP] if self.config.include_timestamp else [] features_to_load.extend(self.config.features_to_merge) load_task = LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, features=features_to_load, - config=self.sh_config, ) output_task = OutputTask(name=self._OUTPUT_NAME) return EOWorkflow(linearly_connect_tasks(load_task, output_task)) diff --git a/eogrow/pipelines/prediction.py b/eogrow/pipelines/prediction.py index 87217eb7..922558d2 100644 --- a/eogrow/pipelines/prediction.py +++ b/eogrow/pipelines/prediction.py @@ -4,6 +4,7 @@ import abc from typing import List, Optional, Tuple +import fs import numpy as np from pydantic import Field, root_validator @@ -98,9 +99,9 @@ def _get_data_preparation_node(self) -> EONode: """Returns nodes containing for loading and preparing the data as well as the endpoint tasks""" features_load_node = EONode( LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, features=[FeatureType.BBOX, FeatureType.TIMESTAMP, *self.config.input_features], - config=self.sh_config, ) ) @@ -109,9 +110,9 @@ def _get_data_preparation_node(self) -> EONode: mask_load_node = EONode( LoadTask( - self.storage.get_folder(self.config.prediction_mask_folder_key, full_path=True), + self.storage.get_folder(self.config.prediction_mask_folder_key), + filesystem=self.storage.filesystem, features=[(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name)], - config=self.sh_config, ) ) @@ -124,10 +125,10 @@ def _get_prediction_node(self, previous_node: EONode) -> EONode: def _get_saving_node(self, previous_node: EONode) -> EONode: """Returns nodes for finalizing and saving features""" save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, compress_level=self.config.compress_level, ) @@ -148,15 +149,15 @@ def _get_output_features(self) -> List[FeatureSpec]: return [FeatureType.BBOX, (FeatureType.DATA_TIMELESS, self.config.output_feature_name)] def _get_prediction_node(self, previous_node: EONode) -> EONode: + model_path = fs.path.join(self.storage.get_folder(self.config.model_folder_key), self.config.model_filename) prediction_task = RegressionPredictionTask( - model_folder=self.storage.get_folder(self.config.model_folder_key, full_path=True), - model_filename=self.config.model_filename, + model_path=model_path, + filesystem=self.storage.filesystem, input_features=self.config.input_features, mask_feature=_optional_typed_feature(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name), output_feature=(FeatureType.DATA_TIMELESS, self.config.output_feature_name), output_dtype=self.config.dtype, mp_lock=self._is_mp_lock_needed, - sh_config=self.sh_config, clip_predictions=self.config.clip_predictions, ) return EONode(prediction_task, inputs=[previous_node]) @@ -183,9 +184,10 @@ def _get_output_features(self) -> List[FeatureSpec]: return features def _get_prediction_node(self, previous_node: EONode) -> EONode: + model_path = fs.path.join(self.storage.get_folder(self.config.model_folder_key), self.config.model_filename) prediction_task = ClassificationPredictionTask( - model_folder=self.storage.get_folder(self.config.model_folder_key, full_path=True), - model_filename=self.config.model_filename, + model_path=model_path, + filesystem=self.storage.filesystem, input_features=self.config.input_features, mask_feature=_optional_typed_feature(FeatureType.MASK_TIMELESS, self.config.prediction_mask_feature_name), output_feature=(FeatureType.MASK_TIMELESS, self.config.output_feature_name), @@ -194,7 +196,6 @@ def _get_prediction_node(self, previous_node: EONode) -> EONode: ), output_dtype=self.config.dtype, mp_lock=self._is_mp_lock_needed, - sh_config=self.sh_config, label_encoder_filename=self.config.label_encoder_filename, ) return EONode(prediction_task, inputs=[previous_node]) diff --git a/eogrow/pipelines/rasterize.py b/eogrow/pipelines/rasterize.py index 675fc1fd..78a152a7 100644 --- a/eogrow/pipelines/rasterize.py +++ b/eogrow/pipelines/rasterize.py @@ -22,7 +22,6 @@ OverwritePermission, SaveTask, ) -from eolearn.core.utils.fs import join_path from eolearn.geometry import VectorToRasterTask from eolearn.io import VectorImportTask @@ -150,7 +149,7 @@ def run_dataset_preprocessing(self, filename: str, preprocess_config: Preprocess dataset_gdf = self.preprocess_dataset(dataset_gdf) - dataset_path = self._get_dataset_path(filename, full_path=False) + dataset_path = self._get_dataset_path(filename) with LocalFile(dataset_path, mode="w", filesystem=self.storage.filesystem) as local_file: dataset_gdf.to_file(local_file.path, encoding="utf-8", driver="GPKG") @@ -167,14 +166,17 @@ def build_workflow(self) -> EOWorkflow: create_node = EONode(CreateEOPatchTask()) path = self._get_dataset_path(self.filename) import_task = VectorImportTask( - self.vector_feature, path=path, layer=self.config.dataset_layer, config=self.sh_config + self.vector_feature, + path=path, + filesystem=self.storage.filesystem, + layer=self.config.dataset_layer, ) data_preparation_node = EONode(import_task, inputs=[create_node]) else: input_task = LoadTask( - self.storage.get_folder(self.config.input_folder_key, full_path=True), + self.storage.get_folder(self.config.input_folder_key), + filesystem=self.storage.filesystem, features=[self.vector_feature, FeatureType.BBOX], - config=self.sh_config, ) data_preparation_node = EONode(input_task) @@ -185,10 +187,10 @@ def build_workflow(self) -> EOWorkflow: postprocess_node = self.get_postrasterization_node(rasterization_node) save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ) save_node = EONode(save_task, inputs=[postprocess_node]) @@ -238,17 +240,15 @@ def _parse_input_file(value: str) -> str: raise ValueError(f"Input file path {value} should be a GeoJSON, Shapefile, GeoPackage or GeoDataBase.") return value - def _get_dataset_path(self, filename: str, full_path: bool = True) -> str: + def _get_dataset_path(self, filename: str) -> str: """Provides a path from where dataset should be loaded into the workflow""" if self.config.preprocess_dataset is not None: - folder = self.storage.get_cache_folder(full_path=full_path) + folder = self.storage.get_cache_folder() filename = f"preprocessed_{filename}" filename = (os.path.splitext(filename))[0] + ".gpkg" else: - folder = self.storage.get_input_data_folder(full_path=full_path) + folder = self.storage.get_input_data_folder() - if full_path: - return join_path(folder, filename) return fs.path.combine(folder, filename) def _get_output_features(self) -> List[FeatureSpec]: diff --git a/eogrow/pipelines/sampling.py b/eogrow/pipelines/sampling.py index 2cf9c7ed..b60cceb4 100644 --- a/eogrow/pipelines/sampling.py +++ b/eogrow/pipelines/sampling.py @@ -67,11 +67,11 @@ def build_workflow(self) -> EOWorkflow: sampling_node = self._get_sampling_node(preprocessing_node) save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, features=self._get_output_features(), compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ) return EOWorkflow.from_endnodes(EONode(save_task, inputs=[sampling_node])) @@ -97,10 +97,10 @@ def _get_loading_node(self) -> EONode: load_features.append(FeatureType.TIMESTAMP) load_task = LoadTask( - self.storage.get_folder(folder_name, full_path=True), + self.storage.get_folder(folder_name), + filesystem=self.storage.filesystem, lazy_loading=True, features=load_features, - config=self.sh_config, ) load_nodes.append(EONode(load_task, name=f"Load from {folder_name}")) diff --git a/eogrow/pipelines/switch_grids.py b/eogrow/pipelines/switch_grids.py index bc3817e3..f7a7d018 100644 --- a/eogrow/pipelines/switch_grids.py +++ b/eogrow/pipelines/switch_grids.py @@ -132,10 +132,11 @@ def build_workflow(self, transformations: List[GridTransformation]) -> EOWorkflo saving them.""" features = self._get_features() - input_path = self.storage.get_folder(self.config.input_folder_key, full_path=True) + input_path = self.storage.get_folder(self.config.input_folder_key) max_input_patch_num = max(len(transformation.source_bboxes) for transformation in transformations) load_nodes = [ - EONode(LoadTask(input_path, features=features, config=self.sh_config)) for _ in range(max_input_patch_num) + EONode(LoadTask(input_path, filesystem=self.storage.filesystem, features=features)) + for _ in range(max_input_patch_num) ] join_task = SpatialJoinTask( @@ -147,7 +148,7 @@ def build_workflow(self, transformations: List[GridTransformation]) -> EOWorkflo join_node = EONode(join_task, inputs=load_nodes) save_nodes = [] - output_path = self.storage.get_folder(self.config.output_folder_key, full_path=True) + output_path = self.storage.get_folder(self.config.output_folder_key) max_output_patch_num = max(len(transformation.target_bboxes) for transformation in transformations) for _ in range(max_output_patch_num): slice_task = SpatialSliceTask(features, raise_misaligned=self.config.raise_misaligned) @@ -155,9 +156,9 @@ def build_workflow(self, transformations: List[GridTransformation]) -> EOWorkflo save_task = SaveTask( output_path, + filesystem=self.storage.filesystem, features=features, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - config=self.sh_config, ) save_node = EONode(save_task, inputs=[slice_node]) save_nodes.append(save_node) diff --git a/eogrow/pipelines/testing.py b/eogrow/pipelines/testing.py index afa4678b..18350f6f 100644 --- a/eogrow/pipelines/testing.py +++ b/eogrow/pipelines/testing.py @@ -138,9 +138,9 @@ def build_workflow(self) -> EOWorkflow: previous_node = start_node save_task = SaveTask( - self.storage.get_folder(self.config.output_folder_key, full_path=True), + self.storage.get_folder(self.config.output_folder_key), + filesystem=self.storage.filesystem, overwrite_permission=OverwritePermission.OVERWRITE_PATCH, - config=self.sh_config, ) save_node = EONode(save_task, inputs=[previous_node]) diff --git a/eogrow/tasks/batch_to_eopatch.py b/eogrow/tasks/batch_to_eopatch.py index def25663..0ce1a506 100644 --- a/eogrow/tasks/batch_to_eopatch.py +++ b/eogrow/tasks/batch_to_eopatch.py @@ -7,10 +7,11 @@ import fs import numpy as np +from fs.base import FS from eolearn.core import EOPatch, EOTask -from eolearn.core.utils.fs import get_base_filesystem_and_path -from sentinelhub import SHConfig, parse_time +from eolearn.core.utils.fs import pickle_fs, unpickle_fs +from sentinelhub import parse_time from ..utils.meta import import_object from ..utils.types import Feature @@ -22,26 +23,26 @@ class LoadUserDataTask(EOTask): def __init__( self, path: str, + filesystem: FS, userdata_feature_name: Optional[str] = None, userdata_timestamp_reader: Optional[str] = None, - config: Optional[SHConfig] = None, ): """ - :param path: Path to folder containing the tiles + :param path: A path to folder containing the tiles, relative to the filesystem object. + :param filesystem: A filesystem object. :param userdata_feature_name: A name of a META_INFO feature in which userdata.json content could be stored :param userdata_timestamp_reader: A reference to a Python function or a Python code that collects timestamps from loaded userdata.json - :param config: A configuration object with AWS credentials """ self.path = path + self.pickled_filesystem = pickle_fs(filesystem) self.userdata_feature_name = userdata_feature_name self.userdata_timestamp_reader = userdata_timestamp_reader - self.config = config def _load_userdata_file(self, folder: str, filename: str = "userdata.json") -> dict: """Loads a content of a JSON file""" - filesystem, relative_path = get_base_filesystem_and_path(self.path, config=self.config) - full_path = fs.path.join(relative_path, folder, filename) + filesystem = unpickle_fs(self.pickled_filesystem) + full_path = fs.path.join(self.path, folder, filename) userdata_text = filesystem.readtext(full_path, encoding="utf-8") return json.loads(userdata_text) @@ -118,27 +119,24 @@ def execute(self, eopatch: EOPatch) -> EOPatch: class DeleteFilesTask(EOTask): """Delete files""" - def __init__(self, path: str, filenames: List[str], config: Optional[SHConfig] = None): + def __init__(self, path: str, filesystem: FS, filenames: List[str]): """ - :param path: Path to folder containing the files to be deleted - :type path: str + :param path: A path to folder containing the files to be deleted, relative to filesystem object. + :param filesystem: A filesystem object :param filenames: A list of filenames to delete - :type filenames: list(str) - :param config: A configuration object with AWS credentials - :type config: SHConfig """ self.path = path + self.pickled_filesystem = pickle_fs(filesystem) self.filenames = filenames - self.config = config def execute(self, *_: EOPatch, folder: str) -> None: """Execute method to delete files relative to the specified tile :param folder: A folder containing files """ - filesystem, relative_path = get_base_filesystem_and_path(self.path, config=self.config) + filesystem = unpickle_fs(self.pickled_filesystem) + file_paths = [fs.path.join(self.path, folder, filename) for filename in self.filenames] - file_paths = [fs.path.join(relative_path, folder, filename) for filename in self.filenames] with concurrent.futures.ThreadPoolExecutor() as executor: # The following is intentionally wrapped in a list in order to get back potential exceptions list(executor.map(filesystem.remove, file_paths)) diff --git a/eogrow/tasks/prediction.py b/eogrow/tasks/prediction.py index eec5c611..795ebbbf 100644 --- a/eogrow/tasks/prediction.py +++ b/eogrow/tasks/prediction.py @@ -4,11 +4,13 @@ import abc from typing import Any, Callable, List, Optional, Tuple, Union, cast +import fs import joblib import numpy as np +from fs.base import FS -from eolearn.core import EOPatch, EOTask, execute_with_mp_lock, get_filesystem -from sentinelhub import SHConfig +from eolearn.core import EOPatch, EOTask, execute_with_mp_lock +from eolearn.core.utils.fs import pickle_fs, unpickle_fs from ..utils.types import Feature @@ -19,28 +21,25 @@ class BasePredictionTask(EOTask, metaclass=abc.ABCMeta): def __init__( self, *, - model_folder: str, - model_filename: str, + model_path: str, + filesystem: FS, input_features: List[Feature], mask_feature: Feature, output_feature: Feature, output_dtype: Optional[np.dtype], mp_lock: bool, - sh_config: SHConfig, ): """ - :param model_folder: Path to the folder where model is stored - :param model_filename: Name of file containing the model + :param model_path: A file path to the model. The path is relative to the filesystem object. + :param filesystem: A filesystem object. :param input_features: List of features containing input for the model, which are concatenated in given order :param mask_feature: Mask specifying which points are to be predicted :param output_feature: Feature into which predictions are saved :param mp_lock: If predictions should be executed with a multiprocessing lock - :param sh_config: SentinelHub config """ - - self.model_folder = model_folder - self.model_filename = model_filename + self.model_path = model_path self._model = None + self.pickled_filesystem = pickle_fs(filesystem) self.input_features = input_features self.mask_feature = mask_feature @@ -48,7 +47,6 @@ def __init__( self.output_dtype = output_dtype self.mp_lock = mp_lock - self.sh_config = sh_config def process_data(self, eopatch: EOPatch, mask: np.ndarray) -> np.ndarray: """Masks and reshapes data into a form suitable for the model""" @@ -68,11 +66,13 @@ def process_data(self, eopatch: EOPatch, mask: np.ndarray) -> np.ndarray: @property def model(self) -> Any: - """Implements lazy loading that gets around file-system issues""" + """Implements lazy loading that gets around filesystem issues""" if self._model is None: - file_system = get_filesystem(self.model_folder, config=self.sh_config) - with file_system.openbin(self.model_filename, "r") as file_handle: + filesystem = unpickle_fs(self.pickled_filesystem) + + with filesystem.openbin(self.model_path, "r") as file_handle: self._model = joblib.load(file_handle) + return self._model def apply_predictor( @@ -127,7 +127,8 @@ def __init__( **kwargs: Any, ): """ - :param label_encoder_filename: Name of file containing the label encoder with which to decode predictions + :param label_encoder_filename: Name of file containing the label encoder with which to decode predictions. The + file should be in the same folder as the model. :param output_probability_feature: If specified saves pseudo-probabilities into given feature. :param kwargs: Parameters of `BasePredictionTask` """ @@ -138,11 +139,16 @@ def __init__( @property def label_encoder(self) -> Any: - """Implements lazy loading that gets around file-system issues""" + """Implements lazy loading that gets around filesystem issues""" if self._label_encoder is None and self.label_encoder_filename is not None: - file_system = get_filesystem(self.model_folder, config=self.sh_config) - with file_system.openbin(self.label_encoder_filename, "r") as file_handle: + filesystem = unpickle_fs(self.pickled_filesystem) + + model_folder = fs.path.dirname(self.model_path) + label_encoder_path = fs.path.join(model_folder, self.label_encoder_filename) + + with filesystem.openbin(label_encoder_path, "r") as file_handle: self._label_encoder = joblib.load(file_handle) + return self._label_encoder def add_predictions(self, eopatch: EOPatch, processed_features: np.ndarray, mask: np.ndarray) -> EOPatch: diff --git a/eogrow/utils/types.py b/eogrow/utils/types.py index f6e472bd..e7131d95 100644 --- a/eogrow/utils/types.py +++ b/eogrow/utils/types.py @@ -19,6 +19,17 @@ JsonDict = Dict[str, Any] RawSchemaDict = Dict[str, Any] +AwsAclType = Literal[ + "private", + "public-read", + "public-read-write", + "aws-exec-read", + "authenticated-read", + "bucket-owner-read", + "bucket-owner-full-control", + "log-delivery-write", +] + class ProcessingType(Enum): RAY = "ray" diff --git a/tests/test_core/test_storage.py b/tests/test_core/test_storage.py index a7eef3e9..0d474a96 100644 --- a/tests/test_core/test_storage.py +++ b/tests/test_core/test_storage.py @@ -4,8 +4,9 @@ import pytest from fs.osfs import OSFS +from fs_s3fs import S3FS -from eogrow.core.config import interpret_config_from_path +from eogrow.core.config import RawConfig, interpret_config_from_path from eogrow.core.storage import StorageManager pytestmark = pytest.mark.fast @@ -65,3 +66,20 @@ def test_get_custom_folder(local_storage_manager: StorageManager, project_folder abs_path = os.path.join(project_folder, "path", "to", "eopatches") assert local_storage_manager.get_folder("eopatches", full_path=True) == abs_path + + +@pytest.mark.parametrize( + "config", + [ + {"project_folder": "s3://fake-bucket/", "aws_acl": "bucket-owner-full-control"}, + {"project_folder": "s3://fake-bucket/"}, + {"project_folder": ".", "aws_acl": "public-read"}, + ], +) +def test_aws_acl(config: RawConfig): + storage = StorageManager.from_raw_config(config) + + if isinstance(storage.filesystem, S3FS): + config_acl = config.get("aws_acl") + filesystem_acl = None if storage.filesystem.upload_args is None else storage.filesystem.upload_args.get("ACL") + assert config_acl == filesystem_acl