Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 2887] Add new columns to EtlDb and bump schema version #2931

Merged
merged 37 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1d16441
implemented basic versioning for schema management
DavidDudas-Intuitial Nov 14, 2024
2cc4825
insert row into version table; rename methods for better readability
DavidDudas-Intuitial Nov 14, 2024
5fc3297
missing from last commit
DavidDudas-Intuitial Nov 14, 2024
447fc61
enable multiple sql files to execute during init db operation
DavidDudas-Intuitial Nov 15, 2024
5226bc0
formatting
DavidDudas-Intuitial Nov 15, 2024
219b130
formatting
DavidDudas-Intuitial Nov 15, 2024
53cf9a9
add ability to check schema version number
DavidDudas-Intuitial Nov 15, 2024
3a7f039
added minimum schema version logic
DavidDudas-Intuitial Nov 15, 2024
629287b
added new columns; bumped schema version number
DavidDudas-Intuitial Nov 15, 2024
71ac599
made the schema versioning process more resilient to developer error
DavidDudas-Intuitial Nov 15, 2024
37d9b77
fixed linter issues
DavidDudas-Intuitial Nov 15, 2024
eb68eb5
rename sql file
DavidDudas-Intuitial Nov 15, 2024
3140069
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 15, 2024
5112ba2
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 15, 2024
bdbf88c
added logging of each migration; removed drop_table sql to prevent ac…
DavidDudas-Intuitial Nov 15, 2024
f1e276c
added comment to explain regex
DavidDudas-Intuitial Nov 16, 2024
ae6de3e
improved handling of non-conformant sql filenames
DavidDudas-Intuitial Nov 18, 2024
28c2c2e
added verbose docstring
DavidDudas-Intuitial Nov 18, 2024
2bb8360
added tests
DavidDudas-Intuitial Nov 18, 2024
2f1f52f
fixed linter issue
DavidDudas-Intuitial Nov 18, 2024
117e6f4
fixed linter issue
DavidDudas-Intuitial Nov 18, 2024
8ff23b6
Merge branch 'main' into issue-2857-analytics-db-schema-versioning
DavidDudas-Intuitial Nov 18, 2024
2620e64
added support for deliverable status
DavidDudas-Intuitial Nov 19, 2024
220fc80
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
1dfdbcf
run docker compose down before make init-db; changed sequence of jobs…
DavidDudas-Intuitial Nov 19, 2024
762b180
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
0630b97
update tests
DavidDudas-Intuitial Nov 19, 2024
cdbf4e1
added support for gh_project table
DavidDudas-Intuitial Nov 19, 2024
051c7fa
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 19, 2024
9abb20e
combined sql files
DavidDudas-Intuitial Nov 19, 2024
0a16643
update tests
DavidDudas-Intuitial Nov 19, 2024
bb9a743
fix type check bug
DavidDudas-Intuitial Nov 19, 2024
a34f106
format
DavidDudas-Intuitial Nov 19, 2024
170db64
add mapping between sprint and project; moved exception handling from…
DavidDudas-Intuitial Nov 20, 2024
d59ac04
remove version validation, per request
DavidDudas-Intuitial Nov 20, 2024
8e318f3
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 20, 2024
7599d0f
Merge branch 'main' into issue-2887-new-etldb-columns
DavidDudas-Intuitial Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion analytics/src/analytics/datasets/etl_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class EtlEntityType(Enum):
DELIVERABLE = "deliverable"
EPIC = "epic"
ISSUE = "issue"
SPRINT = "sprint"
PROJECT = "project"
QUAD = "quad"
SPRINT = "sprint"


