Skip to content

Commit

Permalink
Merge branch 'main' into fix/log-analytics-private-DNS2
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickmoore-nc authored Jan 6, 2025
2 parents 5976655 + e6f38f5 commit cae60c3
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 469 deletions.
15 changes: 0 additions & 15 deletions database/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'message_status') THEN
CREATE TYPE message_status AS ENUM ('created', 'pending_enrichment', 'enriched', 'sending', 'delivered', 'failed');
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'channel_status') THEN
CREATE TYPE channel_status AS ENUM (
'delivered', 'read', 'notification_attempted', 'unnotified', 'rejected', 'notified', 'received',
'permanent_failure', 'temporary_failure', 'technical_failure'
);
END IF;
END$$;

CREATE TABLE IF NOT EXISTS batch_messages (
Expand All @@ -36,12 +30,3 @@ CREATE TABLE IF NOT EXISTS message_statuses (
message_reference UUID NOT NULL,
status message_status DEFAULT 'created'
);

CREATE TABLE IF NOT EXISTS channel_statuses (
created_at TIMESTAMP DEFAULT NOW(),
details JSONB,
idempotency_key TEXT PRIMARY KEY,
message_id TEXT DEFAULT 'UNKNOWN',
message_reference UUID NOT NULL,
status channel_status
);
4 changes: 2 additions & 2 deletions src/functions/message_status/function_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import azure.functions as func
import json
import logging
import status_recorder
import message_status_recorder
import request_verifier

app = func.FunctionApp()
Expand All @@ -24,7 +24,7 @@ def main(req: func.HttpRequest) -> func.HttpResponse:
body = {"status": "error"}
elif request_verifier.verify_signature(req.headers, req_body):
body_dict = json.loads(req_body)
status_recorder.save_statuses(body_dict)
message_status_recorder.save_message_statuses(body_dict)
status_code = 200
body = {"status": "success"}
else:
Expand Down
38 changes: 38 additions & 0 deletions src/functions/message_status/message_status_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import datastore
import json
import logging
import message_status_validator


def save_message_statuses(request_body: dict) -> None:
message_statuses: list[dict] = message_status_params(request_body)

for message_status in message_statuses:
valid, message = message_status_validator.validate(message_status)
if valid:
datastore.create_message_status_record(message_status)
else:
logging.error(f"Validation failed: {message}")

return None


def message_status_params(request_body: dict) -> list[dict]:
params = []
for status_data in request_body["data"]:
try:
attributes = status_data["attributes"]
meta = status_data["meta"]
params.append({
"details": json.dumps(request_body),
"idempotency_key": meta["idempotencyKey"],
"message_id": attributes["messageId"],
"message_reference": attributes["messageReference"],
"status": attributes["messageStatus"],
})
except KeyError as e:
logging.error(f"Missing required field: {e}")
logging.error(f"Request body: {request_body}")
continue

return params
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ def validate(status_params: dict) -> tuple[bool, str]:
return True, SUCCESS_MESSAGE


def validator_for_type(type) -> callable:
def validator_for_type(type):
if type == str:
return valid_string
if type == uuid.UUID:
return valid_uuid
if type == "json":
return valid_json
return lambda: False
return None


def valid_string(value):
Expand Down
34 changes: 0 additions & 34 deletions src/functions/message_status/status_recorder.py

This file was deleted.

41 changes: 29 additions & 12 deletions src/shared/datastore.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import os
import psycopg2
import schema_initialiser
import time
from typing import Tuple

BATCH_MESSAGES_EXISTS = """
SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'batch_messages')
"""

INSERT_BATCH_MESSAGE = """
INSERT INTO batch_messages (
Expand All @@ -23,9 +24,8 @@
%(status)s
) RETURNING batch_id, message_reference"""


INSERT_STATUS = """
INSERT INTO {status_table} (
INSERT_MESSAGE_STATUS = """
INSERT INTO message_statuses (
idempotency_key,
message_id,
message_reference,
Expand All @@ -39,37 +39,40 @@
%(status)s
) RETURNING idempotency_key"""

SCHEMA_FILE_PATH = f"{os.path.dirname(__file__)}/database/schema.sql"

def create_batch_message_record(batch_message_data: dict) -> Tuple[str, str] | None | bool:
def create_batch_message_record(batch_message_data: dict) -> bool | list[str, str]:
try:
with connection() as conn:
with conn.cursor() as cur:
cur.execute(INSERT_BATCH_MESSAGE, batch_message_data)
return cur.fetchone()

conn.commit()
conn.close()
except psycopg2.Error as e:
logging.error("Error creating batch message record")
logging.error(f"{type(e).__name__} : {e}")
return False


def create_status_record(status_data: dict, is_channel_status=False) -> bool | str:
status_table = "channel_statuses" if is_channel_status else "message_statuses"
statement = INSERT_STATUS.format(status_table=status_table)
def create_message_status_record(message_status_data: dict) -> bool | str:
try:
with connection() as conn:
with conn.cursor() as cur:
cur.execute(statement, status_data)
cur.execute(INSERT_MESSAGE_STATUS, message_status_data)

return cur.fetchone()[0]

conn.commit()
conn.close()
except psycopg2.Error as e:
logging.error("Error creating message status record")
logging.error(f"{type(e).__name__} : {e}")
return False


def connection() -> psycopg2.extensions.connection:
def connection():
start = time.time()
conn = psycopg2.connect(
dbname=os.environ["DATABASE_NAME"],
Expand All @@ -81,6 +84,20 @@ def connection() -> psycopg2.extensions.connection:
end = time.time()
logging.debug(f"Connected to database in {(end - start)}s")

schema_initialiser.check_and_initialise_schema(conn)
check_and_initialise_schema(conn)

return conn


def check_and_initialise_schema(conn: psycopg2.extensions.connection):
if bool(os.getenv("SCHEMA_INITIALISED")):
return

with conn.cursor() as cur:
cur.execute(BATCH_MESSAGES_EXISTS)
if not bool(cur.fetchone()[0]):
logging.info("Initialising schema")
cur.execute(open(SCHEMA_FILE_PATH, "r").read())

conn.commit()
os.environ["SCHEMA_INITIALISED"] = "true"
25 changes: 0 additions & 25 deletions src/shared/schema_initialiser.py

This file was deleted.

98 changes: 0 additions & 98 deletions tests/end_to_end/test_notify_callback_to_channel_status_saved.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def assert_message_status_record_created():
assert len(records) == 1

details, message_id, status = records[0]
attributes = details["attributes"]
attributes = details["data"][0]["attributes"]

assert message_id == "2WL3qFTEFM0qMY8xjRbt1LIKCzM"
assert status == "sending"
Expand Down
Loading

0 comments on commit cae60c3

Please sign in to comment.