From c77afcd3dc50bd331bfe9763869d4c20c5f922ec Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Fri, 4 Oct 2024 18:18:19 -0700 Subject: [PATCH 01/22] Deployment Engine --- echodataflow/deployment/deployment_demo.yaml | 159 +++++++++++++++++++ echodataflow/deployment/deployment_engine.py | 52 ++++++ echodataflow/models/deployment.py | 33 ++++ 3 files changed, 244 insertions(+) create mode 100644 echodataflow/deployment/deployment_demo.yaml create mode 100644 echodataflow/deployment/deployment_engine.py create mode 100644 echodataflow/models/deployment.py diff --git a/echodataflow/deployment/deployment_demo.yaml b/echodataflow/deployment/deployment_demo.yaml new file mode 100644 index 0000000..e56fc32 --- /dev/null +++ b/echodataflow/deployment/deployment_demo.yaml @@ -0,0 +1,159 @@ +# If provided, all the path use this as base path, else respective paths are used +out_path: s3://bucket-name/path/to/output +storage_options: + block_name: ABCD + block_type: AWS + +# All services configuration +services: +- name: service_raw_to_Sv + tags: [] + description: null #Optional description + schedule: + anchor_date: D{YYYYmmdd}T{HHMMSS} + interval_mins: 10 + logging: + handler: sv_log_handler + kafka: + topic: echodataflow_logs + servers: + - localhost:9092 + worker: + number_of_workers: 1 + stages: + - name: edf_Sv_pipeline + module: echodataflow.stages.subflows.Sv_pipeline # Optional module name, echodataflow to create a default flow for all the stages within it + tasks: + - name: echodataflow_open_raw + module: echodataflow.stages.subflows.open_raw + task_params: # All external parameters for the task, echodataflow will handle the mapping and validation + sonar_model: EK80 + xml_path: s3// + - name: echodataflow_compute_Sv + module: echodataflow.stages.subflows.compute_Sv + - name: echodataflow_add_depth + module: echodataflow.stages.subflows.add_depth + - name: echodataflow_add_location + module: echodataflow.stages.subflows.add_location + stage_params: + key: value + source: + urlpath: some_url # Single path, list or paths or a dict with some key and value as list of paths + parameters: + # Jinja support + window_options: + time_travel_hours: 20 + time_travel_mins: 0 + window_hours: 0 + window_mins: 40 + number_of_windows: 3 + storage_options: + block_name: ABCD + block_type: AWS + group: + file: s3:// + grouping_regex: # pattern for grouping files based on filename + storage_options: + block_name: ABCD + block_type: AWS + destination: + path: s3:// + storage_options: + block_name: ABCD + block_type: AWS + data_quality: + checks: + - name: NULL_CHECK + - name: MIN_MAX_CHECK + - name: echodataflow.data_quality.checks.custom_check + block_downstream: false + out_path: s3:// + storage_options: + block_name: ABCD + block_type: AWS + options: # Echodataflow master flow configuration options + save_offline: true + use_offline: true # Skip this process if zarr files are already present in the output directory. + group: False # Group Converted files based on Transect + prefect_config: # Configure any prefect related settings for a flow. For an exhaustive list of configurations refer . Task based configurations are optimized and handled by echodataflow + retries: 3 # Number of retries before failing the flow + task_runner: DaskTaskRunner(address=tcp://127.0.0.1:59487) # Configure Runner setting for this specific stage + +- name: service_produce_mvbs + tags: [] + description: null + schedule: + anchor_date: D{YYYYmmdd}T{HHMMSS} + interval_mins: 10 + logging: + handler: mvbs_log_handler + stage: + - name: stage_produce_mvbs + module: echodataflow.stages.subflows.compute_MVBS + source: + urlpath: s3:// + parameters: + # Jinja support + window_options: + time_travel_hours: 20 + time_travel_mins: 0 + window_hours: 0 + window_mins: 40 + number_of_windows: 3 + storage_options: + block_name: ABCD + block_type: AWS + detination: + path: s3:// + storage_options: + block_name: ABCD + block_type: AWS + +- name: service_prediction_to_nasc + tags: [] + description: null + schedule: + anchor_date: D{YYYYmmdd}T{HHMMSS} + interval_mins: 10 + stages: + - name: stage_prediction_to_nasc + module: echodataflow.stages.subflows.prediction_to_nasc + task: + - name: echodataflow_mask_prediction + module: echodataflow.stages.subflows.mask_prediction + - name: echodataflow_apply_mask + module: echodataflow.stages.subflows.apply_mask + - name: echodataflow_compute_NASC + module: echodataflow.stages.subflows.compute_NASC + source: + urlpath: s3:// + parameters: + # Jinja support + window_options: + time_travel_hours: 20 + time_travel_mins: 0 + window_hours: 0 + window_mins: 40 + number_of_windows: 3 + storage_options: + block_name: ABCD + block_type: AWS + destination: + path: s3:// + storage_options: + block_name: ABCD + block_type: AWS + external_params: # All external parameters for the stages, echodataflow will handle the mapping and validation + model_path: /Users/wu-junglee/HakeSurvey2024/models/binary_hake_model_1.0m_bottom_offset_1.0m_depth_2017_2019_ver_1.ckpt + +- name: service_file_transfer + tags: [] + description: null + schedule: + anchor_date: D{YYYYmmdd}T{HHMMSS} + interval_mins: 10 + stage: + - name: stage_file_transfer + module: echodataflow.stages.subflows.file_transfer + stage_params: + command: rclone copy --max-age 120m --no-traverse /Users/wu-junglee/SH24_replay_processed/ s3_hake:sh24replay --exclude /Sv_with_location/** diff --git a/echodataflow/deployment/deployment_engine.py b/echodataflow/deployment/deployment_engine.py new file mode 100644 index 0000000..89c851e --- /dev/null +++ b/echodataflow/deployment/deployment_engine.py @@ -0,0 +1,52 @@ +from pathlib import Path +from typing import Optional, Union + +from prefect import Flow + +from echodataflow.models.deployment import Deployment, Service +from echodataflow.utils.config_utils import extract_config, handle_storage_options +from echodataflow.utils.function_utils import dynamic_function_call + + +def deploy_echodataflow( + deployment_yaml: Union[dict, str, Path], + pipeline_yaml: Union[dict, str, Path], + datastore_yaml: Union[dict, str, Path], + logging_yaml: Optional[Union[dict, str, Path]] = None, + storage_options: Optional[dict] = None, +): + + storage_options = handle_storage_options(storage_options) + + if isinstance(deployment_yaml, Path): + deployment_yaml = str(deployment_yaml) + + if isinstance(deployment_yaml, str): + if not deployment_yaml.endswith((".yaml", ".yml")): + raise ValueError("Configuration file must be a YAML!") + deployment_yaml_dict = extract_config(deployment_yaml, storage_options) + elif isinstance(deployment_yaml, dict): + deployment_yaml_dict = deployment_yaml + + deployment = Deployment(**deployment_yaml_dict) + + for service in deployment.services: + _deploy_service(service, pipeline_yaml, datastore_yaml, logging_yaml, storage_options) + +def _deploy_service( + service: Service, + pipeline_yaml: Union[dict, str, Path], + datastore_yaml: Union[dict, str, Path], + logging_yaml: Optional[Union[dict, str, Path]] = None, + storage_options: Optional[dict] = None, +): + if service.name is None: + raise ValueError("Service name must be specified!") + + pipeline, datastore, logging = extract_service_details(service.name, pipeline_yaml, datastore_yaml, logging_yaml, storage_options) + + flow: Flow = dynamic_function_call(service.module, service.name) + flow.to_deployment() + + + pass \ No newline at end of file diff --git a/echodataflow/models/deployment.py b/echodataflow/models/deployment.py new file mode 100644 index 0000000..1e3b51e --- /dev/null +++ b/echodataflow/models/deployment.py @@ -0,0 +1,33 @@ +from typing import Dict, List, Optional +from pydantic import BaseModel, Field, field_validator + +class DeploymentSchedule(BaseModel): + anchor_date: str = Field(..., description="The start date from which the schedule will compute its intervals.") + interval: int = Field(..., gt=0, description="The number of minutes between each scheduled run.") + timezone: str = Field(..., description="Timezone for the schedule, e.g., 'UTC', 'America/New_York'.") + cron: str = Field(..., description="Cron expression for more complex scheduling.") + + @field_validator('cron') + def validate_cron(cls, value): + if not value.startswith('*/'): + raise ValueError("Invalid cron format") + return value + +class WorkPool(BaseModel): + name: str = Field(..., description="Name of the workpool.") + work_queue_name: Optional[str] = Field('default', description="Name of the work queue to use.") + job_variables: Dict = Field(default_factory=dict, description="Optional variables for jobs in the workpool.") + +class Service(BaseModel): + name: str = Field(..., description="Name of the service.") + module: str = Field(..., description="Python module containing the service definitions.") + schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.") + tags: List[str] = Field(default_factory=list, description="List of tags associated with the service.") + description: Optional[str] = Field(None, description="Description of the service.") + version: Optional[str] = Field(None, description="Version of the service.") + entrypoint: Optional[str] = Field(None, description="Entrypoint command or script for the service.") + parameters: Dict = Field(default_factory=dict, description="Parameters for the service.") + workpool: Optional[WorkPool] = Field(None, description="WorkPool configuration for the service.") + +class Deployment(BaseModel): + services: List[Service] = Field(..., description="List of services included in the deployment.") From f0871dd8dea65029db3c4cd7b5f90f6024416395 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Tue, 8 Oct 2024 09:48:08 -0700 Subject: [PATCH 02/22] Deployment Model --- echodataflow/models/deployment.py | 33 ------- .../models/deployment/data_quality.py | 16 ++++ echodataflow/models/deployment/deployment.py | 92 +++++++++++++++++++ .../models/deployment/deployment_schedule.py | 26 ++++++ echodataflow/models/deployment/source.py | 78 ++++++++++++++++ echodataflow/models/deployment/stage.py | 78 ++++++++++++++++ .../models/deployment/storage_options.py | 56 +++++++++++ 7 files changed, 346 insertions(+), 33 deletions(-) delete mode 100644 echodataflow/models/deployment.py create mode 100644 echodataflow/models/deployment/data_quality.py create mode 100644 echodataflow/models/deployment/deployment.py create mode 100644 echodataflow/models/deployment/deployment_schedule.py create mode 100644 echodataflow/models/deployment/source.py create mode 100644 echodataflow/models/deployment/stage.py create mode 100644 echodataflow/models/deployment/storage_options.py diff --git a/echodataflow/models/deployment.py b/echodataflow/models/deployment.py deleted file mode 100644 index 1e3b51e..0000000 --- a/echodataflow/models/deployment.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import Dict, List, Optional -from pydantic import BaseModel, Field, field_validator - -class DeploymentSchedule(BaseModel): - anchor_date: str = Field(..., description="The start date from which the schedule will compute its intervals.") - interval: int = Field(..., gt=0, description="The number of minutes between each scheduled run.") - timezone: str = Field(..., description="Timezone for the schedule, e.g., 'UTC', 'America/New_York'.") - cron: str = Field(..., description="Cron expression for more complex scheduling.") - - @field_validator('cron') - def validate_cron(cls, value): - if not value.startswith('*/'): - raise ValueError("Invalid cron format") - return value - -class WorkPool(BaseModel): - name: str = Field(..., description="Name of the workpool.") - work_queue_name: Optional[str] = Field('default', description="Name of the work queue to use.") - job_variables: Dict = Field(default_factory=dict, description="Optional variables for jobs in the workpool.") - -class Service(BaseModel): - name: str = Field(..., description="Name of the service.") - module: str = Field(..., description="Python module containing the service definitions.") - schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.") - tags: List[str] = Field(default_factory=list, description="List of tags associated with the service.") - description: Optional[str] = Field(None, description="Description of the service.") - version: Optional[str] = Field(None, description="Version of the service.") - entrypoint: Optional[str] = Field(None, description="Entrypoint command or script for the service.") - parameters: Dict = Field(default_factory=dict, description="Parameters for the service.") - workpool: Optional[WorkPool] = Field(None, description="WorkPool configuration for the service.") - -class Deployment(BaseModel): - services: List[Service] = Field(..., description="List of services included in the deployment.") diff --git a/echodataflow/models/deployment/data_quality.py b/echodataflow/models/deployment/data_quality.py new file mode 100644 index 0000000..2bff63d --- /dev/null +++ b/echodataflow/models/deployment/data_quality.py @@ -0,0 +1,16 @@ +from typing import List, Optional +from pydantic import BaseModel, Field, StrictBool + +from echodataflow.models.deployment.storage_options import StorageOptions + + + +class DataQualityCheck(BaseModel): + name: str = Field(..., description="Name of the data quality check.") + + +class DataQuality(BaseModel): + checks: Optional[List[DataQualityCheck]] = Field(None, description="List of data quality checks.") + block_downstream: StrictBool = Field(False, description="Block downstream processes on failure.") + out_path: Optional[str] = Field(None, description="Output path for data quality results.") + storage_options: Optional[StorageOptions] = Field(None, description="Storage options for data quality results.") \ No newline at end of file diff --git a/echodataflow/models/deployment/deployment.py b/echodataflow/models/deployment/deployment.py new file mode 100644 index 0000000..fc5ca8d --- /dev/null +++ b/echodataflow/models/deployment/deployment.py @@ -0,0 +1,92 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, Field, field_validator + +# Assuming these imports exist in your module +from echodataflow.models.deployment.deployment_schedule import DeploymentSchedule +from echodataflow.models.deployment.stage import Stage +from echodataflow.models.deployment.storage_options import StorageOptions + + +class EDFLogging(BaseModel): + """ + Model for defining logging configuration for a service. + + Attributes: + handler (str): The name of the logging handler. + kafka (Optional[Dict[str, Any]]): Kafka configuration details for logging. + """ + handler: str = Field(..., description="Logging handler name.") + kafka: Optional[Dict[str, Any]] = Field(None, description="Kafka logging configuration details.") + + @field_validator("handler", mode="before") + def validate_handler(cls, v): + if not v or not isinstance(v, str) or v.strip() == "": + raise ValueError("Logging handler must be a non-empty string.") + return v + + +class Service(BaseModel): + """ + Model for defining a service in the deployment pipeline. + + Attributes: + name (str): Name of the service. + tags (Optional[List[str]]): List of tags associated with the service. + description (Optional[str]): Description of the service. + schedule (Optional[DeploymentSchedule]): Scheduling details for the service. + stages (Optional[List[Stage]]): List of stages included in the service. + logging (Optional[EDFLogging]): Logging configuration for the service. + workpool (Optional[str]): WorkPool configuration for the service. + workqueue (Optional[str]): WorkQueue configuration for the service. + """ + name: str = Field(..., description="Name of the service.") + tags: Optional[List[str]] = Field(default_factory=list, description="List of tags associated with the service. Tags must be unique.") + description: Optional[str] = Field(None, description="Description of the service.") + schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.") + stages: List[Stage] = Field(None, description="List of stages included in the service.") + logging: Optional[EDFLogging] = Field(None, description="Logging configuration for the service.") + workpool: Optional[str] = Field('Echodataflow-Workpool', description="WorkPool configuration for the service.") + workqueue: Optional[str] = Field('default', description="WorkQueue configuration for the service.") + + # Validators + @field_validator("name", mode="before") + def validate_service_name(cls, v): + if not v or not isinstance(v, str) or v.strip() == "": + raise ValueError("Service name must be a non-empty string.") + return v + + @field_validator("tags", mode="before") + def validate_tags(cls, v): + if v: + if not isinstance(v, list): + raise ValueError("Tags must be a list of strings.") + # Remove duplicates and ensure all tags are non-empty strings + unique_tags = set() + for tag in v: + if not isinstance(tag, str) or tag.strip() == "": + raise ValueError("Each tag must be a non-empty string.") + unique_tags.add(tag.strip()) + return list(unique_tags) + return v + + +class Deployment(BaseModel): + """ + Model for defining a deployment with multiple services. + + Attributes: + out_path (Optional[str]): Base path for all services. + storage_options (Optional[StorageOptions]): Base storage options applied to all paths. + services (List[Service]): List of services included in the deployment. + """ + out_path: Optional[str] = Field(None, description="Base path for all services. This path will be used if no specific service path is defined.") + storage_options: Optional[StorageOptions] = Field(None, description="Base Storage options, applied to all paths.") + services: List[Service] = Field(..., description="List of services included in the deployment.") + + # Validators + @field_validator("services", mode="before") + def validate_services(cls, v): + if not isinstance(v, list) or not v: + raise ValueError("Services must be a non-empty list of Service objects.") + return v diff --git a/echodataflow/models/deployment/deployment_schedule.py b/echodataflow/models/deployment/deployment_schedule.py new file mode 100644 index 0000000..def64f7 --- /dev/null +++ b/echodataflow/models/deployment/deployment_schedule.py @@ -0,0 +1,26 @@ +from abc import ABC +from datetime import datetime +from pydantic import BaseModel, Field, field_validator + +class DeploymentSchedule(BaseModel): + anchor_date: datetime = Field(default_factory=datetime.now, description="The start date from which the schedule will compute its intervals in format DYYYYMMDD-THHMMSS.") + interval_mins: int = Field(..., gt=0, description="The number of minutes between each scheduled run.") + + @field_validator('anchor_date', mode='before') + def validate_and_convert_anchor_date(cls, v): + # Check if the value is already a datetime object + if isinstance(v, datetime): + return v + + # Expecting anchor_date in the format 'DYYYYMMDD-THHMMSS' + if not isinstance(v, str) or not v.startswith("D"): + raise ValueError("anchor_date must be a string in format 'DYYYYMMDD-THHMMSS'") + + # Remove the leading 'D' and split the date and time parts + try: + date_part, time_part = v[1:].split("-T") + # Convert to datetime object + formatted_date = datetime.strptime(f"{date_part} {time_part}", "%Y%m%d %H%M%S") + return formatted_date + except ValueError as e: + raise ValueError(f"Invalid anchor_date format: {v}. Error: {e}") \ No newline at end of file diff --git a/echodataflow/models/deployment/source.py b/echodataflow/models/deployment/source.py new file mode 100644 index 0000000..f97fcb1 --- /dev/null +++ b/echodataflow/models/deployment/source.py @@ -0,0 +1,78 @@ +from typing import Optional, Dict, Any, Union, List +import jinja2 +from pydantic import BaseModel, Field + +from echodataflow.models.deployment.storage_options import StorageOptions + +class Parameters(BaseModel): + """ + Model for defining parameters. + + Attributes: + ship_name (Optional[str]): The name of the ship. + survey_name (Optional[str]): The name of the survey. + sonar_model (Optional[str]): The model of the sonar. + file_name (Optional[Union[List[str], str]]): The name of the file(s). + """ + ship_name: Optional[str] = Field(None, description="The name of the ship conducting the survey.") + survey_name: Optional[str] = Field(None, description="The name of the survey or project.") + sonar_model: Optional[str] = Field(None, description="The sonar model used for data acquisition.") + file_name: Optional[Union[List[str], str]] = Field(None, description="Name of the file or a list of file names.") + +class Source(BaseModel): + """ + Model for defining the source of the data. + + Attributes: + urlpath (Optional[Union[str, Dict[str, List[str]]]]): URL path or path pattern for the source data. + parameters (Optional[Parameters]): Parameters to apply to the source path. + window_options (Optional[Dict[str, Any]]): Time window options for slicing the source data. + storage_options (Optional[StorageOptions]): Storage options for accessing the source data. + """ + urlpath: Union[str, Dict[str, List[str]]] = Field(..., description="Source URL path or folder structure of the data.") + parameters: Optional[Parameters] = Field(None, description="Parameters to apply to the source.") + window_options: Optional[Dict[str, Any]] = Field(None, description="Window options for the source.") + storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the source.") + + def render_urlpath(self) -> Union[str, Dict[str, List[str]]]: + """ + Render the URL path using the provided parameters. + + Returns: + Union[str, Dict[str, List[str]]]: Rendered URL path or a dictionary of rendered paths. + """ + # Initialize a Jinja environment + env = jinja2.Environment() + + # If `urlpath` is a string, render it with parameters + if isinstance(self.urlpath, str): + return self._render_template(self.urlpath, env) + + # If `urlpath` is a dictionary, render each value in the dictionary + elif isinstance(self.urlpath, dict): + rendered_dict = {} + for key, value in self.urlpath.items(): + # Assume value is a list of strings that need rendering + rendered_list = [self._render_template(v, env) for v in value] + rendered_dict[key] = rendered_list + return rendered_dict + + return self.urlpath + + def _render_template(self, template_str: str, env: jinja2.Environment) -> str: + """ + Render a single template string using Jinja2. + + Args: + template_str (str): Template string to be rendered. + env (jinja2.Environment): Jinja2 environment for rendering. + + Returns: + str: Rendered template string. + """ + template = env.from_string(template_str) + return template.render(self.parameters.dict() if self.parameters else {}) + + class Config: + # Allow arbitrary field types and definitions in nested dictionaries + arbitrary_types_allowed = True \ No newline at end of file diff --git a/echodataflow/models/deployment/stage.py b/echodataflow/models/deployment/stage.py new file mode 100644 index 0000000..f79c8d4 --- /dev/null +++ b/echodataflow/models/deployment/stage.py @@ -0,0 +1,78 @@ +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field + +# Import models assuming they are defined in the respective module paths +from echodataflow.models.deployment.data_quality import DataQuality +from echodataflow.models.deployment.source import Source +from echodataflow.models.deployment.storage_options import StorageOptions + + +# Define additional models required for Stage +class Destination(BaseModel): + """ + Model for defining the destination of the data. + + Attributes: + path (Optional[str]): The path where the data should be stored. + storage_options (Optional[StorageOptions]): Storage options for the destination path. + """ + path: Optional[str] = Field(None, description="Destination path of the data.") + storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the destination.") + + +class Group(BaseModel): + """ + Model for defining the grouping of data in the pipeline. + + Attributes: + file (Optional[str]): The file path used for grouping operations. + grouping_regex (Optional[str]): Regex pattern for grouping files based on filenames. + storage_options (Optional[StorageOptions]): Storage options for grouping operations. + """ + file: Optional[str] = Field(None, description="File path for grouping operations.") + grouping_regex: Optional[str] = Field(None, description="Regex pattern for grouping files based on filename.") + storage_options: Optional[StorageOptions] = Field(None, description="Storage options for grouping.") + + +class Task(BaseModel): + """ + Model for defining a task in the data pipeline. + + Attributes: + name (str): Name of the task. This is a required field. + module (Optional[str]): Python module containing the task definition. + task_params (Optional[Dict[str, Any]]): Additional parameters for configuring the task. + """ + name: str = Field(..., description="Name of the task.") + module: str = Field(..., description="Python module containing the task definition.") + task_params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Parameters for the task.") + + +class Stage(BaseModel): + """ + Model for defining a stage in a data pipeline. + + Attributes: + name (str): Name of the stage. + module (str): Python module containing the service definitions. + stage_params (Optional[Dict[str, Any]]): Parameters for configuring the stage. + source (Optional[Source]): Source of the data. + group (Optional[Group]): Grouping of the data. + destination (Optional[Destination]): Destination of the data. + data_quality (Optional[DataQuality]): Data quality checks configuration. + options (Optional[Dict[str, Any]]): Additional options for configuring the stage. + prefect_config (Optional[Dict[str, Any]]): Prefect configuration for the stage. + tasks (Optional[List[Task]]): List of tasks to be executed in the stage. + """ + name: str = Field(..., description="Name of the stage. This is a required field and should be unique.") + module: str = Field(..., description="Python module containing the service definitions. E.g., 'echodataflow.stages.subflows'.") + stage_params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Dictionary of parameters to configure the stage.") + source: Source = Field(..., description="Source of the data. Must be a valid Source object or None.") + group: Optional[Group] = Field(None, description="Grouping of the data. Must be a valid Group object or None.") + destination: Optional[Destination] = Field(None, description="Destination of the data. Must be a valid Destination object or None.") + data_quality: Optional[DataQuality] = Field(None, description="Data quality checks configuration.") + options: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional options for the stage. Used for stage-specific configuration.") + prefect_config: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Prefect configuration for managing flow control in the stage.") + tasks: Optional[List[Task]] = Field(default_factory=list, description="List of tasks to be executed in the stage.") + + \ No newline at end of file diff --git a/echodataflow/models/deployment/storage_options.py b/echodataflow/models/deployment/storage_options.py new file mode 100644 index 0000000..15af560 --- /dev/null +++ b/echodataflow/models/deployment/storage_options.py @@ -0,0 +1,56 @@ +from enum import Enum +from typing import Optional +from pydantic import BaseModel, Field, StrictBool, model_validator + + +class StorageType(str, Enum): + """ + Enumeration for storage types. + + Attributes: + AWS: Amazon Web Services storage type. + AZCosmos: Azure storage type. + GCP: Google Cloud Platform storage type. + ECHODATAFLOW: Echodataflow-specific storage type. + EDFRUN: Echodataflow runtime storage type. + """ + AWS = "AWS" + AZCosmos = "AZCosmos" + GCP = "GCP" + ECHODATAFLOW = "ECHODATAFLOW" + EDFRUN = "EDFRUN" + + +class StorageOptions(BaseModel): + """ + Model for defining storage options. + + Attributes: + block_type (Optional[StorageType]): The type of storage. Must be one of the defined StorageType enumeration values. + block_name (Optional[str]): The name of the block. Must be between 3 and 100 characters. + anon (Optional[StrictBool]): Whether to use anonymous access. Default is False. + """ + block_type: Optional[StorageType] = Field(None, description="The type of storage. Must be one of the defined StorageType enumeration values.") + block_name: Optional[str] = Field(None, description="The name of the storage block.") + anon: StrictBool = Field(False, description="Whether to use anonymous access. Default is False.") + + # Model-wide validator to ensure logical dependencies between fields + @model_validator(mode='before') + def validate_storage_options(cls, values): + storage_type = values.get("block_type") + block_name = values.get("block_name") + + if storage_type is None and block_name: + raise ValueError("block_name cannot be set if type is None.") + + if storage_type and not block_name: + raise ValueError(f"A block_name must be provided when storage type is set to '{storage_type}'.") + + return values + + class Config: + use_enum_values = True + + def __repr__(self): + return f"StorageOptions(type={self.block_type}, block_name={self.block_name}, anon={self.anon})" + From 57c0811e6f0b9113ca29e29f288b93ce513c6b9b Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Tue, 8 Oct 2024 09:49:49 -0700 Subject: [PATCH 03/22] Prefect 3.0 Upgrade --- deployment/docker_trigger.py | 4 ++-- echodataflow/docker_trigger.py | 4 ++-- echodataflow/stages/echodataflow_trigger.py | 4 ++-- echodataflow/stages/subflows/initialization_flow.py | 4 ++-- echodataflow/stages/subflows/slice_store.py | 1 - echodataflow/stages/subflows/write_output.py | 4 ++-- requirements.txt | 4 ++-- 7 files changed, 12 insertions(+), 13 deletions(-) diff --git a/deployment/docker_trigger.py b/deployment/docker_trigger.py index 1729115..4bcd63f 100644 --- a/deployment/docker_trigger.py +++ b/deployment/docker_trigger.py @@ -4,10 +4,10 @@ from echoflow import echoflow_start from echoflow.stages.echoflow_trigger import echoflow_trigger from prefect import flow -from prefect.task_runners import SequentialTaskRunner +from prefect.task_runners import ThreadPoolTaskRunner from typing import Any, Dict, Optional, Union -@flow(name="Docker-Trigger", task_runner=SequentialTaskRunner()) +@flow(name="Docker-Trigger", task_runner=ThreadPoolTaskRunner(max_workers=1)) def docker_trigger( dataset_config: Union[dict, str, Path], pipeline_config: Union[dict, str, Path], diff --git a/echodataflow/docker_trigger.py b/echodataflow/docker_trigger.py index fe259b0..f1c9d67 100644 --- a/echodataflow/docker_trigger.py +++ b/echodataflow/docker_trigger.py @@ -1,10 +1,10 @@ from pathlib import Path from echodataflow.stages.echodataflow_trigger import echodataflow_trigger from prefect import flow -from prefect.task_runners import SequentialTaskRunner +from prefect.task_runners import ThreadPoolTaskRunner from typing import Any, Dict, Optional, Union -@flow(name="docker-trigger-latest", task_runner=SequentialTaskRunner()) +@flow(name="docker-trigger-latest", task_runner=ThreadPoolTaskRunner(max_workers=1)) def docker_trigger( dataset_config: Union[dict, str, Path], pipeline_config: Union[dict, str, Path], diff --git a/echodataflow/stages/echodataflow_trigger.py b/echodataflow/stages/echodataflow_trigger.py index adfa04b..81fa90b 100644 --- a/echodataflow/stages/echodataflow_trigger.py +++ b/echodataflow/stages/echodataflow_trigger.py @@ -20,7 +20,7 @@ from fastapi.encoders import jsonable_encoder from prefect import flow -from prefect.task_runners import SequentialTaskRunner +from prefect.task_runners import ThreadPoolTaskRunner from prefect.blocks.core import Block from prefect.variables import Variable @@ -38,7 +38,7 @@ from .subflows.initialization_flow import init_flow -@flow(name="Echodataflow", task_runner=SequentialTaskRunner()) +@flow(name="Echodataflow", task_runner=ThreadPoolTaskRunner(max_workers=1)) def echodataflow_trigger( dataset_config: Union[dict, str, Path], pipeline_config: Union[dict, str, Path], diff --git a/echodataflow/stages/subflows/initialization_flow.py b/echodataflow/stages/subflows/initialization_flow.py index a8eaa7c..5e61626 100644 --- a/echodataflow/stages/subflows/initialization_flow.py +++ b/echodataflow/stages/subflows/initialization_flow.py @@ -27,7 +27,7 @@ from distributed import Client, LocalCluster from fastapi.encoders import jsonable_encoder from prefect import flow -from prefect.task_runners import SequentialTaskRunner +from prefect.task_runners import ThreadPoolTaskRunner from prefect_dask import DaskTaskRunner from echodataflow.aspects.echodataflow_aspect import echodataflow @@ -51,7 +51,7 @@ -@flow(name="Initialization", task_runner=SequentialTaskRunner()) +@flow(name="Initialization", task_runner=ThreadPoolTaskRunner(max_workers=1)) @echodataflow(type="FLOW") def init_flow(pipeline: Recipe, config: Dataset, json_data_path: Optional[str] = None): """ diff --git a/echodataflow/stages/subflows/slice_store.py b/echodataflow/stages/subflows/slice_store.py index 4d7f1cc..04f0768 100644 --- a/echodataflow/stages/subflows/slice_store.py +++ b/echodataflow/stages/subflows/slice_store.py @@ -19,7 +19,6 @@ from typing import Dict, Optional from prefect import flow -from prefect.task_runners import SequentialTaskRunner from prefect.variables import Variable import xarray as xr import pandas as pd diff --git a/echodataflow/stages/subflows/write_output.py b/echodataflow/stages/subflows/write_output.py index 3f596fb..2e7f513 100644 --- a/echodataflow/stages/subflows/write_output.py +++ b/echodataflow/stages/subflows/write_output.py @@ -1,7 +1,7 @@ from typing import Dict, Optional from prefect import flow import xarray as xr -from prefect.task_runners import SequentialTaskRunner +from prefect.task_runners import ThreadPoolTaskRunner import zarr.sync from echodataflow.models.datastore import Dataset from echodataflow.models.output_model import ErrorObject, Group @@ -11,7 +11,7 @@ from numcodecs import Zlib import zarr.storage -@flow(task_runner=SequentialTaskRunner()) +@flow(task_runner=ThreadPoolTaskRunner(max_workers=1)) def write_output(groups: Dict[str, Group], config: Dataset, stage: Stage, prev_stage: Optional[Stage]): log_util.log( msg={ diff --git a/requirements.txt b/requirements.txt index 3ff9b9b..1954c17 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ requests -prefect<3 -echopype>=0.6.3 +prefect +echopype jinja2 prefect-dask pydantic From bdd0bd4e1c3ded0943a2fcde8965dacaaa6e4f01 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Tue, 8 Oct 2024 09:51:02 -0700 Subject: [PATCH 04/22] Deployment Engine --- echodataflow/deployment/deployment_engine.py | 73 ++++++++++---------- echodataflow/deployment/service.py | 11 +++ echodataflow/utils/config_utils.py | 16 ++++- 3 files changed, 61 insertions(+), 39 deletions(-) create mode 100644 echodataflow/deployment/service.py diff --git a/echodataflow/deployment/deployment_engine.py b/echodataflow/deployment/deployment_engine.py index 89c851e..1ef6ae7 100644 --- a/echodataflow/deployment/deployment_engine.py +++ b/echodataflow/deployment/deployment_engine.py @@ -1,52 +1,49 @@ from pathlib import Path from typing import Optional, Union +from echodataflow.models.deployment.deployment import Deployment, Service +from echodataflow.deployment.service import edf_service +from echodataflow.utils.config_utils import handle_storage_options, parse_yaml_config +from prefect.client.schemas.schedules import IntervalSchedule +from prefect.client.schemas.actions import DeploymentScheduleCreate +from datetime import timedelta -from prefect import Flow -from echodataflow.models.deployment import Deployment, Service -from echodataflow.utils.config_utils import extract_config, handle_storage_options -from echodataflow.utils.function_utils import dynamic_function_call - - -def deploy_echodataflow( +async def deploy_echodataflow( deployment_yaml: Union[dict, str, Path], - pipeline_yaml: Union[dict, str, Path], - datastore_yaml: Union[dict, str, Path], logging_yaml: Optional[Union[dict, str, Path]] = None, - storage_options: Optional[dict] = None, + storage_options: Optional[dict] = None ): - storage_options = handle_storage_options(storage_options) - - if isinstance(deployment_yaml, Path): - deployment_yaml = str(deployment_yaml) - - if isinstance(deployment_yaml, str): - if not deployment_yaml.endswith((".yaml", ".yml")): - raise ValueError("Configuration file must be a YAML!") - deployment_yaml_dict = extract_config(deployment_yaml, storage_options) - elif isinstance(deployment_yaml, dict): - deployment_yaml_dict = deployment_yaml - - deployment = Deployment(**deployment_yaml_dict) - - for service in deployment.services: - _deploy_service(service, pipeline_yaml, datastore_yaml, logging_yaml, storage_options) - -def _deploy_service( + deployment_dict = parse_yaml_config(deployment_yaml, storage_options=storage_options) + logging_dict = parse_yaml_config(logging_yaml, storage_options=storage_options) if logging_yaml else None + + deployment = Deployment(**deployment_dict) + + for service in deployment.services: + await _deploy_service(service, logging_dict) + +async def _deploy_service( service: Service, - pipeline_yaml: Union[dict, str, Path], - datastore_yaml: Union[dict, str, Path], - logging_yaml: Optional[Union[dict, str, Path]] = None, - storage_options: Optional[dict] = None, + logging_dict: Optional[dict] = None ): - if service.name is None: + if not service.name: raise ValueError("Service name must be specified!") - pipeline, datastore, logging = extract_service_details(service.name, pipeline_yaml, datastore_yaml, logging_yaml, storage_options) + edf_service_fn = edf_service.with_options(name=service.name, description=service.description) + + schedule = [DeploymentScheduleCreate(schedule=IntervalSchedule(interval=timedelta(minutes=service.schedule.interval_mins), anchor_date=service.schedule.anchor_date))] + + # TODO: Create workpool and workqueue if they don't exist + + deployment = await edf_service_fn.to_deployment( + name=service.name, + parameters={"stages": service.stages, "edf_logger": logging_dict}, + work_queue_name=service.workqueue, + work_pool_name=service.workpool, + tags=service.tags, + schedules=schedule + ) - flow: Flow = dynamic_function_call(service.module, service.name) - flow.to_deployment() + uuid = await deployment.apply() - - pass \ No newline at end of file + print(f"Successfully deployed service: {service.name} with deployment name: {uuid}") \ No newline at end of file diff --git a/echodataflow/deployment/service.py b/echodataflow/deployment/service.py new file mode 100644 index 0000000..6116ee8 --- /dev/null +++ b/echodataflow/deployment/service.py @@ -0,0 +1,11 @@ +from typing import List, Dict, Any, Optional +from prefect import flow + +from echodataflow.models.deployment.stage import Stage + +@flow(name="Echodataflow-Service") +def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]): + """ + Service for EDF + """ + pass \ No newline at end of file diff --git a/echodataflow/utils/config_utils.py b/echodataflow/utils/config_utils.py index 54853ab..7deef7c 100644 --- a/echodataflow/utils/config_utils.py +++ b/echodataflow/utils/config_utils.py @@ -33,6 +33,7 @@ import itertools as it import json import os +from pathlib import Path import re from typing import Any, Coroutine, Dict, List, Literal, Optional, Union from zipfile import ZipFile @@ -55,7 +56,6 @@ nest_asyncio.apply() -@task def extract_config( config: Union[Dict[str, Any], str], storage_options: Dict[str, Any] = {} ) -> Dict[str, Any]: @@ -585,6 +585,20 @@ def load_block(name: str = None, type: StorageType = None): block = coro return block +def parse_yaml_config(config: Union[dict, str, Path], storage_options: Dict[str, Any]) -> Dict: + if isinstance(config, Path) or isinstance(config, str): + config = convert_path_to_str(config) + validate_yaml_file(config) + return extract_config(config, storage_options) + return config + +def convert_path_to_str(config: Union[str, Path]) -> str: + return str(config) if isinstance(config, Path) else config + +def validate_yaml_file(config_str) -> None: + if not config_str.endswith((".yaml", ".yml")): + raise ValueError("Configuration file must be a YAML!") + def sanitize_external_params(config: Dataset, external_params: Dict[str, Any]): """ Validates external parameters to ensure they do not contain invalid file paths. From 42102fa46c5bee0012b10636347f2aa593a8ab04 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Wed, 9 Oct 2024 11:24:45 -0700 Subject: [PATCH 05/22] Updated sample deployment.yaml --- echodataflow/deployment/deployment_demo.yaml | 107 ++++++++++--------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/echodataflow/deployment/deployment_demo.yaml b/echodataflow/deployment/deployment_demo.yaml index e56fc32..d300a53 100644 --- a/echodataflow/deployment/deployment_demo.yaml +++ b/echodataflow/deployment/deployment_demo.yaml @@ -1,5 +1,5 @@ # If provided, all the path use this as base path, else respective paths are used -out_path: s3://bucket-name/path/to/output +base_path: s3://bucket-name/path/to/output storage_options: block_name: ABCD block_type: AWS @@ -7,10 +7,11 @@ storage_options: # All services configuration services: - name: service_raw_to_Sv - tags: [] + tags: + - raw2Sv description: null #Optional description schedule: - anchor_date: D{YYYYmmdd}T{HHMMSS} + anchor_date: D20241007-T035100 interval_mins: 10 logging: handler: sv_log_handler @@ -18,8 +19,7 @@ services: topic: echodataflow_logs servers: - localhost:9092 - worker: - number_of_workers: 1 + stages: - name: edf_Sv_pipeline module: echodataflow.stages.subflows.Sv_pipeline # Optional module name, echodataflow to create a default flow for all the stages within it @@ -36,7 +36,15 @@ services: - name: echodataflow_add_location module: echodataflow.stages.subflows.add_location stage_params: + prefect_config: # Configure any prefect related settings for a flow. For an exhaustive list of configurations refer . Task based configurations are optimized and handled by echodataflow + retries: 3 # Number of retries before failing the flow + task_runner: DaskTaskRunner(address=tcp://127.0.0.1:59487) # Configure Runner setting for this specific stage key: value + ky2: value2 + options: # Echodataflow master flow configuration options + save_offline: true + use_offline: true # Skip this process if zarr files are already present in the output directory. + group: False # Group Converted files based on Transect source: urlpath: some_url # Single path, list or paths or a dict with some key and value as list of paths parameters: @@ -71,23 +79,23 @@ services: storage_options: block_name: ABCD block_type: AWS - options: # Echodataflow master flow configuration options - save_offline: true - use_offline: true # Skip this process if zarr files are already present in the output directory. - group: False # Group Converted files based on Transect - prefect_config: # Configure any prefect related settings for a flow. For an exhaustive list of configurations refer . Task based configurations are optimized and handled by echodataflow - retries: 3 # Number of retries before failing the flow - task_runner: DaskTaskRunner(address=tcp://127.0.0.1:59487) # Configure Runner setting for this specific stage + + + + + + - name: service_produce_mvbs - tags: [] - description: null + tags: + - mvbs + description: MVBS schedule: - anchor_date: D{YYYYmmdd}T{HHMMSS} + anchor_date: D20241007-T035100 interval_mins: 10 logging: handler: mvbs_log_handler - stage: + stages: - name: stage_produce_mvbs module: echodataflow.stages.subflows.compute_MVBS source: @@ -103,57 +111,58 @@ services: storage_options: block_name: ABCD block_type: AWS - detination: + destination: path: s3:// storage_options: block_name: ABCD block_type: AWS - name: service_prediction_to_nasc - tags: [] - description: null + tags: + - nasc + description: NASC schedule: - anchor_date: D{YYYYmmdd}T{HHMMSS} + anchor_date: D20241007-T035100 interval_mins: 10 stages: - name: stage_prediction_to_nasc module: echodataflow.stages.subflows.prediction_to_nasc - task: + tasks: - name: echodataflow_mask_prediction module: echodataflow.stages.subflows.mask_prediction + task_params: + model_path: /Users/wu-junglee/HakeSurvey2024/models/binary_hake_model_1.0m_bottom_offset_1.0m_depth_2017_2019_ver_1.ckpt - name: echodataflow_apply_mask module: echodataflow.stages.subflows.apply_mask - name: echodataflow_compute_NASC module: echodataflow.stages.subflows.compute_NASC - source: - urlpath: s3:// - parameters: - # Jinja support - window_options: - time_travel_hours: 20 - time_travel_mins: 0 - window_hours: 0 - window_mins: 40 - number_of_windows: 3 - storage_options: - block_name: ABCD - block_type: AWS - destination: + source: + urlpath: s3:// + parameters: + # Jinja support + window_options: + time_travel_hours: 20 + time_travel_mins: 0 + window_hours: 0 + window_mins: 40 + number_of_windows: 3 + storage_options: + block_name: ABCD + block_type: AWS + destination: path: s3:// storage_options: block_name: ABCD block_type: AWS - external_params: # All external parameters for the stages, echodataflow will handle the mapping and validation - model_path: /Users/wu-junglee/HakeSurvey2024/models/binary_hake_model_1.0m_bottom_offset_1.0m_depth_2017_2019_ver_1.ckpt - -- name: service_file_transfer - tags: [] - description: null - schedule: - anchor_date: D{YYYYmmdd}T{HHMMSS} - interval_mins: 10 - stage: - - name: stage_file_transfer - module: echodataflow.stages.subflows.file_transfer - stage_params: - command: rclone copy --max-age 120m --no-traverse /Users/wu-junglee/SH24_replay_processed/ s3_hake:sh24replay --exclude /Sv_with_location/** + +# - name: service_file_transfer +# tags: [] +# description: null +# schedule: +# anchor_date: D{YYYYmmdd}T{HHMMSS} +# interval_mins: 10 +# stage: +# - name: stage_file_transfer +# module: echodataflow.stages.subflows.file_transfer +# stage_params: +# command: rclone copy --max-age 120m --no-traverse /Users/wu-junglee/SH24_replay_processed/ s3_hake:sh24replay --exclude /Sv_with_location/** From c10b3699b06a397d4fa61942625a9df28b660d9a Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Wed, 9 Oct 2024 14:52:08 -0700 Subject: [PATCH 06/22] Deployment Demo Notebook --- .../deployment/deployment_engine_demo.ipynb | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 echodataflow/deployment/deployment_engine_demo.ipynb diff --git a/echodataflow/deployment/deployment_engine_demo.ipynb b/echodataflow/deployment/deployment_engine_demo.ipynb new file mode 100644 index 0000000..8a3728e --- /dev/null +++ b/echodataflow/deployment/deployment_engine_demo.ipynb @@ -0,0 +1,62 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "c:\\Users\\soham\\anaconda3\\envs\\echodataflow_redesign\\lib\\site-packages\\pydantic\\json_schema.py:2191: PydanticJsonSchemaWarning: Default value defaultdict(, {}) is not JSON serializable; excluding default from JSON schema [non-serializable-default]\n", + " warnings.warn(message, PydanticJsonSchemaWarning)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Successfully deployed service: service_raw_to_Sv with deployment name: cae76655-9784-482a-8205-9df0c8c5ec1a\n", + "Successfully deployed service: service_produce_mvbs with deployment name: cfc64a9e-aa22-46bc-902a-5269f678e7f9\n", + "Successfully deployed service: service_prediction_to_nasc with deployment name: a6d605c4-2ee6-4183-9d79-436ae6bd2bd4\n" + ] + } + ], + "source": [ + "from echodataflow.deployment.deployment_engine import deploy_echodataflow\n", + "\n", + "config_file = 'C:/Users/soham/Desktop/Soham/Projects/OSO/temp/echodataflow/echodataflow/deployment/deployment_demo.yaml'\n", + "d = await deploy_echodataflow(deployment_yaml=config_file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "echodataflow_redesign", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.15" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 44f1e77dcdd2b99e74fdef6c764246b4128b2ab8 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 10 Oct 2024 11:04:06 -0700 Subject: [PATCH 07/22] Refactored FileSystem Utils --- echodataflow/__init__.py | 3 +- echodataflow/extensions/file_monitor.py | 4 +- echodataflow/stages/echodataflow.py | 2 +- echodataflow/stages/echodataflow_trigger.py | 3 +- echodataflow/utils/config_utils.py | 75 -------------- echodataflow/utils/filesystem_utils.py | 105 ++++++++++++++++++++ 6 files changed, 111 insertions(+), 81 deletions(-) create mode 100644 echodataflow/utils/filesystem_utils.py diff --git a/echodataflow/__init__.py b/echodataflow/__init__.py index d02aaaf..96298f5 100644 --- a/echodataflow/__init__.py +++ b/echodataflow/__init__.py @@ -7,9 +7,10 @@ echodataflow_create_prefect_profile, echodataflow_start, get_active_profile, load_profile) -from .utils.config_utils import extract_fs, glob_url, load_block +from .utils.config_utils import extract_fs, glob_url from .utils.file_utils import get_ed_list, get_last_run_output, get_zarr_list from .docker_trigger import docker_trigger +from .utils.filesystem_utils import load_block try: VERSION = get_distribution(__name__).version diff --git a/echodataflow/extensions/file_monitor.py b/echodataflow/extensions/file_monitor.py index 22f4ae3..6079579 100644 --- a/echodataflow/extensions/file_monitor.py +++ b/echodataflow/extensions/file_monitor.py @@ -20,8 +20,8 @@ from echodataflow.models.datastore import StorageType from echodataflow.models.output_model import Output from echodataflow.models.run import EDFRun, FileDetails -from echodataflow.utils.config_utils import (glob_url, handle_storage_options, - load_block) +from echodataflow.utils.config_utils import glob_url +from echodataflow.utils.filesystem_utils import load_block, handle_storage_options from echodataflow.utils.file_utils import extract_fs diff --git a/echodataflow/stages/echodataflow.py b/echodataflow/stages/echodataflow.py index b08aca1..5806627 100644 --- a/echodataflow/stages/echodataflow.py +++ b/echodataflow/stages/echodataflow.py @@ -47,7 +47,7 @@ ) import echopype as ep -from echodataflow.utils.config_utils import load_block +from echodataflow.utils.filesystem_utils import load_block from echodataflow.stages.echodataflow_trigger import echodataflow_trigger diff --git a/echodataflow/stages/echodataflow_trigger.py b/echodataflow/stages/echodataflow_trigger.py index 81fa90b..5e22677 100644 --- a/echodataflow/stages/echodataflow_trigger.py +++ b/echodataflow/stages/echodataflow_trigger.py @@ -31,9 +31,8 @@ from echodataflow.utils.config_utils import ( check_config, extract_config, - get_storage_options, - load_block, ) +from echodataflow.utils.filesystem_utils import load_block, get_storage_options from .subflows.initialization_flow import init_flow diff --git a/echodataflow/utils/config_utils.py b/echodataflow/utils/config_utils.py index 7deef7c..c187552 100644 --- a/echodataflow/utils/config_utils.py +++ b/echodataflow/utils/config_utils.py @@ -510,81 +510,6 @@ def club_raw_files( return all_files -def get_storage_options(storage_options: Block = None) -> Dict[str, Any]: - """ - Get storage options from a Block. - - Parameters: - storage_options (Block, optional): A block containing storage options. - - Returns: - Dict[str, Any]: Dictionary containing storage options. - - Example: - aws_credentials = AwsCredentials(...) - storage_opts = get_storage_options(aws_credentials) - """ - storage_options_dict: Dict[str, Any] = {} - if storage_options is not None: - if isinstance(storage_options, AwsCredentials): - storage_options_dict["key"] = storage_options.aws_access_key_id - storage_options_dict[ - "secret" - ] = storage_options.aws_secret_access_key.get_secret_value() - if storage_options.aws_session_token: - storage_options_dict["token"] = storage_options.aws_session_token - - return storage_options_dict - - -def handle_storage_options(storage_options: Union[Dict[str, Any], Block] = None) -> Dict: - if storage_options: - if isinstance(storage_options, Block): - return get_storage_options(storage_options=storage_options) - elif isinstance(storage_options, dict) and storage_options.get("block_name"): - block = load_block( - name=storage_options.get("block_name"), type=storage_options.get("type") - ) - return get_storage_options(block) - else: - return storage_options if storage_options and len(storage_options.keys()) > 0 else {} - return {} - -def load_block(name: str = None, type: StorageType = None): - """ - Load a block of a specific type by name. - - Parameters: - name (str, optional): The name of the block to load. - type (StorageType, optional): The type of the block to load. - - Returns: - block: The loaded block. - - Raises: - ValueError: If name or type is not provided. - - Example: - loaded_aws_credentials = load_block(name="my-aws-creds", type=StorageType.AWS) - """ - if name is None or type is None: - raise ValueError("Cannot load block without name") - - if type == StorageType.AWS or type == StorageType.AWS.value: - coro = AwsCredentials.load(name=name) - elif type == StorageType.AZCosmos or type == StorageType.AZCosmos.value: - coro = AzureCosmosDbCredentials.load(name=name) - elif type == StorageType.ECHODATAFLOW or type == StorageType.ECHODATAFLOW.value: - coro = EchodataflowConfig.load(name=name) - elif type == StorageType.EDFRUN or type == StorageType.EDFRUN.value: - coro = EDFRun.load(name=name) - - if isinstance(coro, Coroutine): - block = nest_asyncio.asyncio.run(coro) - else: - block = coro - return block - def parse_yaml_config(config: Union[dict, str, Path], storage_options: Dict[str, Any]) -> Dict: if isinstance(config, Path) or isinstance(config, str): config = convert_path_to_str(config) diff --git a/echodataflow/utils/filesystem_utils.py b/echodataflow/utils/filesystem_utils.py new file mode 100644 index 0000000..18cb252 --- /dev/null +++ b/echodataflow/utils/filesystem_utils.py @@ -0,0 +1,105 @@ +from typing import Any, Dict, Optional, Union + +import nest_asyncio +from prefect.filesystems import Block +from prefect_aws import AwsCredentials +from prefect_azure import AzureCosmosDbCredentials + +from echodataflow.models.deployment.storage_options import StorageOptions, StorageType +from echodataflow.models.echodataflow_config import (BaseConfig, + EchodataflowConfig) +from echodataflow.models.run import EDFRun + + +def handle_storage_options(storage_options: Optional[Union[Dict, StorageOptions, Block, BaseConfig]] = None) -> Dict: + if isinstance(storage_options, Block): + return _handle_block(storage_options) + elif isinstance(storage_options, dict): + return _handle_dict_options(storage_options) + elif isinstance(storage_options, StorageOptions): + return _handle_storage_options_class(storage_options) + elif isinstance(storage_options, BaseConfig): + return _handle_baseconfig_options_class(storage_options) + else: + return _handle_default(storage_options) + +def _handle_block(block: Block) -> Dict: + return get_storage_options(storage_options=block) + +def _handle_dict_options(options: Dict[str, Any]) -> Dict: + if "block_name" in options: + block = load_block(name=options["block_name"], stype=options.get("type", None)) + return get_storage_options(block) + return options if options else {} + +def _handle_storage_options_class(options: StorageOptions) -> Dict: + if not options.anon: + block = load_block(name=options.block_name, stype=options.block_type) + return get_storage_options(block) + return {"anon": options.anon} + +def _handle_baseconfig_options_class(options: BaseConfig) -> Dict: + block = load_block(name=options.name, stype=options.type) + return dict(block) + +def _handle_default(options: Dict[str, Any]): + return options if options else {} + + +def get_storage_options(storage_options: Block = None) -> Dict[str, Any]: + """ + Get storage options from a Block. + + Parameters: + storage_options (Block, optional): A block containing storage options. + + Returns: + Dict[str, Any]: Dictionary containing storage options. + + Example: + aws_credentials = AwsCredentials(...) + storage_opts = get_storage_options(aws_credentials) + """ + storage_options_dict: Dict[str, Any] = {} + if storage_options is not None: + if isinstance(storage_options, AwsCredentials): + storage_options_dict["key"] = storage_options.aws_access_key_id + storage_options_dict[ + "secret" + ] = storage_options.aws_secret_access_key.get_secret_value() + if storage_options.aws_session_token: + storage_options_dict["token"] = storage_options.aws_session_token + + return storage_options_dict + +def load_block(name: str, stype: StorageType): + """ + Load a block of a specific type by name. + + Parameters: + name (str, optional): The name of the block to load. + stype (StorageType, optional): The type of the block to load. + + Returns: + block: The loaded block. + + Raises: + ValueError: If name or type is not provided. + + Example: + loaded_aws_credentials = load_block(name="my-aws-creds", stype=StorageType.AWS) + """ + if name is None or stype is None: + raise ValueError("Cannot load block without name or type") + + loader_map = { + StorageType.AWS: AwsCredentials, + StorageType.AZCosmos: AzureCosmosDbCredentials, + StorageType.ECHODATAFLOW: EchodataflowConfig, + StorageType.EDFRUN: EDFRun + } + + if stype in loader_map: + return nest_asyncio.asyncio.run(loader_map[stype].load(name=name)) + else: + raise ValueError(f"Unsupported storage type: {stype}") \ No newline at end of file From bd0c1e81eebbe0146e5890308468d5a2d6ad94b7 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 10 Oct 2024 11:05:04 -0700 Subject: [PATCH 08/22] Added Logging and Workpool support --- echodataflow/deployment/deployment_engine.py | 40 ++++++--- .../models/deployment/storage_options.py | 16 ++++ echodataflow/utils/prefect_utils.py | 86 +++++++++++++++++++ 3 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 echodataflow/utils/prefect_utils.py diff --git a/echodataflow/deployment/deployment_engine.py b/echodataflow/deployment/deployment_engine.py index 1ef6ae7..23bb5e0 100644 --- a/echodataflow/deployment/deployment_engine.py +++ b/echodataflow/deployment/deployment_engine.py @@ -1,11 +1,16 @@ +from datetime import timedelta from pathlib import Path from typing import Optional, Union -from echodataflow.models.deployment.deployment import Deployment, Service -from echodataflow.deployment.service import edf_service -from echodataflow.utils.config_utils import handle_storage_options, parse_yaml_config -from prefect.client.schemas.schedules import IntervalSchedule + from prefect.client.schemas.actions import DeploymentScheduleCreate -from datetime import timedelta +from prefect.client.schemas.schedules import IntervalSchedule +from prefect.deployments.runner import RunnerDeployment + +from echodataflow.deployment.service import edf_service +from echodataflow.models.deployment.deployment import Deployment, Service +from echodataflow.utils.config_utils import parse_yaml_config +from echodataflow.utils.filesystem_utils import handle_storage_options +from echodataflow.utils.prefect_utils import create_work_pool_and_queue async def deploy_echodataflow( @@ -17,10 +22,22 @@ async def deploy_echodataflow( deployment_dict = parse_yaml_config(deployment_yaml, storage_options=storage_options) logging_dict = parse_yaml_config(logging_yaml, storage_options=storage_options) if logging_yaml else None + default_logging = None + if logging_dict: + default_logging = logging_dict.get('default', None) + deployment = Deployment(**deployment_dict) - - for service in deployment.services: - await _deploy_service(service, logging_dict) + + for service in deployment.services: + log_dict = None + if service.logging is not None and logging_dict is not None: + log_dict = logging_dict.get(service.logging.handler, default_logging) + else: + log_dict = default_logging + + await _deploy_service(service, log_dict) + + return deployment async def _deploy_service( service: Service, @@ -34,8 +51,9 @@ async def _deploy_service( schedule = [DeploymentScheduleCreate(schedule=IntervalSchedule(interval=timedelta(minutes=service.schedule.interval_mins), anchor_date=service.schedule.anchor_date))] # TODO: Create workpool and workqueue if they don't exist - - deployment = await edf_service_fn.to_deployment( + await create_work_pool_and_queue(service.workpool, service.workqueue) + + deployment: RunnerDeployment = await edf_service_fn.to_deployment( name=service.name, parameters={"stages": service.stages, "edf_logger": logging_dict}, work_queue_name=service.workqueue, @@ -45,5 +63,5 @@ async def _deploy_service( ) uuid = await deployment.apply() - + print(f"Successfully deployed service: {service.name} with deployment name: {uuid}") \ No newline at end of file diff --git a/echodataflow/models/deployment/storage_options.py b/echodataflow/models/deployment/storage_options.py index 15af560..95b7953 100644 --- a/echodataflow/models/deployment/storage_options.py +++ b/echodataflow/models/deployment/storage_options.py @@ -1,4 +1,5 @@ from enum import Enum +import functools from typing import Optional from pydantic import BaseModel, Field, StrictBool, model_validator @@ -33,6 +34,19 @@ class StorageOptions(BaseModel): block_type: Optional[StorageType] = Field(None, description="The type of storage. Must be one of the defined StorageType enumeration values.") block_name: Optional[str] = Field(None, description="The name of the storage block.") anon: StrictBool = Field(False, description="Whether to use anonymous access. Default is False.") + + @property + @functools.lru_cache() + def _storage_options_dict(self): + """ + Extracts storage options using the handle_storage_options function. + + Returns: + dict: A dictionary representation of the storage options. + """ + from echodataflow.utils.filesystem_utils import handle_storage_options + # Pass the StorageOptions instance itself to handle_storage_options to extract the dictionary + return handle_storage_options(self) # Model-wide validator to ensure logical dependencies between fields @model_validator(mode='before') @@ -48,6 +62,8 @@ def validate_storage_options(cls, values): return values + + class Config: use_enum_values = True diff --git a/echodataflow/utils/prefect_utils.py b/echodataflow/utils/prefect_utils.py new file mode 100644 index 0000000..02f7aec --- /dev/null +++ b/echodataflow/utils/prefect_utils.py @@ -0,0 +1,86 @@ +import logging +from prefect import get_client, task +from prefect.exceptions import ObjectNotFound +from prefect.client.schemas.actions import WorkPoolCreate +from prefect.client.schemas.objects import WorkPool, WorkQueue +from prefect.client.orchestration import PrefectClient + +# Get a logger instance to log messages throughout the code +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Function to get or create a work pool +async def _get_or_create_work_pool(client: PrefectClient, work_pool_name: str) -> WorkPool: + """ + Retrieve an existing work pool or create a new one if it doesn't exist. + + Args: + client (PrefectClient): The Prefect client instance to interact with the API. + work_pool_name (str): The name of the work pool to retrieve or create. + + Returns: + WorkPool: The existing or newly created work pool. + """ + try: + # Attempt to read the existing work pool by name + work_pool = await client.read_work_pool(work_pool_name=work_pool_name) + logger.info(f"Work Pool '{work_pool_name}' already exists.") + except ObjectNotFound: + # If the work pool does not exist, create a new one + try: + work_pool = await client.create_work_pool(WorkPoolCreate(name=work_pool_name, type='Process')) + logger.info(f"Created Work Pool '{work_pool_name}'.") + except Exception as e: + # Log an error message if work pool creation fails and re-raise the exception + logger.error(f"Failed to create work pool '{work_pool_name}': {e}") + raise e + + return work_pool + +# Function to get or create a work queue +async def _get_or_create_work_queue(client: PrefectClient, work_queue_name: str, work_pool_name: str) -> WorkQueue: + """ + Retrieve an existing work queue or create a new one if it doesn't exist. + + Args: + client (PrefectClient): The Prefect client instance to interact with the API. + work_queue_name (str): The name of the work queue to retrieve or create. + work_pool_name (str): The name of the work pool to associate with the work queue. + + Returns: + WorkQueue: The existing or newly created work queue. + """ + try: + # Attempt to read the existing work queue by name within the specified work pool + work_queue = await client.read_work_queue_by_name(name=work_queue_name, work_pool_name=work_pool_name) + logger.info(f"Work Queue '{work_queue_name}' already exists.") + except ObjectNotFound: + # If the work queue does not exist, create a new one + try: + work_queue = await client.create_work_queue(name=work_queue_name, work_pool_name=work_pool_name) + logger.info(f"Created Work Queue '{work_queue_name}' in Work Pool '{work_pool_name}'.") + except Exception as e: + # Log an error message if work queue creation fails and re-raise the exception + logger.error(f"Failed to create work queue '{work_queue_name}' in work pool '{work_pool_name}': {e}") + raise e + return work_queue + + +# Main function to create both work pool and work queue +@task +async def create_work_pool_and_queue(work_pool_name: str, work_queue_name: str): + """ + Ensure that the specified work pool and work queue are created. + + Args: + work_pool_name (str): The name of the work pool to create. + work_queue_name (str): The name of the work queue to create within the work pool. + """ + # Use the Prefect client asynchronously to interact with the API + async with get_client() as client: + # Get or create the work pool + work_pool = await _get_or_create_work_pool(client=client, work_pool_name=work_pool_name) + + # If the work pool exists, proceed to get or create the work queue + if work_pool: + _ = await _get_or_create_work_queue(client=client, work_queue_name=work_queue_name, work_pool_name=work_pool_name) \ No newline at end of file From 83da57e7e04df3267cb0d8e77c89278682d40bc3 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:43:10 -0700 Subject: [PATCH 09/22] Deployment Engine and Module Changes --- echodataflow/__init__.py | 4 +- .../aspects/singleton_echodataflow.py | 77 +------------------ echodataflow/deployment/__init__.py | 0 echodataflow/deployment/deployment_demo.yaml | 23 +++--- echodataflow/deployment/deployment_engine.py | 10 +-- echodataflow/models/deployment/__init__.py | 0 6 files changed, 23 insertions(+), 91 deletions(-) create mode 100644 echodataflow/deployment/__init__.py create mode 100644 echodataflow/models/deployment/__init__.py diff --git a/echodataflow/__init__.py b/echodataflow/__init__.py index 96298f5..a0da106 100644 --- a/echodataflow/__init__.py +++ b/echodataflow/__init__.py @@ -11,6 +11,7 @@ from .utils.file_utils import get_ed_list, get_last_run_output, get_zarr_list from .docker_trigger import docker_trigger from .utils.filesystem_utils import load_block +from .models.deployment.stage import Stage try: VERSION = get_distribution(__name__).version @@ -40,5 +41,6 @@ "get_last_run_output", "get_ed_list", "get_zarr_list", - "docker_trigger" + "docker_trigger", + "Stage" ] diff --git a/echodataflow/aspects/singleton_echodataflow.py b/echodataflow/aspects/singleton_echodataflow.py index 30aee5d..2a54e2b 100644 --- a/echodataflow/aspects/singleton_echodataflow.py +++ b/echodataflow/aspects/singleton_echodataflow.py @@ -28,40 +28,26 @@ """ import logging import os -from datetime import datetime from typing import Dict, Union import psutil import yaml -from echodataflow.models.datastore import Dataset -from echodataflow.models.db_log_model import DB_Log, Log_Data, Process -from echodataflow.models.pipeline import Recipe +from echodataflow.models.db_log_model import DB_Log from echodataflow.rule_engine.dependency_engine import DependencyEngine -from echodataflow.utils.database_utils import ( - create_log_table, - get_connection, - get_last_log, - insert_log_data_by_conn, - update_log_data_by_conn, -) class Singleton_Echodataflow: _instance: "Singleton_Echodataflow" = None rengine: DependencyEngine = DependencyEngine() - pipeline: Recipe - dataset: Dataset db_log: DB_Log logger: logging.Logger = None log_level: int = 0 def __new__( cls, - log_file: Union[Dict[str, str], str] = None, - pipeline: Recipe = None, - dataset: Dataset = None + log_file: Union[Dict[str, str], str] = None ) -> "Singleton_Echodataflow": if cls._instance is None: @@ -70,8 +56,6 @@ def __new__( cls._instance.logger = cls._instance.logger_init(log_file) cls._instance.log_level = cls._instance.logger.level - cls._instance.pipeline = pipeline - cls._instance.dataset = dataset # cls._instance.db_log = cls._instance.setup_echodataflow_db() cls._instance.rengine = cls._instance.load() return cls._instance @@ -111,29 +95,6 @@ def logger_init(self, log_file: Union[Dict[str, str], str]) -> None: logging.config.dictConfig(log_file) return logging.getLogger("echodataflow") - def setup_echodataflow_db(self): - """ - Setup the echodataflow database and return a DB_Log instance. - - Returns: - DB_Log: The initialized DB_Log instance. - """ - last_log = DB_Log() - try: - conn = get_connection(self.pipeline.database_path) - create_log_table(conn=conn) - - if self.pipeline.use_previous_recipe == True: - last_log = get_last_log(conn) - - last_log.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - last_log.run_id = None - - return last_log - except Exception as e: - print("Failed to create Database with below error") - print(e) - def log(self, msg, level, extra): """ Log a message at the specified log levels. @@ -148,40 +109,6 @@ def log(self, msg, level, extra): else: print(f"{extra} : {msg}") - def add_new_process(self, process: Process, name: str): - """ - Add a new process to the DB_Log instance. - - Args: - process (Process): The process to be added. - name (str): The name of the process. - """ - process.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - if name not in self.db_log.data: - self.db_log.data[name] = Log_Data(name=name) - self.db_log.data[name].process_stack.append(process) - - def insert_log_data(self): - """ - Insert or update log data in the database. - - Returns: - int: The inserted or updated log entry ID. - """ - self.db_log.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - try: - conn = get_connection(self._instance.pipeline.database_path) - if self.db_log.run_id is None: - return insert_log_data_by_conn(conn, log=self.db_log) - else: - update_log_data_by_conn(conn, log=self.db_log) - return self.db_log.run_id - except Exception as e: - print(e) - finally: - conn.close() - - def log_memory_usage(self): """ Get the memory usage of the current process. diff --git a/echodataflow/deployment/__init__.py b/echodataflow/deployment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/echodataflow/deployment/deployment_demo.yaml b/echodataflow/deployment/deployment_demo.yaml index d300a53..7bc8a8e 100644 --- a/echodataflow/deployment/deployment_demo.yaml +++ b/echodataflow/deployment/deployment_demo.yaml @@ -6,7 +6,7 @@ storage_options: # All services configuration services: -- name: service_raw_to_Sv +- name: service_raw_to_Sv tags: - raw2Sv description: null #Optional description @@ -14,12 +14,20 @@ services: anchor_date: D20241007-T035100 interval_mins: 10 logging: - handler: sv_log_handler + handler: service_raw_to_Sv kafka: topic: echodataflow_logs servers: - localhost:9092 - + infrastructure: + workpool: + name: sv + workqueue: default + type: Process + cluster: + workers: 5 + address: tcp:// + stages: - name: edf_Sv_pipeline module: echodataflow.stages.subflows.Sv_pipeline # Optional module name, echodataflow to create a default flow for all the stages within it @@ -46,7 +54,7 @@ services: use_offline: true # Skip this process if zarr files are already present in the output directory. group: False # Group Converted files based on Transect source: - urlpath: some_url # Single path, list or paths or a dict with some key and value as list of paths + path: some_url # Single path, list or paths or a dict with some key and value as list of paths parameters: # Jinja support window_options: @@ -59,7 +67,7 @@ services: block_name: ABCD block_type: AWS group: - file: s3:// + path: s3:// grouping_regex: # pattern for grouping files based on filename storage_options: block_name: ABCD @@ -79,11 +87,6 @@ services: storage_options: block_name: ABCD block_type: AWS - - - - - - name: service_produce_mvbs diff --git a/echodataflow/deployment/deployment_engine.py b/echodataflow/deployment/deployment_engine.py index 23bb5e0..a915359 100644 --- a/echodataflow/deployment/deployment_engine.py +++ b/echodataflow/deployment/deployment_engine.py @@ -17,7 +17,7 @@ async def deploy_echodataflow( deployment_yaml: Union[dict, str, Path], logging_yaml: Optional[Union[dict, str, Path]] = None, storage_options: Optional[dict] = None -): +) -> Deployment: storage_options = handle_storage_options(storage_options) deployment_dict = parse_yaml_config(deployment_yaml, storage_options=storage_options) logging_dict = parse_yaml_config(logging_yaml, storage_options=storage_options) if logging_yaml else None @@ -30,6 +30,7 @@ async def deploy_echodataflow( for service in deployment.services: log_dict = None + if service.logging is not None and logging_dict is not None: log_dict = logging_dict.get(service.logging.handler, default_logging) else: @@ -50,14 +51,13 @@ async def _deploy_service( schedule = [DeploymentScheduleCreate(schedule=IntervalSchedule(interval=timedelta(minutes=service.schedule.interval_mins), anchor_date=service.schedule.anchor_date))] - # TODO: Create workpool and workqueue if they don't exist - await create_work_pool_and_queue(service.workpool, service.workqueue) + await create_work_pool_and_queue(service.workpool) deployment: RunnerDeployment = await edf_service_fn.to_deployment( name=service.name, parameters={"stages": service.stages, "edf_logger": logging_dict}, - work_queue_name=service.workqueue, - work_pool_name=service.workpool, + work_queue_name=service.workpool.name, + work_pool_name=service.workpool.workqueue.name, tags=service.tags, schedules=schedule ) diff --git a/echodataflow/models/deployment/__init__.py b/echodataflow/models/deployment/__init__.py new file mode 100644 index 0000000..e69de29 From e484cf33cffc959d0087658610e6af543f8bd9c6 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:44:59 -0700 Subject: [PATCH 10/22] Service and Task Lib Draft --- echodataflow/deployment/service.py | 61 +++++++++++++++++++++++++++++- echodataflow/tasklib/echopype.py | 45 ++++++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 echodataflow/tasklib/echopype.py diff --git a/echodataflow/deployment/service.py b/echodataflow/deployment/service.py index 6116ee8..14735ea 100644 --- a/echodataflow/deployment/service.py +++ b/echodataflow/deployment/service.py @@ -1,11 +1,68 @@ from typing import List, Dict, Any, Optional +from distributed import Client from prefect import flow +from prefect_dask import get_dask_client +from echodataflow.aspects.singleton_echodataflow import Singleton_Echodataflow from echodataflow.models.deployment.stage import Stage +from echodataflow.utils import log_util +from echodataflow.utils.config_utils import get_prefect_config_dict +from echodataflow.utils.function_utils import dynamic_function_call + +from echodataflow.deployment.flow import edf_flow @flow(name="Echodataflow-Service") -def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]): +def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bool: """ Service for EDF """ - pass \ No newline at end of file + + # Add optional support to create a cluster which stays the same for all stages + # For specific stages to be executed on dask cluster add the config to prefect config of the stage. + client: Client = None + + #Initiate Logging + Singleton_Echodataflow(log_file=edf_logger) + gea = Singleton_Echodataflow().get_instance() + + for stage in stages: + + if stage.stage_params: + stage.prefect_config = stage.stage_params.get("prefect_config", None) + stage.options = stage.stage_params.get("options", None) + + if stage.module: + master_flow = dynamic_function_call(stage.module, stage.name) + else: + master_flow = edf_flow + + prefect_config_dict = get_prefect_config_dict(stage) + + prefect_config_dict["name"] = stage.name + prefect_config_dict["flow_run_name"] = stage.name + + if prefect_config_dict.get("task_runner", None): + stage.options["use_dask"] = True + if client is not None: + client.subscribe_topic("echodataflow", lambda event: log_util.log_event(event)) + else: + with get_dask_client() as cl: + cl.subscribe_topic("echodataflow", lambda event: log_util.log_event(event)) + + # master_flow is expected to store the output to the destination + # Source for the next flows to match with the destination of previous flows if two stages are connected + master_flow.with_options(**prefect_config_dict)(stage) + + for stage in stages: + if not stage.options.get("save_offline", True): + # cleanup + stage.destination.cleanup() + + # close cluster if required + if client: + client.close() + + + + return True + \ No newline at end of file diff --git a/echodataflow/tasklib/echopype.py b/echodataflow/tasklib/echopype.py new file mode 100644 index 0000000..3d87c6a --- /dev/null +++ b/echodataflow/tasklib/echopype.py @@ -0,0 +1,45 @@ +from typing import Any, Dict, Optional +from echopype import open_raw +from echopype.commongrid import compute_MVBS +from echopype.calibrate import compute_Sv +from prefect import task + +from echodataflow.models.deployment.stage import Task +from echodataflow.models.deployment.storage_options import StorageOptions + + +@task +def edf_open_raw(task: Task, data: Any, storage_options: Optional[Dict[str, Any]] = {}): + + if task.task_params is not None: + + # Validate task params if required + + ed = open_raw( + raw_file=data, + sonar_model=task.task_params.get('sonar_model', None), + storage_options=storage_options, + ) + else: + raise ValueError("task_params are required for edf_open_raw") + + return {'data': ed} + +@task +def edf_sv(task: Task, data: Dict[str, Any], storage_options: Optional[Dict[str, Any]] = {}): + + if task.task_params is not None: + + # Validate task params if required + + ed = compute_Sv( + Sv=data.get('data'), + sonar_model=task.task_params.get('sonar_model', None), + storage_options=storage_options, + ) + else: + raise ValueError("task_params are required for edf_open_raw") + + return {'data': ed, 'output1': ed} + + From d0b606569ca62788c3cc6aea98e157f678cf4b1e Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:46:41 -0700 Subject: [PATCH 11/22] Pydantic Model Changes --- echodataflow/models/deployment/deployment.py | 42 +++++++++++++++++--- echodataflow/models/deployment/source.py | 35 +++++++++++----- echodataflow/models/deployment/stage.py | 26 ++++++++++-- 3 files changed, 84 insertions(+), 19 deletions(-) diff --git a/echodataflow/models/deployment/deployment.py b/echodataflow/models/deployment/deployment.py index fc5ca8d..9e4de2b 100644 --- a/echodataflow/models/deployment/deployment.py +++ b/echodataflow/models/deployment/deployment.py @@ -1,6 +1,7 @@ from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel, Field, field_validator +from prefect.client.schemas.objects import WorkPool, WorkQueue # Assuming these imports exist in your module from echodataflow.models.deployment.deployment_schedule import DeploymentSchedule @@ -25,7 +26,16 @@ def validate_handler(cls, v): raise ValueError("Logging handler must be a non-empty string.") return v - +class EDFWorkPool(WorkPool): + name: str = Field("Echodataflow-WorkPool", description="Name of the WorkPool.") + workqueue: Union[str, WorkQueue] = Field("default", description="WorkQueue associated with the WorkPool.") + + @property + def workqueue_name(self) -> str: + if isinstance(self.workqueue, WorkQueue): + return self.workqueue.name + return self.workqueue + class Service(BaseModel): """ Model for defining a service in the deployment pipeline. @@ -46,8 +56,7 @@ class Service(BaseModel): schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.") stages: List[Stage] = Field(None, description="List of stages included in the service.") logging: Optional[EDFLogging] = Field(None, description="Logging configuration for the service.") - workpool: Optional[str] = Field('Echodataflow-Workpool', description="WorkPool configuration for the service.") - workqueue: Optional[str] = Field('default', description="WorkQueue configuration for the service.") + workpool: Optional[EDFWorkPool] = Field(WorkPool(name="Echodataflow-WorkPool", type="Process"), description="WorkPool configuration for the service.") # Validators @field_validator("name", mode="before") @@ -80,7 +89,7 @@ class Deployment(BaseModel): storage_options (Optional[StorageOptions]): Base storage options applied to all paths. services (List[Service]): List of services included in the deployment. """ - out_path: Optional[str] = Field(None, description="Base path for all services. This path will be used if no specific service path is defined.") + base_path: Optional[str] = Field(None, description="Base path for all services. This path will be used if no specific service path is defined.") storage_options: Optional[StorageOptions] = Field(None, description="Base Storage options, applied to all paths.") services: List[Service] = Field(..., description="List of services included in the deployment.") @@ -90,3 +99,26 @@ def validate_services(cls, v): if not isinstance(v, list) or not v: raise ValueError("Services must be a non-empty list of Service objects.") return v + + @field_validator("base_path", mode="after") + def construct_path(cls, v): + + if v is not None: + for service in cls.services: + for stage in service.stages: + stage.source.path = v + stage.source.path + stage.destination.path = v + stage.destination.path + stage.group.path = v + stage.group.path + return v + + @field_validator("storage_options", mode="after") + def apply_storage_options(cls, v): + + if v is not None: + for service in cls.services: + for stage in service.stages: + stage.source.storage_options = v + stage.destination.storage_options = v + stage.group.storage_options = v + return v + diff --git a/echodataflow/models/deployment/source.py b/echodataflow/models/deployment/source.py index f97fcb1..b665999 100644 --- a/echodataflow/models/deployment/source.py +++ b/echodataflow/models/deployment/source.py @@ -24,17 +24,17 @@ class Source(BaseModel): Model for defining the source of the data. Attributes: - urlpath (Optional[Union[str, Dict[str, List[str]]]]): URL path or path pattern for the source data. + path (Optional[Union[str, Dict[str, List[str]]]]): URL path or path pattern for the source data. parameters (Optional[Parameters]): Parameters to apply to the source path. window_options (Optional[Dict[str, Any]]): Time window options for slicing the source data. storage_options (Optional[StorageOptions]): Storage options for accessing the source data. """ - urlpath: Union[str, Dict[str, List[str]]] = Field(..., description="Source URL path or folder structure of the data.") + path: Union[str, Dict[str, List[str]]] = Field(..., description="Source URL path or folder structure of the data.") parameters: Optional[Parameters] = Field(None, description="Parameters to apply to the source.") window_options: Optional[Dict[str, Any]] = Field(None, description="Window options for the source.") storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the source.") - def render_urlpath(self) -> Union[str, Dict[str, List[str]]]: + def render_path(self) -> Union[str, Dict[str, List[str]]]: """ Render the URL path using the provided parameters. @@ -45,19 +45,19 @@ def render_urlpath(self) -> Union[str, Dict[str, List[str]]]: env = jinja2.Environment() # If `urlpath` is a string, render it with parameters - if isinstance(self.urlpath, str): - return self._render_template(self.urlpath, env) + if isinstance(self.path, str): + return self._render_template(self.path, env) # If `urlpath` is a dictionary, render each value in the dictionary - elif isinstance(self.urlpath, dict): + elif isinstance(self.path, dict): rendered_dict = {} - for key, value in self.urlpath.items(): + for key, value in self.path.items(): # Assume value is a list of strings that need rendering rendered_list = [self._render_template(v, env) for v in value] rendered_dict[key] = rendered_list return rendered_dict - return self.urlpath + return self.path def _render_template(self, template_str: str, env: jinja2.Environment) -> str: """ @@ -72,7 +72,22 @@ def _render_template(self, template_str: str, env: jinja2.Environment) -> str: """ template = env.from_string(template_str) return template.render(self.parameters.dict() if self.parameters else {}) - + + def extract_source(self): + path = self.render_path() + + if self.window_options is not None: + + # Treat source as a folder and iterate over files to collect and group relevant files + pass + else: + # return single path + return path + + pass + + class Config: # Allow arbitrary field types and definitions in nested dictionaries - arbitrary_types_allowed = True \ No newline at end of file + arbitrary_types_allowed = True + diff --git a/echodataflow/models/deployment/stage.py b/echodataflow/models/deployment/stage.py index f79c8d4..7a81193 100644 --- a/echodataflow/models/deployment/stage.py +++ b/echodataflow/models/deployment/stage.py @@ -5,6 +5,7 @@ from echodataflow.models.deployment.data_quality import DataQuality from echodataflow.models.deployment.source import Source from echodataflow.models.deployment.storage_options import StorageOptions +from echodataflow.utils.file_utils import extract_fs # Define additional models required for Stage @@ -18,6 +19,23 @@ class Destination(BaseModel): """ path: Optional[str] = Field(None, description="Destination path of the data.") storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the destination.") + + def store_result(self, data: Any, engine: str): + if engine == "zarr": + data.to_zarr( + store=self.path, + mode="w", + consolidated=True, + storage_options=self.storage_options._storage_options_dict + ) + + def cleanup(self): + try: + fs = extract_fs(self.path, storage_options=self.storage_options._storage_options_dict) + fs.rm(self.path, recursive=True) + except Exception as e: + print('') + pass class Group(BaseModel): @@ -25,11 +43,11 @@ class Group(BaseModel): Model for defining the grouping of data in the pipeline. Attributes: - file (Optional[str]): The file path used for grouping operations. + path (Optional[str]): The file path used for grouping operations. grouping_regex (Optional[str]): Regex pattern for grouping files based on filenames. storage_options (Optional[StorageOptions]): Storage options for grouping operations. """ - file: Optional[str] = Field(None, description="File path for grouping operations.") + path: Optional[str] = Field(None, description="File path for grouping operations.") grouping_regex: Optional[str] = Field(None, description="Regex pattern for grouping files based on filename.") storage_options: Optional[StorageOptions] = Field(None, description="Storage options for grouping.") @@ -65,11 +83,11 @@ class Stage(BaseModel): tasks (Optional[List[Task]]): List of tasks to be executed in the stage. """ name: str = Field(..., description="Name of the stage. This is a required field and should be unique.") - module: str = Field(..., description="Python module containing the service definitions. E.g., 'echodataflow.stages.subflows'.") + module: Optional[str] = Field(None, description="Python module containing the service definitions. E.g., 'echodataflow.stages.subflows'.") stage_params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Dictionary of parameters to configure the stage.") source: Source = Field(..., description="Source of the data. Must be a valid Source object or None.") group: Optional[Group] = Field(None, description="Grouping of the data. Must be a valid Group object or None.") - destination: Optional[Destination] = Field(None, description="Destination of the data. Must be a valid Destination object or None.") + destination: Destination = Field(..., description="Destination of the data. Must be a valid Destination object or None.") data_quality: Optional[DataQuality] = Field(None, description="Data quality checks configuration.") options: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional options for the stage. Used for stage-specific configuration.") prefect_config: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Prefect configuration for managing flow control in the stage.") From 99e7abe23c3ed788fb58ce1f8ea3a7d515aa4971 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:47:09 -0700 Subject: [PATCH 12/22] Refactor --- echodataflow/utils/config_utils.py | 2 +- echodataflow/utils/prefect_utils.py | 62 +++++++++++++++++++---------- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/echodataflow/utils/config_utils.py b/echodataflow/utils/config_utils.py index c187552..f5a6f67 100644 --- a/echodataflow/utils/config_utils.py +++ b/echodataflow/utils/config_utils.py @@ -298,7 +298,7 @@ def parse_file_path(raw_file: str, fname_pattern: str) -> Dict[str, Any]: @task -def get_prefect_config_dict(stage: Stage): +def get_prefect_config_dict(stage: Stage) -> Dict[str, Any]: """ Gets the updated Prefect configuration dictionary. diff --git a/echodataflow/utils/prefect_utils.py b/echodataflow/utils/prefect_utils.py index 02f7aec..0c42248 100644 --- a/echodataflow/utils/prefect_utils.py +++ b/echodataflow/utils/prefect_utils.py @@ -5,73 +5,91 @@ from prefect.client.schemas.objects import WorkPool, WorkQueue from prefect.client.orchestration import PrefectClient +from echodataflow.models.deployment.deployment import EDFWorkPool + # Get a logger instance to log messages throughout the code logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + # Function to get or create a work pool -async def _get_or_create_work_pool(client: PrefectClient, work_pool_name: str) -> WorkPool: +async def _get_or_create_work_pool(client: PrefectClient, work_pool: WorkPool) -> WorkPool: """ Retrieve an existing work pool or create a new one if it doesn't exist. - + Args: client (PrefectClient): The Prefect client instance to interact with the API. work_pool_name (str): The name of the work pool to retrieve or create. - + Returns: WorkPool: The existing or newly created work pool. """ try: # Attempt to read the existing work pool by name - work_pool = await client.read_work_pool(work_pool_name=work_pool_name) - logger.info(f"Work Pool '{work_pool_name}' already exists.") + work_pool = await client.read_work_pool(work_pool_name=work_pool.name) + logger.info(f"Work Pool '{work_pool.name}' already exists.") except ObjectNotFound: # If the work pool does not exist, create a new one try: - work_pool = await client.create_work_pool(WorkPoolCreate(name=work_pool_name, type='Process')) - logger.info(f"Created Work Pool '{work_pool_name}'.") + work_pool = await client.create_work_pool( + WorkPoolCreate( + name=work_pool.name, + type=work_pool.type, + concurrency_limit=work_pool.concurrency_limit, + ) + ) + logger.info(f"Created Work Pool '{work_pool.name}'.") except Exception as e: # Log an error message if work pool creation fails and re-raise the exception - logger.error(f"Failed to create work pool '{work_pool_name}': {e}") + logger.error(f"Failed to create work pool '{work_pool.name}': {e}") raise e - + return work_pool + # Function to get or create a work queue -async def _get_or_create_work_queue(client: PrefectClient, work_queue_name: str, work_pool_name: str) -> WorkQueue: +async def _get_or_create_work_queue( + client: PrefectClient, work_queue_name: str, work_pool_name: str +) -> WorkQueue: """ Retrieve an existing work queue or create a new one if it doesn't exist. - + Args: client (PrefectClient): The Prefect client instance to interact with the API. work_queue_name (str): The name of the work queue to retrieve or create. work_pool_name (str): The name of the work pool to associate with the work queue. - + Returns: WorkQueue: The existing or newly created work queue. """ try: # Attempt to read the existing work queue by name within the specified work pool - work_queue = await client.read_work_queue_by_name(name=work_queue_name, work_pool_name=work_pool_name) + work_queue = await client.read_work_queue_by_name( + name=work_queue_name, work_pool_name=work_pool_name + ) logger.info(f"Work Queue '{work_queue_name}' already exists.") except ObjectNotFound: # If the work queue does not exist, create a new one try: - work_queue = await client.create_work_queue(name=work_queue_name, work_pool_name=work_pool_name) + work_queue = await client.create_work_queue( + name=work_queue_name, work_pool_name=work_pool_name + ) logger.info(f"Created Work Queue '{work_queue_name}' in Work Pool '{work_pool_name}'.") except Exception as e: # Log an error message if work queue creation fails and re-raise the exception - logger.error(f"Failed to create work queue '{work_queue_name}' in work pool '{work_pool_name}': {e}") + logger.error( + f"Failed to create work queue '{work_queue_name}' in work pool '{work_pool_name}': {e}" + ) raise e return work_queue # Main function to create both work pool and work queue @task -async def create_work_pool_and_queue(work_pool_name: str, work_queue_name: str): +async def create_work_pool_and_queue(edf_work_pool: EDFWorkPool): """ Ensure that the specified work pool and work queue are created. - + Args: work_pool_name (str): The name of the work pool to create. work_queue_name (str): The name of the work queue to create within the work pool. @@ -79,8 +97,12 @@ async def create_work_pool_and_queue(work_pool_name: str, work_queue_name: str): # Use the Prefect client asynchronously to interact with the API async with get_client() as client: # Get or create the work pool - work_pool = await _get_or_create_work_pool(client=client, work_pool_name=work_pool_name) - + work_pool = await _get_or_create_work_pool(client=client, work_pool_name=edf_work_pool.name) + # If the work pool exists, proceed to get or create the work queue if work_pool: - _ = await _get_or_create_work_queue(client=client, work_queue_name=work_queue_name, work_pool_name=work_pool_name) \ No newline at end of file + _ = await _get_or_create_work_queue( + client=client, + work_queue_name=edf_work_pool.workqueue_name, + work_pool_name=edf_work_pool.name, + ) From 4931bdb49ffe66ae662d1a98abee49f26ae048fd Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:47:25 -0700 Subject: [PATCH 13/22] Flow Draft --- echodataflow/deployment/flow.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 echodataflow/deployment/flow.py diff --git a/echodataflow/deployment/flow.py b/echodataflow/deployment/flow.py new file mode 100644 index 0000000..82defe3 --- /dev/null +++ b/echodataflow/deployment/flow.py @@ -0,0 +1,31 @@ +from typing import Dict, List +from prefect import flow +from prefect_dask import get_dask_client +from echodataflow.models.deployment.stage import Stage +from echodataflow.models.output_model import EchodataflowObject +from echodataflow.utils import log_util +from echodataflow.utils.function_utils import dynamic_function_call + +@flow(name='Sv_flow') +def Sv_flow(stage: Stage): + + # call source extraction, which converts source into intermediate data representation + # puts in dict of dict of list of Object + def_metadata: Dict[str, Dict[str, List[EchodataflowObject]]] = stage.source.extract_source(stage.options) + + # Group data based on given file(s) + + # For each file, call tasks + + for file in def_metadata: + # TODO: Logic to determine when and where to call the grouping and super grouping logic + for task in stage.tasks: + + task_fn = dynamic_function_call(task.module, task.name) + + data = task_fn.submit(task, data, stage.source.storage_options._storage_options_dict) + + + stage.destination.store_result(data=data, engine="zarr") + + return True \ No newline at end of file From 091585cf9e1926c592e62d79b9fa35d199029779 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Thu, 17 Oct 2024 07:49:59 -0700 Subject: [PATCH 14/22] Redesign Notes --- echodataflow/deployment/Redesign.md | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 echodataflow/deployment/Redesign.md diff --git a/echodataflow/deployment/Redesign.md b/echodataflow/deployment/Redesign.md new file mode 100644 index 0000000..5d4da2a --- /dev/null +++ b/echodataflow/deployment/Redesign.md @@ -0,0 +1,32 @@ +1. Deployment Engine +Iterate through deployment.yaml to create services in Prefect. +Leverage the Echodataflow Master Service to deploy services, renaming them as specified in deployment.yaml, and deploy the master service to Prefect. +Manage schedules, tags, and descriptions associated with each service. +Handle pipeline parameters, such as adding base paths and storage options when specified globally. +Automatically create work pools and work queues if not already present, using a default pool for all services and individual queues for each service. + +2. Infrastructure Engine +Manage worker deployment using systemd, Docker, or Kubernetes. +Utilize Terraform to provision infrastructure. +Support adding workers as systemd processes. +Enable flow deployments from GitHub or local files via Docker. When a flow is triggered, it pulls the necessary Docker image and runs the flow within a container. + +3. Echodataflow Master Service +Each service manages a set of pipelines or flows. +Parse the source and destination of each pipeline and convert them into an intermediate, JSON-serializable format. +Extract Prefect flow configuration and execute flows sequentially. +After each flow completes, pass data quality (DQ) checks to the AOP (After Operation Processor) for execution. +Run DQ checks on the flow's output after its execution. +Parse and configure logging. +Use a Singleton pattern to handle missing logging configurations, with new functions such as echodataflow.get_logger() and EDFLogger.log(). +Store cleanup information received from the AOP and execute cleanup operations after all flows finish. + +4. Echodataflow Master Flow +Handle a sequence of tasks to be executed. +Perform parameter sanitization by validating each task’s signature. +Use options to configure flow behavior. +Retrieve inputs from the intermediate representation. +Group files when grouping is specified. +Execute tasks for each file or group, either concurrently or in parallel. +Return the output directly without converting it back to an intermediate representation but with cleanup information attached. +AOP runs DQ checks on the output and returns the final result, including cleanup details, to the intermediate representation. \ No newline at end of file From 8ab8b2b9857a608d90d2f3f893e26e8573fe6c6c Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Fri, 18 Oct 2024 18:27:56 -0700 Subject: [PATCH 15/22] Ship deployment general instructions --- .../ship_deployment_instructions.md | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 deployment_config/ship_deployment_instructions.md diff --git a/deployment_config/ship_deployment_instructions.md b/deployment_config/ship_deployment_instructions.md new file mode 100644 index 0000000..8da9a03 --- /dev/null +++ b/deployment_config/ship_deployment_instructions.md @@ -0,0 +1,102 @@ +# Deployment Instructions + +## Setting up the Environment + +### Python Environment + +1. Create and Activate Environment + + Start by creating a new conda, venv, or mamba environment and activate it. + +2. Clone Echodataflow + + Clone the echodataflow repository to your local machine: + ```bash + git clone https://github.com/OSOceanAcoustics/echodataflow + ``` +3. Install Echodataflow in Editable Mode + + Navigate to the cloned repository and install the package in editable mode: + ```bash + cd echodataflow + pip install -e . + ``` + +### Echodataflow configuration +1. Connect to Prefect Account + + Ensure your machine is connected to a Prefect account, which could be either a local Prefect server or a [Prefect Cloud account](https://docs.prefect.io/3.0/manage/cloud/connect-to-cloud). +2. Initialize Echodataflow + + In your environment, run the following command to set up the basic configuration for Echodataflow: + + ```bash + echodataflow init + ``` +3. Create YAML Configuration Files + + Set up the YAML configuration files for each service to be deployed. Reference the [sample config files](https://drive.google.com/drive/u/2/folders/1C2Hs3-SxWbYaE3xTo7RRqAg4I7fzponW) for guidance and check the [documentation](https://echodataflow.readthedocs.io/en/latest/configuration/datastore.html) for additional information. +4. Add Required YAML Files + + Place the following YAML files in a directory. These files are required for the current deployment on Lasker or Shimada, if your use case is different feel free to modify the files accordingly: + + ```bash + df_Sv_pipeline + datastore.yaml + pipeline.yaml + datastore_MVBS.yaml + pipeline_MVBS.yaml + datastore_prediction.yaml + pipeline_prediction.yaml + ``` + +## Deploying the flows + +1. Run Initial Scripts + + In the extensions folder of your environment, run `file_monitor.py` and `file_downloader.py`: + + ```bash + python path/extensions/file_monitor.py + python path/extensions/file_downloader.py + ``` + Wait for the message "Your flow is being served and polling for scheduled runs!" to confirm that deployments have been created in your Prefect account. + +2. Configure File Monitoring and File Transfer + + Configure the source for file monitoring and set up the `rclone` command for file transfer. + +3. Run Main Deployment Script + + Run `main.py` from the deployment folder to create additional deployments: + + ```bash + python deployment/main.py + ``` +4. View and Edit Deployments in Prefect UI + + Go to the Prefect UI and check the Deployments tab to view the created deployments. You can duplicate deployments, modify schedules, and update datastore and pipeline configuration files directly from the UI. + +5. Duplicate Deployments for Different Flows + + Create separate deployments for the Sv, MVBS, and prediction flows by duplicating the existing ones and customizing the schedule and configurations as needed. + +6. Add Path to Configuration Files + + Update the deployment to include the correct paths to the YAML configuration files. If you're using S3 to manage config files, ensure to add the appropriate [block configuration](https://echodataflow.readthedocs.io/en/latest/configuration/blocks.html). + +## Creating Work Pools and Work Queues +### Create Work Pools +In the Prefect UI, navigate to the Work Pools tab and create new pools. These pools can be distributed logically, such as one pool per service or per instance. + +### Create Work Queues +Similarly, create work queues and assign them to the work pools. Distribute the queues in a way that optimizes load distribution across the available workers. + +## Spinning Up Workers +### Start Workers +Once the work pools and queues are set up, you can start the workers. Prefect will provide commands for each pool and queue, which you can run to spin up workers on the instance. + +### Run Workers in Parallel +Each worker command should be executed in a separate terminal session. This will allow multiple workers to run in parallel, processing tasks across different flows simultaneously. + + From 2197ba7cdd471388fa0e7dcc3ca3f631c5ce86ae Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Wed, 23 Oct 2024 16:03:05 -0700 Subject: [PATCH 16/22] Deployment folder rename --- {deployment => deployment_config}/Linux/FileDownloader.service | 0 {deployment => deployment_config}/Linux/FileMonitor.service | 0 {deployment => deployment_config}/Linux/echodataflow.service | 0 {deployment => deployment_config}/MacOS/FileDownloader.plist | 0 {deployment => deployment_config}/MacOS/FileMonitor.plist | 0 {deployment => deployment_config}/MacOS/echodataflow.plist | 0 {deployment => deployment_config}/deploy_echodataflow_worker.sh | 0 {deployment => deployment_config}/docker_trigger.py | 0 {deployment => deployment_config}/main.py | 0 9 files changed, 0 insertions(+), 0 deletions(-) rename {deployment => deployment_config}/Linux/FileDownloader.service (100%) rename {deployment => deployment_config}/Linux/FileMonitor.service (100%) rename {deployment => deployment_config}/Linux/echodataflow.service (100%) rename {deployment => deployment_config}/MacOS/FileDownloader.plist (100%) rename {deployment => deployment_config}/MacOS/FileMonitor.plist (100%) rename {deployment => deployment_config}/MacOS/echodataflow.plist (100%) rename {deployment => deployment_config}/deploy_echodataflow_worker.sh (100%) rename {deployment => deployment_config}/docker_trigger.py (100%) rename {deployment => deployment_config}/main.py (100%) diff --git a/deployment/Linux/FileDownloader.service b/deployment_config/Linux/FileDownloader.service similarity index 100% rename from deployment/Linux/FileDownloader.service rename to deployment_config/Linux/FileDownloader.service diff --git a/deployment/Linux/FileMonitor.service b/deployment_config/Linux/FileMonitor.service similarity index 100% rename from deployment/Linux/FileMonitor.service rename to deployment_config/Linux/FileMonitor.service diff --git a/deployment/Linux/echodataflow.service b/deployment_config/Linux/echodataflow.service similarity index 100% rename from deployment/Linux/echodataflow.service rename to deployment_config/Linux/echodataflow.service diff --git a/deployment/MacOS/FileDownloader.plist b/deployment_config/MacOS/FileDownloader.plist similarity index 100% rename from deployment/MacOS/FileDownloader.plist rename to deployment_config/MacOS/FileDownloader.plist diff --git a/deployment/MacOS/FileMonitor.plist b/deployment_config/MacOS/FileMonitor.plist similarity index 100% rename from deployment/MacOS/FileMonitor.plist rename to deployment_config/MacOS/FileMonitor.plist diff --git a/deployment/MacOS/echodataflow.plist b/deployment_config/MacOS/echodataflow.plist similarity index 100% rename from deployment/MacOS/echodataflow.plist rename to deployment_config/MacOS/echodataflow.plist diff --git a/deployment/deploy_echodataflow_worker.sh b/deployment_config/deploy_echodataflow_worker.sh similarity index 100% rename from deployment/deploy_echodataflow_worker.sh rename to deployment_config/deploy_echodataflow_worker.sh diff --git a/deployment/docker_trigger.py b/deployment_config/docker_trigger.py similarity index 100% rename from deployment/docker_trigger.py rename to deployment_config/docker_trigger.py diff --git a/deployment/main.py b/deployment_config/main.py similarity index 100% rename from deployment/main.py rename to deployment_config/main.py From a8d8ced04386e2307dfb0e824f7597185c93910d Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Wed, 23 Oct 2024 16:04:11 -0700 Subject: [PATCH 17/22] Infrastructure config changes --- echodataflow/deployment/deployment_demo.yaml | 28 ++++--------- echodataflow/deployment/deployment_engine.py | 8 ++-- echodataflow/models/deployment/deployment.py | 44 +++++++++++++++----- echodataflow/utils/prefect_utils.py | 2 +- 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/echodataflow/deployment/deployment_demo.yaml b/echodataflow/deployment/deployment_demo.yaml index 7bc8a8e..1e625f6 100644 --- a/echodataflow/deployment/deployment_demo.yaml +++ b/echodataflow/deployment/deployment_demo.yaml @@ -26,11 +26,10 @@ services: type: Process cluster: workers: 5 - address: tcp:// - + address: tcp:// stages: - - name: edf_Sv_pipeline - module: echodataflow.stages.subflows.Sv_pipeline # Optional module name, echodataflow to create a default flow for all the stages within it + - name: Sv_flow + module: echodataflow.deployment.flow # Optional module name, echodataflow to create a default flow for all the stages within it tasks: - name: echodataflow_open_raw module: echodataflow.stages.subflows.open_raw @@ -54,20 +53,11 @@ services: use_offline: true # Skip this process if zarr files are already present in the output directory. group: False # Group Converted files based on Transect source: - path: some_url # Single path, list or paths or a dict with some key and value as list of paths - parameters: - # Jinja support - window_options: - time_travel_hours: 20 - time_travel_mins: 0 - window_hours: 0 - window_mins: 40 - number_of_windows: 3 + path: s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/*.raw # Single path, list or paths or a dict with some key and value as list of paths storage_options: - block_name: ABCD - block_type: AWS + anon: true group: - path: s3:// + file: s3:// grouping_regex: # pattern for grouping files based on filename storage_options: block_name: ABCD @@ -102,7 +92,7 @@ services: - name: stage_produce_mvbs module: echodataflow.stages.subflows.compute_MVBS source: - urlpath: s3:// + path: s3:// parameters: # Jinja support window_options: @@ -140,7 +130,7 @@ services: - name: echodataflow_compute_NASC module: echodataflow.stages.subflows.compute_NASC source: - urlpath: s3:// + path: s3:// parameters: # Jinja support window_options: @@ -153,7 +143,7 @@ services: block_name: ABCD block_type: AWS destination: - path: s3:// + path: s3:// storage_options: block_name: ABCD block_type: AWS diff --git a/echodataflow/deployment/deployment_engine.py b/echodataflow/deployment/deployment_engine.py index a915359..5244053 100644 --- a/echodataflow/deployment/deployment_engine.py +++ b/echodataflow/deployment/deployment_engine.py @@ -51,13 +51,13 @@ async def _deploy_service( schedule = [DeploymentScheduleCreate(schedule=IntervalSchedule(interval=timedelta(minutes=service.schedule.interval_mins), anchor_date=service.schedule.anchor_date))] - await create_work_pool_and_queue(service.workpool) + await create_work_pool_and_queue(service.infrastructure.workpool) deployment: RunnerDeployment = await edf_service_fn.to_deployment( name=service.name, - parameters={"stages": service.stages, "edf_logger": logging_dict}, - work_queue_name=service.workpool.name, - work_pool_name=service.workpool.workqueue.name, + parameters={"stages": service.stages, "edf_logger": logging_dict, "cluster": service.infrastructure.cluster}, + work_pool_name=service.infrastructure.workpool.name, + work_queue_name=service.infrastructure.workpool.workqueue_name, tags=service.tags, schedules=schedule ) diff --git a/echodataflow/models/deployment/deployment.py b/echodataflow/models/deployment/deployment.py index 9e4de2b..27ed424 100644 --- a/echodataflow/models/deployment/deployment.py +++ b/echodataflow/models/deployment/deployment.py @@ -1,9 +1,8 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Union -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field, ValidationInfo, field_validator from prefect.client.schemas.objects import WorkPool, WorkQueue -# Assuming these imports exist in your module from echodataflow.models.deployment.deployment_schedule import DeploymentSchedule from echodataflow.models.deployment.stage import Stage from echodataflow.models.deployment.storage_options import StorageOptions @@ -28,6 +27,7 @@ def validate_handler(cls, v): class EDFWorkPool(WorkPool): name: str = Field("Echodataflow-WorkPool", description="Name of the WorkPool.") + type: str = Field("Process", description="Type of WorkPool.") workqueue: Union[str, WorkQueue] = Field("default", description="WorkQueue associated with the WorkPool.") @property @@ -36,6 +36,20 @@ def workqueue_name(self) -> str: return self.workqueue.name return self.workqueue + class config: + arbitrary_types_allowed=True + +class Cluster(BaseModel): + address: Optional[str] = Field(None, description="Dask scheduler address") + workers: Optional[int] = Field(3, description="Number of workers") + +class Infrastructure(BaseModel): + workpool: Optional[EDFWorkPool] = Field(EDFWorkPool(), description="WorkPool configuration for the service.") + cluster: Optional[Cluster] = Field(None, description="Dask Cluster Configuration") + + class Config: + arbitrary_types_allowed = True + class Service(BaseModel): """ Model for defining a service in the deployment pipeline. @@ -56,7 +70,7 @@ class Service(BaseModel): schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.") stages: List[Stage] = Field(None, description="List of stages included in the service.") logging: Optional[EDFLogging] = Field(None, description="Logging configuration for the service.") - workpool: Optional[EDFWorkPool] = Field(WorkPool(name="Echodataflow-WorkPool", type="Process"), description="WorkPool configuration for the service.") + infrastructure: Optional[Infrastructure] = Field(Infrastructure(), description="Infrastructure configuration for the service.") # Validators @field_validator("name", mode="before") @@ -79,6 +93,8 @@ def validate_tags(cls, v): return list(unique_tags) return v + class Config: + arbitrary_types_allowed = True class Deployment(BaseModel): """ @@ -101,10 +117,13 @@ def validate_services(cls, v): return v @field_validator("base_path", mode="after") - def construct_path(cls, v): - + def construct_path(cls, v, info: ValidationInfo): + """ + This validator ensures that if a base_path is provided, it is applied to all services. + """ + services = info.data.get("services", []) if v is not None: - for service in cls.services: + for service in services: for stage in service.stages: stage.source.path = v + stage.source.path stage.destination.path = v + stage.destination.path @@ -112,13 +131,18 @@ def construct_path(cls, v): return v @field_validator("storage_options", mode="after") - def apply_storage_options(cls, v): - + def apply_storage_options(cls, v, info: ValidationInfo): + """ + This validator ensures that if storage options are provided, they are applied to all services. + """ + services = info.data.get("services", []) if v is not None: - for service in cls.services: + for service in services: for stage in service.stages: stage.source.storage_options = v stage.destination.storage_options = v stage.group.storage_options = v return v - + + class Config: + arbitrary_types_allowed = True \ No newline at end of file diff --git a/echodataflow/utils/prefect_utils.py b/echodataflow/utils/prefect_utils.py index 0c42248..241a963 100644 --- a/echodataflow/utils/prefect_utils.py +++ b/echodataflow/utils/prefect_utils.py @@ -97,7 +97,7 @@ async def create_work_pool_and_queue(edf_work_pool: EDFWorkPool): # Use the Prefect client asynchronously to interact with the API async with get_client() as client: # Get or create the work pool - work_pool = await _get_or_create_work_pool(client=client, work_pool_name=edf_work_pool.name) + work_pool = await _get_or_create_work_pool(client=client, work_pool=edf_work_pool) # If the work pool exists, proceed to get or create the work queue if work_pool: From 36d56c81fbf72033219f7ffc68747af00bde3696 Mon Sep 17 00:00:00 2001 From: Soham Butala Date: Wed, 23 Oct 2024 16:09:59 -0700 Subject: [PATCH 18/22] Service and flow execution logic --- echodataflow/deployment/flow.py | 56 ++++++++++++--- echodataflow/deployment/service.py | 69 ++++++++++--------- echodataflow/models/deployment/source.py | 12 ++-- .../models/deployment/storage_options.py | 14 ++-- echodataflow/tasklib/echopype.py | 27 ++++---- echodataflow/utils/config_utils.py | 66 ++++++++++++------ echodataflow/utils/function_utils.py | 6 +- 7 files changed, 160 insertions(+), 90 deletions(-) diff --git a/echodataflow/deployment/flow.py b/echodataflow/deployment/flow.py index 82defe3..a62b81f 100644 --- a/echodataflow/deployment/flow.py +++ b/echodataflow/deployment/flow.py @@ -1,9 +1,13 @@ +import json +import os from typing import Dict, List -from prefect import flow +from fastapi.encoders import jsonable_encoder +from prefect import Task, flow from prefect_dask import get_dask_client from echodataflow.models.deployment.stage import Stage from echodataflow.models.output_model import EchodataflowObject from echodataflow.utils import log_util +from echodataflow.utils.config_utils import club_raw_files, parse_raw_paths from echodataflow.utils.function_utils import dynamic_function_call @flow(name='Sv_flow') @@ -11,21 +15,51 @@ def Sv_flow(stage: Stage): # call source extraction, which converts source into intermediate data representation # puts in dict of dict of list of Object - def_metadata: Dict[str, Dict[str, List[EchodataflowObject]]] = stage.source.extract_source(stage.options) - + raw_files: List[str] = stage.source.extract_source() + # Group data based on given file(s) + file_dicts = parse_raw_paths(all_raw_files=raw_files, config=stage.source, group=stage.group) + + log_util.log( + msg={ + "msg": f"Files To Be Processed", + "mod_name": __file__, + "func_name": "Init Flow", + } + ) + log_util.log( + msg={ + "msg": json.dumps(jsonable_encoder(file_dicts)), + "mod_name": __file__, + "func_name": "Init Flow", + } + ) + edf_metadata = club_raw_files( + config=stage.group, + raw_dicts=file_dicts, + raw_url_file=None, + json_storage_options=None, + ) # For each file, call tasks - - for file in def_metadata: - # TODO: Logic to determine when and where to call the grouping and super grouping logic - for task in stage.tasks: + for group in edf_metadata: + for fdict in group: - task_fn = dynamic_function_call(task.module, task.name) + file_info = os.path.basename(fdict.get("file_path")).split(".", maxsplit=1) - data = task_fn.submit(task, data, stage.source.storage_options._storage_options_dict) + data_dict = {"source_file_path": fdict.get("file_path"), + "filename": file_info[0], + "file_extension": file_info[-1]} + + for task in stage.tasks: - - stage.destination.store_result(data=data, engine="zarr") + task_fn = dynamic_function_call(task.module, task.name) + if not isinstance(task_fn, Task): + raise ValueError(f"Task {task.name} is not a valid Task. Annotate tasks with @task decorator") + + data_dict = task_fn.submit(task, data_dict, stage.source.storage_options._storage_options_dict) + + data_dict = data_dict.result() + stage.destination.store_result(data=data_dict, engine="zarr") return True \ No newline at end of file diff --git a/echodataflow/deployment/service.py b/echodataflow/deployment/service.py index 14735ea..fcb5bbe 100644 --- a/echodataflow/deployment/service.py +++ b/echodataflow/deployment/service.py @@ -1,18 +1,20 @@ from typing import List, Dict, Any, Optional -from distributed import Client +from distributed import Client, LocalCluster from prefect import flow from prefect_dask import get_dask_client +from prefect_dask.task_runners import DaskTaskRunner from echodataflow.aspects.singleton_echodataflow import Singleton_Echodataflow +from echodataflow.models.deployment.deployment import Cluster from echodataflow.models.deployment.stage import Stage from echodataflow.utils import log_util from echodataflow.utils.config_utils import get_prefect_config_dict from echodataflow.utils.function_utils import dynamic_function_call -from echodataflow.deployment.flow import edf_flow +from echodataflow.deployment.flow import Sv_flow @flow(name="Echodataflow-Service") -def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bool: +def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]] = None, cluster: Optional[Cluster] = None) -> bool: """ Service for EDF """ @@ -25,33 +27,41 @@ def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bo Singleton_Echodataflow(log_file=edf_logger) gea = Singleton_Echodataflow().get_instance() + global_dask = False + close_cluster_allowed = True + + if cluster: + global_dask = True + if cluster.address is None: + client = Client(LocalCluster(n_workers=cluster.workers, nanny=True).address) + else: + client = Client(cluster.address) + close_cluster_allowed = False + for stage in stages: - if stage.stage_params: - stage.prefect_config = stage.stage_params.get("prefect_config", None) - stage.options = stage.stage_params.get("options", None) - - if stage.module: - master_flow = dynamic_function_call(stage.module, stage.name) - else: - master_flow = edf_flow - - prefect_config_dict = get_prefect_config_dict(stage) + if stage.module: + master_flow = dynamic_function_call(stage.module, stage.name) + else: + master_flow = Sv_flow + + prefect_config_dict = get_prefect_config_dict(stage) + + prefect_config_dict["name"] = stage.name + prefect_config_dict["flow_run_name"] = stage.name + + if global_dask: + prefect_config_dict["task_runner"] = DaskTaskRunner(address=client.scheduler.address) + + if client is not None: + client.subscribe_topic("echodataflow", lambda event: log_util.log_event(event)) - prefect_config_dict["name"] = stage.name - prefect_config_dict["flow_run_name"] = stage.name + if prefect_config_dict.get("task_runner", None): + stage.options["use_dask"] = True - if prefect_config_dict.get("task_runner", None): - stage.options["use_dask"] = True - if client is not None: - client.subscribe_topic("echodataflow", lambda event: log_util.log_event(event)) - else: - with get_dask_client() as cl: - cl.subscribe_topic("echodataflow", lambda event: log_util.log_event(event)) - - # master_flow is expected to store the output to the destination - # Source for the next flows to match with the destination of previous flows if two stages are connected - master_flow.with_options(**prefect_config_dict)(stage) + # master_flow is expected to store the output to the destination + # Source for the next flows to match with the destination of previous flows if two stages are connected + master_flow.with_options(**prefect_config_dict)(stage) for stage in stages: if not stage.options.get("save_offline", True): @@ -59,10 +69,7 @@ def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bo stage.destination.cleanup() # close cluster if required - if client: + if client and close_cluster_allowed: client.close() - - - return True - \ No newline at end of file + return True \ No newline at end of file diff --git a/echodataflow/models/deployment/source.py b/echodataflow/models/deployment/source.py index b665999..faf70bb 100644 --- a/echodataflow/models/deployment/source.py +++ b/echodataflow/models/deployment/source.py @@ -33,6 +33,7 @@ class Source(BaseModel): parameters: Optional[Parameters] = Field(None, description="Parameters to apply to the source.") window_options: Optional[Dict[str, Any]] = Field(None, description="Window options for the source.") storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the source.") + raw_regex: Optional[str] = Field("(.*)-?D(?P\w{1,8})-T(?P