Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📊 rework regions dataset to have unique indexes #1282

Merged
merged 12 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions etl/data_helpers/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,25 +705,24 @@ def list_members_of_region(
if excluded_members is None:
excluded_members = []

# Get main tables from the regions dataset.
df_region_definitions = pd.DataFrame(ds_regions["definitions"]).reset_index()
df_region_members = pd.DataFrame(ds_regions["members"]).reset_index()
# Get the main table from the regions dataset.
tb_regions = ds_regions["regions"][["name", "members"]]

# Get a mapping from region code to name.
region_names = df_region_definitions.set_index("code").to_dict()["name"]
# Get a mapping from code to region name.
mapping = tb_regions["name"].to_dict()

# Map each region code to its name, and each member code to its name.
df_countries_in_region = df_region_members.copy()
df_countries_in_region["region"] = map_series(
df_countries_in_region["code"], mapping=region_names, warn_on_missing_mappings=True
)
df_countries_in_region["member"] = map_series(
df_countries_in_region["member"], mapping=region_names, warn_on_missing_mappings=True
)
# Convert strings of lists of members into lists of aliases.
tb_regions["members"] = [json.loads(member) if pd.notnull(member) else [] for member in tb_regions["members"]]

# Explode list of members to have one row per member.
tb_regions = tb_regions.explode("members").dropna()

# Map member codes to names.
tb_regions["members"] = map_series(series=tb_regions["members"], mapping=mapping, warn_on_missing_mappings=True)

# Create a column with the list of members in each region
df_countries_in_region = (
df_countries_in_region.rename(columns={"member": "members"})
tb_countries_in_region = (
tb_regions.rename(columns={"name": "region"})
.groupby("region", as_index=True, observed=True)
.agg({"members": list})
)
Expand All @@ -732,14 +731,12 @@ def list_members_of_region(
if "wb_income_group" in ds_income_groups.table_names:
# TODO: Remove this block once the old income groups dataset has been archived.
# Get the main table from the income groups dataset.
df_income = (
pd.DataFrame(ds_income_groups["wb_income_group"])
.reset_index()
.rename(columns={"income_group": "classification"})
tb_income = (
ds_income_groups["wb_income_group"].reset_index().rename(columns={"income_group": "classification"})
)
elif "income_groups_latest" in ds_income_groups.table_names:
# Get the table with the current definitions of income groups.
df_income = ds_income_groups["income_groups_latest"].reset_index()
tb_income = ds_income_groups["income_groups_latest"].reset_index()
else:
raise KeyError(
"Table 'income_groups_latest' not found. "
Expand All @@ -753,7 +750,7 @@ def list_members_of_region(
# Keep only countries that are not in "income_groups_latest".
# NOTE: This not only includes historical regions, but also countries that don't appear in
# "income_groups_latest", like Venezuela.
historical_regions = historical_regions[~historical_regions["country"].isin(df_income["country"])]
historical_regions = historical_regions[~historical_regions["country"].isin(tb_income["country"])]
# Keep only the latest income group classification of each historical region.
historical_regions = (
historical_regions.sort_values(["country", "year"], ascending=True)
Expand All @@ -762,33 +759,33 @@ def list_members_of_region(
.reset_index(drop=True)
)
# Append historical regions to latest income group classifications.
df_income = pd.concat([df_income, historical_regions], ignore_index=True)
tb_income = pd.concat([tb_income, historical_regions], ignore_index=True)

# Create a dataframe of countries in each income group.
df_countries_in_income_group = (
df_income.rename(columns={"classification": "region", "country": "members"})
tb_countries_in_income_group = (
tb_income.rename(columns={"classification": "region", "country": "members"}) # type: ignore
.groupby("region", as_index=True, observed=True)
.agg({"members": list})
)

# Create a dataframe of members in regions, including income groups.
df_countries_in_region = pd.concat([df_countries_in_region, df_countries_in_income_group], ignore_index=False)
tb_countries_in_region = pd.concat([tb_countries_in_region, tb_countries_in_income_group], ignore_index=False)

# Get list of default members for the given region, if it's known.
if region in df_countries_in_region.index.tolist():
countries_set = set(df_countries_in_region.loc[region]["members"])
if region in tb_countries_in_region.index.tolist():
countries_set = set(tb_countries_in_region.loc[region]["members"])
else:
# Initialise an empty set of members.
countries_set = set()

# List countries from the list of regions included.
countries_set |= set(
sum([df_countries_in_region.loc[region_included]["members"] for region_included in additional_regions], [])
sum([tb_countries_in_region.loc[region_included]["members"] for region_included in additional_regions], [])
)

# Remove all countries from the list of regions excluded.
countries_set -= set(
sum([df_countries_in_region.loc[region_excluded]["members"] for region_excluded in excluded_regions], [])
sum([tb_countries_in_region.loc[region_excluded]["members"] for region_excluded in excluded_regions], [])
)

# Add the list of individual countries to be included.
Expand Down
9 changes: 6 additions & 3 deletions etl/harmonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,12 @@ class CountryRegionMapper:
valid_names: Set[str]

def __init__(self) -> None:
ds_regions = Dataset(LATEST_REGIONS_DATASET_PATH)
rc_df = ds_regions["definitions"]
aliases_s = ds_regions["aliases"]["alias"]
tb_regions = Dataset(LATEST_REGIONS_DATASET_PATH)["regions"]
rc_df = tb_regions[["name", "short_name", "region_type", "is_historical", "defined_by"]]
# Convert strings of lists of aliases into lists of aliases.
tb_regions["aliases"] = [json.loads(alias) if pd.notnull(alias) else [] for alias in tb_regions["aliases"]]
# Explode list of aliases to have one row per alias.
aliases_s = tb_regions["aliases"].explode().dropna()
aliases = {}
valid_names = set()
for row in rc_df.itertuples():
Expand Down
9 changes: 2 additions & 7 deletions etl/steps/data/garden/emissions/2023-05-03/owid_co2.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,7 @@ def run(dest_dir: str) -> None:
tb_energy = ds_energy["primary_energy_consumption"]
tb_gdp = ds_gdp["maddison_gdp"]
tb_population = ds_population["population"]
tb_region_names = ds_regions["definitions"]
tb_region_codes = ds_regions["legacy_codes"]
tb_regions = ds_regions["regions"]

#
# Process data.
Expand All @@ -373,11 +372,7 @@ def run(dest_dir: str) -> None:
tb_energy = tb_energy.reset_index()[list(PRIMARY_ENERGY_COLUMNS)].rename(columns=PRIMARY_ENERGY_COLUMNS)
tb_gdp = tb_gdp.reset_index()[list(GDP_COLUMNS)].rename(columns=GDP_COLUMNS)
tb_population = tb_population.reset_index()[list(POPULATION_COLUMNS)].rename(columns=POPULATION_COLUMNS)
tb_regions = (
pd.merge(tb_region_names, tb_region_codes, left_index=True, right_index=True)
.reset_index()[list(REGIONS_COLUMNS)]
.rename(columns=REGIONS_COLUMNS)
)
tb_regions = tb_regions.reset_index()[list(REGIONS_COLUMNS)].rename(columns=REGIONS_COLUMNS)

# Combine tables.
combined = combine_tables(
Expand Down
9 changes: 2 additions & 7 deletions etl/steps/data/garden/emissions/2023-07-10/owid_co2.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ def run(dest_dir: str) -> None:
tb_energy = ds_energy["primary_energy_consumption"]
tb_gdp = ds_gdp["maddison_gdp"]
tb_population = ds_population["population"]
tb_region_names = ds_regions["definitions"]
tb_region_codes = ds_regions["legacy_codes"]
tb_regions = ds_regions["regions"]

#
# Process data.
Expand All @@ -369,11 +368,7 @@ def run(dest_dir: str) -> None:
tb_energy = tb_energy.reset_index()[list(PRIMARY_ENERGY_COLUMNS)].rename(columns=PRIMARY_ENERGY_COLUMNS)
tb_gdp = tb_gdp.reset_index()[list(GDP_COLUMNS)].rename(columns=GDP_COLUMNS)
tb_population = tb_population.reset_index()[list(POPULATION_COLUMNS)].rename(columns=POPULATION_COLUMNS)
tb_regions = (
pd.merge(tb_region_names, tb_region_codes, left_index=True, right_index=True)
.reset_index()[list(REGIONS_COLUMNS)]
.rename(columns=REGIONS_COLUMNS)
)
tb_regions = tb_regions.reset_index()[list(REGIONS_COLUMNS)].rename(columns=REGIONS_COLUMNS)

# Combine tables.
combined = combine_tables(
Expand Down
59 changes: 17 additions & 42 deletions etl/steps/data/garden/regions/2023-01-01/regions.meta.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,47 @@ dataset:
Region definitions and other useful data like alternative region names, historical region successors, and sub-region members.

tables:
definitions:
regions:
variables:
name: &ref_name
name:
title: "Region name"
description: |
Region name.
short_name: &ref_short_name
short_name:
title: "Region short name"
description: |
Region short name, to be used in certain visualizations that are short of space.
region_type: &ref_region_type
region_type:
title: "Region type"
description: |
Type of region (e.g. country or continent). These types are not accurate, given that many regions have a complicated political status, and it is out of the scope of this dataset to adopt precise definitions.
is_historical: &ref_is_historical
is_historical:
title: "Is historical"
description: |
True if the region does not currently exist, but existed in the past.
defined_by: &ref_defined_by
defined_by:
title: "Defined by"
description: |
Institution that introduced a certain definition. For example, if a certain region was found in a dataset of a certain institution, this variable will contain the name of the institution (in a short, snake_case form, e.g. "owid").

aliases:
variables:
alias: &ref_alias
title: "Alias"
description: |
Alternative name for a region. For example, "U.S" is a common alias of region "United States".

members:
variables:
member: &ref_member
members:
title: "Member"
description: |
Member of a region.

related:
variables:
member: &ref_related
aliases:
title: "Alias"
description: |
Alternative name for a region. For example, "U.S" is a common alias of region "United States".
related:
title: "Possible member"
description: |
Possible member of a region. Here, membership is defined rather vaguely: A member can be a sub-region of a country, an overseas territory, a country inside a continent, etc.

The status of many regions is unclear or contested, and by defining these dependencies we are not making any political statement. We simply define possible overlaps between geographical regions that can be found in datasets, to ensure we never double-count the contribution from those regions when creating aggregate data.

transitions:
variables:
end_year:
title: "End year"
title: "Last year that a historical region existed"
description: |
Final year a region existed.
successor:
title: "Successor"
Last year when a historical region existed.
successors:
title: "Successors of a historical region"
description: |
Successor of a historical region.

A successor is defined as a region that starts existing after a certain historical region stops existing, occupying (partially or totally) the same geographic area.

regions:
variables:
name: *ref_name
short_name: *ref_short_name
region_type: *ref_region_type
is_historical: *ref_is_historical
defined_by: *ref_defined_by
members: *ref_member
aliases: *ref_alias
related: *ref_related
List of successors of a historical region.
93 changes: 14 additions & 79 deletions etl/steps/data/garden/regions/2023-01-01/regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,49 +90,17 @@ def run_sanity_checks(df: pd.DataFrame) -> None:
assert len(aliases_duplicated) == 0, error


def _merge_tables(
tb_definitions: Table,
tb_aliases: Table,
tb_members: Table,
tb_related: Table,
tb_legacy_codes: Table,
) -> Table:
"""Merge all regions tables into a single one. 1:n relationships are merged as JSON lists."""
tb_regions = tb_definitions.copy()

# add members as JSON
tb_regions["members"] = tb_members["member"].groupby("code").agg(lambda x: json.dumps(list(x)))

# add aliass as JSON
tb_regions["aliases"] = tb_aliases["alias"].groupby("code").agg(lambda x: json.dumps(list(x)))

# add related as JSON
tb_regions["related"] = tb_related["member"].groupby("code").agg(lambda x: json.dumps(list(x)))

# add legacy codes
tb_regions = tb_regions.join(tb_legacy_codes, how="left")

return tb_regions


def run(dest_dir: str) -> None:
#
# Load inputs.
#
#
# Load main regions data from yaml file.
with open(REGION_DEFINITIONS_FILE) as _file:
df = pd.DataFrame.from_dict(yaml.safe_load(_file))

# Load file of region codes.
# NOTE: Namibia has iso_code "NA" which would be interpreted as NaN without extra arguments.
df_codes = pd.read_csv(
REGION_CODES_FILE,
keep_default_na=False,
na_values=[
"",
],
)
df_codes = pd.read_csv(REGION_CODES_FILE, keep_default_na=False, na_values=[""])

#
# Process data.
Expand All @@ -143,63 +111,30 @@ def run(dest_dir: str) -> None:
# Run sanity checks on input data.
run_sanity_checks(df=df)

# Create an appropriate index for main dataframe and sort conveniently.
df = df.set_index(["code"], verify_integrity=True).sort_index()

# Create an appropriate index for codes dataframe and sort conveniently.
df_codes = df_codes.set_index(["code"], verify_integrity=True).sort_index()

# Create table for region definitions.
tb_definitions = Table(
df[["name", "short_name", "region_type", "is_historical", "defined_by"]], short_name="definitions"
)

# Create table for aliases.
tb_aliases = Table(
df.rename(columns={"aliases": "alias"})[["alias"]].explode("alias").dropna(how="all"), short_name="aliases"
)

# Create table for members.
tb_members = Table(
df.rename(columns={"members": "member"}).explode("member")[["member"]].dropna(how="all"), short_name="members"
)

# Create table for other possible related members.
tb_related = Table(
df.rename(columns={"related": "member"}).explode("member")[["member"]].dropna(how="all"), short_name="related"
# Create a table of legacy codes (ensuring all numeric codes are integer).
df_codes = df_codes.astype(
{code: pd.Int64Dtype() for code in ["cow_code", "imf_code", "legacy_country_id", "legacy_entity_id"]}
)

# Create table of historical transitions.
tb_transitions = Table(
df[["end_year", "successors"]]
.rename(columns={"successors": "successor"})
.explode("successor")
.dropna(how="all")
.astype({"end_year": int}),
short_name="transitions",
)
# Combine data with legacy codes.
tb_regions = Table(pd.merge(df, df_codes, on="code", how="left"), short_name="regions")

# Create a table of legacy codes (ensuring all numeric codes are integer).
tb_legacy_codes = Table(
df_codes.astype(
{code: pd.Int64Dtype() for code in ["cow_code", "imf_code", "legacy_country_id", "legacy_entity_id"]}
),
short_name="legacy_codes",
)
# Convert columns that are list of strings into jsons.
for column in ["aliases", "members", "related", "successors"]:
tb_regions[column] = tb_regions.groupby("code")[column].transform(
lambda x: json.dumps(sum(list(x), [])) if pd.notna(x.values) else x
)

# Create merged flat table with useful columns.
tb_regions = Table(
_merge_tables(tb_definitions, tb_aliases, tb_members, tb_related, tb_legacy_codes),
short_name="regions",
)
# Set an appropriate index and sort conveniently.
tb_regions = tb_regions.set_index("code", verify_integrity=True).sort_index()

#
# Save outputs.
#
# Create a new garden dataset.
ds_garden = create_dataset(
dest_dir=dest_dir,
tables=[tb_regions, tb_definitions, tb_aliases, tb_members, tb_related, tb_transitions, tb_legacy_codes],
tables=[tb_regions],
)

# Save changes in the new garden dataset.
Expand Down
Loading