Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 2887] Add new columns to EtlDb and bump schema version #2931

Merged
merged 37 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1d16441
implemented basic versioning for schema management
DavidDudas-Intuitial Nov 14, 2024
2cc4825
insert row into version table; rename methods for better readability
DavidDudas-Intuitial Nov 14, 2024
5fc3297
missing from last commit
DavidDudas-Intuitial Nov 14, 2024
447fc61
enable multiple sql files to execute during init db operation
DavidDudas-Intuitial Nov 15, 2024
5226bc0
formatting
DavidDudas-Intuitial Nov 15, 2024
219b130
formatting
DavidDudas-Intuitial Nov 15, 2024
53cf9a9
add ability to check schema version number
DavidDudas-Intuitial Nov 15, 2024
3a7f039
added minimum schema version logic
DavidDudas-Intuitial Nov 15, 2024
629287b
added new columns; bumped schema version number
DavidDudas-Intuitial Nov 15, 2024
71ac599
made the schema versioning process more resilient to developer error
DavidDudas-Intuitial Nov 15, 2024
37d9b77
fixed linter issues
DavidDudas-Intuitial Nov 15, 2024
eb68eb5
rename sql file
DavidDudas-Intuitial Nov 15, 2024
3140069
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 15, 2024
5112ba2
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 15, 2024
bdbf88c
added logging of each migration; removed drop_table sql to prevent ac…
DavidDudas-Intuitial Nov 15, 2024
f1e276c
added comment to explain regex
DavidDudas-Intuitial Nov 16, 2024
ae6de3e
improved handling of non-conformant sql filenames
DavidDudas-Intuitial Nov 18, 2024
28c2c2e
added verbose docstring
DavidDudas-Intuitial Nov 18, 2024
2bb8360
added tests
DavidDudas-Intuitial Nov 18, 2024
2f1f52f
fixed linter issue
DavidDudas-Intuitial Nov 18, 2024
117e6f4
fixed linter issue
DavidDudas-Intuitial Nov 18, 2024
8ff23b6
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 18, 2024
2620e64
added support for deliverable status
DavidDudas-Intuitial Nov 19, 2024
220fc80
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
1dfdbcf
run docker compose down before make init-db; changed sequence of jobs…
DavidDudas-Intuitial Nov 19, 2024
762b180
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
0630b97
update tests
DavidDudas-Intuitial Nov 19, 2024
cdbf4e1
added support for gh_project table
DavidDudas-Intuitial Nov 19, 2024
051c7fa
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
9abb20e
combined sql files
DavidDudas-Intuitial Nov 19, 2024
0a16643
update tests
DavidDudas-Intuitial Nov 19, 2024
bb9a743
fix type check bug
DavidDudas-Intuitial Nov 19, 2024
a34f106
format
DavidDudas-Intuitial Nov 19, 2024
170db64
add mapping between sprint and project; moved exception handling from…
DavidDudas-Intuitial Nov 20, 2024
d59ac04
remove version validation, per request
DavidDudas-Intuitial Nov 20, 2024
8e318f3
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 20, 2024
7599d0f
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion analytics/src/analytics/datasets/etl_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class EtlEntityType(Enum):
DELIVERABLE = "deliverable"
EPIC = "epic"
ISSUE = "issue"
SPRINT = "sprint"
PROJECT = "project"
QUAD = "quad"
SPRINT = "sprint"


class EtlDataset(BaseDataset):
Expand All @@ -32,6 +33,7 @@ class EtlDataset(BaseDataset):
"deliverable_url": "deliverable_ghid",
"deliverable_title": "deliverable_title",
"deliverable_pillar": "deliverable_pillar",
"deliverable_status": "deliverable_status",
"epic_url": "epic_ghid",
"epic_title": "epic_title",
"issue_url": "issue_ghid",
Expand All @@ -43,6 +45,8 @@ class EtlDataset(BaseDataset):
"issue_closed_at": "issue_closed_at",
"issue_points": "issue_points",
"issue_status": "issue_status",
"project_owner": "project_name",
"project_number": "project_ghid",
"sprint_id": "sprint_ghid",
"sprint_name": "sprint_name",
"sprint_start": "sprint_start",
Expand Down Expand Up @@ -144,3 +148,15 @@ def get_issue_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null issue ghids."""
df = self.df[self.df.issue_ghid.notna()]
return df.issue_ghid.unique()

# PROJECT getters

def get_project(self, project_ghid: int) -> pd.Series:
"""Fetch data about a given project."""
query_string = f"project_ghid == {project_ghid}"
return self.df.query(query_string).iloc[0]

def get_project_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null project ghids."""
df = self.df[self.df.project_ghid.notna()]
return df.project_ghid.unique()
63 changes: 47 additions & 16 deletions analytics/src/analytics/integrations/etldb/deliverable_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Define EtlDeliverableModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_deliverable(
) -> tuple[int | None, EtlChangeType]:
"""Write deliverable data to etl database."""
# initialize return value
deliverable_id = None
change_type = EtlChangeType.NONE

# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
self._insert_facts(deliverable_id, deliverable_df, ghid_map)
try:
# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
_ = self._insert_facts(deliverable_id, deliverable_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e

return deliverable_id, change_type

Expand Down Expand Up @@ -69,10 +81,10 @@ def _insert_facts(
deliverable_id: int,
deliverable_df: Series,
ghid_map: dict,
) -> int | None:
) -> tuple[int | None, int | None]:
"""Write deliverable fact data to etl database."""
# insert into fact table: deliverable_quad_map
new_row_id = None
map_id = None
cursor = self.dbh.connection()
result = cursor.execute(
text(
Expand All @@ -91,12 +103,31 @@ def _insert_facts(
)
row = result.fetchone()
if row:
new_row_id = row[0]
map_id = row[0]

# insert into fact table: deliverable_history
history_id = None
result = cursor.execute(
text(
"insert into gh_deliverable_history(deliverable_id, status, d_effective) "
"values (:deliverable_id, :status, :effective) "
"on conflict(deliverable_id, d_effective) do update "
"set (status, t_modified) = (:status, current_timestamp) returning id",
),
{
"deliverable_id": deliverable_id,
"status": deliverable_df["deliverable_status"],
"effective": self.dbh.effective_date,
},
)
row = result.fetchone()
if row:
history_id = row[0]

# commit
self.dbh.commit(cursor)

return new_row_id
return history_id, map_id

def _update_dimensions(
self,
Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/epic_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Defines EtlEpicModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_epic(
) -> tuple[int | None, EtlChangeType]:
"""Write epic data to etl database."""
# initialize return value
epic_id = None
change_type = EtlChangeType.NONE

# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
try:
# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e

return epic_id, change_type

Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/issue_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from datetime import datetime

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -23,20 +25,30 @@ def sync_issue(
) -> tuple[int | None, EtlChangeType]:
"""Write issue data to etl database."""
# initialize return value
issue_id = None
change_type = EtlChangeType.NONE

# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
try:
# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e

return issue_id, change_type

Expand Down
102 changes: 38 additions & 64 deletions analytics/src/analytics/integrations/etldb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import re
from pathlib import Path

from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlDataset, EtlEntityType
from analytics.integrations.etldb.deliverable_model import EtlDeliverableModel
from analytics.integrations.etldb.epic_model import EtlEpicModel
from analytics.integrations.etldb.etldb import EtlDb
from analytics.integrations.etldb.issue_model import EtlIssueModel
from analytics.integrations.etldb.project_model import EtlProjectModel
from analytics.integrations.etldb.quad_model import EtlQuadModel
from analytics.integrations.etldb.sprint_model import EtlSprintModel

Expand Down Expand Up @@ -73,83 +72,44 @@ def sync_data(dataset: EtlDataset, effective: str) -> None:
ghid_map: dict[EtlEntityType, dict[str, int]] = {
EtlEntityType.DELIVERABLE: {},
EtlEntityType.EPIC: {},
EtlEntityType.SPRINT: {},
EtlEntityType.PROJECT: {},
EtlEntityType.QUAD: {},
EtlEntityType.SPRINT: {},
}

# initialize db connection
db = EtlDb(effective)

# note: the following code assumes SCHEMA VERSION >= 4
# sync project data to db resulting in row id for each project
ghid_map[EtlEntityType.PROJECT] = sync_projects(db, dataset)
print(f"project row(s) processed: {len(ghid_map[EtlEntityType.PROJECT])}")

# sync quad data to db resulting in row id for each quad
try:
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync quad data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")

# sync deliverable data to db resulting in row id for each deliverable
try:
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)

# sync sprint data to db resulting in row id for each sprint
try:
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync sprint data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")

# sync epic data to db resulting in row id for each epic
try:
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")

# sync issue data to db resulting in row id for each issue
try:
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")


def sync_deliverables(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
Expand Down Expand Up @@ -188,6 +148,20 @@ def sync_issues(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
return result


def sync_projects(db: EtlDb, dataset: EtlDataset) -> dict:
"""Insert or update (if necessary) a row for each project and return a map of row ids."""
result = {}
model = EtlProjectModel(db)
for ghid in dataset.get_project_ghids():
project_df = dataset.get_project(ghid)
result[ghid], _ = model.sync_project(project_df)
Comment on lines +155 to +157
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File this away for future improvements, but just wanted to highlight that instead of:

  1. Filtering for the unique set of project github IDs
  2. Iterating through that list of IDs
  3. Filtering the dataset again to get project data for a given ID

A more common pattern would be to select the distinct set of values we want to insert using pandas drop_duplicates function and then iterate through the list of data, inserting each row or (preferably) doing a bulk upsert.

The difference between these patterns is trivial when we're inserting 3 project values, but it seems we're using the same pattern in most places:

Plus this current approach limits the options we have for bulk operations, which would make it easier to wrap DML into transaction blocks and rollback all changes if the statement fails, avoiding partial updates of tables during a batch process.

I think it's good the current insert pattern follows the others, but I created this ticket for us to revisit that insert pattern and added it to our improvements/tech debt epic

if VERBOSE:
print(
f"PROJECT '{ghid}' title = '{project_df['project_name']}', row_id = {result[ghid]}",
)
return result


def sync_sprints(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
"""Insert or update (if necessary) a row for each sprint and return a map of row ids."""
result = {}
Expand Down
Loading