-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Investigate event, baseflow, rising limb filter algorithms #227
Comments
I think we should investigate the "Series to Series" pandas_udf() for this. See: https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html If this works the way I think it does, it could be an easy way to add event detection with existing Python code (from HydroTools). We could then later see if doing it all natively in PySpark would be more performant. |
These types of wholistic view Series to Series will be added to the |
I think there are actually two ways we want this to be able to work.
|
Dropping this here so we don't lose it. def add_is_above_quantile(
sdf,
output_field,
input_field,
quantile=0.90,
return_type=BooleanType(),
group_by=[
'reference_time',
'primary_location_id',
'configuration_name',
'variable_name',
'unit_name'
]
):
# Get the schema of the input DataFrame
input_schema = sdf.schema
# Create a copy of the schema and add the new column
output_schema = StructType(input_schema.fields + [StructField(output_field, return_type, True)])
def is_above_quantile(pdf, output_field, input_field, quantile) -> pd.DataFrame:
pvs = pdf[input_field]
# Calculate the 90th percentile
percentile = pvs.quantile(quantile)
# Create a new column indicating whether each value is above the 90th percentile
pdf[output_field] = pvs > percentile
return pdf
def wrapper(pdf, output_field, input_field, quantile):
return is_above_quantile(pdf, output_field, input_field, quantile)
# Group the data and apply the UDF
# lambda pdf: wrapper_function(pdf, threshold_value)
sdf = sdf.groupby(group_by).applyInPandas(
lambda pdf: wrapper(pdf, output_field, input_field, quantile),
schema=output_schema
)
return sdf
def add_segment_ids(
sdf,
output_field,
input_field,
time_field,
return_type=StringType(),
group_by=[
'reference_time',
'primary_location_id',
'configuration_name',
'variable_name',
'unit_name'
]
):
# Get the schema of the input DataFrame
input_schema = sdf.schema
# Create a copy of the schema and add the new column
output_schema = StructType(input_schema.fields + [StructField(output_field, return_type, True)])
def segment(pdf: pd.DataFrame, output_field, input_field, time_field) -> pd.DataFrame:
# Create a new column for continuous segments
pdf['segment'] = (pdf[input_field] != pdf[input_field].shift()).cumsum()
# Filter only the segments where values are over the 90th percentile
segments = pdf[pdf[input_field]]
# Group by segment and create startdate-enddate string
segment_ranges = segments.groupby('segment').agg(
startdate=(time_field, 'min'),
enddate=(time_field, 'max')
).reset_index()
# Merge the segment ranges back to the original DataFrame
pdf = pdf.merge(segment_ranges[['segment', 'startdate', 'enddate']], on='segment', how='left')
# Create the startdate-enddate string column
pdf[output_field] = pdf.apply(
lambda row: f"{row['startdate']}-{row['enddate']}" if pd.notnull(row['startdate']) else None,
axis=1
)
# Drop the 'segment', 'startdate', and 'enddate' columns before returning
pdf.drop(columns=['segment', 'startdate', 'enddate'], inplace=True)
return pdf
def wrapper(pdf, output_field, input_field, time_field):
return segment(pdf, output_field, input_field, time_field)
# Group the data and apply the UDF
sdf = sdf.orderBy(*group_by, time_field).groupby(group_by).applyInPandas(
lambda pdf: wrapper(pdf, output_field, input_field, time_field),
schema=output_schema
)
return sdf |
Several of the proposed signature metrics we have identified as important require filtering the timeseries based on identifying some pattern in the timeseries that requires a wholistic view of the timeseries not just checking a single value against some criteria (i.e., a threshold). We need to understand what is currently being done in this area (event, baseflow, rising limb) as well as what may be required in the future such that we can generalize this process to handle not only current needs but future ones too (that should be the aim anyway - in reality it can be hard to anticipate future needs).
The text was updated successfully, but these errors were encountered: