Skip to content

Commit

Permalink
Blackified according to main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
GriffinBabe committed Mar 7, 2024
1 parent 9a9ab08 commit 1388d54
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 61 deletions.
4 changes: 1 addition & 3 deletions src/openeo_gfmap/fetching/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ def load_collection(
"http://"
), "Please provide a valid URL or a path to a GeoJSON file."
else:
raise ValueError(
"Please provide a valid URL to a GeoParquet or GeoJSON file."
)
raise ValueError("Please provide a valid URL to a GeoParquet or GeoJSON file.")
cube = connection.load_collection(
collection_id=collection_name,
temporal_extent=[temporal_extent.start_date, temporal_extent.end_date],
Expand Down
4 changes: 1 addition & 3 deletions src/openeo_gfmap/fetching/s1.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):
},
Backend.CDSE_STAGING: {
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(
get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"
),
"preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"),
},
}

Expand Down
4 changes: 1 addition & 3 deletions src/openeo_gfmap/fetching/s2.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ def s2_l2a_default_processor(cube: openeo.DataCube, **params):
},
Backend.CDSE_STAGING: {
"fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
"preprocessor": partial(
get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"
),
"preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"),
},
}

Expand Down
44 changes: 11 additions & 33 deletions src/openeo_gfmap/manager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ def _post_job_worker(self):
except queue.Empty:
continue
except KeyboardInterrupt:
_log.debug(
f"Worker thread {threading.current_thread().name} interrupted."
)
_log.debug(f"Worker thread {threading.current_thread().name} interrupted.")
return

def _update_statuses(self, df: pd.DataFrame):
Expand All @@ -106,19 +104,13 @@ def _update_statuses(self, df: pd.DataFrame):
if (df.loc[idx, "status"] in ["created", "queued", "running"]) and (
job_metadata["status"] == "finished"
):
_log.info(
f"Job {job.job_id} finished successfully, queueing on_job_done..."
)
_log.info(f"Job {job.job_id} finished successfully, queueing on_job_done...")
self._finished_job_queue.put((PostJobStatus.FINISHED, job, row))
df.loc[idx, "costs"] = job_metadata["costs"]

# Case in which it failed
if (df.loc[idx, "status"] != "error") and (
job_metadata["status"] == "error"
):
_log.info(
f"Job {job.job_id} finished with error, queueing on_job_error..."
)
if (df.loc[idx, "status"] != "error") and (job_metadata["status"] == "error"):
_log.info(f"Job {job.job_id} finished with error, queueing on_job_error...")
self._finished_job_queue.put((PostJobStatus.ERROR, job, row))
df.loc[idx, "costs"] = job_metadata["costs"]

Expand All @@ -141,9 +133,7 @@ def on_job_error(self, job: BatchJob, row: pd.Series):
title = job_metadata["title"]
job_id = job_metadata["id"]

output_log_path = (
Path(self._output_dir) / "failed_jobs" / f"{title}_{job_id}.log"
)
output_log_path = Path(self._output_dir) / "failed_jobs" / f"{title}_{job_id}.log"
output_log_path.parent.mkdir(parents=True, exist_ok=True)

if len(error_logs) > 0:
Expand All @@ -168,9 +158,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
_log.debug(
f"Generating output path for asset {asset.name} from job {job.job_id}..."
)
output_path = self._output_path_gen(
self._output_dir, temp_file.name, idx, row
)
output_path = self._output_path_gen(self._output_dir, temp_file.name, idx, row)
_log.debug(
f"Generated path for asset {asset.name} from job {job.job_id} -> {output_path}"
)
Expand All @@ -180,13 +168,9 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
shutil.move(temp_file.name, output_path)
# Add to the list of downloaded products
job_products[f"{job.job_id}_{asset.name}"] = [output_path]
_log.info(
f"Downloaded asset {asset.name} from job {job.job_id} -> {output_path}"
)
_log.info(f"Downloaded asset {asset.name} from job {job.job_id} -> {output_path}")
except Exception as e:
_log.exception(
f"Error downloading asset {asset.name} from job {job.job_id}", e
)
_log.exception(f"Error downloading asset {asset.name} from job {job.job_id}", e)
raise e
finally:
shutil.rmtree(temp_file.name, ignore_errors=True)
Expand All @@ -200,9 +184,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
item = pystac.read_file(item_metadata.get_self_href())
asset_path = job_products[f"{job.job_id}_{item.id}"][0]

assert (
len(item.assets.values()) == 1
), "Each item should only contain one asset"
assert len(item.assets.values()) == 1, "Each item should only contain one asset"
for asset in item.assets.values():
asset.href = str(
asset_path
Expand Down Expand Up @@ -249,18 +231,14 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
("description", None),
("costs", None),
]
new_columns = {
col: val for (col, val) in required_with_default if col not in df.columns
}
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)

_log.debug(f"Normalizing dataframe. Columns: {df.columns}")

return df

def run_jobs(
self, df: pd.DataFrame, start_job: Callable, output_file: Union[str, Path]
):
def run_jobs(self, df: pd.DataFrame, start_job: Callable, output_file: Union[str, Path]):
"""Starts the jobs defined in the dataframe and runs the `start_job` function on each job.
Parameters
Expand Down
4 changes: 1 addition & 3 deletions src/openeo_gfmap/manager/job_splitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import h3


def _resplit_group(
polygons: gpd.GeoDataFrame, max_points: int
) -> List[gpd.GeoDataFrame]:
def _resplit_group(polygons: gpd.GeoDataFrame, max_points: int) -> List[gpd.GeoDataFrame]:
"""Performs re-splitting of a dataset of polygons in a list of datasets"""
for i in range(0, len(polygons), max_points):
yield polygons.iloc[i : i + max_points].reset_index(drop=True)
Expand Down
17 changes: 4 additions & 13 deletions src/openeo_gfmap/preprocessing/compositing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,17 @@
import openeo


def median_compositing(
cube: openeo.DataCube, period: Union[str, list]
) -> openeo.DataCube:
def median_compositing(cube: openeo.DataCube, period: Union[str, list]) -> openeo.DataCube:
"""Perfrom median compositing on the given datacube."""
if isinstance(period, str):
return cube.aggregate_temporal_period(
period=period, reducer="median", dimension="t"
)
return cube.aggregate_temporal_period(period=period, reducer="median", dimension="t")
elif isinstance(period, list):
return cube.aggregate_temporal(
intervals=period, reducer="median", dimension="t"
)

return cube.aggregate_temporal(intervals=period, reducer="median", dimension="t")


def mean_compositing(cube: openeo.DataCube, period: str) -> openeo.DataCube:
"""Perfrom mean compositing on the given datacube."""
if isinstance(period, str):
return cube.aggregate_temporal_period(
period=period, reducer="mean", dimension="t"
)
return cube.aggregate_temporal_period(period=period, reducer="mean", dimension="t")
elif isinstance(period, list):
return cube.aggregate_temporal(intervals=period, reducer="mean", dimension="t")
4 changes: 1 addition & 3 deletions src/openeo_gfmap/stac/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,7 @@ def create_spatial_dimension(
"type": "application/x-netcdf",
"roles": ["data"],
"proj:shape": [64, 64],
"raster:bands": [
{"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16}
],
"raster:bands": [{"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16}],
}
)

Expand Down

0 comments on commit 1388d54

Please sign in to comment.