Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured logs #181

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/mlinfra/amplitude.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import string
import sys
from typing import Optional
from logger_config import log

from getmac import get_mac_address
from git.config import GitConfigParser
Expand Down Expand Up @@ -61,14 +62,16 @@ def send_event(
user_properties: Optional[dict] = None,
) -> None:
if hasattr(sys, "_called_from_test") or VERSION == DEV_VERSION:
print("Not sending amplitude cause we think we're in a pytest or in dev")
# print("Not sending amplitude cause we think we're in a pytest or in dev")
log.info("Not sending amplitude cause we think we're in a pytest or in dev")
return
event_properties = event_properties or {}
user_properties = user_properties or {}
insert_id = "".join(
random.SystemRandom().choices(string.ascii_letters + string.digits, k=16)
)
if event_type not in self.VALID_EVENTS:
log.exception("Invalid event type", event_type=event_type, valid_events=self.VALID_EVENTS)
raise Exception(f"Invalid event type: {event_type}")
body = {
"api_key": self.api_key,
Expand Down Expand Up @@ -100,14 +103,16 @@ def send_event(
timeout=10,
)
if r.status_code != codes.ok:
log.error("Analytics request failed",status_code=r.status_code,response_body=r.text)

raise Exception(
"Hey, we're trying to send some analytics over to our devs for the "
f"product usage and we got a {r.status_code} response back. Could "
"you pls email over to our dev team about this and tell them of the "
f"failure with the aforementioned code and this response body: {r.text}"
)
except Exception as err:
print(f"Unexpected error when connecting to amplitude {err=}, {type(err)=}")

#print(f"Unexpected error when connecting to amplitude {err=}, {type(err)=}")
log.error( "Unexpected error when connecting to Amplitude",error=str(err),error_type=type(err).__name__)

amplitude_client = AmplitudeClient()
8 changes: 3 additions & 5 deletions src/mlinfra/cli/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from mlinfra.utils.constants import TF_PATH
from mlinfra.utils.utils import run_command
from typing_extensions import Annotated
from logger_config import log

app = typer.Typer()

Expand Down Expand Up @@ -43,11 +44,8 @@ def generate_terraform_config(
)

run_command(["terraform", f"-chdir={TF_PATH}", "init"], capture_output=False)
print(
f"""
Terraform config has been generated in the {TF_PATH} folder.
"""
)
log.info("Terraform config has been generated", folder_path = TF_PATH)

amplitude_client.send_event(
amplitude_client.FINISH_GEN_TERRAFORM_EVENT,
event_properties=current_properties,
Expand Down
19 changes: 19 additions & 0 deletions src/mlinfra/logger_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
import structlog

structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.dev.ConsoleRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.NOTSET),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False
)

log = structlog.get_logger()
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
AbstractDeployment,
)
from mlinfra.utils.utils import generate_tf_json

from logger_config import log

class KindDeployment(AbstractDeployment):
def __init__(
Expand Down Expand Up @@ -181,11 +181,8 @@ def generate_deployment_config(self):
"config"
]["kubernetes"].get(k8s_config, None)
else:
print(
"""
WARNING: The config value {k8s_config} is not user facing.
Please check the k8s.yaml config file to see if this is a valid config value.
"""
log.warning("The config value is not user-facing. Please check eks.yaml for validity.",
config_value=k8s_config
)

generate_tf_json(module_name="kind", json_module=k8s_json_module)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
AbstractDeployment,
)
from mlinfra.utils.utils import generate_tf_json

from logger_config import log

class KubernetesDeployment(AbstractDeployment):
def __init__(
Expand Down Expand Up @@ -176,12 +176,9 @@ def generate_deployment_config(self):
"config"
]["kubernetes"].get(k8s_config, None)
else:
print(
"""
WARNING: The config value {k8s_config} is not user facing.
Please check the eks.yaml config file to see if this is a valid config value.
"""
)
log.warning("The config value is not user-facing. Please check eks.yaml for validity.",
config_value=k8s_config
)

generate_tf_json(module_name="eks", json_module=k8s_json_module)

Expand Down Expand Up @@ -217,10 +214,13 @@ def generate_deployment_config(self):
generate_tf_json(module_name="nodegroups", json_module=nodegroups_json_module)

elif self.provider == CloudProvider.GCP:
log.error("Provider is not yet supported", provider = self.provider)
raise ValueError(f"Provider {self.provider} is not yet supported")
elif self.provider == CloudProvider.AZURE:
log.error("Provider is not yet supported", provider = self.provider)
raise ValueError(f"Provider {self.provider} is not yet supported")
else:
log.error("Provider is not yet supported", provider = self.provider)
raise ValueError(f"Provider {self.provider} is not supported")

