Skip to content

Commit

Permalink
[Issue 2887] Add new columns to EtlDb and bump schema version (#2931)
Browse files Browse the repository at this point in the history
## Summary
Fixes #2887 

### Time to review: __5 mins__

## Changes proposed
> What was added, updated, or removed in this PR.

Add new facts and dimensions to EtlDb to support certain dashboards in
Metabase:
- Slowly changing dimension "deliverable status" is now supported via
the new table `gh_deliverable_history`
- Sprint-to-project relationship is now supported via the new table
`gh_project`, and the table `gh_sprint` is altered to add a new column
`project_id`
- Updated `EtlDataset` to support the new columns
- Updated `analytics/integrations/etldb` main and models to use the new
schema during transform and load
- Moved exception handling out of `etldb` main and into model classes to
make main more readable
- Updated tests and mock file used for tests

## Context for reviewers
> Testing instructions, background context, more in-depth details of the
implementation, and anything else you'd like to call out or ask
reviewers. Explain how the changes were verified.

Certain dashboards in Metabase required certain facts and dimensions in
EtlDb. This PR adds the fields that are needed, modifies transform/load
to use them, and bumps the schema version to 4.

## Additional information
> Screenshots, GIF demos, code examples or output to help show the
changes working as expected.


![db-init-in-CI](https://github.com/user-attachments/assets/6257e8cb-59ec-4a21-b11f-763b917ca310)
  • Loading branch information
DavidDudas-Intuitial authored Nov 21, 2024
1 parent 5ee465d commit 0fbbae2
Show file tree
Hide file tree
Showing 12 changed files with 856 additions and 541 deletions.
18 changes: 17 additions & 1 deletion analytics/src/analytics/datasets/etl_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class EtlEntityType(Enum):
DELIVERABLE = "deliverable"
EPIC = "epic"
ISSUE = "issue"
SPRINT = "sprint"
PROJECT = "project"
QUAD = "quad"
SPRINT = "sprint"


class EtlDataset(BaseDataset):
Expand All @@ -32,6 +33,7 @@ class EtlDataset(BaseDataset):
"deliverable_url": "deliverable_ghid",
"deliverable_title": "deliverable_title",
"deliverable_pillar": "deliverable_pillar",
"deliverable_status": "deliverable_status",
"epic_url": "epic_ghid",
"epic_title": "epic_title",
"issue_url": "issue_ghid",
Expand All @@ -43,6 +45,8 @@ class EtlDataset(BaseDataset):
"issue_closed_at": "issue_closed_at",
"issue_points": "issue_points",
"issue_status": "issue_status",
"project_owner": "project_name",
"project_number": "project_ghid",
"sprint_id": "sprint_ghid",
"sprint_name": "sprint_name",
"sprint_start": "sprint_start",
Expand Down Expand Up @@ -144,3 +148,15 @@ def get_issue_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null issue ghids."""
df = self.df[self.df.issue_ghid.notna()]
return df.issue_ghid.unique()

# PROJECT getters

def get_project(self, project_ghid: int) -> pd.Series:
"""Fetch data about a given project."""
query_string = f"project_ghid == {project_ghid}"
return self.df.query(query_string).iloc[0]

def get_project_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null project ghids."""
df = self.df[self.df.project_ghid.notna()]
return df.project_ghid.unique()
63 changes: 47 additions & 16 deletions analytics/src/analytics/integrations/etldb/deliverable_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Define EtlDeliverableModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_deliverable(
) -> tuple[int | None, EtlChangeType]:
"""Write deliverable data to etl database."""
# initialize return value
deliverable_id = None
change_type = EtlChangeType.NONE

# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
self._insert_facts(deliverable_id, deliverable_df, ghid_map)
try:
# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
_ = self._insert_facts(deliverable_id, deliverable_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e

return deliverable_id, change_type

Expand Down Expand Up @@ -69,10 +81,10 @@ def _insert_facts(
deliverable_id: int,
deliverable_df: Series,
ghid_map: dict,
) -> int | None:
) -> tuple[int | None, int | None]:
"""Write deliverable fact data to etl database."""
# insert into fact table: deliverable_quad_map
new_row_id = None
map_id = None
cursor = self.dbh.connection()
result = cursor.execute(
text(
Expand All @@ -91,12 +103,31 @@ def _insert_facts(
)
row = result.fetchone()
if row:
new_row_id = row[0]
map_id = row[0]

# insert into fact table: deliverable_history
history_id = None
result = cursor.execute(
text(
"insert into gh_deliverable_history(deliverable_id, status, d_effective) "
"values (:deliverable_id, :status, :effective) "
"on conflict(deliverable_id, d_effective) do update "
"set (status, t_modified) = (:status, current_timestamp) returning id",
),
{
"deliverable_id": deliverable_id,
"status": deliverable_df["deliverable_status"],
"effective": self.dbh.effective_date,
},
)
row = result.fetchone()
if row:
history_id = row[0]

# commit
self.dbh.commit(cursor)

return new_row_id
return history_id, map_id

def _update_dimensions(
self,
Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/epic_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Defines EtlEpicModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_epic(
) -> tuple[int | None, EtlChangeType]:
"""Write epic data to etl database."""
# initialize return value
epic_id = None
change_type = EtlChangeType.NONE

# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
try:
# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e

return epic_id, change_type

Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/issue_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from datetime import datetime

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -23,20 +25,30 @@ def sync_issue(
) -> tuple[int | None, EtlChangeType]:
"""Write issue data to etl database."""
# initialize return value
issue_id = None
change_type = EtlChangeType.NONE

# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
try:
# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e

return issue_id, change_type

Expand Down
102 changes: 38 additions & 64 deletions analytics/src/analytics/integrations/etldb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import re
from pathlib import Path

from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlDataset, EtlEntityType
from analytics.integrations.etldb.deliverable_model import EtlDeliverableModel
from analytics.integrations.etldb.epic_model import EtlEpicModel
from analytics.integrations.etldb.etldb import EtlDb
from analytics.integrations.etldb.issue_model import EtlIssueModel
from analytics.integrations.etldb.project_model import EtlProjectModel
from analytics.integrations.etldb.quad_model import EtlQuadModel
from analytics.integrations.etldb.sprint_model import EtlSprintModel

Expand Down Expand Up @@ -73,83 +72,44 @@ def sync_data(dataset: EtlDataset, effective: str) -> None:
ghid_map: dict[EtlEntityType, dict[str, int]] = {
EtlEntityType.DELIVERABLE: {},
EtlEntityType.EPIC: {},
EtlEntityType.SPRINT: {},
EtlEntityType.PROJECT: {},
EtlEntityType.QUAD: {},
EtlEntityType.SPRINT: {},
}

# initialize db connection
db = EtlDb(effective)

# note: the following code assumes SCHEMA VERSION >= 4
# sync project data to db resulting in row id for each project
ghid_map[EtlEntityType.PROJECT] = sync_projects(db, dataset)
print(f"project row(s) processed: {len(ghid_map[EtlEntityType.PROJECT])}")

# sync quad data to db resulting in row id for each quad
try:
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync quad data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")

# sync deliverable data to db resulting in row id for each deliverable
try:
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables(
db,
dataset,
ghid_map,
)
print(
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)

# sync sprint data to db resulting in row id for each sprint
try:
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync sprint data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")

# sync epic data to db resulting in row id for each epic
try:
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")

# sync issue data to db resulting in row id for each issue
try:
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")


def sync_deliverables(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
Expand Down Expand Up @@ -188,6 +148,20 @@ def sync_issues(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
return result


def sync_projects(db: EtlDb, dataset: EtlDataset) -> dict:
"""Insert or update (if necessary) a row for each project and return a map of row ids."""
result = {}
model = EtlProjectModel(db)
for ghid in dataset.get_project_ghids():
project_df = dataset.get_project(ghid)
result[ghid], _ = model.sync_project(project_df)
if VERBOSE:
print(
f"PROJECT '{ghid}' title = '{project_df['project_name']}', row_id = {result[ghid]}",
)
return result


def sync_sprints(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict:
"""Insert or update (if necessary) a row for each sprint and return a map of row ids."""
result = {}
Expand Down
Loading

0 comments on commit 0fbbae2

Please sign in to comment.