Skip to content

Commit

Permalink
Merge pull request #137 from sentinel-hub/develop
Browse files Browse the repository at this point in the history
Release version 1.3.3
  • Loading branch information
zigaLuksic authored Nov 17, 2022
2 parents 1ae4e2b + e78dad0 commit f06ec3c
Show file tree
Hide file tree
Showing 54 changed files with 1,409 additions and 526 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
tests/test_project/*
!tests/test_project/input-data/
tests/test_project/input-data/import_test.tiff

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 1 addition & 1 deletion eogrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
The main init module
"""

__version__ = "1.3.2"
__version__ = "1.3.3"
8 changes: 3 additions & 5 deletions eogrow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import click

from .core.config import collect_configs_from_path, decode_config_list, encode_config_list, interpret_config_from_dict
from .core.schemas import build_minimal_template, build_schema_template
from .core.schemas import build_schema_template
from .pipelines.testing import TestPipeline
from .utils.general import jsonify
from .utils.meta import collect_schema, import_object, load_pipeline_class
Expand Down Expand Up @@ -180,7 +180,7 @@ def ray(
@click.option(
"--template-format",
"template_format",
type=click.Choice(["minimal", "open-api", "full"], case_sensitive=False),
type=click.Choice(["minimal", "open-api"], case_sensitive=False),
help="Specifies which template format to use. The default is `minimal`",
default="minimal",
)
Expand Down Expand Up @@ -223,10 +223,8 @@ def make_template(

if template_format == "open-api":
template = schema.schema()
elif template_format == "full":
template = build_schema_template(schema)
else:
template = build_minimal_template(
template = build_schema_template(
schema,
pipeline_import_path=import_path,
required_only=required_only,
Expand Down
20 changes: 14 additions & 6 deletions eogrow/core/area/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def get_area_dataframe(self, *, ignore_region_filter: bool = False) -> gpd.GeoDa
:return: A GeoDataFrame containing the unmodified area shape
"""
filename = fs.path.join(self.storage.get_input_data_folder(), self.config.area_filename)
with self.storage.filesystem.openbin(filename, "r") as file_handle:
area_df = gpd.read_file(file_handle)
with LocalFile(filename, mode="r", filesystem=self.storage.filesystem) as local_file:
area_df = gpd.read_file(local_file.path, engine=self.storage.config.geopandas_backend)

if self.has_region_filter() and not ignore_region_filter:
area_df = area_df[area_df[self.config.region_column_name].isin(self.config.region_names)]
Expand Down Expand Up @@ -174,7 +174,7 @@ def _load_area_geometry(self, filename: str) -> Geometry:
LOGGER.info("Loading cached area geometry from %s", filename)

with LocalFile(filename, mode="r", filesystem=self.storage.filesystem) as local_file:
area_gdf = gpd.read_file(local_file.path)
area_gdf = gpd.read_file(local_file.path, engine=self.storage.config.geopandas_backend)

return Geometry(area_gdf.geometry[0], CRS(area_gdf.crs))

Expand All @@ -185,7 +185,9 @@ def _save_area_geometry(self, area_geometry: Geometry, filename: str) -> None:
area_gdf = gpd.GeoDataFrame(geometry=[area_geometry.geometry], crs=area_geometry.crs.pyproj_crs())

with LocalFile(filename, mode="w", filesystem=self.storage.filesystem) as local_file:
area_gdf.to_file(local_file.path, driver="GPKG", encoding="utf-8")
area_gdf.to_file(
local_file.path, driver="GPKG", encoding="utf-8", engine=self.storage.config.geopandas_backend
)

def _create_and_save_grid(self, grid_filename: str, ignore_region_filter: bool) -> List[gpd.GeoDataFrame]:
"""Defines a new grid and saves it."""
Expand All @@ -208,7 +210,9 @@ def _load_grid(self, filename: str) -> List[gpd.GeoDataFrame]:
grid = []
with LocalFile(filename, mode="r", filesystem=self.storage.filesystem) as local_file:
for crs_layer in fiona.listlayers(local_file.path):
grid.append(gpd.read_file(local_file.path, layer=crs_layer))
grid.append(
gpd.read_file(local_file.path, layer=crs_layer, engine=self.storage.config.geopandas_backend)
)

return grid

Expand All @@ -219,7 +223,11 @@ def _save_grid(self, grid: List[gpd.GeoDataFrame], filename: str) -> None:
with LocalFile(filename, mode="w", filesystem=self.storage.filesystem) as local_file:
for crs_grid in grid:
crs_grid.to_file(
local_file.path, driver="GPKG", encoding="utf-8", layer=f"Grid EPSG:{crs_grid.crs.to_epsg()}"
local_file.path,
driver="GPKG",
encoding="utf-8",
layer=f"Grid EPSG:{crs_grid.crs.to_epsg()}",
engine=self.storage.config.geopandas_backend,
)

def _construct_file_path(self, *, prefix: str, suffix: str = "gpkg", ignore_region_filter: bool = False) -> str:
Expand Down
7 changes: 5 additions & 2 deletions eogrow/core/eopatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, config: Schema, area_manager: AreaManager):

self._area_manager = area_manager

# temporary, until area manager and eopatch manager are merged into a single manager
if isinstance(area_manager, BatchAreaManager) and not isinstance(self, BatchTileManager):
raise ValueError("To use `BatchAreaManager` you should use the `BatchTileManager` eopatch manager.")

self._name_to_id_map: Optional[bidict] = None
self._name_to_bbox_map: Optional[dict] = None

Expand All @@ -60,7 +64,7 @@ def _prepare_names(self) -> Tuple[bidict, Dict[str, BBox]]:
"""
bbox_grid = self._area_manager.get_grid(add_bbox_column=True)

bbox_df: DataFrame = pandas.concat(bbox_grid, ignore_index=True)
bbox_df: DataFrame = pandas.concat([gdf.drop(columns="geometry") for gdf in bbox_grid], ignore_index=True)

prepared_name_to_id_map = self.generate_names(bbox_df)
prepared_name_to_bbox_map = dict(zip(prepared_name_to_id_map, bbox_df["BBOX"]))
Expand Down Expand Up @@ -88,7 +92,6 @@ def is_eopatch_name(self, name: str) -> bool:
"""Checks if the given name (could be entire file path) is the name of an EOPatch
:param name: A name or a file path of a folder which could be one of EOPatches
:type name: str
"""
return os.path.basename(name) in self.name_to_id_map

Expand Down
2 changes: 1 addition & 1 deletion eogrow/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def run_procedure(self) -> Tuple[List[str], List[str]]:
raise NotImplementedError(
"Default implementation of run_procedure method requires implementation of build_workflow method"
)
workflow = self.build_workflow() # type: ignore[attr-defined]
workflow = self.build_workflow()
exec_args = self.get_execution_arguments(workflow)

finished, failed, _ = self.run_execution(workflow, exec_args)
Expand Down
79 changes: 4 additions & 75 deletions eogrow/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
as an internal class of the implemented pipeline class
"""
from inspect import isclass
from typing import Dict, List, Optional, Type
from typing import List, Optional, Type

from pydantic import BaseModel, Field
from pydantic.fields import ModelField
Expand Down Expand Up @@ -64,78 +64,7 @@ class PipelineSchema(BaseSchema):
)


def build_schema_template(schema: Type[BaseModel]) -> dict:
"""From a given schema class it creates a template of a config file. It does that by modifying
OpenAPI-style schema.
"""
openapi_schema = schema.schema()

template_mapping: dict = {}
model_schemas = openapi_schema.get("definitions", {})
for name, model_schema in model_schemas.items():
model_template = _process_model_schema(model_schema, template_mapping)
template_mapping[name] = model_template

return _process_model_schema(openapi_schema, template_mapping)


def _process_model_schema(openapi_schema: dict, template_mapping: Dict[str, dict]) -> dict:
"""Processes schema for a single model into a template"""
template = _get_basic_template(openapi_schema)

params = openapi_schema.get("properties", {})
required_params = set(openapi_schema.get("required", []))

for param_name, param_schema in params.items():
param_template = _get_basic_template(param_schema)

if param_name in required_params:
param_template["#required"] = True

referred_model_name = _get_referred_model_name(param_schema)
if referred_model_name:
# In case param_template and referred_model have some parameters in common the following prioritizes the
# ones from param_template
for model_param_name, model_param_value in template_mapping[referred_model_name].items():
if model_param_name not in param_template:
param_template[model_param_name] = model_param_value

template[param_name] = param_template

return template


_SUPPORTED_OPENAPI_FIELDS = {"title", "properties", "required", "definitions", "$ref", "allOf"}


def _get_basic_template(openapi_schema: dict) -> dict:
"""For an OpenAPI parameter schema it prepares a template with basic fields"""
template = {}
for key, value in openapi_schema.items(): # get metadata fields
if key in _SUPPORTED_OPENAPI_FIELDS or (key == "type" and value == "object"):
continue

template[f"#{key}"] = value

return template


def _get_referred_model_name(openapi_schema: dict) -> Optional[str]:
"""In a parameter schema it finds a reference to another model schema. If it doesn't exist it returns None"""
referred_path = openapi_schema.get("$ref")
if not referred_path:
schema_items = openapi_schema.get("items", {}) # items can be a list
referred_path = schema_items.get("$ref") if isinstance(schema_items, dict) else None

if not referred_path and "allOf" in openapi_schema:
referred_path = openapi_schema["allOf"][0].get("$ref")

if referred_path:
return referred_path.rsplit("/", 1)[-1]
return referred_path


def build_minimal_template(
def build_schema_template(
schema: Type[BaseModel],
required_only: bool = False,
pipeline_import_path: Optional[str] = None,
Expand All @@ -155,14 +84,14 @@ def build_minimal_template(
elif isclass(field.type_) and issubclass(field.type_, BaseModel):
# Contains a subschema in the nesting
if isclass(field.outer_type_) and issubclass(field.outer_type_, BaseModel):
template[name] = build_minimal_template(field.type_, **rec_flags)
template[name] = build_schema_template(field.type_, **rec_flags)
if description:
template[name]["<< description >>"] = description
else:
template[name] = {
"<< type >>": repr(field._type_display()),
"<< nested schema >>": str(field.type_),
"<< sub-template >>": build_minimal_template(field.type_, **rec_flags),
"<< sub-template >>": build_schema_template(field.type_, **rec_flags),
}
else:
template[name] = _field_description(field, description)
Expand Down
41 changes: 8 additions & 33 deletions eogrow/core/storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""
This module handles everything regarding storage of the data
"""
from io import StringIO
from typing import Dict, List, Optional
from typing import Dict, Literal, Optional

import fs
from fs.base import FS
Expand Down Expand Up @@ -43,6 +42,9 @@ class Schema(ManagerSchema, BaseSettings):
default_factory=dict,
description="A flat key: value store mapping each key to a path in the project.",
)
geopandas_backend: Literal["fiona", "pyogrio"] = Field(
"fiona", description="Which backend is used for IO operations when using geopandas."
)

class Config(ManagerSchema.Config):
case_sensitive = True
Expand Down Expand Up @@ -71,15 +73,15 @@ def _prepare_sh_config(self) -> SHConfig:
return sh_config

def _prepare_filesystem(self) -> FS:
"""Prepares the main instance of filesystem object which contains all additional configuration parameters."""
"""Prepares a filesystem object with the configuration parameters."""
fs_kwargs: Dict[str, str] = {}
if is_s3_path(self.config.project_folder) and self.config.aws_acl:
fs_kwargs["acl"] = self.config.aws_acl

return get_filesystem(self.config.project_folder, create=True, config=self.sh_config, **fs_kwargs)

def get_folder(self, key: str, full_path: bool = False) -> str:
"""Returns the path associated with a key in the structure config."""
"""Returns the path associated with the given key in the structure config."""
folder_path = self.config.structure[key]
self.filesystem.makedirs(folder_path, recreate=True)

Expand All @@ -88,9 +90,7 @@ def get_folder(self, key: str, full_path: bool = False) -> str:
return folder_path

def get_logs_folder(self, full_path: bool = False) -> str:
"""Method for obtaining the logs folder. Will store logs to the current folder.
Temporary solution until the logging to AWS is handled properly
"""
"""Method for obtaining the logs folder."""
return self.get_folder("logs", full_path=full_path)

def get_cache_folder(self, full_path: bool = False) -> str:
Expand All @@ -102,30 +102,5 @@ def get_input_data_folder(self, full_path: bool = False) -> str:
return self.get_folder("input_data", full_path=full_path)

def is_on_aws(self) -> bool:
"""Returns True if the project_folder is on S3, False otherwise."""
"""Returns True if the project_folder is on S3, False otherwise."""
return is_s3_path(self.config.project_folder)

def show_folder_structure(
self, show_files: bool = False, return_str: bool = False, exclude: Optional[List[str]] = None
) -> Optional[str]:
"""Shows how folder structure looks like at the moment. It will show all folders except EOPatch folders and
EOExecution report folders
:param show_files: If `True` it will show also files inside the folders. Note that the number of files may be
huge. By default, this is set to `False`.
:param return_str: If `True` it will return folder structure as a string. If `False` it will just print the
visualization to stdout.
:param exclude: A list of grep folder paths to exclude from the structure return.
Defaults to ['eopatch*', 'eoexecution-report*']
:return: Depending on return_str it will either return a string or None
"""
if exclude is None:
exclude = ["eopatch*", "eoexecution-report*"]

file_filter = None if show_files else [""]
io_object = StringIO() if return_str else None
self.filesystem.tree(max_levels=10, with_color=True, exclude=exclude, filter=file_filter, file=io_object)
if io_object:
io_object.seek(0)
return io_object.getvalue()
return None
4 changes: 2 additions & 2 deletions eogrow/pipelines/byoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _parse_sensing_time(cls, value: Optional[str], values: Dict[str, object]) ->

def __init__(self, config: Schema, raw_config: Optional[RawConfig] = None):
super().__init__(config, raw_config)
if not self.storage.is_on_aws:
if not self.storage.is_on_aws():
raise ValueError("Can only ingest for projects based on S3 storage.")
project_folder = self.storage.config.project_folder
self.bucket_name = project_folder.replace("s3://", "").split("/")[0]
Expand Down Expand Up @@ -156,7 +156,7 @@ def _get_cover_geometry(self, crs: CRS) -> Optional[Geometry]:
folder_path = self.storage.get_folder(self.config.cover_geometry_folder_key)
file_path = fs.path.join(folder_path, self.config.cover_geometry)
with self.storage.filesystem.openbin(file_path, "r") as file_handle:
self._cover_geometry_df = gpd.read_file(file_handle)
self._cover_geometry_df = gpd.read_file(file_handle, engine=self.storage.config.geopandas_backend)

return self._cover_geometry_df.to_crs(crs.pyproj_crs()).unary_union

Expand Down
2 changes: 1 addition & 1 deletion eogrow/pipelines/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def get_execution_arguments(self, workflow: EOWorkflow) -> List[Dict[EONode, Dic
for index, bbox in enumerate(bbox_list):
exec_args[index][download_node] = {"bbox": bbox}
if hasattr(self.config, "time_period"):
exec_args[index][download_node]["time_interval"] = self.config.time_period # type: ignore[attr-defined]
exec_args[index][download_node]["time_interval"] = self.config.time_period

return exec_args

Expand Down
3 changes: 3 additions & 0 deletions eogrow/pipelines/download_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ def run_procedure(self) -> Tuple[List[str], List[str]]:

processed = self._get_tile_names_from_results(results, BatchTileStatus.PROCESSED)
failed = self._get_tile_names_from_results(results, BatchTileStatus.FAILED)
log_msg = f"Successfully downloaded {len(processed)} tiles"
log_msg += f", but {len(failed)} tiles failed." if failed else "."
LOGGER.info(log_msg)
return processed, failed

def _create_or_collect_batch_request(self) -> BatchRequest:
Expand Down
Loading

0 comments on commit f06ec3c

Please sign in to comment.