Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate db migrations #144

Closed
wants to merge 49 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c0621f7
Remove code not currently in use on feature branch
steventux Jan 6, 2025
b12ece2
Add Flask dependencies
steventux Jan 6, 2025
df390a2
Add basic Flask app
steventux Jan 6, 2025
51b3f02
Add function app which delegates requests to Flask
steventux Jan 6, 2025
99bc84d
Test Flask app in CI
steventux Jan 6, 2025
62883dc
Revise Dockerfile/compose.yml for one function app with db
steventux Jan 7, 2025
52f7f9e
alembic init
dnimmo Jan 9, 2025
6403908
Add Flask dependencies
steventux Jan 6, 2025
4757e84
Add basic Flask app
steventux Jan 6, 2025
b40ad35
Add function app which delegates requests to Flask
steventux Jan 6, 2025
557764d
Test Flask app in CI
steventux Jan 6, 2025
6f47c02
Revise Dockerfile/compose.yml for one function app with db
steventux Jan 7, 2025
8e618ff
added alembic for migrations
dnimmo Jan 14, 2025
7148b32
alembic init
dnimmo Jan 9, 2025
3af3ccf
added alembic for migrations
dnimmo Jan 14, 2025
b87b43e
updated lockfile
dnimmo Jan 14, 2025
c53cc53
Add pytest-sugar
steventux Jan 14, 2025
b8d65b1
Add request verifier
steventux Jan 14, 2025
f02ddb9
Access headers with lowercase keys
steventux Jan 14, 2025
5e96744
Add POST status create endpoint
steventux Jan 14, 2025
fe893c3
Add jsonschema validation library
steventux Jan 14, 2025
8364991
Validate request body data against NHS Notify schema
steventux Jan 15, 2025
803059a
Move status endpoints to route handlers
steventux Jan 15, 2025
d6209cd
Update name of healthcheck route function to match path
steventux Jan 15, 2025
ea4fbdd
Merge pull request #140 from NHSDigital/add-status-endpoint-to-flask
steventux Jan 15, 2025
2bef605
Ensure local test runs use test db
steventux Jan 16, 2025
5a8f46f
Add status recorder service
steventux Jan 16, 2025
e1126d1
Use psycopg2 sql.Identifier to safely interpolate the table name
steventux Jan 16, 2025
9722944
Add FLASK_DEBUG env vars
steventux Jan 16, 2025
acf0110
Extract hmac signature generation
steventux Jan 16, 2025
24ffac4
Read the post body as json and ensure key sorting is consistent
steventux Jan 16, 2025
372eb62
Truncate all tables after each test
steventux Jan 16, 2025
4263a47
Return useful error descriptions in the status/create response
steventux Jan 16, 2025
ec5d825
Add postgres to CI test jobs
steventux Jan 16, 2025
a427e07
PR comments
dnimmo Jan 17, 2025
8a8f8e3
Merge branch 'feature-flask-and-function-apps' of https://github.com/…
dnimmo Jan 20, 2025
385c076
file formatting fixes for auto-generated alembic init
dnimmo Jan 20, 2025
9ef879a
removed schema.hcl
dnimmo Jan 20, 2025
8d415f5
re-add schema.hcl
dnimmo Jan 20, 2025
df1c138
DTOSS-6505 update Pipfile
dnimmo Jan 20, 2025
e0ef3d4
replace print with logger
dnimmo Jan 20, 2025
97472a4
Refactor persistence tests
steventux Jan 20, 2025
afa461f
Merge pull request #142 from NHSDigital/persist-status-callback-requests
steventux Jan 21, 2025
46476d6
Merge pull request #139 from NHSDigital/DTOSS-6505
dnimmo Jan 21, 2025
a2a5763
Add sqlalchemy_utils dependency
steventux Jan 21, 2025
c2ae0cb
Add alembic_postgresql_enum dependency
steventux Jan 21, 2025
a6f87ff
Refactor schema for batch message endpoint
steventux Jan 21, 2025
4ae3227
Remove schema initialisation from database module
steventux Jan 22, 2025
86dff02
Run alembic upgrade head on test CI dbs
steventux Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add status recorder service
When the status callback endpoint receives requests the post body will be saved to the database to record the channel and message status depending on the payload type.
This commit adds the datastore and service module which handles this persistence.
steventux committed Jan 16, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 5a8f46fa72b093efa7ad76bdd1b2802c58cf9b6a
3 changes: 2 additions & 1 deletion src/notify/app/route_handlers/status.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flask import request
import json
import app.validators.request_validator as request_validator
import app.services.status_recorder as status_recorder


