From ce8b1f96936e53ddbb94c8b6615160c339c5b8c4 Mon Sep 17 00:00:00 2001 From: Marigold Date: Fri, 11 Oct 2024 11:00:56 +0200 Subject: [PATCH 1/2] :hammer: Cache indicator data from S3 From 13194fda259e38d23a204d1e84cfc03ded7582ac Mon Sep 17 00:00:00 2001 From: Marigold Date: Fri, 11 Oct 2024 11:08:35 +0200 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=94=A8=20Cache=20indicator=20data=20f?= =?UTF-8?q?rom=20S3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl/grapher_io.py | 109 +++++++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 34 deletions(-) diff --git a/etl/grapher_io.py b/etl/grapher_io.py index 09621eaaabe..32b5ce68b17 100644 --- a/etl/grapher_io.py +++ b/etl/grapher_io.py @@ -1,4 +1,5 @@ import concurrent.futures +import io import warnings from http.client import RemoteDisconnected from typing import Any, Dict, List, Optional, cast @@ -21,6 +22,7 @@ from etl.config import OWID_ENV, OWIDEnv from etl.db import get_connection, read_sql from etl.grapher_model import Dataset, Variable +from etl.paths import CACHE_DIR log = structlog.get_logger() @@ -263,30 +265,76 @@ def variable_data_df_from_s3( return res -def _fetch_data_df_from_s3(variable_id: int): +def _fetch_response(method: str, url: str): + """Helper function to perform HTTP requests with retries and centralized exception handling.""" try: - # Cloudflare limits us to 600 requests per minute, retry in case we hit the limit - # NOTE: increase wait time or attempts if we hit the limit too often for attempt in Retrying( wait=wait_fixed(2), stop=stop_after_attempt(3), - retry=retry_if_exception_type((URLError, RemoteDisconnected)), + retry=retry_if_exception_type((URLError, RemoteDisconnected, requests.exceptions.RequestException)), ): with attempt: - return ( - pd.read_json(config.variable_data_url(variable_id)) - .rename( - columns={ - "entities": "entityId", - "values": "value", - "years": "year", - } - ) - .assign(variableId=variable_id) - ) - # no data on S3 - except HTTPError: + response = requests.request(method, url) + response.raise_for_status() + return response + except HTTPError as e: + # No data on S3 + if e.response.status_code == 404: + return None + else: + raise + except (URLError, RemoteDisconnected, requests.exceptions.RequestException): + raise + + +def _fetch_data_df_from_s3(variable_id: int): + cache_dir = CACHE_DIR / "variable_data" + cache_dir.mkdir(parents=True, exist_ok=True) + cache_filename = cache_dir / f"{variable_id}.json" + etag_filename = cache_dir / f"{variable_id}.etag" + + url = config.variable_data_url(variable_id) + + # Check if cached data exists + if cache_filename.exists() and etag_filename.exists(): + # Read stored ETag + stored_etag = etag_filename.read_text() + else: + stored_etag = None + + # Get current ETag from server + response = _fetch_response("HEAD", url) + if response is None: return pd.DataFrame(columns=["variableId", "entityId", "year", "value"]) + current_etag = response.headers.get("ETag") + + # Compare ETags + if stored_etag and current_etag and stored_etag == current_etag: + # ETag matches, load from cache + data_df = pd.read_json(cache_filename) + else: + # Fetch new data + response = _fetch_response("GET", url) + if response is None: + return pd.DataFrame(columns=["variableId", "entityId", "year", "value"]) + # Save response text to cache + cache_filename.write_text(response.text, encoding="utf-8") + # Save new ETag + if current_etag: + etag_filename.write_text(current_etag) + elif etag_filename.exists(): + etag_filename.unlink() + data_df = pd.read_json(io.StringIO(response.text)) + + # Process DataFrame + data_df = data_df.rename( + columns={ + "entities": "entityId", + "values": "value", + "years": "year", + } + ).assign(variableId=variable_id) + return data_df def add_entity_code_and_name(session: Session, df: pd.DataFrame) -> pd.DataFrame: @@ -338,24 +386,17 @@ def variable_metadata_df_from_s3( return results # type: ignore -def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any] | None: - try: - # Cloudflare limits us to 600 requests per minute, retry in case we hit the limit - # NOTE: increase wait time or attempts if we hit the limit too often - for attempt in Retrying( - wait=wait_fixed(2), - stop=stop_after_attempt(3), - retry=retry_if_exception_type((URLError, RemoteDisconnected)), - ): - with attempt: - if env is not None: - url = env.indicator_metadata_url(variable_id) - else: - url = config.variable_metadata_url(variable_id) - return requests.get(url).json() - # no data on S3 - except HTTPError: +def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any]: + if env is not None: + url = env.indicator_metadata_url(variable_id) + else: + url = config.variable_metadata_url(variable_id) + + response = _fetch_response("GET", url) + if response is None: return {} + else: + return response.json() #######################################################################################################