Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔨 Cache indicator data from S3 #3398

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 75 additions & 34 deletions etl/grapher_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import concurrent.futures
import io
import warnings
from http.client import RemoteDisconnected
from typing import Any, Dict, List, Optional, cast
Expand All @@ -21,6 +22,7 @@
from etl.config import OWID_ENV, OWIDEnv
from etl.db import get_connection, read_sql
from etl.grapher_model import Dataset, Variable
from etl.paths import CACHE_DIR

log = structlog.get_logger()

Expand Down Expand Up @@ -263,30 +265,76 @@ def variable_data_df_from_s3(
return res


def _fetch_data_df_from_s3(variable_id: int):
def _fetch_response(method: str, url: str):
"""Helper function to perform HTTP requests with retries and centralized exception handling."""
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
retry=retry_if_exception_type((URLError, RemoteDisconnected, requests.exceptions.RequestException)),
):
with attempt:
return (
pd.read_json(config.variable_data_url(variable_id))
.rename(
columns={
"entities": "entityId",
"values": "value",
"years": "year",
}
)
.assign(variableId=variable_id)
)
# no data on S3
except HTTPError:
response = requests.request(method, url)
response.raise_for_status()
return response
except HTTPError as e:
# No data on S3
if e.response.status_code == 404:
return None
else:
raise
except (URLError, RemoteDisconnected, requests.exceptions.RequestException):
raise


def _fetch_data_df_from_s3(variable_id: int):
cache_dir = CACHE_DIR / "variable_data"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_filename = cache_dir / f"{variable_id}.json"
etag_filename = cache_dir / f"{variable_id}.etag"

url = config.variable_data_url(variable_id)

# Check if cached data exists
if cache_filename.exists() and etag_filename.exists():
# Read stored ETag
stored_etag = etag_filename.read_text()
else:
stored_etag = None

# Get current ETag from server
response = _fetch_response("HEAD", url)
if response is None:
return pd.DataFrame(columns=["variableId", "entityId", "year", "value"])
current_etag = response.headers.get("ETag")

# Compare ETags
if stored_etag and current_etag and stored_etag == current_etag:
# ETag matches, load from cache
data_df = pd.read_json(cache_filename)
else:
# Fetch new data
response = _fetch_response("GET", url)
if response is None:
return pd.DataFrame(columns=["variableId", "entityId", "year", "value"])
# Save response text to cache
cache_filename.write_text(response.text, encoding="utf-8")
# Save new ETag
if current_etag:
etag_filename.write_text(current_etag)
elif etag_filename.exists():
etag_filename.unlink()
data_df = pd.read_json(io.StringIO(response.text))

# Process DataFrame
data_df = data_df.rename(
columns={
"entities": "entityId",
"values": "value",
"years": "year",
}
).assign(variableId=variable_id)
return data_df


def add_entity_code_and_name(session: Session, df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -338,24 +386,17 @@ def variable_metadata_df_from_s3(
return results # type: ignore


def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any] | None:
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
):
with attempt:
if env is not None:
url = env.indicator_metadata_url(variable_id)
else:
url = config.variable_metadata_url(variable_id)
return requests.get(url).json()
# no data on S3
except HTTPError:
def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any]:
if env is not None:
url = env.indicator_metadata_url(variable_id)
else:
url = config.variable_metadata_url(variable_id)

response = _fetch_response("GET", url)
if response is None:
return {}
else:
return response.json()


#######################################################################################################
Expand Down
Loading