Skip to content

Commit

Permalink
Add preliminary pipeline class structure and move functions to proces…
Browse files Browse the repository at this point in the history
…sing.

Not everything works, this is mostly a progress save
  • Loading branch information
f-PLT committed Nov 19, 2024
1 parent 8097ac2 commit d362f83
Show file tree
Hide file tree
Showing 12 changed files with 1,374 additions and 1,191 deletions.
1 change: 1 addition & 0 deletions climateset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
DATA_DIR = PROJECT_ROOT / "data"
RAW_DATA = DATA_DIR / "raw"
PROCESSED_DATA = DATA_DIR / "processed"
FINAL_DATA = DATA_DIR / "final"
LOAD_DATA = DATA_DIR / "load"
META_DATA = DATA_DIR / "meta"
SCRIPT_DIR = PROJECT_ROOT / "scripts"
Expand Down
13 changes: 13 additions & 0 deletions climateset/processing/abstract_processor_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from abc import ABC, abstractmethod


class AbstractProcessorStep(ABC):
def __init__(self):
self.results_directory = None

@abstractmethod
def execute(self, input_directory):
pass

def get_results_directory(self):
return self.results_directory
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
LOGGER = create_logger(__name__)


class AbstractRawProcesser(ABC):
class AbstractRawProcessor(ABC):
"""Abstract class for raw processing."""

def __init__(
Expand Down Expand Up @@ -53,7 +53,7 @@ def type_class_meta(self) -> str:
"""Returns the name tag of the subclass."""

@abstractmethod
def preprocess_subdir(
def process_directory(
self,
input_dir: Path,
cleaned_dir: Path,
Expand Down Expand Up @@ -99,6 +99,6 @@ def add_processing_step(self, steps: Union[str, list]):
self.processing_steps.append(step)

def list_available_steps(self):
step_list = [step for step in self.available_steps.keys() if not step.startswith("_")]
step_list = [step for step in self.available_steps if not step.startswith("_")]
LOGGER.info(f"Available steps: {step_list}")
return step_list
12 changes: 3 additions & 9 deletions climateset/processing/raw/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,11 @@ def add_lonlat_check(self):
def add_variables_check(self):
self.checker_steps.append(VERIFY_VARIABLES)

def check_directory(self, log_file: Path = None, model: str = "") -> bool:
def check_directory(self) -> bool:
"""
Checking all files in a sub dir.
TAKES TIME.
Args:
sub_dir (Path): The dir that should be checked
log_file (Path): Where problematic files should be logged
mode (str): 'w' writes the logs into the file, 'a' appends it to
an existing file.
model (str): Default is "". Set this, if you want that only
the data of this model is checked.
Returns:
bool: True if all checks were successful, False if not
"""
Expand Down Expand Up @@ -374,7 +367,8 @@ def check_units_ok(self):
if found_unit != expected_unit:
self.results["units_ok"] = False
LOGGER.warning(
f"The following file has the unit {found_unit}, but the unit {expected_unit} was expected: \n{self.input_file}"
f"The following file has the unit {found_unit}, but the unit {expected_unit} "
f"was expected: \n{self.input_file}"
)
else:
self.results["units_ok"] = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import xmip.preprocessing as xmip_preprocessing
from tqdm import tqdm

from climateset.processing.raw.abstract_raw_processing import AbstractRawProcesser
from climateset.processing.raw.abstract_raw_processor import AbstractRawProcessor
from climateset.processing.raw.utils import create_generic_output_path


# Attention: we need to write _desired_units for our all our data
# Attention: Before you apply, make sure this is only applied to CMIP6 data!
class ClimateModelProcesser(AbstractRawProcesser):
class ClimateModelProcesser(AbstractRawProcessor):
"""
Can be called to apply xmip preprocessing.
Expand All @@ -36,7 +36,6 @@ def __init__(
correct_time_axis: bool = True,
correct_calendar: bool = True,
sum_levels: bool = True,
**kwargs,
):
"""Init function for xmip processer
Args:
Expand Down Expand Up @@ -111,7 +110,7 @@ def update_desired_units_dict(self, var: str, unit: str):
xmip.preprocessing._desired_units[var] = unit

# share this with input4mips?
def preprocess_subdir(
def process_directory(
self,
sub_dir: Path,
output_dir: Path,
Expand Down
Empty file.
Loading

0 comments on commit d362f83

Please sign in to comment.