def create():
@@ -14,7 +15,7 @@ def create():
status_code = 422
body = {"status": "error"}
else:
# status_recorder.save_statuses(body_dict)
status_recorder.save_statuses(dict(request.form))
status_code = 200
body = {"status": "success"}

Empty file.
68 changes: 68 additions & 0 deletions src/notify/app/services/datastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import app.utils.database as database
import logging
import psycopg2

INSERT_BATCH_MESSAGE = """
INSERT INTO batch_messages (
batch_id,
details,
message_reference,
nhs_number,
recipient_id,
status
) VALUES (
%(batch_id)s,
%(details)s,
%(message_reference)s,
%(nhs_number)s,
%(recipient_id)s,
%(status)s
) RETURNING batch_id, message_reference"""

INSERT_STATUS = """
INSERT INTO {table_name} (
idempotency_key,
message_id,
message_reference,
details,
status
) VALUES (
%(idempotency_key)s,
%(message_id)s,
%(message_reference)s,
%(details)s,
%(status)s
) RETURNING idempotency_key"""

STATUS_TABLE_NAMES_BY_TYPE = {
"ChannelStatus": "channel_statuses",
"MessageStatus": "message_statuses"
}


def create_batch_message_record(batch_message_data: dict) -> bool | list[str, str]:
try:
with database.connection() as conn:
with conn.cursor() as cur:
cur.execute(INSERT_BATCH_MESSAGE, batch_message_data)
return cur.fetchone()

except psycopg2.Error as e:
logging.error("Error creating batch message record")
logging.error(f"{type(e).__name__} : {e}")
return False


def create_status_record(status_type: str, status_data: dict) -> bool | str:
table_name = STATUS_TABLE_NAMES_BY_TYPE[status_type]
try:
with database.connection() as conn:
with conn.cursor() as cur:
cur.execute(INSERT_STATUS.format(table_name=table_name), status_data)

return cur.fetchone()[0]

except psycopg2.Error as e:
logging.error("Error creating status record")
logging.error(f"{type(e).__name__} : {e}")
return False
34 changes: 34 additions & 0 deletions src/notify/app/services/status_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import app.services.datastore as datastore
import json
import logging


def save_statuses(request_body: dict) -> None:
statuses: list[dict] = status_params(request_body)
status_type = request_body["data"][0]["type"]

for status in statuses:
datastore.create_status_record(status_type, status)

return None


def status_params(request_body: dict) -> list[dict]:
params = []
for status_data in request_body["data"]:
try:
attributes = status_data["attributes"]
meta = status_data["meta"]
params.append({
"details": json.dumps(request_body),
"idempotency_key": meta["idempotencyKey"],
"message_id": attributes["messageId"],
"message_reference": attributes["messageReference"],
"status": attributes.get("messageStatus", attributes.get("channelStatus")),
})
except KeyError as e:
logging.error(f"Missing required field: {e}")
logging.error(f"Request body: {request_body}")
continue

return params
Empty file.
40 changes: 40 additions & 0 deletions src/notify/app/utils/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
import os
import psycopg2
import time

SCHEMA_FILE_PATH = f"{os.path.dirname(__file__)}/../../../../database/schema.sql"
SCHEMA_INITIALISED_SQL = """
SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'channel_statuses')
"""


def connection():
start = time.time()
conn = psycopg2.connect(
dbname=os.environ["DATABASE_NAME"],
user=os.environ["DATABASE_USER"],
host=os.environ["DATABASE_HOST"],
password=os.environ["DATABASE_PASSWORD"],
sslmode=os.getenv("DATABASE_SSLMODE", "require"),
)
end = time.time()
logging.debug(f"Connected to database in {(end - start)}s")

check_and_initialise_schema(conn)

return conn


def check_and_initialise_schema(conn: psycopg2.extensions.connection):
if bool(os.getenv("SCHEMA_INITIALISED")):
return

with conn.cursor() as cur:
cur.execute(SCHEMA_INITIALISED_SQL)
if not bool(cur.fetchone()[0]):
logging.info("Initialising schema")
cur.execute(open(SCHEMA_FILE_PATH, "r").read())

conn.commit()
os.environ["SCHEMA_INITIALISED"] = "true"
73 changes: 73 additions & 0 deletions tests/unit/notify/app/services/test_datastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import app.services.datastore as datastore
import pytest


@pytest.fixture
def mock_connection(mocker):
return mocker.patch("app.utils.database.connection")


@pytest.fixture
def mock_cursor(mocker, mock_connection):
return mock_connection().__enter__().cursor().__enter__()


