diff --git a/pysus/ftp/databases/ibge_datasus.py b/pysus/ftp/databases/ibge_datasus.py index 5e131f1..e21edfb 100644 --- a/pysus/ftp/databases/ibge_datasus.py +++ b/pysus/ftp/databases/ibge_datasus.py @@ -56,38 +56,29 @@ def get_files( year: Optional[Union[str, int, list]] = None, *args, **kwargs ) -> List[File]: + sources = ["POP", "censo", "POPTCU", "projpop"] source_dir = None for dir in self.paths: - if ( - source in ["POP", "censo", "POPTCU", "projpop"] - and source in dir.path - ): + if source in sources and source in dir.path: source_dir = dir if not source_dir: - raise ValueError(f"Unkown source {source}") + raise ValueError(f"Unkown source {source}. Options: {sources}") files = source_dir.content - if source in ["POPTCU", "censo", "POP"]: - if year: - if isinstance(year, (str, int)): - files = [ - f for f in files if - self.describe(f)["year"] == zfill_year(year) - ] - elif isinstance(year, list): - files = [ - f for f in files - if str(self.describe(f)["year"]) - in [str(zfill_year(y)) for y in year] - ] - else: - if year: - logger.warning( - f"{source} files are not arranged in years, " - "returning all files for source" - ) + if year: + if isinstance(year, (str, int)): + files = [ + f for f in files if + self.describe(f)["year"] == zfill_year(year) + ] + elif isinstance(year, list): + files = [ + f for f in files + if str(self.describe(f)["year"]) + in [str(zfill_year(y)) for y in year] + ] return files diff --git a/pysus/online_data/IBGE.py b/pysus/online_data/IBGE.py index e14bb04..014abcc 100644 --- a/pysus/online_data/IBGE.py +++ b/pysus/online_data/IBGE.py @@ -1,13 +1,17 @@ """ Helper functions to download official statistics from IBGE SIDRA """ -from typing import Literal +from typing import Literal, Optional +from pathlib import Path +from zipfile import ZipFile +from tempfile import TemporaryDirectory import ssl # Builtin import urllib3 import requests import pandas as pd +from pysus.data.local import ParquetSet from pysus.ftp.databases.ibge_datasus import IBGEDATASUS # requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1' @@ -16,6 +20,8 @@ APIBASE = 'https://servicodados.ibge.gov.br/api/v3/' +ibge = IBGEDATASUS().load() + def get_sidra_table( table_id, @@ -268,7 +274,7 @@ def to_dataframe(self): """ -class CustomHttpAdapter(requests.adapters.HTTPAdapter): +class CustomHttpAdapter(requests.sessions.HTTPAdapter): # "Transport adapter" that allows us to use custom ssl_context. def __init__(self, ssl_context=None, **kwargs): @@ -293,15 +299,67 @@ def get_legacy_session(): def get_population( - year, + year: int, source: Literal["POP", "censo", "POPTCU", "projpop"] = "POPTCU", -): + censo_data: Literal["ALF", "ESCA", "ESCB", "IDOSO", "RENDA"] = "ALF" +) -> pd.DataFrame: """ Get population data from IBGE as shared by DATASUS :param year: year of the data - :param source: 'POPTCU'|'POP'|'censo'|'projpop' + :param source: + "POP" - 1992-presente: Estimativas populacionais estratificadas por + idade e sexo. + "censo" - 1991, 2000 e 2010: Censos Demográficos + "POPTCU" - 1992-presente: Estimativas populacionais enviadas para o TCU, + estratificadas por idade e sexo pelo MS/SGEP/Datasus. + "projpop": Estimativas preliminares para os anos intercensitários dos + totais populacionais, estratificadas por idade e sexo pelo + MS/SGEP/Datasus. + :param censo_data: + "ALF": Censo Demográfico + "ESCA": Censo Escolar da Educação Básica + "ESCB": Censo Escolar da Educação Superior + "IDOSO": População de pessoas com 65 anos ou mais + "RENDA": População de pessoas de acordo com a renda familiar :return: DataFrame with population data """ - ibgedatasus = IBGEDATASUS().load() - files = ibgedatasus.get_files(year=year, source=source) - raise NotImplemented("TODO") + + files = ibge.get_files(year=int(year), source=source) + + if files == []: + return pd.DataFrame() + + if source == "censo": + opts = ["ALF", "ESCA", "ESCB", "IDOSO", "RENDA"] + if not censo_data or censo_data not in opts: + raise ValueError( + f"Incorrect `censo_data` parameter. Options: {opts}" + ) + file = [f for f in files if censo_data in f.name][0].download() + else: + file = files[0].download() + + if isinstance(file, ParquetSet): + return file.to_dataframe() + + file = Path(str(file)) + + if file.suffix.lower() == ".zip": + return _unzip_to_dataframe(str(file)) + else: + raise NotImplementedError(f"Unkown file type '{file.suffix}'") + + +def _unzip_to_dataframe(file: str) -> pd.DataFrame: + zip_file = ZipFile(file) # pyright: ignore + with TemporaryDirectory() as tempdir: + for file in zip_file.namelist(): + if file.lower().endswith(".csv"): + return pd.read_csv(zip_file.extract(file, tempdir)) + + if file.lower().endswith((".dbf", ".dbc")): + return ParquetSet( + zip_file.extract(file, tempdir) + ).to_dataframe() + + raise ValueError(f"No data found in {zip_file}") diff --git a/pysus/tests/test_ibge.py b/pysus/tests/test_ibge.py index 778fe4d..1f16aa9 100644 --- a/pysus/tests/test_ibge.py +++ b/pysus/tests/test_ibge.py @@ -46,12 +46,12 @@ def test_FetchData(self): @pytest.mark.timeout(120) def test_get_population(self): - l = IBGE.get_population(2021) - self.assertEqual(l[0].name, 'POPTBR21') - self.assertGreater(len(l), 0) - l = IBGE.get_population(2012, source='projpop') - self.assertEqual(l[0].name, 'projbr12') - self.assertGreater(len(l), 0) + l = IBGE.get_population(year=2021, source="POP") + self.assertEqual(type(l), pd.DataFrame) + self.assertEqual(len(l), 5570) + l = IBGE.get_population(year=2012, source='projpop') + self.assertEqual(type(l), pd.DataFrame) + self.assertEqual(len(l), 182) if __name__ == '__main__':