From 4edbeff8b85cd27eb36eabc1cdace4b849a4d195 Mon Sep 17 00:00:00 2001 From: shantanu156 Date: Wed, 24 Apr 2024 19:31:30 +0200 Subject: [PATCH] adding personalization model --- .../sk_preprocess-encoder_new.py | 470 ++++++++++++++ .../personalization/utils/requirements.txt | 10 + .../utils/transform_features.py | 501 +++++++++++++++ .../xgboost_evaluate-encoder.py | 115 ++++ training_pipeline/training_pipeline.py | 591 +++++++++++------- 5 files changed, 1475 insertions(+), 212 deletions(-) create mode 100644 training_pipeline/src/personalization/sk_preprocess-encoder_new.py create mode 100644 training_pipeline/src/personalization/utils/requirements.txt create mode 100644 training_pipeline/src/personalization/utils/transform_features.py create mode 100644 training_pipeline/src/personalization/xgboost_evaluate-encoder.py diff --git a/training_pipeline/src/personalization/sk_preprocess-encoder_new.py b/training_pipeline/src/personalization/sk_preprocess-encoder_new.py new file mode 100644 index 0000000..4c67865 --- /dev/null +++ b/training_pipeline/src/personalization/sk_preprocess-encoder_new.py @@ -0,0 +1,470 @@ +import sys +import subprocess + +def install(package): + subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package]) + +install('gensim==4.3.2') +install('rapidfuzz==3.6.1') +install('category_encoders==2.6.3') + +import argparse +import pathlib +import boto3 +import logging +import os +import json +import joblib +import re +import tarfile +import pandas as pd +import numpy as np +import datetime +import warnings +from io import StringIO +from sklearn.compose import ColumnTransformer +from sklearn.pipeline import Pipeline +from sklearn.impute import SimpleImputer +from sagemaker_containers.beta.framework import worker +from category_encoders import OrdinalEncoder +from utils.transform_features import * + +try: + from sagemaker_containers.beta.framework import ( + encoders, + worker, + ) +except ImportError: + pass + +# suppress only the 'Mean of empty slice' RuntimeWarning: +warnings.filterwarnings(action='ignore', message='Mean of empty slice', category=RuntimeWarning) + +logger = logging.getLogger() +logger.setLevel(logging.WARN) +logger.addHandler(logging.StreamHandler()) + + +cat_cols_imp_ns = [ + # 'last_browser_family', + # 'last_device_type', + # 'job_product_type', + 'jobseeker_preference_job_type_codename', + # 'jobseeker_preference_salary_period', + # 'job_salary_period', + # 'job_origin', + # 'job_link_out_type', + 'job_required_experience', + 'job_required_education', + # 'job_shift', + 'job_company_uid' +] + +additonal_cols = ['job_allows_easy_apply', + 'job_open_for_career_changers', + 'jobseeker_preference_salary_min', + 'job_salary_amount', + 'job_salary_range_min', + 'job_salary_range_max' + ] + +# Create a list of columns to drop from the dataset +columns_to_drop = ['jdp_view_count', + 'job_bookmark_count', + 'application_start_count', + 'application_sent_count', + 'application_submit_count', + 'gold_application_sent_count'] + +label_features = ['target', + 'target_w2v'] + +# these features are for random negative sample creation +job_features = ['job_uid', 'job_product_type', 'job_company_uid', 'job_title', + 'job_description', 'job_salary_amount', 'job_salary_currency', + 'job_salary_period', 'job_origin', 'job_working_hours', + 'job_employment_types', 'job_city', 'job_lat', 'job_lng', + 'job_allows_easy_apply', 'job_salary_range_min', 'job_salary_range_max', + 'job_salary_incentive', 'job_link_out_type', 'job_working_from_home', + 'job_required_experience', 'job_required_education', + 'job_open_for_career_changers', 'job_schedule_type', 'job_shift', + 'job_type_de_name', 'job_kldb_code'] + + +def find_latest_training_data(s3_client, bucket_name, prefix, date_format="%Y-%m-%d"): + file_list = [] + interactions = {'positive_interactions': {}, 'negative_interactions': {}} + + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): + for obj in page.get("Contents", []): + file_list.append(obj.get("Key")) + + for file_path in file_list: + parts = file_path.split("/") + interaction_type = parts[3] + date_str = parts[4].split("=")[1] + date = datetime.datetime.strptime(date_str, date_format) + + if date not in interactions[interaction_type]: + interactions[interaction_type][date] = file_path + + + common_dates = set(interactions['positive_interactions']).intersection(set(interactions['negative_interactions'])) + + if not common_dates: + return [] + + latest_date = max(common_dates) + return [ + interactions['positive_interactions'][latest_date], + interactions['negative_interactions'][latest_date] + ] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--train-validation-ratio", type=float, default=0.9) + parser.add_argument("--test-day-span", type=int, default=5) + parser.add_argument("--negative-sample-ratio", type=int, default=2) + parser.add_argument("--random-negative-sample-multiplier", type=int, default=2) + parser.add_argument("--include-jobseeker-uid-in-training", type=str, default="no") + parser.add_argument("--include-jdp-view-as-target", type=str, default="no") + args, _ = parser.parse_known_args() + + train_val_ratio = args.train_validation_ratio + test_day_span = args.test_day_span + negative_sample_ratio = args.negative_sample_ratio + random_negative_sample_multiplier = args.random_negative_sample_multiplier + include_jobseeker_uid_in_training = args.include_jobseeker_uid_in_training + include_jdp_view_as_target = args.include_jdp_view_as_target + + logger.info("Received arguments {}".format(args)) + + # Set local path prefix in the processing container + local_dir = "/opt/ml/processing" + raw_data_positive = "positive.csv" + raw_data_negative = "negative.csv" + + s3_client = boto3.client("s3") + bucket_name = "inventory-sls-production-two-way-dp-bridge" + latest_data = find_latest_training_data( + s3_client, + bucket_name=bucket_name, + prefix="from_dp_to_inv/inv_job_recommendations_training_data_export/search_feed" + ) + logger.info(latest_data) + if len(latest_data) > 0: + s3_client.download_file(bucket_name, latest_data[0], raw_data_positive) + s3_client.download_file(bucket_name, latest_data[1], raw_data_negative) + logger.info("Downloaded latest data to local") + else: + logger.error("New data not found") + + logger.info("Reading data from {} & {}".format(raw_data_positive, raw_data_negative)) + negative_df = pd.read_csv(raw_data_negative) + positive_df = pd.read_csv(raw_data_positive) + + # convert to date format + positive_df['derived_tstamp'] = pd.to_datetime(positive_df['derived_tstamp'], + format="%Y-%m-%d %H:%M:%S.%f", + errors='coerce').dt.date + negative_df['derived_tstamp'] = pd.to_datetime(negative_df['derived_tstamp'], + format="%Y-%m-%d %H:%M:%S.%f", + errors='coerce').dt.date + + # Drop rows with missing derived_tstamp + positive_df = positive_df.dropna(subset=['derived_tstamp']) + negative_df = negative_df.dropna(subset=['derived_tstamp']) + + logger.info(f"Generating test dataset by including last {test_day_span} days from main dataset") + + # split sample into train-validation & test + n_days_ago = (positive_df['derived_tstamp'].max() - datetime.timedelta(days=test_day_span)) + + positive_df_test = positive_df[positive_df['derived_tstamp'] > n_days_ago] + negative_df_test = negative_df[negative_df['derived_tstamp'] > n_days_ago] + + # creating test data + df_test = pd.concat([positive_df_test, negative_df_test], ignore_index=True) + + if include_jdp_view_as_target == "yes": + logger.info("Including jdp_view_count as part of the test target labels") + df_test.loc[:,'target'] = np.where(((df_test['jdp_view_count']>0) | + (df_test['job_bookmark_count']>0) | + (df_test['application_start_count']>0) | + (df_test['application_sent_count']>0) | + (df_test['application_submit_count']>0) | + (df_test['gold_application_sent_count']>0)) + ,1,0) + else: + logger.info("Excluding jdp_view_count as part of the test target labels") + df_test.loc[:,'target'] = np.where(((df_test['job_bookmark_count']>0) | + (df_test['application_start_count']>0) | + (df_test['application_sent_count']>0) | + (df_test['application_submit_count']>0) | + (df_test['gold_application_sent_count']>0)) + ,1,0) + + logger.info(f"Generating train-validation dataset by filtering out last {test_day_span} days from main dataset") + positive_df = positive_df[positive_df['derived_tstamp'] <= n_days_ago] + negative_df = negative_df[negative_df['derived_tstamp'] <= n_days_ago] + + # getting a pool of all the jobs + combined_df = pd.concat([positive_df, negative_df], ignore_index=True) + + # create target variable in positive samples + if include_jdp_view_as_target == "yes": + logger.info("Including jdp_view_count as part of the train target labels") + positive_df.loc[:,'target'] = np.where(((positive_df['jdp_view_count']>0) | + (positive_df['job_bookmark_count']>0) | + (positive_df['application_start_count']>0) | + (positive_df['application_sent_count']>0) | + (positive_df['application_submit_count']>0) | + (positive_df['gold_application_sent_count']>0)) + ,1,0) + else: + logger.info("Excluding jdp_view_count as part of the train target labels") + positive_df.loc[:,'target'] = np.where(((positive_df['job_bookmark_count']>0) | + (positive_df['application_start_count']>0) | + (positive_df['application_sent_count']>0) | + (positive_df['application_submit_count']>0) | + (positive_df['gold_application_sent_count']>0)) + ,1,0) + + # creating extra target_w2v which will be used by word2vec model + positive_df.loc[:,'target_w2v'] = np.where(((positive_df['jdp_view_count']>0) | + (positive_df['job_bookmark_count']>0) | + (positive_df['application_start_count']>0) | + (positive_df['application_sent_count']>0) | + (positive_df['application_submit_count']>0) | + (positive_df['gold_application_sent_count']>0)) + ,1,0) + + # Drop the columns from the DataFrame + positive_df = positive_df.drop(columns=columns_to_drop, axis=1) + + logger.info(f"Generating negative samples by taking at least {negative_sample_ratio} negative per positive sample per user per session from the respective positive session") + # Filter negative sample just to take + negative_df = negative_df.groupby(['jobseeker_uid', 'derived_tstamp', 'session_key'] + ).head(negative_sample_ratio).reset_index(drop=True) + negative_df = negative_df.drop(columns=columns_to_drop, axis=1) + + # directly assigning target and target_w2v as 0 being negative sample + negative_df['target'] = 0 + negative_df['target_w2v'] = 0 + + # filtering out only negative samples from a positively interacted session + negative_df = pd.merge(positive_df.loc[positive_df['target']>0, + ['jobseeker_uid', + 'derived_tstamp', + 'session_key']].drop_duplicates(), + negative_df, + on=['jobseeker_uid', + 'derived_tstamp', + 'session_key'], + how='inner') + + # combining the positve and negative samples + df_with_neg_samples = pd.concat([positive_df, negative_df], ignore_index=True) + + # Keep only the last occurrence for each unique user_id and job_uid combination (positive first) + df_with_neg_samples.sort_values(by=['jobseeker_uid', 'job_uid', 'target', 'derived_tstamp'], + ascending=[False, False, False, False], + inplace=True) + df_with_neg_samples = df_with_neg_samples.drop_duplicates(subset=['jobseeker_uid', 'job_uid'], + keep='first') + + logger.info(f"Generating {random_negative_sample_multiplier} time random negative sample for each unique user interaction (postive + negative)") + # generating random negative samples of the size of the positive sample data + user_features = df_with_neg_samples.drop(job_features + label_features, + axis=1).drop_duplicates().reset_index(drop=True) + user_features = [user_features] * random_negative_sample_multiplier + user_features = pd.concat(user_features, ignore_index=True) + + # Shuffle the rows of the large job features DataFrame based on N times + # the number of unique user-sessions + # removing the jobs which are part of the test dataset + combined_df = combined_df.sample(n=len(user_features)).reset_index(drop=True) + combined_df = combined_df[job_features] + + combined_df['target'] = 0 + combined_df['target_w2v'] = 0 + + # pairing up random user sessions with random user jobs + df_with_random_neg = pd.concat([user_features, combined_df], axis=1) + + # concatenating the dataframe with the random negative samples + df_with_random_neg = pd.concat([df_with_neg_samples, + df_with_random_neg], + ignore_index=True) + + # Keep only the last occurrence for each unique user_id and job_uid combination (positive first) + df_with_random_neg.sort_values(by=['jobseeker_uid', 'job_uid', 'target', 'derived_tstamp'], + ascending=[False, False, False, False], + inplace=True) + df_with_random_neg = df_with_random_neg.drop_duplicates(subset=['jobseeker_uid', 'job_uid'], + keep='first') + + logger.info(f"Shape of positive & negative dataset i.e. df_with_neg_samples is: {df_with_neg_samples.shape}") + logger.info(f"Distribution of positive & negative dataset i.e. df_with_neg_samples is:: {df_with_neg_samples.target.value_counts()}") + logger.info(f"Shape of positive & negative dataset with random negative samples i.e. df_with_random_neg is: {df_with_random_neg.shape}") + logger.info(f"Distribution of positive & negative dataset with random negative samples i.e. df_with_random_neg is: {df_with_random_neg.target.value_counts()}") + + logger.info(f"Building sklearn column transformer") + # defining sklearn preprocessing pipeline + cat_cols_pipeline_ns = Pipeline(steps=[ + ('encoders', OrdinalEncoder(handle_unknown='value', handle_missing='value')) + ]) + + # Pipeline for categorical columns with 'no' imputation + cat_cols_pipeline_no = Pipeline(steps=[ + ('imputer', SimpleImputer(strategy='constant', fill_value='no')), + ('encoders', OrdinalEncoder(handle_unknown='value', handle_missing='value')) + ]) + + # Pipeline for categorical columns with 'fixed_working_hours' imputation + cat_cols_pipeline_fwh = Pipeline(steps=[ + ('imputer', SimpleImputer(strategy='constant', fill_value='fixed_working_hours')), + ('encoders', OrdinalEncoder(handle_unknown='value', handle_missing='value')) + ]) + + if include_jobseeker_uid_in_training == "yes": + logger.info("Including jobseeker_uid as part of the training features") + cat_cols_imp_ns = cat_cols_imp_ns + ['jobseeker_uid'] + + # Define column transformer + preprocessor = ColumnTransformer( + transformers=[ + ('cat_cols_ns', cat_cols_pipeline_ns, cat_cols_imp_ns), + ('cat_cols_no', cat_cols_pipeline_no, ['job_working_from_home']), + ('cat_cols_fwh', cat_cols_pipeline_fwh, ['job_schedule_type']), + ('distance_calculator', DistanceCalculator(), ['jobseeker_preference_lng', + 'jobseeker_preference_lat', + 'job_lng', + 'job_lat']), + ('job_employment_types', MultiLabelBinarizerTransformer(), ['jobseeker_preference_employment_types', + 'common_employment_types', + 'jobseeker_preference_working_hours', + 'common_working_hours']), + ('similarity_score_calculator', SimilarityScoreCalculator(), ['jobseeker_uid', + 'jobseeker_preference_job_title_b2c', + 'last_feed_search_query', + 'job_title']), + ], + verbose_feature_names_out=False + ) + + logger.info(f"Building train and validation dataset and spliting based on {train_val_ratio} ratio based on derived_tstamp") + df_with_random_neg = df_with_random_neg[df_with_random_neg['target'] == df_with_random_neg['target_w2v']].reset_index(drop=True) + + # Identify users with past interactions + df_with_random_neg =df_with_random_neg.sort_values(by='derived_tstamp', ascending=True) + + # perform general preprocessing of the dataframe + df_with_random_neg = process_dataframe(df_with_random_neg) + + split_point = int(len(df_with_random_neg) * train_val_ratio) + train_data = df_with_random_neg[:split_point] + train_data = process_traindata_only(train_data) # selecting only the top 50 company names + validation_data = df_with_random_neg[split_point:] + + # filtering validation and test data + train_users = set(train_data['jobseeker_uid']) + validation_data = validation_data[validation_data['jobseeker_uid'].isin(train_users)] + test_data = df_test[df_test['jobseeker_uid'].isin(train_users)] + + train_data = train_data.reset_index(drop=True) + validation_data = validation_data.reset_index(drop=True) + test_data = test_data.reset_index(drop=True) + + logger.info(f"Shape of train dataset is: {train_data.shape}") + logger.info(f"Distribution of train dataset is: {train_data.target.value_counts()}") + logger.info(f"Shape of validation dataset is: {validation_data.shape}") + logger.info(f"Distribution of validation dataset is: {validation_data.target.value_counts()}") + logger.info(f"Shape of test dataset is: {test_data.shape}") + logger.info(f"Distribution of test dataset is: {test_data.target.value_counts()}") + + logger.info(f"Transforming train, and validation dataset") + # perform preprocessing and fit transform on train_data + X_train = pd.DataFrame(preprocessor.fit_transform(train_data, train_data['target_w2v'])) + combined_train = pd.concat([train_data['target'], + X_train, + train_data[additonal_cols]], + axis=1, ignore_index=True) + + # perform preprocessing and transform on validation_data + X_val = pd.DataFrame(preprocessor.transform(validation_data)) + # Concatenate X_train and y_train + combined_val = pd.concat([validation_data['target'], + X_val, + validation_data[additonal_cols]], + axis=1, ignore_index=True) + + # Save processed datasets to the local paths in the processing container. + # SageMaker will upload the contents of these paths to S3 bucket + logger.debug("Writing processed datasets to container local path.") + + train_output_path = os.path.join(f"{local_dir}/train", "train.csv") + validation_output_path = os.path.join(f"{local_dir}/val", "validation.csv") + test_output_path = os.path.join(f"{local_dir}/test", "test.parquet") + + preprocess_pipeline_joblib_path = os.path.join(f"{local_dir}/preprocess_pipeline", "preprocess_pipeline.joblib") + preprocess_pipeline_tar_path = os.path.join(f"{local_dir}/preprocess_pipeline", "preprocess_pipeline.tar.gz") + + logger.info("Saving preprocessor to {}".format(preprocess_pipeline_tar_path)) + joblib.dump(preprocessor, preprocess_pipeline_joblib_path) + with tarfile.open(preprocess_pipeline_tar_path, "w:gz") as tar: + tar.add(preprocess_pipeline_joblib_path, + arcname=os.path.basename(preprocess_pipeline_joblib_path)) + + logger.info("Saving train data to {}".format(train_output_path)) + combined_train.to_csv(train_output_path, index=False) + + logger.info("Saving validation data to {}".format(validation_output_path)) + combined_val.to_csv(validation_output_path, index=False) + + logger.info("Saving test data to {}".format(test_output_path)) + test_data.to_parquet(test_output_path, index=False) + + +def input_fn(input_data, content_type): + try: + if content_type == "text/csv": + return pd.read_csv(StringIO(input_data), header=0) + else: + raise ValueError(f"Input fn: {content_type} not supported by script!") + except Exception as e: + logger.error(f"Input fn: An error occurred: {e} and the data is {input_data}") + + +def output_fn(prediction, accept): + try: + if accept == "text/csv": + return worker.Response(encoders.encode(prediction, accept), mimetype=accept) + else: + raise RuntimeError(f"Output fn: {accept} accept type is not supported by this script.") + except Exception as e: + logger.error(f"Output fn: An error occurred: {e} and the data is: {pd.DataFrame(prediction).to_csv(index=False)}") + + +def predict_fn(input_data, model): + try: + list_columns = ['common_working_hours', + 'common_employment_types', + 'jobseeker_preference_employment_types', + 'jobseeker_preference_working_hours'] + for col in list_columns: + input_data[col] = input_data[col].str.replace("'", "\"").map(json.loads) + input_data.loc[:,['job_allows_easy_apply', 'job_open_for_career_changers']] = input_data.loc[:,['job_allows_easy_apply', 'job_open_for_career_changers']].astype(float) + return np.concatenate([model.transform(input_data), input_data[additonal_cols].to_numpy()], axis=1) + except Exception as e: + logger.error(f"Predict fn: An error occurred: {e} and the data is: {input_data.to_csv(index=False)}") + + +def model_fn(model_dir): + try: + preprocess_pipeline = joblib.load(os.path.join(model_dir, "preprocess_pipeline.joblib")) + return preprocess_pipeline + except Exception as e: + logger.error(f"Model fn: An error occurred while loading the preprocess_pipeline: {e}") diff --git a/training_pipeline/src/personalization/utils/requirements.txt b/training_pipeline/src/personalization/utils/requirements.txt new file mode 100644 index 0000000..85b1625 --- /dev/null +++ b/training_pipeline/src/personalization/utils/requirements.txt @@ -0,0 +1,10 @@ +gensim==4.3.2 +sagemaker-training==4.7.4 +numpy==1.24 +pandas==2.0 +pyarrow==14.0.2 +nltk==3.7 +scikit-learn==1.2.1 +joblib==1.3.2 +beautifulsoup4==4.12.2 +sagemaker_containers==2.8.6 \ No newline at end of file diff --git a/training_pipeline/src/personalization/utils/transform_features.py b/training_pipeline/src/personalization/utils/transform_features.py new file mode 100644 index 0000000..fca6ad2 --- /dev/null +++ b/training_pipeline/src/personalization/utils/transform_features.py @@ -0,0 +1,501 @@ +import re +import json +import datetime +import numpy as np +import pandas as pd +from gensim.models import Word2Vec +from rapidfuzz import fuzz +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.preprocessing import MultiLabelBinarizer, MinMaxScaler + +employment_mapping = { + "apprenticeship": "apprenticeship", + "azubi": "apprenticeship", + "trainee": "trainee", + "permanent employment": "permanent_employment", + "festanstellung": "permanent_employment", + "self employment": "self_employment", + "selbständig": "self_employment", + "internship": "internship", + "praktikum": "internship", + "working student": "working_student", + "werkstudent": "working_student", + "dual study": "dual_study", + "duales studium": "dual_study", + "marginal employment": "marginal_employment", + "geringfügige beschäftigung": "marginal_employment", + "minijob": "mini_job", + "secondary activity": "secondary_activity", + "nebentätigkeit": "secondary_activity", + "temporary help": "temporary_help", + "aushilfe": "temporary_help", + "other": "other", + "andere": "other" +} + +# Mapping from German and English phrases to desired English forms +working_hours_mapping = { + "teilzeit": "part_time", + "vollzeit": "full_time", + "part-time": "part_time", + "full-time": "full_time", + "part": "part_time", + "full": "full_time", + "time": "part_time" +} + +num_cols = ["jobseeker_preference_salary_min", + "job_salary_amount", + "job_salary_range_min", + "job_salary_range_max" + ] + +def dict_to_list(x): + if pd.isna(x): # Check if the value is NaN + return [] + result = [f'{elem.strip()}' for elem in str(x).strip('{}').split(',') if elem.strip() != ""] + return result + +# Adjusted function to convert a string list of employment types to a list of enum values +def convert_to_enum(string_list): + list_of_strings = json.loads(string_list.replace("'", "\"")) + return [employment_mapping[item.lower()] for item in list_of_strings] + +# Function to convert list of German/English phrases to list of desired English forms +def convert_to_english(string_list): + list_of_strings = json.loads(string_list.replace("'", "\"")) + result = [working_hours_mapping[item.lower()] for item in list_of_strings] + return result if result else [] + +def handle_nan_and_split(series): + return series.fillna('').str.replace('full_time_part_time', + 'full_time,part_time').str.split(',').apply(lambda x: [] if x == [''] else x) + +def process_dataframe(df): + + df.loc[:,'job_allows_easy_apply'] = (df.loc[:,'job_allows_easy_apply'] == 't') + df.loc[:,'job_open_for_career_changers'] = (df.loc[:,'job_open_for_career_changers'] == True) + + df.loc[:,'job_kldb_code'] = df.loc[:,'job_kldb_code'].fillna(0).astype(int).astype(str).str.zfill(4) + df.loc[:,'jobseeker_preference_job_title_kldb_code'] = df.loc[:,'jobseeker_preference_job_title_kldb_code'].fillna(0).astype(int).astype(str).str.zfill(4) + df.loc[:,'job_product_type'] = df.loc[:,'job_product_type'].apply(lambda x: x.lower() if isinstance(x, str) else x) + df.loc[:,'job_origin'] = df.loc[:,'job_origin'].apply(lambda x: x.lower() if isinstance(x, str) else x) + + df.loc[:,'jobseeker_preference_employment_types'] = df.loc[:,'jobseeker_preference_employment_types'].apply(dict_to_list) + df.loc[:,'job_employment_types'] = df.loc[:,'job_employment_types'].apply(dict_to_list) + + df.loc[:,'last_feed_employment_type_filter'] = df.loc[:,'last_feed_employment_type_filter'].apply(convert_to_enum) + df.loc[:,'last_feed_working_hours_filter'] = df.loc[:,'last_feed_working_hours_filter'].apply(convert_to_english) + + df.loc[:,'jobseeker_preference_working_hours'] = handle_nan_and_split(df.loc[:,'jobseeker_preference_working_hours']) + df.loc[:,'job_working_hours'] = handle_nan_and_split(df.loc[:,'job_working_hours']) + + df.loc[:,'common_employment_types'] = df.apply(lambda row: list(set(row['job_employment_types'] + row['last_feed_employment_type_filter'])), axis=1) + df.loc[:,'common_working_hours'] = df.apply(lambda row: list(set(row['job_working_hours'] + row['last_feed_working_hours_filter'])), axis=1) + + max_date = datetime.date.today() + scale = 60 + df.loc[:,'gauss_recency'] = df.loc[:,'derived_tstamp'].apply(lambda date: np.round(np.exp(- ((max_date - date).days ** 2 / (2 * scale ** 2))),4)) + + df.loc[:, num_cols] = df.loc[:, num_cols].fillna(0.0) + + return df + + +def process_traindata_only(df): + + three_months_ago = df.loc[:,'derived_tstamp'].max() - datetime.timedelta(days=90) + top_50_company_uids = df[df['derived_tstamp'] >= three_months_ago]['job_company_uid'].value_counts().nlargest(50).index + + df.loc[~df['job_company_uid'].isin(top_50_company_uids), 'job_company_uid'] = 'other' + return df + + +class SimilarityScoreCalculator(BaseEstimator, TransformerMixin): + def __init__(self): + self.legal_entity = [ + "a.g", + "a. g", + "ag", + "co", + "de", + "e.k", + "e. k", + "ek", + "e.Kfr", + "e.u", + "e. u", + "eu", + "e.v", + "e. v", + "ev", + "e.g", + "e. g", + "eg", + "ewiv", + "gag", + "gbr", + "germany", + "gesbr", + "ges. mb h", + "gmbh", + "ggmbh", + "group", + "gesbr", + "gesmbh", + "ges. mb h", + "g-reit", + "holding", + "i.g", + "inc", + "invag", + "keg", + "kdör", + "kg", + "k.i.e.ö.r", + "kgaa", + "ohg", + "mbh", + "partg", + "plc", + "sce", + "se", + "ug", + "vvag", + ] + + self.stop_words = [ + "ab", + "und", + "e", + "mit", + "auch", + "am", + "ohne", + "die", + "zum", + "b", + "für", + "im", + "in", + "gn", + "der", + "all genders", + "als", + "area", + "basis", + "befristet", + "bei", + "bewerben", + "bonus", + "branchenzuschlägen", + "brutto", + "bundesweit", + "chance neu durchzustarten", + "company confidential", + "da", + "daz", + "dazuv", + "dazuver", + "dazuverdie", + "dazuverdien", + "dazuverdiene", + "dazuverdienen", + "dein", + "direktvermittlung", + "dringend", + "dtm2FULL-TIME", + "eur", + "eur/h", + "euro", + "fahrgeld", + "festanstellung", + "für die region", + "gehalt", + "gesucht", + "großer", + "großraum", + "hamburg", + "im raum", + "in 3 minuten erfolgreich", + "in teilzeit", + "inhouse", + "jahresdurchschnitt", + "jetzt", + "job", + "m w d", + "mgl", + "mind", + "monat", + "mwd", + "neu", + "neuer", + "oder", + "pro", + "pro stunde", + "pro", + "prämie", + "ref", + "refnr", + "remote", + "schichten", + "schichtzuschlägen", + "sehr gutes", + "signing", + "sofort", + "st", + "standort", + "standorte", + "startprämie", + "std", + "std br beginnend", + "stunde", + "stunde", + "stundenlohn", + "tage", + "team", + "top", + "unbefristet", + "unbefristete", + "up to", + "urlaub", + "verdienst", + "von", + "wechseln", + "with growth opportunities", + ] + + self.size = 100 + self.window = 25 + self.min_count = 1 + self.workers = 4 + self.epochs = 10 + + # remove alphanumeric words like J123, 123, 123-123, 1,000 from text + self.pattern0 = re.compile(r"(\b[a-zA-Z]?\d+(,|\.|-)?(?!:|th|[rn]d|st)(\w+)?)", re.IGNORECASE) + # remove m/f | m/f/d | m/f/div patterns from text + self.pattern1 = re.compile(r"\b(\w{1})([\/|\||\*])(\w{1})(([\/|\||\*]))?(\b\w{0,7})?(([\/|\||\*]))?(\b\w{1})?\b") + # remove circular brackets from text + self.pattern2 = re.compile(r"\&|\"|\,|\(|\)") + # remove multiple continuous white spaces from text + self.pattern3 = re.compile(r" +") + # remove special characters other than alphabets from pre / suffix of the text + self.pattern4 = re.compile(r"^\W+|\W+$|\s+\.\s+|\-\s+\-") + # remove words at the enf of the text + self.pattern5 = re.compile( + r"((? str: + """ + This function identifies patterns and removes it from the text + :param text: + :param regex_patterns: + :return: clean text + """ + for pattern in regex_patterns: + if re.findall(pattern, text): + text = re.sub(pattern, " ", text) + return text.strip() + + def clean_text(self, text: str) -> str: + """ + preprocesing function to remove special chars, numbers & some unwanted text like (m/f/d) combinations from text + The function retains "/" & "-" in the text and removes multiple white spaces. It also removes the stopwords + and german company legal entities from text + :param text: + :return: + """ + + clean_txt = self.remove_patterns(text, self.title_regex_preprocess) + clean_txt = self.remove_patterns(clean_txt, self.title_remove_stopwords) + clean_txt = self.remove_patterns(clean_txt, self.company_legal_entity_regex) + clean_txt = self.remove_patterns(clean_txt, self.title_regex_postprocess) + return clean_txt.lower().split(" ") + + def pre_calculate_vector(self, words): + words = words.split() + vectors = [self.word_vectors_dict.get(word) for word in words if self.word_vectors_dict.get(word) is not None] + return np.nanmean(vectors, axis=0) if vectors else np.zeros(self.size) + + def fit(self, X, y=None): + + X['target_w2v'] = y + X = X[(X['target_w2v'] == 1)] + X.drop(['target_w2v'], axis=1, inplace=True) + X = X.fillna("") + + X['job_title'] = X['job_title'].astype(str).apply(self.clean_text) + X['jobseeker_preference_job_title_b2c'] = X['jobseeker_preference_job_title_b2c'].apply(lambda row: [val.lower().split('/')[0] + for val in str(row).split() + if len(val) > 1]) + X['last_feed_search_query'] = X['last_feed_search_query'].apply(lambda row: [val.lower().split('/')[0] + for val in str(row).split() + if len(val) > 1]) + X['job_title'] = X['job_title'].apply(lambda row: [val.lower() for val in row if len(val) > 1]) + X['combined'] = X.apply(lambda row: list(set(i for sublist in [row['jobseeker_preference_job_title_b2c'], + row['last_feed_search_query'], + row['job_title']] for i in sublist if i)), axis=1) + + df = X.groupby('jobseeker_uid').agg({'combined': 'sum'}).reset_index() + self.model = Word2Vec(sentences=df['combined'].tolist(), + vector_size=self.size, + window=self.window, + min_count=self.min_count, + workers=self.workers, + epochs=self.epochs) + + unique_b2c_queries = X.apply(lambda row: ' '.join(row['jobseeker_preference_job_title_b2c'] + row['last_feed_search_query']), axis=1).unique() + # Pre-calculate the vectors for unique values of jobseekers and all the words + self.word_vectors_dict = {word: self.model.wv[word] for word in self.model.wv.index_to_key} + self.precalculated_b2c_query_vectors = {value: self.pre_calculate_vector(value) for value in unique_b2c_queries} + return self + + def process_row(self, row): + return " ".join([val.lower().split('/')[0] for val in str(row).split() if len(val) > 1]) + + def calculate_similarity(self, row): + if np.isnan(np.sum(row['jobseeker_b2c_queries_vec'])) or np.isnan(np.sum(row['job_title_vec'])): + return 0.0 + norm_product = np.linalg.norm(row['jobseeker_b2c_queries_vec']) * np.linalg.norm(row['job_title_vec']) + return np.round(np.dot(row['jobseeker_b2c_queries_vec'], row['job_title_vec']) / norm_product, 4) if norm_product > 0 else 0.0 + + def transform(self, X): + X = X.fillna("") + X['job_title'] = X['job_title'].str.lower() + X['jobseeker_preference_job_title_b2c'] = X['jobseeker_preference_job_title_b2c'].apply(self.process_row) + X['last_feed_search_query'] = X['last_feed_search_query'].apply(self.process_row) + X['jobseeker_b2c_queries'] = X['jobseeker_preference_job_title_b2c'] + " " + X['last_feed_search_query'] + X['jobseeker_b2c_queries'] = X['jobseeker_b2c_queries'].str.strip() + + X['jobseeker_b2c_queries_vec'] = [self.precalculated_b2c_query_vectors.get( + query, + np.nanmean([ + self.word_vectors_dict.get(word) + for word in query.split(" ") + if self.word_vectors_dict.get(word) is not None + ], axis=0) if query.split() else np.zeros(self.size)) + for query in X['jobseeker_b2c_queries'] + ] + X['job_title_vec'] = [np.nanmean([ + self.word_vectors_dict.get(word) + for word in query.split(" ") + if self.word_vectors_dict.get(word) is not None + ], axis=0) if query.split() else np.zeros(self.size) + for query in X['job_title'] + ] + + X['w2v_similarity_score'] = X[['jobseeker_b2c_queries_vec','job_title_vec']].apply(self.calculate_similarity, axis=1) + X['fuzziness_similarity_score'] = [round(fuzz.partial_ratio(query1, query2)/100, 4) for query1, query2 in zip(X['jobseeker_b2c_queries'], X['job_title'])] + return X[['w2v_similarity_score', 'fuzziness_similarity_score']].to_numpy() + + def get_feature_names_out(self, input_features=None): + # Generate and return feature names based on input_features or any other logic + return ['w2v_similarity_score', 'fuzziness_similarity_score'] + + +class MultiLabelBinarizerTransformer(BaseEstimator, TransformerMixin): + """ + Wraps `MultiLabelBinarizer` in a form that can work with `ColumnTransformer` + """ + + def __init__(self): + self.mlbs = {} + + def fit(self, X, y=None): + for col in X.columns: + mlb = MultiLabelBinarizer() + mlb.fit(X[col]) + self.mlbs[col] = mlb + return self + + def transform(self, X): + transformed_cols = [self.mlbs[col].transform(X[col]) for col in X.columns] + return np.concatenate(transformed_cols, axis=1) + + def get_feature_names_out(self, input_features=None): + feature_names_out = [] + for col in self.mlbs: + feature_names_out.extend([f'{col}_{class_}' for class_ in self.mlbs[col].classes_]) + return feature_names_out + + +class DistanceCalculator(BaseEstimator, TransformerMixin): + def __init__(self): + self.scaler = MinMaxScaler() + + def fit(self, X, y=None): + # compute the Euclidean distance + X['distance'] = X.apply(lambda row: self.euclidean_distance(row['jobseeker_preference_lng'], + row['jobseeker_preference_lat'], + row['job_lng'], + row['job_lat']), axis=1) + + # if any column is 0, set distance to a large number (e.g., the maximum distance observed) + X.loc[(X['jobseeker_preference_lat'] == 0) | (X['jobseeker_preference_lng'] == 0) | (X['job_lat'] == 0) | (X['job_lng'] == 0), 'distance'] = X['distance'].max() + + # fit the MinMaxScaler to the distances + self.scaler.fit(X[['distance']]) + + return self + + def euclidean_distance(self, lon1, lat1, lon2, lat2): + """ + Calculate the Euclidean distance between two points + on the earth (specified in decimal degrees) + """ + return ((lon2 - lon1)**2 + (lat2 - lat1)**2)**0.5 + + def transform(self, X): + # compute the Euclidean distance + X['distance'] = X.apply(lambda row: self.euclidean_distance(row['jobseeker_preference_lng'], + row['jobseeker_preference_lat'], + row['job_lng'], + row['job_lat']), axis=1) + max_distance = X['distance'].max() + # if any column is 0, set distance to a large number (e.g., the maximum distance observed) + X.loc[(X['jobseeker_preference_lat'] == 0) | (X['jobseeker_preference_lng'] == 0) | (X['job_lat'] == 0) | (X['job_lng'] == 0), 'distance'] = max_distance + + # normalize between 0 and 1 using the MinMaxScaler fitted during the fit method + X['distance_normalized'] = self.scaler.transform(X[['distance']]) + + return X[['distance_normalized']].to_numpy() + + def get_feature_names_out(self, input_features=None): + return ["distance"] + + +class KldbTransformer(BaseEstimator, TransformerMixin): + """Preprocesses and extracts the first N digits from a Kldb code""" + def fit(self, X, y=None): + # store the input feature names + self.input_features_ = X.columns + return self + + def transform(self, X): + X_transformed = pd.DataFrame() + for col in X.columns: + X_1_digit = X[col].str.slice(0, 1) # first digit + X_2_digits = X[col].str.slice(0, 2) # first two digits + X_transformed = pd.concat([X_transformed, + pd.DataFrame({f'{col}_{n}_digit': X_n_digits for n, X_n_digits in zip(range(2, 0, -1), [X_2_digits, X_1_digit])})], axis=1) + return X_transformed + + def get_feature_names_out(self, input_features=None): + # use the stored feature names if input_features is not provided + return [f'{col}_{n}_digit' for col in self.input_features_ for n in range(2, 0, -1)] diff --git a/training_pipeline/src/personalization/xgboost_evaluate-encoder.py b/training_pipeline/src/personalization/xgboost_evaluate-encoder.py new file mode 100644 index 0000000..a2257d8 --- /dev/null +++ b/training_pipeline/src/personalization/xgboost_evaluate-encoder.py @@ -0,0 +1,115 @@ +import sys +import subprocess + +def install(package): + subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package]) + +install('gensim==4.3.2') +install('rapidfuzz==3.6.1') +install('scikit-learn==1.0.2') +install('category_encoders==2.6.3') + + +import argparse +import pathlib +import boto3 +import logging +import os +import json +import joblib +import pickle +import re +import ast +import tarfile +import pandas as pd +import numpy as np +import datetime +import xgboost as xgb +from io import StringIO +from ast import literal_eval +from sklearn.compose import ColumnTransformer +from sklearn.pipeline import Pipeline +from sklearn.impute import SimpleImputer +from sagemaker_containers.beta.framework import worker +from sklearn.metrics import roc_auc_score +from category_encoders import OrdinalEncoder +from utils.transform_features import * + +additonal_cols = ['job_allows_easy_apply', + 'job_open_for_career_changers', + 'jobseeker_preference_salary_min', + 'job_salary_amount', + 'job_salary_range_min', + 'job_salary_range_max' + ] + +# Create a list of columns to drop from the dataset +columns_to_drop = ['jdp_view_count', + 'job_bookmark_count', + 'application_start_count', + 'application_sent_count', + 'application_submit_count', + 'gold_application_sent_count'] + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--include-jobseeker-uid-in-training", type=str, default="yes") + + args = parser.parse_args() + logger.info("Loaded parser arguments") + + model_path = "/opt/ml/processing/model/model.tar.gz" + with tarfile.open(model_path) as tar: + tar.extractall(path=".") + + logger.debug("Loading xgboost model.") + # The name of the file should match how the model was saved in the training script + # model = pickle.load(open("xgboost-model", "rb")) + model = xgb.Booster() + model.load_model('xgboost-model') + + logger.debug("Loading preprocess transformer.") + # The name of the file should match how the model was saved in the training script + preprocess_transformer_path = "/opt/ml/processing/preprocess_pipeline" + preprocessor = joblib.load(os.path.join(preprocess_transformer_path, "preprocess_pipeline.joblib")) + + logger.debug("Reading test data.") + test_local_path = "/opt/ml/processing/test/test.parquet" + data_test = pd.read_parquet(test_local_path) + + data_test = process_dataframe(data_test) + data_test = data_test.drop(columns=columns_to_drop, axis=1) + + # Extract test set target column + y_test = data_test['target'].values + X_test = pd.DataFrame(preprocessor.transform(data_test)) + # Concatenate X_train and y_train + X_test = pd.concat([X_test, data_test[additonal_cols]], + axis=1, ignore_index=True) + X_test = X_test.to_numpy() + + X_test = xgb.DMatrix(X_test) + + logger.info("Generating predictions for test data.") + pred = model.predict(X_test) + + # Calculate model evaluation score + logger.debug("Calculating ROC-AUC score.") + auc = roc_auc_score(y_test, pred) + metric_dict = { + "classification_metrics": {"roc_auc": {"value": auc}} + } + + # Save model evaluation metrics + output_dir = "/opt/ml/processing/evaluation" + pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) + + logger.info("Writing evaluation report with ROC-AUC: %f", auc) + evaluation_path = f"{output_dir}/evaluation.json" + with open(evaluation_path, "w") as f: + f.write(json.dumps(metric_dict)) diff --git a/training_pipeline/training_pipeline.py b/training_pipeline/training_pipeline.py index 3b17fdf..6d74f7f 100644 --- a/training_pipeline/training_pipeline.py +++ b/training_pipeline/training_pipeline.py @@ -8,322 +8,489 @@ import os import argparse from datetime import datetime +from aws_profiles import UserProfiles + + +import pandas as pd +import json import boto3 -from sagemaker.processing import ScriptProcessor -from sagemaker.pytorch.processing import PyTorchProcessor -from sagemaker.workflow.steps import ProcessingStep, TrainingStep -from sagemaker.processing import ProcessingInput, ProcessingOutput -from sagemaker.workflow.properties import PropertyFile -from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat -from sagemaker.model_metrics import MetricsSource, ModelMetrics +import pathlib +import io +import sagemaker +import time + +from sagemaker.model import Model +from sagemaker.sklearn import SKLearn, SKLearnModel +from sagemaker.xgboost import XGBoostModel +from sagemaker import PipelineModel + +from sagemaker.deserializers import CSVDeserializer +from sagemaker.serializers import CSVSerializer + +from sagemaker.xgboost.estimator import XGBoost +from sagemaker.estimator import Estimator +from sagemaker.sklearn.processing import SKLearnProcessor +from sagemaker.processing import ( + ProcessingInput, + ProcessingOutput, + ScriptProcessor +) +from sagemaker.inputs import TrainingInput + +from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.model_step import ModelStep +from sagemaker.workflow.functions import Join + +from sagemaker.workflow.steps import ( + ProcessingStep, + TrainingStep, + CreateModelStep, + CacheConfig +) +from sagemaker.workflow.check_job_config import CheckJobConfig +from sagemaker.workflow.parameters import ( + ParameterInteger, + ParameterFloat, + ParameterString, + ParameterBoolean +) +from sagemaker.workflow.clarify_check_step import ( + ModelBiasCheckConfig, + ClarifyCheckStep, + ModelExplainabilityCheckConfig +) +from sagemaker.workflow.step_collections import RegisterModel from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo +from sagemaker.workflow.properties import PropertyFile from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet -from sagemaker.workflow.pipeline import Pipeline -from sagemaker.inputs import TrainingInput -from sagemaker.huggingface import HuggingFaceProcessor, HuggingFace -from sagemaker.huggingface.model import HuggingFaceModel -from sagemaker.workflow.model_step import ModelStep -from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig -from sagemaker.workflow.pipeline_context import PipelineSession -from sagemaker.workflow.steps import CacheConfig -from aws_profiles import UserProfiles +from sagemaker.workflow.lambda_step import ( + LambdaStep, + LambdaOutput, + LambdaOutputTypeEnum, +) +from sagemaker.lambda_helper import Lambda + +from sagemaker.model_metrics import ( + MetricsSource, + ModelMetrics, + FileSource +) +from sagemaker.drift_check_baselines import DriftCheckBaselines + +from sagemaker.image_uris import retrieve + def get_pipeline(pipeline_name: str, profile_name: str, region: str) -> Pipeline: - session = ( + sess = ( boto3.Session(profile_name=profile_name) if profile_name else boto3.Session() ) - sagemaker_session = PipelineSession(boto_session=session) - account_id = session.client("sts").get_caller_identity().get("Account") + iam = sess.client("iam") + + # Fetch SageMaker execution role + # sagemaker_role = sagemaker.get_execution_role() + account_id = sess.client("sts").get_caller_identity().get("Account") + sagemaker_role = iam.get_role(RoleName=f"{account_id}-sagemaker-exec")["Role"]["Arn"] + + + # sagemaker_session = PipelineSession(boto_session=session) + # account_id = session.client("sts").get_caller_identity().get("Account") + + # iam = session.client("iam") + # role = iam.get_role(RoleName=f"{account_id}-sagemaker-exec")["Role"]["Arn"] + + # default_bucket = sagemaker_session.default_bucket() - iam = session.client("iam") - role = iam.get_role(RoleName=f"{account_id}-sagemaker-exec")["Role"]["Arn"] + # Docker images are located in ECR in 'operations' account + # operations_id = UserProfiles().get_profile_id("operations") + # custom_image_uri = ( + # f"{operations_id}.dkr.ecr.{region}.amazonaws.com/training-image:latest" + # ) - default_bucket = sagemaker_session.default_bucket() + # Set names of pipeline objects + pipeline_name = "Job-Personalization-Pipeline" - # Docker images are located in ECR in 'operations' account - operations_id = UserProfiles().get_profile_id("operations") - custom_image_uri = ( - f"{operations_id}.dkr.ecr.{region}.amazonaws.com/training-image:latest" - ) + # for xgboost + pipeline_model_name = "job-personalization-contextual-xgb" + model_package_group_name = "job-personalization-contextual-xgb-group" + base_job_name_prefix = "job-personalization" + endpoint_config_name = f"{pipeline_model_name}-endpoint-config" + endpoint_name = 'job-personalization-model-contextual-xgb' - model_path = f"s3://{default_bucket}/model" - data_path = f"s3://{default_bucket}/data" - model_package_group_name = f"{pipeline_name}ModelGroup" - model_package_group_arn = ( - f"arn:aws:sagemaker:{region}:{account_id}:" - f"model-package/{model_package_group_name}" - ) + # Set data parameters + target_col = "target" + + # important default training parameters (in sprint format) + train_validation_ratio = "0.95" + test_day_span = "3" + negative_sample_ratio = "2" + random_negative_sample_multiplier = "2" + include_jobseeker_uid_in_training = "no" + include_jdp_view_as_target = "no" - gpu_instance_type = "ml.g4dn.xlarge" - cpu_instance_type = "ml.m5.large" - pytorch_version = "1.9.0" - transformers_version = "4.11.0" - py_version = "py38" - requirement_dependencies = ["images/train/requirements.txt"] + # Set instance types and counts + process_instance_type = "ml.t3.2xlarge" + train_instance_count = 1 + train_instance_type = "ml.m5.4xlarge" - cache_config = CacheConfig(enable_caching=False, expire_after="30d") + # enable caching + cache_config = CacheConfig(enable_caching=False, expire_after="PT5H") - trial_name = "trial-run-" + datetime.now().strftime("%d-%m-%Y--%H-%M-%S") - pipeline_experiment_config = PipelineExperimentConfig(pipeline_name, trial_name) # ====================================================== # Define Pipeline Parameters # ====================================================== - epoch_count = ParameterInteger(name="epochs", default_value=1) - batch_size = ParameterInteger(name="batch_size", default_value=10) - learning_rate = ParameterFloat(name="learning_rate", default_value=1e-5) + # Set up pipeline input parameters + + # Set timestamp param + timestamp_param = ParameterString( + name="timestamp", default_value="2024-03-11T11:11:11Z" + ) + + # Set timestamp param + bucket_param = ParameterString( + name="bucket", default_value="heyjobs-job-recommendations-production" + ) + + # Setup important parameters + train_validation_ratio_param = ParameterString( + name="train_validation_ratio", + default_value=train_validation_ratio, + ) + + test_day_span_param = ParameterString( + name="test_day_span", + default_value=test_day_span, + ) + + negative_sample_ratio_param = ParameterString( + name="negative_sample_ratio", + default_value=negative_sample_ratio, + ) + + random_negative_sample_multiplier_param = ParameterString( + name="random_negative_sample_multiplier", + default_value=random_negative_sample_multiplier, + ) + + include_jobseeker_uid_in_training_param = ParameterString( + name="include_jobseeker_uid_in_training", + default_value=include_jobseeker_uid_in_training, + ) + + include_jdp_view_as_target_param = ParameterString( + name="include_jdp_view_as_target", + default_value=include_jdp_view_as_target, + ) + + # Set processing instance type + process_instance_type_param = ParameterString( + name="ProcessingInstanceType", + default_value=process_instance_type, + ) + + # Set training instance type + train_instance_type_param = ParameterString( + name="TrainingInstanceType", + default_value=train_instance_type, + ) + + # Set training instance count + train_instance_count_param = ParameterInteger( + name="TrainingInstanceCount", + default_value=train_instance_count + ) + + # Set model approval param + model_approval_status_param = ParameterString( + name="ModelApprovalStatus", default_value="Approved" + ) + + # Set model deployment param + model_deployment_param = ParameterString( + name="ModelDeploymentlStatus", default_value="no" + ) # ====================================================== # Step 1: Load and preprocess the data # ====================================================== - script_preprocess = PyTorchProcessor( - framework_version="1.8", - instance_type=cpu_instance_type, - image_uri=custom_image_uri, + # Define the SKLearnProcessor configuration + sklearn_processor = SKLearnProcessor( + framework_version="1.0-1", + role=sagemaker_role, instance_count=1, - base_job_name="preprocess-script", - role=role, - sagemaker_session=sagemaker_session, + instance_type=process_instance_type, + base_job_name=f"{base_job_name_prefix}-processing", ) - preprocess_step_args = script_preprocess.run( + # Define pipeline processing step + process_step = ProcessingStep( + name="DataProcessing", + processor=sklearn_processor, inputs=[ - ProcessingInput( - source=os.path.join(data_path, "train.csv"), - destination="/opt/ml/processing/input/train", - ), - ProcessingInput( - source=os.path.join(data_path, "test.csv"), - destination="/opt/ml/processing/input/test", - ), - ProcessingInput( - source=os.path.join(data_path, "val.csv"), - destination="/opt/ml/processing/input/val", - ), + ProcessingInput(source='personalization/src/utils/', destination="/opt/ml/processing/input/code/utils/") ], outputs=[ ProcessingOutput( - output_name="train", source="/opt/ml/processing/output/train" + destination=Join(on='/', values=["s3:/", bucket_param, "job-personalization/processing_jobs", timestamp_param, "train_data"]), + output_name="train_data", + source="/opt/ml/processing/train" ), ProcessingOutput( - output_name="test", source="/opt/ml/processing/output/test" + destination=Join(on='/', values=["s3:/", bucket_param, "job-personalization/processing_jobs", timestamp_param, "validation_data"]), + output_name="validation_data", + source="/opt/ml/processing/val") + , + ProcessingOutput( + destination=Join(on='/', values=["s3:/", bucket_param, "job-personalization/processing_jobs", timestamp_param, "test_data"]), + output_name="test_data", + source="/opt/ml/processing/test" ), - ProcessingOutput(output_name="val", source="/opt/ml/processing/output/val"), + ProcessingOutput( + destination=Join(on='/', values=["s3:/", bucket_param, "job-personalization/processing_jobs", timestamp_param, "preprocess_pipeline"]), + output_name="preprocess_pipeline", + source="/opt/ml/processing/preprocess_pipeline" + ) + ], + job_arguments=[ + "--train-validation-ratio", train_validation_ratio_param, + "--test-day-span", test_day_span_param, + "--negative-sample-ratio", negative_sample_ratio_param, + "--random-negative-sample-multiplier", random_negative_sample_multiplier_param, + "--include-jobseeker-uid-in-training", include_jobseeker_uid_in_training_param, + "--include-jdp-view-as-target", include_jdp_view_as_target_param ], - code="preprocess.py", - source_dir="src", + code="personalization/src/sk_preprocess-encoder_new.py", + cache_config=cache_config ) + # ====================================================== + # Step 2: Train model + # ====================================================== - step_preprocess = ProcessingStep( - name="preprocess-data", - step_args=preprocess_step_args, - cache_config=cache_config, + # Retrieve training image + xgboost_training_image = retrieve(framework="xgboost", + region=region, + version="1.7-1", + image_scope="training") + + # Set XGBoost model hyperparameters + hyperparams = { + "eval_metric" : "auc", + "objective": "binary:logistic", + "num_round": "250", + "max_depth": "15", + "subsample": "0.9", + "colsample_bytree": "0.9", + "eta": "0.1" + } + + estimator_output_uri = Join(on='/', values=["s3:/", bucket_param, "job-personalization/training_jobs", timestamp_param]) + + xgb_estimator = Estimator( + image_uri=xgboost_training_image, + instance_type=train_instance_type, + instance_count=train_instance_count, + output_path=estimator_output_uri, + code_location=estimator_output_uri, + role=sagemaker_role, ) + xgb_estimator.set_hyperparameters(**hyperparams) + + # Access the location where the preceding processing step saved train and validation datasets + # Pipeline step properties can give access to outputs which can be used in succeeding steps + s3_input_train = TrainingInput( + s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, + content_type="text/csv" + ) + s3_input_validation = TrainingInput( + s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri, + content_type="text/csv" + ) + + # Set pipeline training step + train_step = TrainingStep( + name="XGBModelTraining", + estimator=xgb_estimator, + inputs={ + "train":s3_input_train, # Train channel + "validation": s3_input_validation # Validation channel + }, + cache_config=cache_config + ) # ====================================================== - # Step 2: Train Huggingface model and optionally finetune hyperparameter + # Step 3: Building sklearn pipeline model # ====================================================== - estimator = HuggingFace( - instance_type=gpu_instance_type, - instance_count=1, - source_dir="src", - entry_point="train.py", - sagemaker_session=sagemaker_session, - role=role, - output_path=model_path, - transformers_version=transformers_version, - pytorch_version=pytorch_version, - py_version=py_version, - dependencies=requirement_dependencies, - ) - - estimator.set_hyperparameters( - epoch_count=epoch_count, - batch_size=batch_size, - learning_rate=learning_rate, + scaler_model = SKLearnModel( + name="SKLearnPipelineModelXGB", + model_data=Join(on='/', values=["s3:/", bucket_param, "job-personalization/processing_jobs", timestamp_param, "preprocess_pipeline", "preprocess_pipeline.tar.gz"]), + role=sagemaker_role, + sagemaker_session=sess, + source_dir="personalization/src/", + entry_point="sk_preprocess-encoder_new.py", + framework_version="1.0-1", + ) + + scaler_model.env = {"SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT":"text/csv"} + + xgboost_inference_image = retrieve(framework="xgboost", + region=region, + version="1.7-1", + image_scope="inference") + + model_artifacts = train_step.properties.ModelArtifacts.S3ModelArtifacts + + xgboost_model = Model( + image_uri=xgboost_inference_image, + model_data=model_artifacts, + sagemaker_session=sess, + role=sagemaker_role, ) - step_train = TrainingStep( - name="train-model", - estimator=estimator, - cache_config=cache_config, - inputs={ - "train": TrainingInput( - s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[ - "train" - ].S3Output.S3Uri, - content_type="text/csv", - ), - "test": TrainingInput( - s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[ - "test" - ].S3Output.S3Uri, - content_type="text/csv", - ), - }, + pipeline_model = PipelineModel( + models=[scaler_model, xgboost_model], role=sagemaker_role, sagemaker_session=sess ) # ====================================================== - # Step 3: Evaluate model + # Step 4: Evaluate model # ====================================================== - script_eval = HuggingFaceProcessor( - instance_type=gpu_instance_type, - image_uri=custom_image_uri, + eval_processor = ScriptProcessor( + image_uri=xgboost_training_image, + command=["python3"], + instance_type="ml.m5.xlarge", instance_count=1, - base_job_name="eval-script", - role=role, - sagemaker_session=sagemaker_session, + base_job_name=f"{base_job_name_prefix}-model-eval", + sagemaker_session=sess, + role=sagemaker_role, ) - evaluation_report = PropertyFile( - name="EvaluationReport", output_name="evaluation", path="evaluation.json" + name="EvaluationReport", + output_name="evaluation", + path="evaluation.json", ) - eval_step_args = script_eval.run( + # Set model evaluation step + evaluation_step = ProcessingStep( + name="XGBModelEvaluate", + processor=eval_processor, inputs=[ ProcessingInput( - source=step_preprocess.properties.ProcessingOutputConfig.Outputs[ - "val" - ].S3Output.S3Uri, - destination="/opt/ml/processing/val", + # Fetch S3 location where train step saved model artifacts + source=train_step.properties.ModelArtifacts.S3ModelArtifacts, + destination="/opt/ml/processing/model", ), ProcessingInput( - source=step_train.properties.ModelArtifacts.S3ModelArtifacts, - destination="/opt/ml/processing/model", + source='personalization/src/utils/', + destination="/opt/ml/processing/input/code/utils/", + ), + ProcessingInput( + source=process_step.properties.ProcessingOutputConfig.Outputs["preprocess_pipeline"].S3Output.S3Uri, + destination="/opt/ml/processing/preprocess_pipeline", + ), + ProcessingInput( + # Fetch S3 location where processing step saved test data + source=process_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, + destination="/opt/ml/processing/test", ), ], outputs=[ - ProcessingOutput( - output_name="evaluation", source="/opt/ml/processing/evaluation" - ), + ProcessingOutput(destination=Join(on='/', values=["s3:/", bucket_param, "job-personalization/model_eval", timestamp_param]), + output_name="evaluation", + source="/opt/ml/processing/evaluation"), ], - code="eval.py", - source_dir="src", - ) - - step_eval = ProcessingStep( - name="eval-model", - step_args=eval_step_args, + job_arguments=[ + "--include-jobseeker-uid-in-training", include_jobseeker_uid_in_training_param + ], + code="personalization/src/xgboost_evaluate-encoder.py", property_files=[evaluation_report], - cache_config=cache_config, + cache_config=cache_config ) # ====================================================== - # Step 4: Register model + # Step 5: Register model # ====================================================== - evaluation_s3_uri = "{}/evaluation.json".format( - step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] - ) - model_metrics = ModelMetrics( model_statistics=MetricsSource( - s3_uri=evaluation_s3_uri, + s3_uri=Join(on='/', values=["s3:/", bucket_param, "job-personalization/model_eval", timestamp_param, "evaluation.json"]), content_type="application/json", ) ) - model = HuggingFaceModel( - name="text-classification-model", - model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - sagemaker_session=sagemaker_session, - source_dir="src", - entry_point="model.py", - dependencies=requirement_dependencies, - role=role, - transformers_version=transformers_version, - pytorch_version=pytorch_version, - py_version=py_version, + register_step = RegisterModel( + name="XGBRegisterModel", + model=pipeline_model, + content_types=["text/csv"], + response_types=["text/csv"], + inference_instances=["ml.t2.medium", "ml.t2.large", "ml.t2.xlarge", "ml.m5.large", "ml.m6g.large"], + model_metrics=model_metrics, + model_package_group_name=model_package_group_name, + approval_status=model_approval_status_param, ) - - step_register = ModelStep( - name="register-model", - step_args=model.register( - content_types=["text/csv"], - response_types=["text/csv"], - inference_instances=[gpu_instance_type, "ml.m5.large"], - transform_instances=[gpu_instance_type, "ml.m5.large"], - model_package_group_name=model_package_group_name, - model_metrics=model_metrics, - ), - ) - - # ====================================================== - # Step 5: Approve model - # ====================================================== - - script_approve = ScriptProcessor( - command=["python3"], - image_uri=custom_image_uri, - instance_type=cpu_instance_type, - instance_count=1, - base_job_name="script-approve", - role=role, - env={ - "model_package_version": step_register.properties.ModelPackageVersion.to_string(), - "model_package_group_arn": model_package_group_arn, - }, - sagemaker_session=sagemaker_session, - ) - - step_approve = ProcessingStep( - name="approve-model", - step_args=script_approve.run( - code="src/approve.py", - ), + pipeline_model.register( + content_types=["text/csv"], + response_types=["text/csv"], + inference_instances=["ml.t2.medium", "ml.t2.large", "ml.t2.xlarge", "ml.m5.large", "ml.m6g.large"], + model_package_group_name=model_package_group_name, + approval_status=model_approval_status_param, ) # ====================================================== # Step 6: Condition for model approval status # ====================================================== + # Evaluate model performance on test set cond_gte = ConditionGreaterThanOrEqualTo( left=JsonGet( - step_name=step_eval.name, + step_name=evaluation_step.name, property_file=evaluation_report, - json_path="metrics.accuracy.value", + json_path="classification_metrics.roc_auc.value", ), - right=0.1, + right=0.6, # Threshold to compare model performance against ) - step_cond = ConditionStep( - name="accuracy-check", + condition_step = ConditionStep( + name="CheckPersonalizationModelXGBEvaluation", conditions=[cond_gte], - if_steps=[step_approve], - else_steps=[], + if_steps=[register_step], + else_steps=[] ) # ====================================================== # Final Step: Define Pipeline # ====================================================== + # Create the Pipeline with all component steps and parameters pipeline = Pipeline( name=pipeline_name, - parameters=[ - epoch_count, - batch_size, - learning_rate, - ], + parameters=[timestamp_param, + bucket_param, + process_instance_type_param, + train_instance_type_param, + train_instance_count_param, + model_approval_status_param, + train_validation_ratio_param, + test_day_span_param, + negative_sample_ratio_param, + random_negative_sample_multiplier_param, + include_jobseeker_uid_in_training_param, + include_jdp_view_as_target_param], steps=[ - step_preprocess, - step_train, - step_register, - step_eval, - step_cond, + process_step, + train_step, + evaluation_step, + condition_step ], - sagemaker_session=sagemaker_session, - pipeline_experiment_config=pipeline_experiment_config, + sagemaker_session=sess ) - return pipeline