@pytest.fixture
def batch_message_data(autouse=True):
return {
"batch_id": "0b3b3b3b-3b3b-3b3b-3b3b-3b3b3b3b3b3b",
"details": "Test details",
"message_reference": "0b3b3b3b-3b3b-3b-3b3b-3b3b3b3b3b3b",
"nhs_number": "1234567890",
"recipient_id": "e3e7b3b3-3b3b-3b-3b3b-3b3b3b3b3b3b",
"status": "test_status",
}


@pytest.fixture
def message_status_data(autouse=True):
return {
"idempotency_key": "0b3b3b3b-3b3b-3b3b-3b3b-3b3b3b3b3b3b",
"message_id": "0x0x0x0xabx0x0",
"message_reference": "0b3b3b3b-3b3b-3b3b-3b3b-3b3b3b3b3b3b",
"details": "Test details",
"status": "test_status",
}


def test_create_batch_message_record(mock_cursor):
"""Test the SQL execution of batch message record creation."""
datastore.create_batch_message_record(batch_message_data)

mock_cursor.execute.assert_called_with(datastore.INSERT_BATCH_MESSAGE, batch_message_data)
mock_cursor.fetchone.assert_called_once()


def test_create_batch_message_record_with_error(mock_cursor):
"""Test the SQL execution of batch message record creation with an error."""
mock_cursor.execute.side_effect = Exception("Test error")

with pytest.raises(Exception):
assert datastore.create_batch_message_record(batch_message_data) is False

mock_cursor.execute.assert_called_with(datastore.INSERT_BATCH_MESSAGE, batch_message_data)
mock_cursor.fetchone.assert_not_called()


def test_create_message_status_record(mock_cursor):
"""Test the SQL execution of message status record creation."""
datastore.create_status_record("MessageStatus", message_status_data)

mock_cursor.execute.assert_called_with(datastore.INSERT_STATUS.format(table_name="message_statuses"), message_status_data)
mock_cursor.fetchone.assert_called_once()


def test_create_message_status_record_with_error(mock_cursor):
"""Test the SQL execution of message status record creation with an error."""
mock_cursor.execute.side_effect = Exception("Test error")

with pytest.raises(Exception):
assert datastore.create_status_record("MessageStatus", message_status_data) is False

mock_cursor.execute.assert_called_with(datastore.INSERT_STATUS.format(table_name="message_statuses"), message_status_data)
mock_cursor.fetchone.assert_not_called()
56 changes: 56 additions & 0 deletions tests/unit/notify/app/services/test_message_status_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import app.services.status_recorder as status_recorder


def test_save_statuses_with_channel_status_data(mocker, channel_status_post_body):
"""Test saving channel status data to datastore"""
mock_datastore = mocker.patch("app.services.status_recorder.datastore")

assert status_recorder.save_statuses(channel_status_post_body) is None

mock_datastore.create_status_record.assert_called_once_with("ChannelStatus", {
"details": json.dumps(channel_status_post_body),
"idempotency_key": "2515ae6b3a08339fba3534f3b17cd57cd573c57d25b25b9aae08e42dc9f0a445", #gitleaks:allow
"message_id": "2WL3qFTEFM0qMY8xjRbt1LIKCzM",
"message_reference": "1642109b-69eb-447f-8f97-ab70a74f5db4",
"status": "delivered"
})


def test_save_statuses_with_message_status_data(mocker, message_status_post_body):
"""Test saving message status data to datastore"""
mock_datastore = mocker.patch("app.services.status_recorder.datastore")

assert status_recorder.save_statuses(message_status_post_body) is None

mock_datastore.create_status_record.assert_called_once_with("MessageStatus", {
"details": json.dumps(message_status_post_body),
"idempotency_key": "2515ae6b3a08339fba3534f3b17cd57cd573c57d25b25b9aae08e42dc9f0a445", #gitleaks:allow
"message_id": "2WL3qFTEFM0qMY8xjRbt1LIKCzM",
"message_reference": "1642109b-69eb-447f-8f97-ab70a74f5db4",
"status": "sending"
})


def test_status_params(message_status_post_body):
"""Test conversion of request body to message status parameters"""
expected = [
{
"details": json.dumps(message_status_post_body),
"idempotency_key": "2515ae6b3a08339fba3534f3b17cd57cd573c57d25b25b9aae08e42dc9f0a445", #gitleaks:allow
"message_id": "2WL3qFTEFM0qMY8xjRbt1LIKCzM",
"message_reference": "1642109b-69eb-447f-8f97-ab70a74f5db4",
"status": "sending"
},
]

assert status_recorder.status_params(message_status_post_body) == expected


def test_status_params_with_missing_field(message_status_post_body):
"""Test conversion of request body with missing field to message status parameters"""
message_status_post_body["data"][0]["attributes"].pop("messageReference")

expected = []

assert status_recorder.status_params(message_status_post_body) == expected