Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
from typing import Optional
from typing import Union
import pandas as pd
from typing import Optional
from typing import Dict
from typing import Any
import yaml
import pkgutil


def get_root_dir() -> Path:
Expand All @@ -24,12 +29,29 @@ def read_csv(file_path: Path):
return df


def read_yaml(file_path: Path) -> Dict[str, Any]:
"""
Read a YAML file and return its contents as a dictionary.

:param file_path: Path to the YAML file.
:return data: Dictionary containing the YAML file contents.
"""
with open(file_path, "r") as file:
data = yaml.safe_load(file)
return data


def reader(extension: str) -> Optional[callable]:
"""
Get the appropriate reader function based on the file extension.

:param extension: File extension (e.g., '.csv').
:return reader_func: Function to read the file, or None if no reader is found.
"""
readers = {
".csv": read_csv,
".yaml": read_yaml,
".yml": read_yaml,
}

return readers.get(extension, None)
Expand Down
52 changes: 27 additions & 25 deletions examples/walmart_sales_regression/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ class WalmartSalesRegressor(mlflow.pyfunc.PythonModel):
Custom MLflow model for sales regression.
"""

def __init__(self):
def __init__(self, config):
"""
Initialize the WalmartSalesRegressor.
"""
self.numerical_features = ["Holiday_Flag"]
self.categorical_features = ["Temperature", "Fuel_Price", "CPI", "Unemployment"]
self.target = "Weekly_Sales"
self.numerical_features = config["numerical_features"]
self.categorical_features = config["categorical_features"]
self.target = config["target"]
self.artifact_uris = {}

def load_context(self, context):
Expand All @@ -35,9 +35,11 @@ def load_context(self, context):
:param context: The context object containing the model.
:return: None
"""

model_artifacts = context.artifacts
self.models = {
store_id: mlflow.sklearn.load_model(uri)
for store_id, uri in self.artifact_uris.items()
for store_id, uri in model_artifacts.items()
}
print(f"Model artifact URIs loaded: {self.artifact_uris}")

