From c892eba31661f3a36a145aef1dc0cd6bab6c49ee Mon Sep 17 00:00:00 2001 From: Andrea Vitali Date: Fri, 5 Apr 2024 16:37:54 +0200 Subject: [PATCH] complete working pipeline --- main.py | 38 +++++++++++++++++++++++----------- src/train_random_forest/run.py | 23 +++++++++++++------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/main.py b/main.py index d19459460..2d41799c2 100644 --- a/main.py +++ b/main.py @@ -80,16 +80,16 @@ def go(config: DictConfig): if "data_split" in active_steps: - # Download file and load in W&B + # split data in train, validation and test set _ = mlflow.run( f"{config['main']['components_repository']}/train_val_test_split", "main", version='main', parameters={ "input": "clean_sample.csv:latest", - "test_size": config["modeling"]["test_size"], - "random_seed": config["modeling"]["random_seed"], - "stratify_by": config["modeling"]["stratify_by"] + "test_size": config['modeling']['test_size'], + "random_seed": config['modeling']['random_seed'], + "stratify_by": config['modeling']['stratify_by'] }, ) @@ -103,19 +103,33 @@ def go(config: DictConfig): # NOTE: use the rf_config we just created as the rf_config parameter for the train_random_forest # step - ################## - # Implement here # - ################## + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), "src", "train_random_forest"), + "main", + parameters={ + "trainval_artifact": "trainval_data.csv:latest", + "val_size": config['modeling']['val_size'], + "random_seed": config['modeling']['random_seed'], + "stratify_by": config['modeling']['stratify_by'], + "rf_config": rf_config, + "max_tfidf_features":config['modeling']['max_tfidf_features'], + "output_artifact":"random_forest_export", + }, + ) pass if "test_regression_model" in active_steps: - ################## - # Implement here # - ################## - - pass + _ = mlflow.run( + f"{config['main']['components_repository']}/test_regression_model", + "main", + version='main', + parameters={ + "mlflow_model": "random_forest_export:prod", + "test_dataset": "test_data.csv:latest" + }, + ) if __name__ == "__main__": diff --git a/src/train_random_forest/run.py b/src/train_random_forest/run.py index d8f37d41b..a9cb0d87e 100644 --- a/src/train_random_forest/run.py +++ b/src/train_random_forest/run.py @@ -54,7 +54,7 @@ def go(args): ###################################### # Use run.use_artifact(...).file() to get the train and validation artifact (args.trainval_artifact) # and save the returned path in train_local_pat - trainval_local_path = # YOUR CODE HERE + trainval_local_path = run.use_artifact(args.trainval_artifact).file() ###################################### X = pd.read_csv(trainval_local_path) @@ -75,7 +75,7 @@ def go(args): ###################################### # Fit the pipeline sk_pipe by calling the .fit method on X_train and y_train - # YOUR CODE HERE + sk_pipe.fit(X_train, y_train) ###################################### # Compute r2 and MAE @@ -97,7 +97,7 @@ def go(args): ###################################### # Save the sk_pipe pipeline as a mlflow.sklearn model in the directory "random_forest_dir" # HINT: use mlflow.sklearn.save_model - # YOUR CODE HERE + mlflow.sklearn.save_model(sk_pipe, "random_forest_dir") ###################################### ###################################### @@ -106,7 +106,11 @@ def go(args): # type, provide a description and add rf_config as metadata. Then, use the .add_dir method of the artifact instance # you just created to add the "random_forest_dir" directory to the artifact, and finally use # run.log_artifact to log the artifact to the run - # YOUR CODE HERE + artifact = wandb.Artifact(args.output_artifact, "model_export", + description="sklearn pipeline containing the trained model", + metadata=rf_config) + artifact.add_dir("random_forest_dir") + run.log_artifact(artifact) ###################################### # Plot feature importance @@ -116,10 +120,10 @@ def go(args): # Here we save r_squared under the "r2" key run.summary['r2'] = r_squared # Now log the variable "mae" under the key "mae". - # YOUR CODE HERE + run.summary["mae"] = mae ###################################### - # Upload to W&B the feture importance visualization + # Upload to W&B the feature importance visualization run.log( { "feature_importance": wandb.Image(fig_feat_imp), @@ -158,7 +162,9 @@ def get_inference_pipeline(rf_config, max_tfidf_features): # Build a pipeline with two steps: # 1 - A SimpleImputer(strategy="most_frequent") to impute missing values # 2 - A OneHotEncoder() step to encode the variable - non_ordinal_categorical_preproc = # YOUR CODE HERE + non_ordinal_categorical_preproc = make_pipeline(SimpleImputer(strategy="most_frequent"), + OneHotEncoder()) + ###################################### # Let's impute the numerical columns to make sure we can handle missing values @@ -217,7 +223,8 @@ def get_inference_pipeline(rf_config, max_tfidf_features): # ColumnTransformer instance that we saved in the `preprocessor` variable, and a step called "random_forest" # with the random forest instance that we just saved in the `random_forest` variable. # HINT: Use the explicit Pipeline constructor so you can assign the names to the steps, do not use make_pipeline - sk_pipe = # YOUR CODE HERE + sk_pipe = Pipeline(steps=[("preprocessor",preprocessor), + ("random_forest",random_Forest)]) return sk_pipe, processed_features