Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate event, baseflow, rising limb filter algorithms #227

Open
mgdenno opened this issue Aug 20, 2024 · 4 comments
Open

Investigate event, baseflow, rising limb filter algorithms #227

mgdenno opened this issue Aug 20, 2024 · 4 comments
Assignees
Milestone

Comments

@mgdenno
Copy link
Contributor

mgdenno commented Aug 20, 2024

Several of the proposed signature metrics we have identified as important require filtering the timeseries based on identifying some pattern in the timeseries that requires a wholistic view of the timeseries not just checking a single value against some criteria (i.e., a threshold). We need to understand what is currently being done in this area (event, baseflow, rising limb) as well as what may be required in the future such that we can generalize this process to handle not only current needs but future ones too (that should be the aim anyway - in reality it can be hard to anticipate future needs).

@mgdenno
Copy link
Contributor Author

mgdenno commented Sep 13, 2024

I think we should investigate the "Series to Series" pandas_udf() for this. See: https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html

If this works the way I think it does, it could be an easy way to add event detection with existing Python code (from HydroTools). We could then later see if doing it all natively in PySpark would be more performant.

@mgdenno
Copy link
Contributor Author

mgdenno commented Sep 27, 2024

These types of wholistic view Series to Series will be added to the create_joined_timeseries script for an Evaluation.

@mgdenno
Copy link
Contributor Author

mgdenno commented Oct 23, 2024

I think there are actually two ways we want this to be able to work.

  1. Add event detection (boolean) and event_id (continuous events) to the joined timeseries table.
  2. Add event detection (boolean) and event_id (continuous events) as a chainable step that can be run on-the-fly prior to metrics.
    like ev.metrics.add_event_ids(some args).query(group_by=..., include_metrics=...)

@mgdenno
Copy link
Contributor Author

mgdenno commented Nov 15, 2024

Dropping this here so we don't lose it.

def add_is_above_quantile(
    sdf,
    output_field,
    input_field, 
    quantile=0.90,
    return_type=BooleanType(),
    group_by=[
        'reference_time',
        'primary_location_id',
        'configuration_name',
        'variable_name',
        'unit_name'
    ]

):
    # Get the schema of the input DataFrame
    input_schema = sdf.schema
        
    # Create a copy of the schema and add the new column
    output_schema = StructType(input_schema.fields + [StructField(output_field, return_type, True)])

    def is_above_quantile(pdf, output_field, input_field, quantile) -> pd.DataFrame:
        pvs = pdf[input_field]
        
        # Calculate the 90th percentile
        percentile = pvs.quantile(quantile)
        
        # Create a new column indicating whether each value is above the 90th percentile
        pdf[output_field] = pvs > percentile
        
        return pdf
    
    def wrapper(pdf, output_field, input_field, quantile):
        return is_above_quantile(pdf, output_field, input_field, quantile)
    
    # Group the data and apply the UDF
    # lambda pdf: wrapper_function(pdf, threshold_value)
    sdf = sdf.groupby(group_by).applyInPandas(
        lambda pdf: wrapper(pdf, output_field, input_field, quantile),
        schema=output_schema
    )
    
    return sdf


def add_segment_ids(
    sdf,
    output_field,
    input_field, 
    time_field,
    return_type=StringType(),
    group_by=[
        'reference_time',
        'primary_location_id',
        'configuration_name',
        'variable_name',
        'unit_name'
    ]

):
    # Get the schema of the input DataFrame
    input_schema = sdf.schema
        
    # Create a copy of the schema and add the new column
    output_schema = StructType(input_schema.fields + [StructField(output_field, return_type, True)])

    def segment(pdf: pd.DataFrame, output_field, input_field, time_field) -> pd.DataFrame:        
        # Create a new column for continuous segments
        pdf['segment'] = (pdf[input_field] != pdf[input_field].shift()).cumsum()
        
        # Filter only the segments where values are over the 90th percentile
        segments = pdf[pdf[input_field]]
        
        # Group by segment and create startdate-enddate string
        segment_ranges = segments.groupby('segment').agg(
            startdate=(time_field, 'min'),
            enddate=(time_field, 'max')
        ).reset_index()
        
        # Merge the segment ranges back to the original DataFrame
        pdf = pdf.merge(segment_ranges[['segment', 'startdate', 'enddate']], on='segment', how='left')
        
        # Create the startdate-enddate string column
        pdf[output_field] = pdf.apply(
            lambda row: f"{row['startdate']}-{row['enddate']}" if pd.notnull(row['startdate']) else None,
            axis=1
        )
        
        # Drop the 'segment', 'startdate', and 'enddate' columns before returning
        pdf.drop(columns=['segment', 'startdate', 'enddate'], inplace=True)

        return pdf
        
    def wrapper(pdf, output_field, input_field, time_field):
        return segment(pdf, output_field, input_field, time_field)
    
    # Group the data and apply the UDF
    sdf = sdf.orderBy(*group_by, time_field).groupby(group_by).applyInPandas(
        lambda pdf: wrapper(pdf, output_field, input_field, time_field),
        schema=output_schema
    )
    
    return sdf

@mgdenno mgdenno modified the milestones: v0.4 Release, v0.5 Release Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant