From 035c9e1b1fea81304ea4b520f96650dc840662a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Wed, 10 Apr 2024 00:52:45 -0300 Subject: [PATCH] fix get_files from IBGEDATASUS --- pysus/data/local.py | 2 ++ pysus/ftp/databases/ibge_datasus.py | 56 ++++++++++++++++++++--------- pysus/online_data/IBGE.py | 12 ++++--- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/pysus/data/local.py b/pysus/data/local.py index fdd0462..5ea7476 100644 --- a/pysus/data/local.py +++ b/pysus/data/local.py @@ -81,6 +81,8 @@ def parse_data_content( if data_path.suffix.lower() in [".dbc", ".dbf", ".parquet"]: content.append(ParquetSet(str(data_path), _pbar=_pbar)) + elif data_path.suffix.lower() == ".zip": + content.append(str(data_path)) else: continue diff --git a/pysus/ftp/databases/ibge_datasus.py b/pysus/ftp/databases/ibge_datasus.py index b6a777d..5e131f1 100644 --- a/pysus/ftp/databases/ibge_datasus.py +++ b/pysus/ftp/databases/ibge_datasus.py @@ -1,4 +1,5 @@ -from typing import Optional, List, Union +from typing import Optional, List, Union, Literal +from loguru import logger from pysus.ftp import Database, Directory, File from pysus.ftp.utils import zfill_year, to_list @@ -46,24 +47,47 @@ def describe(self, file: File) -> dict: return description return {} - def format(self, file: File) -> str: - return file.name[-2:] + def format(self, file: File) -> tuple: + return file.name[-2:], def get_files( - self, - year: Optional[Union[str, int, list]] = None, + self, + source: Literal["POP", "censo", "POPTCU", "projpop"] = "POPTCU", + year: Optional[Union[str, int, list]] = None, + *args, **kwargs ) -> List[File]: - files = [f for f in self.files if f.extension.upper( - ) in [".ZIP", ".DBF"] and self.describe(f)["year"] == year] - # files = list(filter( - # lambda f: f.extension.upper() in [".ZIP"], self.files - # )) + source_dir = None - if year or str(year) in ["0", "00"]: - years = ( - [zfill_year(str(y)[-4:]) for y in to_list(year)] - ) - files = list(filter(lambda f: zfill_year( - self.format(f)) in years, files)) + for dir in self.paths: + if ( + source in ["POP", "censo", "POPTCU", "projpop"] + and source in dir.path + ): + source_dir = dir + + if not source_dir: + raise ValueError(f"Unkown source {source}") + + files = source_dir.content + + if source in ["POPTCU", "censo", "POP"]: + if year: + if isinstance(year, (str, int)): + files = [ + f for f in files if + self.describe(f)["year"] == zfill_year(year) + ] + elif isinstance(year, list): + files = [ + f for f in files + if str(self.describe(f)["year"]) + in [str(zfill_year(y)) for y in year] + ] + else: + if year: + logger.warning( + f"{source} files are not arranged in years, " + "returning all files for source" + ) return files diff --git a/pysus/online_data/IBGE.py b/pysus/online_data/IBGE.py index 02b82d2..e14bb04 100644 --- a/pysus/online_data/IBGE.py +++ b/pysus/online_data/IBGE.py @@ -1,6 +1,7 @@ """ Helper functions to download official statistics from IBGE SIDRA """ +from typing import Literal import ssl # Builtin import urllib3 @@ -291,7 +292,10 @@ def get_legacy_session(): return session -def get_population(year, source='POPTCU'): +def get_population( + year, + source: Literal["POP", "censo", "POPTCU", "projpop"] = "POPTCU", +): """ Get population data from IBGE as shared by DATASUS :param year: year of the data @@ -299,7 +303,5 @@ def get_population(year, source='POPTCU'): :return: DataFrame with population data """ ibgedatasus = IBGEDATASUS().load() - files = [ - f for f in ibgedatasus.get_files(year=year) if f.path.split('/')[-2] == source - ] - return files + files = ibgedatasus.get_files(year=year, source=source) + raise NotImplemented("TODO")