class EtlDataset(BaseDataset):
Expand All @@ -32,6 +33,7 @@ class EtlDataset(BaseDataset):
"deliverable_url": "deliverable_ghid",
"deliverable_title": "deliverable_title",
"deliverable_pillar": "deliverable_pillar",
"deliverable_status": "deliverable_status",
"epic_url": "epic_ghid",
"epic_title": "epic_title",
"issue_url": "issue_ghid",
Expand All @@ -43,6 +45,8 @@ class EtlDataset(BaseDataset):
"issue_closed_at": "issue_closed_at",
"issue_points": "issue_points",
"issue_status": "issue_status",
"project_owner": "project_name",
"project_number": "project_ghid",
"sprint_id": "sprint_ghid",
"sprint_name": "sprint_name",
"sprint_start": "sprint_start",
Expand Down Expand Up @@ -144,3 +148,15 @@ def get_issue_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null issue ghids."""
df = self.df[self.df.issue_ghid.notna()]
return df.issue_ghid.unique()

# PROJECT getters

def get_project(self, project_ghid: int) -> pd.Series:
"""Fetch data about a given project."""
query_string = f"project_ghid == {project_ghid}"
return self.df.query(query_string).iloc[0]

def get_project_ghids(self) -> NDArray[Any]:
"""Fetch an array of unique non-null project ghids."""
df = self.df[self.df.project_ghid.notna()]
return df.project_ghid.unique()
63 changes: 47 additions & 16 deletions analytics/src/analytics/integrations/etldb/deliverable_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Define EtlDeliverableModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_deliverable(
) -> tuple[int | None, EtlChangeType]:
"""Write deliverable data to etl database."""
# initialize return value
deliverable_id = None
change_type = EtlChangeType.NONE

# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
self._insert_facts(deliverable_id, deliverable_df, ghid_map)
try:
# insert dimensions
deliverable_id = self._insert_dimensions(deliverable_df)
if deliverable_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if deliverable_id is None:
deliverable_id, change_type = self._update_dimensions(deliverable_df)

# insert facts
if deliverable_id is not None:
_ = self._insert_facts(deliverable_id, deliverable_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e

return deliverable_id, change_type

Expand Down Expand Up @@ -69,10 +81,10 @@ def _insert_facts(
deliverable_id: int,
deliverable_df: Series,
ghid_map: dict,
) -> int | None:
) -> tuple[int | None, int | None]:
"""Write deliverable fact data to etl database."""
# insert into fact table: deliverable_quad_map
new_row_id = None
map_id = None
cursor = self.dbh.connection()
result = cursor.execute(
text(
Expand All @@ -91,12 +103,31 @@ def _insert_facts(
)
row = result.fetchone()
if row:
new_row_id = row[0]
map_id = row[0]

# insert into fact table: deliverable_history
history_id = None
result = cursor.execute(
text(
"insert into gh_deliverable_history(deliverable_id, status, d_effective) "
"values (:deliverable_id, :status, :effective) "
"on conflict(deliverable_id, d_effective) do update "
"set (status, t_modified) = (:status, current_timestamp) returning id",
),
{
"deliverable_id": deliverable_id,
"status": deliverable_df["deliverable_status"],
"effective": self.dbh.effective_date,
},
)
row = result.fetchone()
if row:
history_id = row[0]

# commit
self.dbh.commit(cursor)

return new_row_id
return history_id, map_id

def _update_dimensions(
self,
Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/epic_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Defines EtlEpicModel class to encapsulate db CRUD operations."""

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -21,20 +23,30 @@ def sync_epic(
) -> tuple[int | None, EtlChangeType]:
"""Write epic data to etl database."""
# initialize return value
epic_id = None
change_type = EtlChangeType.NONE

# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
try:
# insert dimensions
epic_id = self._insert_dimensions(epic_df)
if epic_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if epic_id is None:
epic_id, change_type = self._update_dimensions(epic_df)

# insert facts
if epic_id is not None:
self._insert_facts(epic_id, epic_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e

return epic_id, change_type

Expand Down
36 changes: 24 additions & 12 deletions analytics/src/analytics/integrations/etldb/issue_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from datetime import datetime

from pandas import Series
from psycopg.errors import InsufficientPrivilege
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, ProgrammingError

from analytics.datasets.etl_dataset import EtlEntityType
from analytics.integrations.etldb.etldb import EtlChangeType, EtlDb
Expand All @@ -23,20 +25,30 @@ def sync_issue(
) -> tuple[int | None, EtlChangeType]:
"""Write issue data to etl database."""
# initialize return value
issue_id = None
change_type = EtlChangeType.NONE

# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
try:
# insert dimensions
issue_id = self._insert_dimensions(issue_df, ghid_map)
if issue_id is not None:
change_type = EtlChangeType.INSERT

# if insert failed, select and update
if issue_id is None:
issue_id, change_type = self._update_dimensions(issue_df, ghid_map)

# insert facts
if issue_id is not None:
self._insert_facts(issue_id, issue_df, ghid_map)
except (
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e

return issue_id, change_type

Expand Down
Loading