Skip to content

Commit

Permalink
dag to scrape and save the current ridership URL from the NTD portal (#…
Browse files Browse the repository at this point in the history
…3545)

* dag to scrape the current ridership URL from the NTD portal

* fix naming and add some descriptions

* reconfigured airflow dag setup for dependencies and special handling

* test storing variables in xcoms

* cleaned up imports

* rebase

* remove and reorganize some lingering and unnecessary code and test

* linter not working

* refactor lambda for flake8

* flake8 config change

* flake8 config change again

* create function of url finder

* add comment for flake8 suppression

* accidentally pushed copy file

* suppress whitespace after colon error

* last pass at configuration changes

* suppress whitespace after colon error

* remove testing comments, clean up changed files
  • Loading branch information
charlie-costanzo authored Nov 26, 2024
1 parent 4f07ab5 commit 4a63342
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 11 deletions.
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ repos:
rev: 6.0.0
hooks:
- id: flake8
args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these)
args: ["--ignore=E501,W503,E231"] # line too long and line before binary operator (black is ok with these) and explicitly ignore the whitespace after colon error
types:
- python
# Suppress SyntaxWarning about invalid escape sequence from calitp-data-infra dependency without modifying source
entry: env PYTHONWARNINGS="ignore::SyntaxWarning" flake8
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
Expand Down
1 change: 1 addition & 0 deletions airflow/dags/sync_ntd_data_xlsx/METADATA.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ default_args:
retry_delay: !timedelta 'minutes: 2'
concurrency: 50
#sla: !timedelta 'hours: 2'
provide_context: True
wait_for_defaults:
timeout: 3600
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
operator: operators.NtdDataProductXLSXOperator

product: 'complete_monthly_ridership_with_adjustments_and_estimates'
xlsx_file_url: 'https://www.transit.dot.gov/sites/fta.dot.gov/files/2024-11/September%202024%20Complete%20Monthly%20Ridership%20%28with%20adjustments%20and%20estimates%29_241101.xlsx'
year: 'historical'
xlsx_file_url: 'https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release' # placeholder for scraped url from scrape_ntd_ridership_url task
year: 'historical' # one of: 'historical' (long history), 'mutli-year' (select history), or a specific year (ex: 2022)
dependencies:
- scrape_ntd_ridership_xlsx_url
42 changes: 42 additions & 0 deletions airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ---
# python_callable: scrape_ntd_ridership_xlsx_url
# provide_context: true
# ---
import logging

import requests
from bs4 import BeautifulSoup
from pydantic import HttpUrl, parse_obj_as


# pushes the scraped URL value to XCom
def push_url_to_xcom(scraped_url, context):
task_instance = context["ti"]
task_instance.xcom_push(key="current_url", value=scraped_url)


# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
def href_matcher(href):
return (
href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx")
)


def scrape_ntd_ridership_xlsx_url(**context):
# page to find download URL
url = "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release"
req = requests.get(url)
soup = BeautifulSoup(req.text, "html.parser")

link = soup.find("a", href=href_matcher)

# Extract the href if the link is found
file_link = link["href"] if link else None

updated_url = f"https://www.transit.dot.gov{file_link}"

validated_url = parse_obj_as(HttpUrl, updated_url)

logging.info(f"Validated URL: {validated_url}.")

push_url_to_xcom(scraped_url=validated_url, context=context)
35 changes: 27 additions & 8 deletions airflow/plugins/operators/scrape_ntd_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
CLEAN_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"]


# pulls the URL from XCom
def pull_url_from_xcom(context):
task_instance = context["ti"]
pulled_value = task_instance.xcom_pull(
task_ids="scrape_ntd_ridership_xlsx_url", key="current_url"
)
print(f"Pulled value from XCom: {pulled_value}")
return pulled_value


class NtdDataProductXLSXExtract(PartitionedGCSArtifact):
bucket: ClassVar[str]
year: str
Expand All @@ -41,6 +51,11 @@ class Config:
arbitrary_types_allowed = True

def fetch_from_ntd_xlsx(self, file_url):
# As of now, the only file that we are downloading is for complete_monthly_ridership_with_adjustments_and_estimates
# and the download link changes every time they update the date, so we have special handling for that here, which is dependent
# another dag task called scrape_ntd_ridership_xlsx_url.py. if we look to download other xlsx files from the DOT portal and they
# also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic

validated_url = parse_obj_as(HttpUrl, file_url)

logging.info(f"reading file from url {validated_url}")
Expand Down Expand Up @@ -84,6 +99,7 @@ def __init__(
product: str,
xlsx_file_url,
year: int,
*args,
**kwargs,
):
self.year = year
Expand All @@ -98,15 +114,18 @@ def __init__(
filename=f"{self.year}__{self.product}_raw.xlsx",
)

super().__init__(**kwargs)
super().__init__(*args, **kwargs)

def execute(self, **kwargs):
excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(
self.raw_excel_extract.file_url
)
logging.info(
f"file url is {self.xlsx_file_url} and file type is {type(self.xlsx_file_url)}"
)
def execute(self, context, *args, **kwargs):
download_url = self.raw_excel_extract.file_url

if self.product == "complete_monthly_ridership_with_adjustments_and_estimates":
download_url = pull_url_from_xcom(context=context)

# see what is returned
logging.info(f"reading ridership url as {download_url}")

excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url)

self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content)

Expand Down

0 comments on commit 4a63342

Please sign in to comment.