Skip to content

Commit

Permalink
🔨 Refactor upserts to MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Sep 30, 2024
1 parent 755c28e commit e62a17d
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 291 deletions.
14 changes: 1 addition & 13 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from etl import config, files
from etl import config
from etl.config import OWIDEnv
from etl.db import read_sql

Expand Down Expand Up @@ -394,18 +394,6 @@ def _omit_nullable_values(d: dict) -> dict:
return {k: v for k, v in d.items() if v is not None and (isinstance(v, list) and len(v) or not pd.isna(v))}


def checksum_data_str(var_data_str: str) -> str:
return files.checksum_str(var_data_str)


def checksum_metadata(meta: Dict[str, Any]) -> str:
"""Calculate checksum for metadata. It modifies the metadata dict!"""
# Drop fields not needed for checksum computation
meta = filter_out_fields_in_metadata_for_checksum(meta)

return files.checksum_str(json.dumps(meta, default=str))


def filter_out_fields_in_metadata_for_checksum(meta: Dict[str, Any]) -> Dict[str, Any]:
"""Drop fields that are not needed to estimate the checksum."""
meta_ = deepcopy(meta)
Expand Down
18 changes: 0 additions & 18 deletions etl/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,6 @@ def clear(self) -> None:

CACHE_CHECKSUM_FILE = RuntimeCache()

TEXT_CHARS = bytes(range(32, 127)) + b"\n\r\t\f\b"
DEFAULT_CHUNK_SIZE = 512


def istextblock(block: bytes) -> bool:
if not block:
# An empty file is considered a valid text file
return True

if b"\x00" in block:
# Files with null bytes are binary
return False

# Use translate's 'deletechars' argument to efficiently remove all
# occurrences of TEXT_CHARS from the block
nontext = block.translate(None, TEXT_CHARS)
return float(len(nontext)) / len(block) <= 0.30


def checksum_str(s: str) -> str:
"Return the md5 hex digest of the string."
Expand Down
30 changes: 14 additions & 16 deletions etl/grapher_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from apps.backport.datasync import data_metadata as dm
from etl.db import get_engine, read_sql
from etl.files import checksum_str

Expand Down Expand Up @@ -92,8 +93,8 @@ def _yield_wide_table(
# Validation
if "year" not in table.primary_key:
raise Exception("Table is missing `year` primary key")
if "entity_id" not in table.primary_key:
raise Exception("Table is missing `entity_id` primary key")
if "entityId" not in table.primary_key:
raise Exception("Table is missing `entityId` primary key")
if na_action == "raise":
for col in table.columns:
if table[col].isna().any():
Expand All @@ -102,7 +103,7 @@ def _yield_wide_table(
if cols_with_none_units:
raise Exception("Columns with missing units: " + ", ".join(cols_with_none_units))

dim_names = [k for k in table.primary_key if k not in ("year", "entity_id")]
dim_names = [k for k in table.primary_key if k not in ("year", "entityId", "entityCode", "entityName")]

# Keep only entity_id and year in index
table = table.reset_index(level=dim_names)
Expand Down Expand Up @@ -188,7 +189,6 @@ def _yield_wide_table(
# traverse metadata and expand Jinja
tab[short_name].metadata = _expand_jinja(tab[short_name].metadata, dim_dict)

# Keep only entity_id and year in index
yield tab


Expand Down Expand Up @@ -504,20 +504,14 @@ def _adapt_dataset_metadata_for_grapher(
return metadata


def _adapt_table_for_grapher(
table: catalog.Table, engine: Engine | None = None, country_col: str = "country", year_col: str = "year"
) -> catalog.Table:
def _adapt_table_for_grapher(table: catalog.Table, engine: Engine) -> catalog.Table:
"""Adapt table (from a garden dataset) to be used in a grapher step. This function
is not meant to be run explicitly, but by default in the grapher step.
Parameters
----------
table : catalog.Table
Table from garden dataset.
country_col : str
Name of country column in table.
year_col : str
Name of year column in table.
Returns
-------
Expand All @@ -534,7 +528,7 @@ def _adapt_table_for_grapher(
), f"Variable titles are not unique ({variable_titles_counts[variable_titles_counts > 1].index})."

# Remember original dimensions
dim_names = [n for n in table.index.names if n and n not in ("year", "date", "entity_id", country_col)]
dim_names = [n for n in table.index.names if n and n not in ("year", "date", "entity_id", "country")]

# Reset index unless we have default index
if table.index.names != [None]:
Expand All @@ -546,14 +540,18 @@ def _adapt_table_for_grapher(
assert "year" not in table.columns, "Table cannot have both `date` and `year` columns."
table = adapt_table_with_dates_to_grapher(table)

assert {"year", country_col} <= set(table.columns), f"Table must have columns {country_col} and year."
assert {"year", "country"} <= set(table.columns), "Table must have columns country and year."
assert "entity_id" not in table.columns, "Table must not have column entity_id."

# Grapher needs a column entity id, that is constructed based on the unique entity names in the database.
table["entity_id"] = country_to_entity_id(table[country_col], create_entities=True, engine=engine)
table = table.drop(columns=[country_col]).rename(columns={year_col: "year"})
table["entityId"] = country_to_entity_id(table["country"], create_entities=True, engine=engine)
table = table.drop(columns=["country"])

# Add entity code and name
with Session(engine) as session:
table = dm.add_entity_code_and_name(session, table).copy_metadata(table)

table = table.set_index(["entity_id", "year"] + dim_names)
table = table.set_index(["entityId", "entityCode", "entityName", "year"] + dim_names)

# Ensure the default source of each column includes the description of the table (since that is the description that
# will appear in grapher on the SOURCES tab).
Expand Down
Loading

0 comments on commit e62a17d

Please sign in to comment.