def configure_deployment(self):
Expand Down
4 changes: 4 additions & 0 deletions src/mlinfra/stack_processor/stack_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from mlinfra.stack_processor.stack_processor.local_stack import LocalStack

from logger_confing import log

class StackGenerator:
"""
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, stack_config):
or "deployment" not in self.stack_config
or "stack" not in self.stack_config
):
log.exception("Stack config component is misssing")
raise Exception("Stack config component is missing")

# this has to be done now as the stack config is read
Expand Down Expand Up @@ -196,6 +198,7 @@ def generate(self):
).generate()

else:
log.error("Deployment type not supported", deployment_type=deployment_type)
raise ValueError(f"Deployment type {deployment_type} not supported")

def configure_provider(self) -> CloudProvider:
Expand Down Expand Up @@ -224,4 +227,5 @@ def configure_provider(self) -> CloudProvider:
local_provider.configure_provider()
return CloudProvider.LOCAL
else:
log.error("Cloud provider not supported")
raise NotImplementedError("Cloud provider not supported")
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from mlinfra.utils.constants import TF_PATH

from logger_config import log

class CloudVMStack(AbstractStack):
"""Class representing the CloudVMStack class."""
Expand Down Expand Up @@ -192,6 +193,7 @@ def prepare_input(self):

def generate(self):
# logger.info("Processing Cloud Infrastructure")
log.info("Processing Cloud Infrastructure")
self.process_stack_config()
self.process_stack_inputs()
self.process_stack_modules()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AbstractStack,
)
from mlinfra.utils.constants import TF_PATH
from logger_confing import log


class KubernetesStack(AbstractStack):
Expand Down Expand Up @@ -207,7 +208,7 @@ def prepare_input(self):
json.dump(json_output, tf_json, ensure_ascii=False, indent=2)

def generate(self):
# logger.info("Processing Kubernetes Infrastructure")
log.info("Processing Kubernetes Infrastructure")
self.process_stack_config()
self.process_stack_inputs()
self.process_stack_modules()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AbstractStack,
)
from mlinfra.utils.constants import TF_PATH
from logger_confing import log


class LocalStack(AbstractStack):
Expand Down Expand Up @@ -149,7 +150,7 @@ def process_stack_inputs(self):
pass

def generate(self):
# logger.info("Processing Kubernetes Infrastructure")
log.info("Processing Kubernetes Infrastructure")
self.process_stack_config()
self.process_stack_inputs()
self.process_stack_modules()
Expand Down
26 changes: 17 additions & 9 deletions src/mlinfra/terraform/state_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ def manage_aws_state_storage(self) -> None:
Bucket=self.bucket_name,
)
except ClientError as e:

if e.response["Error"]["Code"] == "AuthFailure":
log.error("AWS Authentication Failure", error_code=e.response["Error"]["Code"], error_message=e.response['Error']['Message'])
raise Exception(
"The AWS Credentials are not configured properly.\n"
"https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration"
"for more information."
)
elif e.response["Error"]["Code"] == "AccessDenied":
log.error("Access Denied to S3 bucket", error_code=e.response['Error']['Code'], error_message=e.response['Error']['Message'], bucket_name=self.bucket_name)
raise Exception(
f"We were unable to access the S3 bucket, {self.bucket_name} on your AWS account.\n"
"Possible Issues: \n"
Expand All @@ -61,14 +64,15 @@ def manage_aws_state_storage(self) -> None:
"Please fix these issues and try again!"
)
elif e.response["Error"]["Code"] == "404":
print("S3 bucket for terraform state not found, creating a new one")
log.info("S3 bucket for Terraform state not found, creating a new one", bucket_name=self.bucket_name)
self._setup_bucket(
s3_client=s3,
region=self.region,
bucket_name=self.bucket_name,
bucket_exists=False,
)
else:
log.error("AWS S3 Error", error_code=e.response['Error']['Code'], error_message=e.response['Error']['Message'])
raise Exception(
f"{e.response['Error']['Code']} error with the message "
f"{e.response['Error']['Message']}"
Expand All @@ -88,12 +92,14 @@ def manage_aws_state_storage(self) -> None:
dynamodb.describe_table(TableName=self.dynamodb_table)
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
log.error("DynamoDB error", error_code = e.response['Error']['Code'], error_message = e.response['Error']['Message'])
raise Exception(
"When trying to determine the status of the state dynamodb table, we got an "
f"{e.response['Error']['Code']} error with the message "
f"{e.response['Error']['Message']}"
)
print("Dynamodb table for terraform state not found, creating a new one")
# print("Dynamodb table for terraform state not found, creating a new one")
log.info("DynamoDB table for terraform state not found, creating a new one")
dynamodb.create_table(
TableName=self.dynamodb_table,
KeySchema=[{"AttributeName": "LockID", "KeyType": "HASH"}],
Expand Down Expand Up @@ -121,6 +127,7 @@ def _setup_bucket(self, s3_client, region: str, bucket_name: str, bucket_exists:
)
time.sleep(10)
except ClientError as e:
log.error("AWS S3 error", error_code = e.response['Error']['Code'], error_message = e.response['Error']['Message'])
raise Exception(
f"When trying to create a new bucket with name {bucket_name}, we got an "
f"{e.response['Error']['Code']} error with the message "
Expand All @@ -135,32 +142,33 @@ def _setup_bucket(self, s3_client, region: str, bucket_name: str, bucket_exists:
try:
response = s3_client.get_bucket_versioning(Bucket=bucket_name).get("Status")
if response == "Enabled":
print("Versioning is already enabled on state bucket")
log.info("Versioning is already enabled on state bucket")
else:
print("Versioning is not yet enabled on state bucket")
log.info("Versioning is not yet enabled on state bucket")
s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={"Status": "Enabled"},
)
print("Versioning is now enabled on state bucket")
log.info("Versioning is now enabled on state bucket")
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchBucket":
log.error("Versioning Error", error_code = e.response['Error']['Code'], error_message = e.response['Error']['Message'])
raise Exception(
f"When trying to check if versioning is enabled on the state bucket, we got an "
f"{e.response['Error']['Code']} error with the message "
f"{e.response['Error']['Message']}"
)
print("State bucket does not exist")
log.info("State bucket does not exist")

# checking for bucket lifecycle configuration
# enable if it does not exist
try:
response = s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name).get("Rules")
if response is not None:
print("Bucket lifecycle configuration already exists on state bucket")
log.info("Bucket lifecycle configuration already exists on state bucket")
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchLifecycleConfiguration":
print("Bucket lifecycle configuration does not exists on state bucket")
log.info("Bucket lifecycle configuration does not exists on state bucket")
s3_client.put_bucket_lifecycle(
Bucket=bucket_name,
LifecycleConfiguration={
Expand All @@ -179,4 +187,4 @@ def _setup_bucket(self, s3_client, region: str, bucket_name: str, bucket_exists:
]
},
)
print("Bucket lifecycle configuration now exsits on state bucket")
log.info("Bucket lifecycle configuration now exsits on state bucket")
11 changes: 7 additions & 4 deletions src/mlinfra/terraform/terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
from importlib import resources
from typing import Tuple
from logger_config import log

# import hashlib
import boto3
Expand Down Expand Up @@ -60,20 +61,22 @@ def __init__(self, stack_config_path: str):
def check_config_file_exists(self):
"""This function is responsible for checking if the config file exists"""
if not os.path.isfile(self.stack_config_path):
log.error(f"The file {self.stack_config_path} does not exist.")
raise FileNotFoundError(f"The file {self.stack_config_path} does not exist.")

with open(self.stack_config_path, "r") as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
log.error("YAML parsing error",error=str(exc), error_type=type(exc).__name__)
raise ValueError(f"{exc}")

# check if all required keys are present in the config file
required_keys = ["name", "provider", "deployment", "stack"]

if not all(key in data for key in required_keys):
missing_keys = [key for key in required_keys if key not in data]
log.error("Missing keys!", missing_keys = missing_keys)
raise ValueError(f"The following keys are missing: {', '.join(missing_keys)}")

def clean_ml_infra_folder(self, delete_dir: bool = True):
Expand All @@ -84,8 +87,8 @@ def clean_ml_infra_folder(self, delete_dir: bool = True):
if delete_dir:
clean_tf_directory()
else:
print("The param delete_dir is set as false, skipping the directory deletion")

log.info("The param delete_dir is set as false, skipping the directory deletion")
def read_stack_config(self) -> yaml:
# clean the generated files directory
clean_tf_directory()
Expand Down Expand Up @@ -113,8 +116,8 @@ def read_stack_config(self) -> yaml:
# os.makedirs(f"{TF_PATH}/{stack_file_digest}", mode=0o777)

return config

except FileNotFoundError:
log.error("Stack config file not found", stack_config_path=self.stack_config_path)
raise FileNotFoundError(f"Stack config file not found: {self.stack_config_path}")

# TODO: write getters for state file name and region
Expand Down
Loading