Expand Down Expand Up @@ -77,10 +79,27 @@ def fit_model(self, x_train, y_train, store_id: int, run_id: str):
].iloc[0:1],
)
mlflow.log_params({"store_id": store_id})
self.artifact_uris[store_id] = (
self.artifact_uris[str(store_id)] = (
f"runs:/{run.info.run_id}/model_store_{store_id}"
)

def predict(self, context, model_input, params=None):
"""
Perform prediction using the model.

:param context: The context object containing the model.
:param model_input: Input data for prediction.
:param params: Additional parameters for prediction.
:return: Predicted values.
"""
if params is not None:
store_id = params.get("store_id", "1")
if store_id not in self.artifact_uris.keys():
raise ValueError(f"Model for store ID {store_id} not found.")
return self._predict(store_id, model_input)
else:
return self._predict(None, model_input)

def _get_model_signature(self) -> ModelSignature:
"""
Get the model signature for the MLflow model.
Expand All @@ -96,7 +115,7 @@ def _get_model_signature(self) -> ModelSignature:
]

param_specification = [
ParamSpec(dtype="integer", name="store_id", default=1),
ParamSpec(dtype="string", name="store_id", default="1"),
]
param_schema = ParamSchema(
params=param_specification,
Expand Down Expand Up @@ -140,24 +159,7 @@ def _get_sklearn_pipeline(

return pipeline

def predict(self, context, model_input, params=None):
"""
Perform prediction using the model.

:param context: The context object containing the model.
:param model_input: Input data for prediction.
:param params: Additional parameters for prediction.
:return: Predicted values.
"""
if params is not None:
store_id = params.get("store_id", 1)
if store_id not in self.artifact_uris.keys():
raise ValueError(f"Model for store ID {store_id} not found.")
return self._predict(store_id, model_input)
else:
return self._predict(None, model_input)

def _predict(self, store_id: Optional[int], x):
def _predict(self, store_id: Optional[str], x):
"""
Predicts the target variable using the fitted model.

Expand Down
14 changes: 14 additions & 0 deletions examples/walmart_sales_regression/configs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
numerical_features:
- "Temperature"
- "Fuel_Price"
- "CPI"
- "Unemployment"

categorical_features:
- Holiday_Flag

target: "Weekly_Sales"
# Model configuration
registered_model_name: "walmart-store-sales-regressor"
run_name: "walmart-sales-regressors"
artifact_path: "store-sales-regressor"
11 changes: 7 additions & 4 deletions examples/walmart_sales_regression/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
from typing import Union
import pandas as pd
from typing import Tuple
from typing import Dict
from typing import Any
from sklearn.model_selection import train_test_split


class SalesDataProcessor:
def __init__(self, path):
def __init__(self, path, configs: Dict[str, Any]):
"""
Initialize the SalesDataProcessor with the path to the data and the data itself.

:param path: Path to the data file.
"""
self.load_data(path)
self.configs = configs

def load_data(self, path: Union[str, Path]) -> None:
"""
Expand All @@ -36,9 +39,9 @@ def create_train_test_split(self, test_size: float = 0.2) -> Tuple:
:return: Tuple containing the training and testing sets.
"""
df = self.data.copy()
numerical_features = ["Holiday_Flag"]
categorical_features = ["Temperature", "Fuel_Price", "CPI", "Unemployment"]
target = "Weekly_Sales"
numerical_features = self.configs["numerical_features"]
categorical_features = self.configs["categorical_features"]
target = self.configs["target"]

# drop date
df = df.drop(columns=["Date"])
Expand Down
20 changes: 13 additions & 7 deletions examples/walmart_sales_regression/inference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mlflow
import pandas as pd
from examples.utils.file_utils import get_root_dir
from examples.utils.file_utils import read_file
from examples.walmart_sales_regression.data import SalesDataProcessor


Expand All @@ -9,29 +10,34 @@ def main():
Perform inference using the Walmart sales regression model.
"""
root_dir = get_root_dir()

configs = read_file(
root_dir / "examples" / "walmart_sales_regression" / "configs.yaml"
)
registered_model_name = configs["registered_model_name"]
data_path = (
root_dir.parents[1] / "Downloads" / "sales-walmart" / "Walmart_Sales.csv"
) # change this to your data path

data_processor = SalesDataProcessor(path=data_path)
data_processor = SalesDataProcessor(path=data_path, configs=configs)
_, x_test, _, y_test = data_processor.create_train_test_split()

# load model
model_uri = "models:/walmart-store-sales-regressor@production"
model_uri = f"models:/{registered_model_name}@production"
model = mlflow.pyfunc.load_model(model_uri=model_uri)
print("Model loaded.")

# predicting for the store store_id
store_id = 3
x_test = x_test[x_test["Store"] == store_id]
y_test = y_test[y_test["Store"] == store_id]
# # predicting for the store store_id
store_id = "2"
x_test = x_test[x_test["Store"] == int(store_id)]
y_test = y_test[y_test["Store"] == int(store_id)]
x_test = x_test.drop(columns=["Store"])
y_test = y_test.drop(columns=["Store"])

# make predictions
predictions = model.predict(x_test, params={"store_id": store_id})

weekly_sales = y_test["Weekly_Sales"].values
weekly_sales = y_test[configs["target"]].values
print(
pd.DataFrame(
{
Expand Down
11 changes: 8 additions & 3 deletions examples/walmart_sales_regression/online_inference.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from examples.walmart_sales_regression.data import SalesDataProcessor
from examples.utils.file_utils import get_root_dir
from examples.utils.file_utils import read_file
import pandas as pd
import httpx

Expand All @@ -14,11 +15,15 @@ def main():

url = "http://localhost:5000/invocations"
root_dir = get_root_dir()
configs = read_file(
root_dir / "examples" / "walmart_sales_regression" / "configs.yaml"
)

data_path = (
root_dir.parents[1] / "Downloads" / "sales-walmart" / "Walmart_Sales.csv"
) # change this to your data path

data_processor = SalesDataProcessor(path=data_path)
data_processor = SalesDataProcessor(path=data_path, configs=configs)
_, x_test, _, y_test = data_processor.create_train_test_split()

# predicting for the store store_id
Expand All @@ -28,14 +33,14 @@ def main():

payload = {
"dataframe_split": x_test.to_dict(orient="split"),
"params": {"store_id": store_id},
"params": {"store_id": str(store_id)},
}
headers = {"Content-Type": "application/json"}
response = httpx.post(url, json=payload, headers=headers)
if response.status_code == 200:
predictions = response.json().get("predictions")
weekly_sales_pred = predictions
weekly_sales = y_test["Weekly_Sales"].values
weekly_sales = y_test[configs["target"]].values

print(
pd.DataFrame(
Expand Down
19 changes: 13 additions & 6 deletions examples/walmart_sales_regression/train.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from examples.walmart_sales_regression.data import SalesDataProcessor
from examples.walmart_sales_regression.base import WalmartSalesRegressor
from examples.utils.file_utils import get_root_dir
from examples.utils.file_utils import read_file
from examples.utils.decorators import mlflow_tracking_uri
from examples.utils.decorators import mlflow_client
from examples.utils.decorators import mlflow_experiment
from examples.utils.mlflow_utils import set_alias_to_latest_version

import mlflow


Expand All @@ -15,13 +17,16 @@ def main(**kwargs):
"""
Train the Walmart sales regression model.
"""
registered_model_name = "walmart-store-sales-regressor"
root = get_root_dir()
configs = read_file(root / "examples" / "walmart_sales_regression" / "configs.yaml")

registered_model_name = configs["registered_model_name"]
root_dir = get_root_dir()
data_path = (
root_dir.parents[1] / "Downloads" / "sales-walmart" / "Walmart_Sales.csv"
) # change this to your data path

data_processor = SalesDataProcessor(path=data_path)
data_processor = SalesDataProcessor(path=data_path, configs=configs)
x_train, x_test, y_train, y_test = data_processor.create_train_test_split()
print("Data loaded and split into training and testing sets.")

Expand All @@ -31,9 +36,9 @@ def main(**kwargs):
x_test = x_test[x_test["Store"].isin([1, 2, 3])]
y_test = y_test[y_test["Store"].isin([1, 2, 3])]

store_sales_regressor = WalmartSalesRegressor()
store_sales_regressor = WalmartSalesRegressor(config=configs)

with mlflow.start_run(run_name="walmart-sales-regressors") as run:
with mlflow.start_run(run_name=configs["run_name"]) as run:

for store_id in x_train["Store"].unique():
store_sales_regressor.fit_model(
Expand All @@ -47,21 +52,23 @@ def main(**kwargs):
signature = store_sales_regressor._get_model_signature()
# log model without code
mlflow.pyfunc.log_model(
artifact_path="store-sales-regressor",
artifact_path=configs["artifact_path"],
python_model=store_sales_regressor,
registered_model_name=registered_model_name,
input_example=x_test.sample(5),
signature=signature,
artifacts=store_sales_regressor.artifact_uris,
)

# log model with code
mlflow.pyfunc.log_model(
artifact_path="store-sales-regressor-code",
artifact_path=configs["artifact_path"] + "-code",
python_model=store_sales_regressor,
infer_code_paths=True,
registered_model_name=registered_model_name + "-code",
input_example=x_test.sample(5),
signature=signature,
artifacts=store_sales_regressor.artifact_uris,
)

print("Models fitted successfully.")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mlflow_for_ml_dev"
version = "1.5.3"
version = "1.5.4"
description = "Code examples for the youtube playlist 'MLflow for Machine Learning Development' by Manuel Gil"
authors = ["Manuel Gil <[email protected]>"]
readme = "README.md"
Expand Down
Loading