Skip to content

Commit

Permalink
🔨 add support for origins in fast-track
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Aug 14, 2023
1 parent 3390e5e commit 94653ce
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 256 deletions.
3 changes: 3 additions & 0 deletions etl/metadata_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def metadata_export(
for source in ds_meta.get("sources", []):
_prune_empty(source)

for origin in ds_meta.get("origins", []):
_prune_empty(origin)

for license in ds_meta.get("licenses", []):
_prune_empty(license)

Expand Down
248 changes: 150 additions & 98 deletions fasttrack/cli.py

Large diffs are not rendered by default.

30 changes: 18 additions & 12 deletions fasttrack/csv.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List, Tuple
from typing import Any, Dict, List, Tuple

import pandas as pd
from owid.catalog import DatasetMeta, VariableMeta
from owid.catalog.utils import underscore

from .sheets import PartialSnapshotMeta, parse_data_from_sheets
from .yaml_meta import YAMLMeta
from .sheets import parse_data_from_sheets


def parse_data_from_csv(csv_df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -17,10 +17,10 @@ def parse_data_from_csv(csv_df: pd.DataFrame) -> pd.DataFrame:
return data


def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[YAMLMeta, PartialSnapshotMeta]:
def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[DatasetMeta, Dict[str, VariableMeta]]:
filename = filename.replace(".csv", "")
title = f"DRAFT {filename}"
dataset_dict = {
dataset_dict: dict[str, Any] = {
"title": title,
"short_name": underscore(filename),
"version": "latest",
Expand All @@ -37,12 +37,18 @@ def parse_metadata_from_csv(filename: str, columns: List[str]) -> Tuple[YAMLMeta
if col.lower() not in ("country", "year", "entity")
}

return (
YAMLMeta(**{"dataset": dataset_dict, "tables": {dataset_dict["short_name"]: {"variables": variables_dict}}}),
PartialSnapshotMeta(
dataset_dict["sources"] = [
dict(
name="Unknown",
url="",
publication_year=None,
license_url=None,
license_name=None,
),
)
)
]
dataset_dict["licenses"] = [
{
"url": None,
"name": None,
}
]

return DatasetMeta(**dataset_dict), {k: VariableMeta(**v) for k, v in variables_dict.items()}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ def run(dest_dir: str) -> None:
tb = catalog.Table(data, short_name=P.short_name)

# add table, update metadata from *.meta.yml and save
ds = create_dataset(dest_dir, tables=[tb], default_metadata=snap.metadata)
ds = create_dataset(dest_dir, tables=[tb.set_index(["country", "year"])], default_metadata=snap.metadata)
ds.save()
174 changes: 108 additions & 66 deletions fasttrack/sheets.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import concurrent.futures
import datetime as dt
from typing import Any, Dict, List, Optional, Tuple, cast
import json
import urllib.error
from typing import Any, Dict, Optional, Tuple

import pandas as pd
from pydantic import BaseModel
from owid.catalog import (
DatasetMeta,
License,
Origin,
Source,
VariableMeta,
VariablePresentationMeta,
)

from etl.grapher_helpers import INT_TYPES

from .yaml_meta import YAMLMeta


class ValidationError(Exception):
pass


class PartialSnapshotMeta(BaseModel):
url: str
publication_year: Optional[int]
license_url: Optional[str]
license_name: Optional[str]


# these IDs are taken from template sheet, they will be different if someone
# creates a new sheet from scratch and use those names
SHEET_TO_GID = {
Expand All @@ -29,9 +29,17 @@ class PartialSnapshotMeta(BaseModel):
"variables_meta": 777328216,
"dataset_meta": 1719161864,
"sources_meta": 1399503534,
"origins_meta": 279169148,
}


def _fetch_url_or_empty_dataframe(url, **kwargs):
try:
return pd.read_csv(url, **kwargs)
except urllib.error.HTTPError:
return pd.DataFrame()


def import_google_sheets(url: str) -> Dict[str, Any]:
# read dataset first to check if we're using data_url instead of data sheet
dataset_meta = pd.read_csv(f"{url}&gid={SHEET_TO_GID['dataset_meta']}", header=None)
Expand All @@ -40,13 +48,19 @@ def import_google_sheets(url: str) -> Dict[str, Any]:
with concurrent.futures.ThreadPoolExecutor() as executor:
data_future = executor.submit(lambda x: pd.read_csv(x), data_url)
variables_meta_future = executor.submit(lambda x: pd.read_csv(x), f"{url}&gid={SHEET_TO_GID['variables_meta']}")
sources_meta_future = executor.submit(lambda x: pd.read_csv(x), f"{url}&gid={SHEET_TO_GID['sources_meta']}")
sources_meta_future = executor.submit(
lambda url: _fetch_url_or_empty_dataframe(url, header=None), f"{url}&gid={SHEET_TO_GID['sources_meta']}"
)
origins_meta_future = executor.submit(
lambda url: _fetch_url_or_empty_dataframe(url, header=None), f"{url}&gid={SHEET_TO_GID['origins_meta']}"
)

return {
"data": data_future.result(),
"variables_meta": variables_meta_future.result(),
"dataset_meta": dataset_meta,
"sources_meta": sources_meta_future.result(),
"origins_meta": origins_meta_future.result(),
}


Expand All @@ -72,16 +86,45 @@ def parse_data_from_sheets(data_df: pd.DataFrame) -> pd.DataFrame:
return data_df.set_index(["country", "year"])


def parse_metadata_from_sheets(
dataset_meta_df: pd.DataFrame, variables_meta_df: pd.DataFrame, sources_meta_df: pd.DataFrame
) -> Tuple[YAMLMeta, PartialSnapshotMeta]:
sources_dict = cast(Dict[str, Any], sources_meta_df.set_index("short_name").to_dict())
sources_dict = {k: _prune_empty(v) for k, v in sources_dict.items()}
def _parse_sources(sources_meta_df: pd.DataFrame) -> Optional[Source]:
sources = sources_meta_df.set_index(0).T.to_dict(orient="records")

if not sources:
return None

assert len(sources) == 1, "Only one source is supported for now"
source = sources[0]

if pd.isnull(source.get("date_accessed")):
source.pop("date_accessed")

# publisher_source is not used anymore
for source in sources_dict.values():
source.pop("publisher_source", None)
source.pop("publisher_source", None)
# short_name is not used anymore
source.pop("short_name", None)

return Source(**source)


def _parse_origins(origins_meta_df: pd.DataFrame) -> Optional[Origin]:
origins = origins_meta_df.set_index(0).T.to_dict(orient="records")

if not origins:
return None

assert len(origins) == 1, "Only one source is supported for now"
origin = origins[0]

origin = _prune_empty(origin)

# parse license fields
if origin.get("license.name") or origin.get("license.url"):
origin["license"] = License(name=origin.pop("license.name", None), url=origin.pop("license.url", None))

return Origin(**origin)


def _parse_dataset(dataset_meta_df: pd.DataFrame) -> DatasetMeta:
dataset_dict = _prune_empty(dataset_meta_df.set_index(0)[1].to_dict()) # type: ignore
dataset_dict["namespace"] = "fasttrack" # or should it be owid? or institution specific?
dataset_dict.pop("updated")
Expand All @@ -99,6 +142,18 @@ def parse_metadata_from_sheets(
if key not in dataset_dict:
raise ValidationError(f"Missing mandatory field '{key}' from sheet 'dataset_meta'")

# deprecated field
dataset_dict.pop("sources", None)

dataset_meta = DatasetMeta(**dataset_dict)
dataset_meta.licenses = [
License(name=dataset_dict.pop("license_name", None), url=dataset_dict.pop("license_url", None))
]

return dataset_meta


def _parse_variables(variables_meta_df: pd.DataFrame) -> Dict[str, VariableMeta]:
variables_list = [_prune_empty(v) for v in variables_meta_df.to_dict(orient="records")] # type: ignore

# default variable values
Expand All @@ -111,56 +166,43 @@ def parse_metadata_from_sheets(
if k.startswith("display."):
variable.setdefault("display", {})[k[8:]] = variable.pop(k)

# expand sources
if "sources" in dataset_dict:
dataset_dict["sources"] = _expand_sources(dataset_dict["sources"], sources_dict)

out = {}
for variable in variables_list:
if "sources" in variable:
variable["sources"] = _expand_sources(variable["sources"], sources_dict)
# sources field is deprecated
variable.pop("sources", None)
short_name = variable.pop("short_name")

variables_dict = {v.pop("short_name"): v for v in variables_list}
if variable.get("presentation"):
variable["presentation"] = VariablePresentationMeta(**json.loads(variable["presentation"]))

# extract fields for snapshot
# NOTE: we used to have special fields in dataset_meta for `url` and `publication_year`, but these
# are the same fields as in source so we use these instead
if len(dataset_dict.get("sources", [])) > 0:
dataset_source = dataset_dict["sources"][0]
else:
dataset_source = {}

partial_snapshot_meta = _prune_empty(
{
# "publication_year": dataset_dict.pop("publication_year", None),
"publication_year": dataset_source.get("publication_year", None),
"license_url": dataset_dict.pop("license_url", None),
"license_name": dataset_dict.pop("license_name", None),
}
)
partial_snapshot_meta["url"] = dataset_source.get("url", "")

_move_keys_to_the_end(dataset_dict, ["description", "sources"])

return (
YAMLMeta(**{"dataset": dataset_dict, "tables": {dataset_dict["short_name"]: {"variables": variables_dict}}}),
PartialSnapshotMeta(**partial_snapshot_meta),
)


def _expand_sources(sources_name: str, sources_dict: Dict[str, Any]) -> List[Dict[str, Any]]:
sources = []
for source_short_name in map(lambda s: s.strip(), sources_name.split(",")):
try:
sources.append(sources_dict[source_short_name])
except KeyError:
raise ValidationError(f"Source with short_name `{source_short_name}` not found in `sources_meta` sheet")
return sources


def _move_keys_to_the_end(d: Dict[str, Any], keys: List[str]) -> None:
for key in keys:
if key in d:
d[key] = d.pop(key)
var_meta = VariableMeta(**variable)

out[short_name] = var_meta

return out


def parse_metadata_from_sheets(
dataset_meta_df: pd.DataFrame,
variables_meta_df: pd.DataFrame,
sources_meta_df: pd.DataFrame,
origins_meta_df: pd.DataFrame,
) -> Tuple[DatasetMeta, Dict[str, VariableMeta]]:
source = _parse_sources(sources_meta_df)
origin = _parse_origins(origins_meta_df)
dataset_meta = _parse_dataset(dataset_meta_df)
variables_meta_dict = _parse_variables(variables_meta_df)

if origin and source:
raise ValidationError("Using origins and sources together is not yet supported")

# put all sources and origins to dataset level
if source:
dataset_meta.sources = [source]
if origin:
dataset_meta.origins = [origin]

return dataset_meta, variables_meta_dict


def _prune_empty(d: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
76 changes: 0 additions & 76 deletions fasttrack/yaml_meta.py

This file was deleted.

Loading

0 comments on commit 94653ce

Please sign in to comment.