From 7ae217d94135e85ccc9820f8a3e93d48d70a9241 Mon Sep 17 00:00:00 2001 From: Marigold Date: Mon, 14 Aug 2023 15:39:11 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20add=20support=20for=20origins=20?= =?UTF-8?q?in=20fast-track?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl/metadata_export.py | 3 + fasttrack/cli.py | 250 +++++++++++------- fasttrack/csv.py | 30 ++- .../{{cookiecutter.short_name}}.py | 2 +- fasttrack/sheets.py | 174 +++++++----- fasttrack/yaml_meta.py | 76 ------ lib/catalog/owid/catalog/__init__.py | 12 +- 7 files changed, 293 insertions(+), 254 deletions(-) delete mode 100644 fasttrack/yaml_meta.py diff --git a/etl/metadata_export.py b/etl/metadata_export.py index 19c4c57be45..164b3cafb17 100644 --- a/etl/metadata_export.py +++ b/etl/metadata_export.py @@ -49,6 +49,9 @@ def metadata_export( for source in ds_meta.get("sources", []): _prune_empty(source) + for origin in ds_meta.get("origins", []): + _prune_empty(origin) + for license in ds_meta.get("licenses", []): _prune_empty(license) diff --git a/fasttrack/cli.py b/fasttrack/cli.py index 0962c317bb3..62472ee346e 100644 --- a/fasttrack/cli.py +++ b/fasttrack/cli.py @@ -17,9 +17,9 @@ import structlog from cryptography.fernet import Fernet from git.repo import Repo -from owid.catalog import Dataset, License, Source +from owid.catalog import Dataset, DatasetMeta, Source, Table, VariableMeta from owid.catalog.utils import underscore, validate_underscore -from owid.datautils import dataframes +from owid.datautils import io from pydantic import BaseModel from pywebio import input as pi from pywebio import output as po @@ -34,10 +34,12 @@ from etl.command import main as etl_main from etl.compare import diff_print from etl.db import get_engine -from etl.files import apply_black_formatter_to_files +from etl.files import apply_black_formatter_to_files, yaml_dump +from etl.metadata_export import metadata_export from etl.paths import ( BASE_DIR, DAG_DIR, + DATA_DIR, LATEST_REGIONS_DATASET_PATH, SNAPSHOTS_DIR, STEP_DIR, @@ -46,7 +48,6 @@ from walkthrough import utils as walkthrough_utils from . import csv, sheets -from .yaml_meta import YAMLDatasetMeta, YAMLMeta, YAMLSourceMeta, YAMLVariableMeta config.enable_bugsnag() @@ -97,61 +98,78 @@ def cli(commit: bool, dummy_data: bool, auto_open: bool, port: int) -> None: class FasttrackImport: def __init__( self, - data: pd.DataFrame, - meta: YAMLMeta, + dataset: Dataset, sheets_url: str, - is_private: bool, - partial_snapshot_meta: sheets.PartialSnapshotMeta, ): - self.data = data - self.meta = meta + self.dataset = dataset self.sheets_url = sheets_url - self.is_private = is_private - self.partial_snapshot_meta = partial_snapshot_meta + + @property + def meta(self) -> DatasetMeta: + return self.dataset.metadata + + @property + def data(self) -> Table: + return self.dataset[self.meta.short_name] # type: ignore @property def dataset_dir(self) -> Path: - return STEP_DIR / "data" / "grapher" / self.meta.dataset.namespace / str(self.meta.dataset.version) + return STEP_DIR / "data" / "grapher" / self.meta.namespace / str(self.meta.version) # type: ignore @property def step_path(self) -> Path: - return self.dataset_dir / (self.meta.dataset.short_name + ".py") + return self.dataset_dir / (self.meta.short_name + ".py") # type: ignore @property def metadata_path(self) -> Path: - return self.dataset_dir / (self.meta.dataset.short_name + ".meta.yml") + return self.dataset_dir / (self.meta.short_name + ".meta.yml") # type: ignore @property def snapshot(self) -> Snapshot: - return Snapshot(f"{self.meta.dataset.namespace}/{self.meta.dataset.version}/{self.meta.dataset.short_name}.csv") + return Snapshot(f"{self.meta.namespace}/{self.meta.version}/{self.meta.short_name}.csv") @property def snapshot_meta(self) -> SnapshotMeta: # since sheets url is accessible with link, we have to encrypt it when storing in metadata - sheets_url = _encrypt(self.sheets_url) if self.is_private else self.sheets_url + sheets_url = _encrypt(self.sheets_url) if not self.meta.is_public else self.sheets_url source_name = "Google Sheet" if self.sheets_url != "local_csv" else "Local CSV" - return SnapshotMeta( - namespace=self.meta.dataset.namespace, - short_name=self.meta.dataset.short_name, - name=self.meta.dataset.title, - version=str(self.meta.dataset.version), - file_extension="csv", - description=self.meta.dataset.description, - source=Source( - url=self.partial_snapshot_meta.url, + if len(self.meta.sources) == 1: + dataset_source = self.meta.sources[0] + source = Source( + url=dataset_source.url, name=source_name, published_by=source_name, source_data_url=sheets_url, date_accessed=str(dt.date.today()), - publication_year=self.partial_snapshot_meta.publication_year, - ), - license=License( - url=self.partial_snapshot_meta.license_url, - name=self.partial_snapshot_meta.license_name, - ), - is_public=not self.is_private, + publication_year=dataset_source.publication_year, + ) + origin = None + license = self.meta.licenses[0] + elif len(self.meta.origins) == 1: + source = None + origin = self.meta.origins[0] + origin.date_accessed = str(dt.date.today()) + + # Misuse the version field and dataset_url_download fields to store info about the spreadsheet + origin.version = source_name + origin.dataset_url_download = sheets_url + license = self.meta.licenses[0] + else: + raise ValueError("Dataset must have either one source or one origin") + + return SnapshotMeta( + namespace=self.meta.namespace, # type: ignore + short_name=self.meta.short_name, # type: ignore + name=self.meta.title, # type: ignore + version=str(self.meta.version), + file_extension="csv", + description=self.meta.description, # type: ignore + source=source, + origin=origin, + license=license, + is_public=self.meta.is_public, ) def snapshot_exists(self) -> bool: @@ -161,9 +179,13 @@ def snapshot_exists(self) -> bool: except FileNotFoundError: return False + def dataset_yaml(self) -> str: + """Generate dataset YAML file.""" + return yaml_dump(metadata_export(self.dataset)) # type: ignore + def save_metadata(self) -> None: with open(self.metadata_path, "w") as f: - f.write(self.meta.to_yaml()) + f.write(self.dataset_yaml()) def upload_snapshot(self) -> Path: # save snapshotmeta YAML file @@ -171,7 +193,7 @@ def upload_snapshot(self) -> Path: # upload snapshot snap = self.snapshot - dataframes.to_file(self.data, file_path=snap.path) + io.df_to_file(self.data, file_path=snap.path) snap.dvc_add(upload=True) return snap.metadata_path @@ -194,7 +216,6 @@ def wrapper(*args, **kwargs): @catch_exceptions def app(dummy_data: bool, commit: bool) -> None: dummies = DUMMY_DATA if dummy_data else {} - with open(CURRENT_DIR / "intro.md", "r") as f: po.put_markdown(f.read()) @@ -218,17 +239,17 @@ def app(dummy_data: bool, commit: bool) -> None: contents=[po.put_markdown(f.read())], ) - data, meta, sheets_url, form, snapshot_dict = _load_data_and_meta(dummies) + dataset, sheets_url = _load_data_and_meta(dummies) - fast_import = FasttrackImport(data, meta, sheets_url, form.is_private, snapshot_dict) + fast_import = FasttrackImport(dataset, sheets_url) # diff with existing dataset if fast_import.snapshot_exists() and fast_import.metadata_path.exists(): po.put_markdown("""## Data differences from existing dataset...""") - _data_diff(fast_import, data) + _data_diff(fast_import) po.put_markdown("""## Metadata differences from existing dataset...""") - _metadata_diff(fast_import, meta) + _metadata_diff(fast_import) # if data_is_different or metadata_is_different: do_continue = _ask_to_continue() @@ -236,12 +257,10 @@ def app(dummy_data: bool, commit: bool) -> None: return # add dataset to dag - dag_content = _add_to_dag(meta.dataset, form.is_private) + dag_content = _add_to_dag(dataset.metadata) # create step and metadata file - walkthrough_utils.generate_step_to_channel( - CURRENT_DIR / "grapher_cookiecutter/", dict(**meta.dataset.dict(), channel="grapher") - ) + walkthrough_utils.generate_step_to_channel(CURRENT_DIR / "grapher_cookiecutter/", fast_import.meta.to_dict()) fast_import.save_metadata() po.put_markdown( @@ -254,13 +273,13 @@ def app(dummy_data: bool, commit: bool) -> None: po.put_success("Upload successful!") po.put_markdown("""## Running ETL and upserting to GrapherDB...""") - step = f"grapher/{meta.dataset.path}" + step = f"{dataset.metadata.uri}" log.info("fasttrack.etl", step=step) etl_main( dag_path=DAG_FASTTRACK_PATH, steps=[step], grapher=True, - private=form.is_private, + private=not dataset.metadata.is_public, workers=1, # NOTE: force is necessary because we are caching checksums with files.CACHE_CHECKSUM_FILE # we could have cleared the cache, but this is cleaner @@ -279,7 +298,7 @@ def app(dummy_data: bool, commit: bool) -> None: po.put_markdown("""## Links""") po.put_markdown( f""" - * [Dataset in admin]({os.environ.get("ADMIN_HOST", "")}/admin/datasets/{_dataset_id(meta.dataset)}) + * [Dataset in admin]({os.environ.get("ADMIN_HOST", "")}/admin/datasets/{_dataset_id(dataset.metadata)}) * [Commit in ETL]({github_link}) """ ) @@ -314,9 +333,7 @@ def __init__(self, **data: Any) -> None: super().__init__(**data) -def _load_data_and_meta( - dummies: dict[str, str] -) -> Tuple[pd.DataFrame, YAMLMeta, str, FasttrackForm, sheets.PartialSnapshotMeta]: +def _load_data_and_meta(dummies: dict[str, str]) -> Tuple[Dataset, str]: existing_sheets = [ {"label": "Choose previously uploaded dataset", "value": "unselected"} ] + _load_existing_sheets_from_snapshots() @@ -331,8 +348,8 @@ def _load_data_and_meta( def _onchange_existing_sheets_url(c: str) -> None: """If user selects existing sheet, update its public/private to be consistent with the sheet.""" if c == "unselected": - # new sheet, use private by default - pi.input_update("options", value=[Options.INFER_METADATA.value, Options.IS_PRIVATE.value]) + # new sheet, use public by default + pi.input_update("options", value=[Options.INFER_METADATA.value]) else: # existing sheet, loads its public/private is_public = [e["is_public"] for e in existing_sheets if e["value"] == c][0] @@ -365,7 +382,7 @@ def _onchange_existing_sheets_url(c: str) -> None: "Additional Options", options=[Options.INFER_METADATA.value, Options.IS_PRIVATE.value], # type: ignore name="options", - value=[Options.INFER_METADATA.value, Options.IS_PRIVATE.value], + value=[Options.INFER_METADATA.value], ), ], ) @@ -373,7 +390,7 @@ def _onchange_existing_sheets_url(c: str) -> None: form = FasttrackForm(**form_dict) - log.info("fasttrack.form", form=form_dict) + log.info("fasttrack.form", form={k: v for k, v in form_dict.items() if k not in ("local_csv",)}) # use selected sheet if URL is not available if not form.local_csv: @@ -390,7 +407,7 @@ def _onchange_existing_sheets_url(c: str) -> None: csv_df = pd.read_csv(StringIO(form.local_csv["content"].decode())) data = csv.parse_data_from_csv(csv_df) - meta, partial_snapshot_meta = csv.parse_metadata_from_csv(form.local_csv["filename"], csv_df.columns) + dataset_meta, variables_meta_dict = csv.parse_metadata_from_csv(form.local_csv["filename"], csv_df.columns) sheets_url = "local_csv" @@ -422,8 +439,11 @@ def _onchange_existing_sheets_url(c: str) -> None: po.put_success( f"Data imported (sheet refreshed {_last_updated_before_minutes(google_sheets['dataset_meta'])} minutes ago)" ) - meta, partial_snapshot_meta = sheets.parse_metadata_from_sheets( - google_sheets["dataset_meta"], google_sheets["variables_meta"], google_sheets["sources_meta"] + dataset_meta, variables_meta_dict = sheets.parse_metadata_from_sheets( + google_sheets["dataset_meta"], + google_sheets["variables_meta"], + google_sheets["sources_meta"], + google_sheets["origins_meta"], ) data = sheets.parse_data_from_sheets(google_sheets["data"]) except urllib.error.HTTPError: @@ -442,22 +462,20 @@ def _onchange_existing_sheets_url(c: str) -> None: # try to infer as much missing metadata as possible if form.infer_metadata: - data, meta.tables[meta.dataset.short_name].variables = _infer_metadata( - data, meta.tables[meta.dataset.short_name].variables - ) - # add unknown source if we don't have any - if not meta.dataset.sources: - meta.dataset.sources = [ - YAMLSourceMeta( + data, variables_meta_dict = _infer_metadata(data, variables_meta_dict) + # add unknown source if we have neither sources nor origins + if not dataset_meta.sources and not dataset_meta.origins: + dataset_meta.sources = [ + Source( name="Unknown", published_by="Unknown", publication_year=dt.date.today().year, - date_accessed=dt.date.today(), + date_accessed=str(dt.date.today()), ) ] # validation - success = _validate_data(data, meta.tables[meta.dataset.short_name].variables) + success = _validate_data(data, variables_meta_dict) if not success: continue @@ -491,13 +509,26 @@ def _onchange_existing_sheets_url(c: str) -> None: break - return data, meta, sheets_url, form, partial_snapshot_meta + dataset_meta.is_public = not form.is_private + dataset_meta.channel = "grapher" + + # create table + tb = Table(data, short_name=dataset_meta.short_name) + for short_name, var_meta in variables_meta_dict.items(): + tb[short_name].metadata = var_meta + + # create dataset and add table + dataset = Dataset.create_empty(DATA_DIR / dataset_meta.uri, dataset_meta) + dataset.add(tb) + dataset.save() + + return dataset, sheets_url -def _dataset_id(meta_ds: YAMLDatasetMeta) -> int: +def _dataset_id(ds_meta: DatasetMeta) -> int: with Session(get_engine()) as session: ds = gm.Dataset.load_with_path( - session, namespace=meta_ds.namespace, short_name=meta_ds.short_name, version=str(meta_ds.version) + session, namespace=ds_meta.namespace, short_name=ds_meta.short_name, version=str(ds_meta.version) # type: ignore ) assert ds.id return ds.id @@ -518,26 +549,45 @@ def _load_existing_sheets_from_snapshots() -> List[Dict[str, str]]: # get all fasttrack snapshots metas = [SnapshotMeta.load_from_yaml(path) for path in (SNAPSHOTS_DIR / "fasttrack").rglob("*.dvc")] - # sort them by date accessed - metas.sort(key=lambda meta: str(meta.source.date_accessed), reverse=True) # type: ignore + existing_sheets = [] + for meta in metas: + # exclude local CSVs + if (getattr(meta.source, "name", None) or getattr(meta.origin, "version")) == "Local CSV": + continue - # exclude local CSVs - metas = [m for m in metas if m.source.name != "Local CSV"] # type: ignore + if meta.source: + assert meta.source.source_data_url + url = meta.source.source_data_url + date_accessed = meta.source.date_accessed + elif meta.origin: + assert meta.origin.dataset_url_download + url = meta.origin.dataset_url_download + date_accessed = meta.origin.date_accessed + else: + raise ValueError("Neither source nor origin") - # decrypt URLs if private - for meta in metas: + # decrypt URLs if private if not meta.is_public: - assert meta.source - assert meta.source.source_data_url - meta.source.source_data_url = _decrypt(meta.source.source_data_url) + url = _decrypt(url) - # extract their name and url - return [{"label": f"{meta.name} / {meta.version}", "value": meta.source.source_data_url, "is_public": meta.is_public} for meta in metas] # type: ignore + existing_sheets.append( + { + "label": f"{meta.name} / {meta.version}", + "value": url, + "is_public": meta.is_public, + "date_accessed": str(date_accessed), + } + ) + + # sort them by date accessed + existing_sheets.sort(key=lambda m: m["date_accessed"], reverse=True) # type: ignore + + return existing_sheets def _infer_metadata( - data: pd.DataFrame, meta_variables: Dict[str, YAMLVariableMeta] -) -> Tuple[pd.DataFrame, Dict[str, YAMLVariableMeta]]: + data: pd.DataFrame, meta_variables: Dict[str, VariableMeta] +) -> Tuple[pd.DataFrame, Dict[str, VariableMeta]]: # underscore variable names from data sheet, this doesn't raise warnings for col in data.columns: data = data.rename(columns={col: underscore(col)}) @@ -558,7 +608,7 @@ def _infer_metadata( return data, meta_variables -def _validate_data(df: pd.DataFrame, meta_variables: Dict[str, YAMLVariableMeta]) -> bool: +def _validate_data(df: pd.DataFrame, variables_meta_dict: Dict[str, VariableMeta]) -> bool: po.put_markdown("""## Validating data and metadata...""") errors = [] @@ -570,16 +620,16 @@ def _validate_data(df: pd.DataFrame, meta_variables: Dict[str, YAMLVariableMeta] errors.append(sheets.ValidationError(e)) # missing columns in metadata - for col in set(df.columns) - set(meta_variables.keys()): + for col in set(df.columns) - set(variables_meta_dict.keys()): errors.append(sheets.ValidationError(f"Variable {col} is not defined in metadata")) # extra columns in metadata - for col in set(meta_variables.keys()) - set(df.columns): + for col in set(variables_meta_dict.keys()) - set(df.columns): errors.append(sheets.ValidationError(f"Variable {col} in metadata is not in the data")) # missing titles for col in df.columns: - if col in meta_variables and not meta_variables[col].title: + if col in variables_meta_dict and not variables_meta_dict[col].title: errors.append(sheets.ValidationError(f"Variable {col} is missing title (you can use its short name)")) # no inf values @@ -634,17 +684,18 @@ def _harmonize_countries(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]: return df, unknown_countries -def _add_to_dag(ds_meta: YAMLDatasetMeta, is_private: bool) -> str: - public_data_step = f"data://grapher/{ds_meta.path}" - private_data_step = f"data-private://grapher/{ds_meta.path}" +def _add_to_dag(ds_meta: DatasetMeta) -> str: + public_data_step = f"data://{ds_meta.uri}" + private_data_step = f"data-private://{ds_meta.uri}" # add steps to dag, replace public by private and vice versa if needed - if is_private: - to_remove = public_data_step - to_add = {private_data_step: [f"snapshot-private://{ds_meta.path}.csv"]} - else: + snapshot_uri = ds_meta.uri.replace("grapher/", "") + if ds_meta.is_public: to_remove = private_data_step - to_add = {public_data_step: [f"snapshot://{ds_meta.path}.csv"]} + to_add = {public_data_step: [f"snapshot://{snapshot_uri}.csv"]} + else: + to_remove = public_data_step + to_add = {private_data_step: [f"snapshot-private://{snapshot_uri}.csv"]} walkthrough_utils.remove_from_dag( to_remove, @@ -656,7 +707,7 @@ def _add_to_dag(ds_meta: YAMLDatasetMeta, is_private: bool) -> str: ) -def _data_diff(fast_import: FasttrackImport, data: pd.DataFrame) -> bool: +def _data_diff(fast_import: FasttrackImport) -> bool: console = Console(record=True) # load data from snapshot @@ -666,7 +717,7 @@ def _data_diff(fast_import: FasttrackImport, data: pd.DataFrame) -> bool: exit_code = diff_print( df1=existing_data, - df2=data.reset_index(), + df2=fast_import.data.reset_index(), df1_label="existing", df2_label="imported", absolute_tolerance=0.00000001, @@ -684,7 +735,7 @@ def _data_diff(fast_import: FasttrackImport, data: pd.DataFrame) -> bool: return exit_code != 0 -def _metadata_diff(fast_import: FasttrackImport, meta: YAMLMeta) -> bool: +def _metadata_diff(fast_import: FasttrackImport) -> bool: # load existing metadata file with open(fast_import.metadata_path, "r") as f: existing_meta = f.read() @@ -694,10 +745,13 @@ def _metadata_diff(fast_import: FasttrackImport, meta: YAMLMeta) -> bool: # create new snapshot metadata new_snapshot_yaml = fast_import.snapshot_meta.to_yaml() + # create metadata file + new_meta_yaml = fast_import.dataset_yaml() + # combine snapshot YAML and grapher YAML file diff = difflib.HtmlDiff() html_diff = diff.make_table( - (existing_meta + old_snapshot_yaml).split("\n"), (meta.to_yaml() + new_snapshot_yaml).split("\n"), context=True + (existing_meta + old_snapshot_yaml).split("\n"), (new_meta_yaml + new_snapshot_yaml).split("\n"), context=True ) if "No Differences Found" in html_diff: po.put_success("No metadata differences found.") diff --git a/fasttrack/csv.py b/fasttrack/csv.py index d2c9daae089..0e9f16a8c89 100644 --- a/fasttrack/csv.py +++ b/fasttrack/csv.py @@ -1,10 +1,10 @@ -from typing import List, Tuple +from typing import Any, Dict, List, Tuple import pandas as pd +from owid.catalog import DatasetMeta, VariableMeta from owid.catalog.utils import underscore -from .sheets import PartialSnapshotMeta, parse_data_from_sheets -from .yaml_meta import YAMLMeta +from .sheets import parse_data_from_sheets def parse_data_from_csv(csv_df: pd.DataFrame) -> pd.DataFrame: @@ -17,10 +17,10 @@ def parse_data_from_csv(csv_df: pd.DataFrame) -> pd.DataFrame: return data -def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[YAMLMeta, PartialSnapshotMeta]: +def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[DatasetMeta, Dict[str, VariableMeta]]: filename = filename.replace(".csv", "") title = f"DRAFT {filename}" - dataset_dict = { + dataset_dict: dict[str, Any] = { "title": title, "short_name": underscore(filename), "version": "latest", @@ -37,12 +37,18 @@ def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[YAMLMeta if col.lower() not in ("country", "year", "entity") } - return ( - YAMLMeta(**{"dataset": dataset_dict, "tables": {dataset_dict["short_name"]: {"variables": variables_dict}}}), - PartialSnapshotMeta( + dataset_dict["sources"] = [ + dict( + name="Unknown", url="", publication_year=None, - license_url=None, - license_name=None, - ), - ) + ) + ] + dataset_dict["licenses"] = [ + { + "url": None, + "name": None, + } + ] + + return DatasetMeta(**dataset_dict), {k: VariableMeta(**v) for k, v in variables_dict.items()} diff --git a/fasttrack/grapher_cookiecutter/{{cookiecutter.namespace}}/{{cookiecutter.version}}/{{cookiecutter.short_name}}.py b/fasttrack/grapher_cookiecutter/{{cookiecutter.namespace}}/{{cookiecutter.version}}/{{cookiecutter.short_name}}.py index 4f96ceadef3..f3448083e76 100644 --- a/fasttrack/grapher_cookiecutter/{{cookiecutter.namespace}}/{{cookiecutter.version}}/{{cookiecutter.short_name}}.py +++ b/fasttrack/grapher_cookiecutter/{{cookiecutter.namespace}}/{{cookiecutter.version}}/{{cookiecutter.short_name}}.py @@ -18,5 +18,5 @@ def run(dest_dir: str) -> None: tb = catalog.Table(data, short_name=P.short_name) # add table, update metadata from *.meta.yml and save - ds = create_dataset(dest_dir, tables=[tb], default_metadata=snap.metadata) + ds = create_dataset(dest_dir, tables=[tb.set_index(["country", "year"])], default_metadata=snap.metadata) ds.save() diff --git a/fasttrack/sheets.py b/fasttrack/sheets.py index 42cf9310deb..07c1882d1e9 100644 --- a/fasttrack/sheets.py +++ b/fasttrack/sheets.py @@ -1,26 +1,26 @@ import concurrent.futures import datetime as dt -from typing import Any, Dict, List, Optional, Tuple, cast +import json +import urllib.error +from typing import Any, Dict, Optional, Tuple import pandas as pd -from pydantic import BaseModel +from owid.catalog import ( + DatasetMeta, + License, + Origin, + Source, + VariableMeta, + VariablePresentationMeta, +) from etl.grapher_helpers import INT_TYPES -from .yaml_meta import YAMLMeta - class ValidationError(Exception): pass -class PartialSnapshotMeta(BaseModel): - url: str - publication_year: Optional[int] - license_url: Optional[str] - license_name: Optional[str] - - # these IDs are taken from template sheet, they will be different if someone # creates a new sheet from scratch and use those names SHEET_TO_GID = { @@ -29,9 +29,17 @@ class PartialSnapshotMeta(BaseModel): "variables_meta": 777328216, "dataset_meta": 1719161864, "sources_meta": 1399503534, + "origins_meta": 279169148, } +def _fetch_url_or_empty_dataframe(url, **kwargs): + try: + return pd.read_csv(url, **kwargs) + except urllib.error.HTTPError: + return pd.DataFrame() + + def import_google_sheets(url: str) -> Dict[str, Any]: # read dataset first to check if we're using data_url instead of data sheet dataset_meta = pd.read_csv(f"{url}&gid={SHEET_TO_GID['dataset_meta']}", header=None) @@ -40,13 +48,19 @@ def import_google_sheets(url: str) -> Dict[str, Any]: with concurrent.futures.ThreadPoolExecutor() as executor: data_future = executor.submit(lambda x: pd.read_csv(x), data_url) variables_meta_future = executor.submit(lambda x: pd.read_csv(x), f"{url}&gid={SHEET_TO_GID['variables_meta']}") - sources_meta_future = executor.submit(lambda x: pd.read_csv(x), f"{url}&gid={SHEET_TO_GID['sources_meta']}") + sources_meta_future = executor.submit( + lambda url: _fetch_url_or_empty_dataframe(url, header=None), f"{url}&gid={SHEET_TO_GID['sources_meta']}" + ) + origins_meta_future = executor.submit( + lambda url: _fetch_url_or_empty_dataframe(url, header=None), f"{url}&gid={SHEET_TO_GID['origins_meta']}" + ) return { "data": data_future.result(), "variables_meta": variables_meta_future.result(), "dataset_meta": dataset_meta, "sources_meta": sources_meta_future.result(), + "origins_meta": origins_meta_future.result(), } @@ -72,16 +86,45 @@ def parse_data_from_sheets(data_df: pd.DataFrame) -> pd.DataFrame: return data_df.set_index(["country", "year"]) -def parse_metadata_from_sheets( - dataset_meta_df: pd.DataFrame, variables_meta_df: pd.DataFrame, sources_meta_df: pd.DataFrame -) -> Tuple[YAMLMeta, PartialSnapshotMeta]: - sources_dict = cast(Dict[str, Any], sources_meta_df.set_index("short_name").to_dict()) - sources_dict = {k: _prune_empty(v) for k, v in sources_dict.items()} +def _parse_sources(sources_meta_df: pd.DataFrame) -> Optional[Source]: + sources = sources_meta_df.set_index(0).T.to_dict(orient="records") + + if not sources: + return None + + assert len(sources) == 1, "Only one source is supported for now" + source = sources[0] + + if pd.isnull(source.get("date_accessed")): + source.pop("date_accessed") # publisher_source is not used anymore - for source in sources_dict.values(): - source.pop("publisher_source", None) + source.pop("publisher_source", None) + # short_name is not used anymore + source.pop("short_name", None) + + return Source(**source) + + +def _parse_origins(origins_meta_df: pd.DataFrame) -> Optional[Origin]: + origins = origins_meta_df.set_index(0).T.to_dict(orient="records") + + if not origins: + return None + + assert len(origins) == 1, "Only one source is supported for now" + origin = origins[0] + origin = _prune_empty(origin) # type: ignore + + # parse license fields + if origin.get("license.name") or origin.get("license.url"): + origin["license"] = License(name=origin.pop("license.name", None), url=origin.pop("license.url", None)) + + return Origin(**origin) + + +def _parse_dataset(dataset_meta_df: pd.DataFrame) -> DatasetMeta: dataset_dict = _prune_empty(dataset_meta_df.set_index(0)[1].to_dict()) # type: ignore dataset_dict["namespace"] = "fasttrack" # or should it be owid? or institution specific? dataset_dict.pop("updated") @@ -99,6 +142,18 @@ def parse_metadata_from_sheets( if key not in dataset_dict: raise ValidationError(f"Missing mandatory field '{key}' from sheet 'dataset_meta'") + # deprecated field + dataset_dict.pop("sources", None) + + dataset_meta = DatasetMeta(**dataset_dict) + dataset_meta.licenses = [ + License(name=dataset_dict.pop("license_name", None), url=dataset_dict.pop("license_url", None)) + ] + + return dataset_meta + + +def _parse_variables(variables_meta_df: pd.DataFrame) -> Dict[str, VariableMeta]: variables_list = [_prune_empty(v) for v in variables_meta_df.to_dict(orient="records")] # type: ignore # default variable values @@ -111,56 +166,43 @@ def parse_metadata_from_sheets( if k.startswith("display."): variable.setdefault("display", {})[k[8:]] = variable.pop(k) - # expand sources - if "sources" in dataset_dict: - dataset_dict["sources"] = _expand_sources(dataset_dict["sources"], sources_dict) - + out = {} for variable in variables_list: - if "sources" in variable: - variable["sources"] = _expand_sources(variable["sources"], sources_dict) + # sources field is deprecated + variable.pop("sources", None) + short_name = variable.pop("short_name") - variables_dict = {v.pop("short_name"): v for v in variables_list} + if variable.get("presentation"): + variable["presentation"] = VariablePresentationMeta(**json.loads(variable["presentation"])) - # extract fields for snapshot - # NOTE: we used to have special fields in dataset_meta for `url` and `publication_year`, but these - # are the same fields as in source so we use these instead - if len(dataset_dict.get("sources", [])) > 0: - dataset_source = dataset_dict["sources"][0] - else: - dataset_source = {} - - partial_snapshot_meta = _prune_empty( - { - # "publication_year": dataset_dict.pop("publication_year", None), - "publication_year": dataset_source.get("publication_year", None), - "license_url": dataset_dict.pop("license_url", None), - "license_name": dataset_dict.pop("license_name", None), - } - ) - partial_snapshot_meta["url"] = dataset_source.get("url", "") - - _move_keys_to_the_end(dataset_dict, ["description", "sources"]) - - return ( - YAMLMeta(**{"dataset": dataset_dict, "tables": {dataset_dict["short_name"]: {"variables": variables_dict}}}), - PartialSnapshotMeta(**partial_snapshot_meta), - ) - - -def _expand_sources(sources_name: str, sources_dict: Dict[str, Any]) -> List[Dict[str, Any]]: - sources = [] - for source_short_name in map(lambda s: s.strip(), sources_name.split(",")): - try: - sources.append(sources_dict[source_short_name]) - except KeyError: - raise ValidationError(f"Source with short_name `{source_short_name}` not found in `sources_meta` sheet") - return sources - - -def _move_keys_to_the_end(d: Dict[str, Any], keys: List[str]) -> None: - for key in keys: - if key in d: - d[key] = d.pop(key) + var_meta = VariableMeta(**variable) + + out[short_name] = var_meta + + return out + + +def parse_metadata_from_sheets( + dataset_meta_df: pd.DataFrame, + variables_meta_df: pd.DataFrame, + sources_meta_df: pd.DataFrame, + origins_meta_df: pd.DataFrame, +) -> Tuple[DatasetMeta, Dict[str, VariableMeta]]: + source = _parse_sources(sources_meta_df) + origin = _parse_origins(origins_meta_df) + dataset_meta = _parse_dataset(dataset_meta_df) + variables_meta_dict = _parse_variables(variables_meta_df) + + if origin and source: + raise ValidationError("Using origins and sources together is not yet supported") + + # put all sources and origins to dataset level + if source: + dataset_meta.sources = [source] + if origin: + dataset_meta.origins = [origin] + + return dataset_meta, variables_meta_dict def _prune_empty(d: Dict[str, Any]) -> Dict[str, Any]: diff --git a/fasttrack/yaml_meta.py b/fasttrack/yaml_meta.py deleted file mode 100644 index 44882b63548..00000000000 --- a/fasttrack/yaml_meta.py +++ /dev/null @@ -1,76 +0,0 @@ -import datetime as dt -from typing import Any, List, Optional, Union - -from pydantic import BaseModel, Extra, Field - -from etl.files import yaml_dump - - -class YAMLSourceMeta(BaseModel): - class Config: - extra = Extra.forbid - - name: str - published_by: str - publication_year: Optional[int] = None - date_accessed: dt.date = Field(default_factory=dt.date.today) - url: Optional[str] = None - - -class YAMLDatasetMeta(BaseModel): - class Config: - extra = Extra.forbid - - namespace: str - version: Union[str, dt.date] - short_name: str - title: str - description: str - sources: List[YAMLSourceMeta] = Field(default_factory=list) - - @property - def path(self): - return f"{self.namespace}/{self.version}/{self.short_name}" - - -class YAMLVariableMeta(BaseModel): - class Config: - extra = Extra.forbid - - title: str - short_unit: Union[str, None] = None - unit: str - description: Optional[str] = None - display: Optional[dict[str, Any]] = None - sources: List[YAMLSourceMeta] = Field(default_factory=list) - - -class YAMLTableMeta(BaseModel): - class Config: - extra = Extra.forbid - - title: Optional[str] - description: Optional[str] - variables: dict[str, YAMLVariableMeta] - - -class YAMLMeta(BaseModel): - class Config: - extra = Extra.forbid - - dataset: YAMLDatasetMeta - tables: dict[str, YAMLTableMeta] - - def to_yaml(self) -> str: - d = self.dict(exclude_none=True, exclude_unset=True) - - # exclude fields that are inferred from path - d["dataset"].pop("namespace") - d["dataset"].pop("version") - d["dataset"].pop("short_name") - - # description and title is already in the snapshot - d["dataset"].pop("title", None) - d["dataset"].pop("description", None) - - return yaml_dump(d) # type: ignore diff --git a/lib/catalog/owid/catalog/__init__.py b/lib/catalog/owid/catalog/__init__.py index e087cddc3fc..e0166fbdff9 100644 --- a/lib/catalog/owid/catalog/__init__.py +++ b/lib/catalog/owid/catalog/__init__.py @@ -3,7 +3,16 @@ from . import processing, utils from .catalogs import CHANNEL, LocalCatalog, RemoteCatalog, find, find_latest, find_one from .datasets import Dataset -from .meta import DatasetMeta, FaqLink, License, Origin, Source, TableMeta, VariableMeta +from .meta import ( + DatasetMeta, + FaqLink, + License, + Origin, + Source, + TableMeta, + VariableMeta, + VariablePresentationMeta, +) from .tables import Table from .variables import Variable @@ -19,6 +28,7 @@ "DatasetMeta", "TableMeta", "VariableMeta", + "VariablePresentationMeta", "FaqLink", "Source", "Origin",