Skip to content

Commit

Permalink
feat(spark): model quality and drift for current - multiclass classif…
Browse files Browse the repository at this point in the history
…ication (#74)

* feat: add multiclass model quality calc for current

* feat: add model quality in job

* feat: add drift and refactoring

* fix: remove strange timezone from files
  • Loading branch information
rivamarco committed Jul 2, 2024
1 parent aed740b commit 14d2cb9
Show file tree
Hide file tree
Showing 36 changed files with 2,015 additions and 910 deletions.
22 changes: 10 additions & 12 deletions spark/jobs/current_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from metrics.statistics import calculate_statistics_current
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset
from utils.reference_regression import ReferenceMetricsRegressionService
from utils.current_binary import CurrentMetricsService
from utils.current_multiclass import CurrentMetricsMulticlassService
from utils.models import JobStatus, ModelOut, ModelType
Expand Down Expand Up @@ -57,9 +56,8 @@ def main(
case ModelType.BINARY:
metrics_service = CurrentMetricsService(
spark_session=spark_session,
current=current_dataset.current,
reference=reference_dataset.reference,
model=model,
current=current_dataset,
reference=reference_dataset,
)
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
Expand All @@ -80,25 +78,25 @@ def main(
case ModelType.MULTI_CLASS:
metrics_service = CurrentMetricsMulticlassService(
spark_session=spark_session,
current=current_dataset.current,
reference=reference_dataset.reference,
model=model,
current=current_dataset,
reference=reference_dataset,
)
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
model_quality = metrics_service.calculate_model_quality()
drift = metrics_service.calculate_drift()
complete_record["STATISTICS"] = statistics.model_dump_json(
serialize_as_any=True
)
complete_record["DATA_QUALITY"] = data_quality.model_dump_json(
serialize_as_any=True
)
case ModelType.REGRESSION:
metrics_service = ReferenceMetricsRegressionService(
reference=reference_dataset
complete_record["MODEL_QUALITY"] = orjson.dumps(model_quality).decode(
"utf-8"
)
complete_record["DRIFT"] = orjson.dumps(drift).decode("utf-8")
case ModelType.REGRESSION:
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()

complete_record["STATISTICS"] = statistics.model_dump_json(
serialize_as_any=True
)
Expand Down
File renamed without changes.
79 changes: 79 additions & 0 deletions spark/jobs/metrics/drift_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from pyspark.sql import SparkSession

from metrics.chi2 import Chi2Test
from metrics.ks import KolmogorovSmirnovTest
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset


class DriftCalculator:
@staticmethod
def calculate_drift(
spark_session: SparkSession,
reference_dataset: ReferenceDataset,
current_dataset: CurrentDataset,
):
drift_result = dict()
drift_result["feature_metrics"] = []

categorical_features = [
categorical.name
for categorical in reference_dataset.model.get_categorical_features()
]
chi2 = Chi2Test(
spark_session=spark_session,
reference_data=reference_dataset.reference,
current_data=current_dataset.current,
)

for column in categorical_features:
feature_dict_to_append = {
"feature_name": column,
"drift_calc": {
"type": "CHI2",
},
}
if (
reference_dataset.reference_count > 5
and current_dataset.current_count > 5
):
result_tmp = chi2.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["pValue"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["pValue"] <= 0.05
)
else:
feature_dict_to_append["drift_calc"]["value"] = None
feature_dict_to_append["drift_calc"]["has_drift"] = False
drift_result["feature_metrics"].append(feature_dict_to_append)

numerical_features = [
numerical.name
for numerical in reference_dataset.model.get_numerical_features()
]
ks = KolmogorovSmirnovTest(
reference_data=reference_dataset.reference,
current_data=current_dataset.current,
alpha=0.05,
phi=0.004,
)

for column in numerical_features:
feature_dict_to_append = {
"feature_name": column,
"drift_calc": {
"type": "KS",
},
}
result_tmp = ks.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["ks_statistic"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["ks_statistic"] > result_tmp["critical_value"]
)
drift_result["feature_metrics"].append(feature_dict_to_append)

return drift_result
File renamed without changes.
46 changes: 46 additions & 0 deletions spark/jobs/models/current_dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import List

from pyspark.ml.feature import StringIndexer
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StructField, StructType

from models.reference_dataset import ReferenceDataset
from utils.models import ModelOut, ModelType, ColumnDefinition
from utils.spark import apply_schema_to_dataframe

Expand Down Expand Up @@ -95,3 +97,47 @@ def get_all_variables(self) -> List[ColumnDefinition]:
+ [self.model.timestamp]
+ self.model.outputs.output
)

def get_string_indexed_dataframe(self, reference: ReferenceDataset):
"""
Source: https://stackoverflow.com/questions/65911146/how-to-transform-multiple-categorical-columns-to-integers-maintaining-shared-val
Current dataset will be indexed with columns from both reference and current in order to have complete data
"""
predictions_df_current = self.current.select(
self.model.outputs.prediction.name
).withColumnRenamed(self.model.outputs.prediction.name, "classes")
target_df_current = self.current.select(
self.model.target.name
).withColumnRenamed(self.model.target.name, "classes")
predictions_df_reference = reference.reference.select(
self.model.outputs.prediction.name
).withColumnRenamed(self.model.outputs.prediction.name, "classes")
target_df_reference = reference.reference.select(
self.model.target.name
).withColumnRenamed(self.model.target.name, "classes")
prediction_target_df = (
predictions_df_current.union(target_df_current)
.union(predictions_df_reference)
.union(target_df_reference)
)
indexer = StringIndexer(
inputCol="classes",
outputCol="classes_index",
stringOrderType="alphabetAsc",
handleInvalid="skip",
)
indexer_model = indexer.fit(prediction_target_df)
indexer_prediction = indexer_model.setInputCol(
self.model.outputs.prediction.name
).setOutputCol(f"{self.model.outputs.prediction.name}-idx")
indexed_prediction_df = indexer_prediction.transform(self.current)
indexer_target = indexer_model.setInputCol(self.model.target.name).setOutputCol(
f"{self.model.target.name}-idx"
)
indexed_target_df = indexer_target.transform(indexed_prediction_df)

index_label_map = {
str(float(index)): str(label)
for index, label in enumerate(indexer_model.labelsArray[0])
}
return index_label_map, indexed_target_df
4 changes: 1 addition & 3 deletions spark/jobs/reference_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ def main(

match model.model_type:
case ModelType.BINARY:
metrics_service = ReferenceMetricsService(
reference_dataset.reference, model=model
)
metrics_service = ReferenceMetricsService(reference=reference_dataset)
model_quality = metrics_service.calculate_model_quality()
statistics = calculate_statistics_reference(reference_dataset)
data_quality = metrics_service.calculate_data_quality()
Expand Down
Loading

0 comments on commit 14d2cb9

Please sign in to comment.