Skip to content

Commit

Permalink
Merge branch 'main' into curriculum_docs_update
Browse files Browse the repository at this point in the history
  • Loading branch information
shweta487 authored Nov 27, 2024
2 parents 55a5a9f + 0de0734 commit cc13683
Show file tree
Hide file tree
Showing 67 changed files with 1,728 additions and 722 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- name: Build jupyter book
run: jb build docs --warningiserror --keep-going # set doc to fail on any sphinx warning
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
if: always()
with:
name: docs-build
Expand Down
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ repos:
rev: 6.0.0
hooks:
- id: flake8
args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these)
args: ["--ignore=E501,W503,E231"] # line too long and line before binary operator (black is ok with these) and explicitly ignore the whitespace after colon error
types:
- python
# Suppress SyntaxWarning about invalid escape sequence from calitp-data-infra dependency without modifying source
entry: env PYTHONWARNINGS="ignore::SyntaxWarning" flake8
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,125 +16,88 @@ hive_options:
source_uri_prefix: "annual-database-agency-information/{dt:DATE}/{ts:TIMESTAMP}/{year:INTEGER}/"
schema_fields:
- name: number_of_state_counties
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: tam_tier
type: STRING
mode: NULLABLE
- name: personal_vehicles
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: density
type: FLOAT
mode: NULLABLE
- name: uza_name
type: STRING
mode: NULLABLE
- name: tribal_area_name
type: STRING
mode: NULLABLE
- name: service_area_sq_miles
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: total_voms
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: city
type: STRING
mode: NULLABLE
- name: fta_recipient_id
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: region
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: state_admin_funds_expended
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: zip_code_ext
type: FLOAT
mode: NULLABLE
type: STRING
- name: zip_code
type: FLOAT
mode: NULLABLE
type: STRING
- name: ueid
type: STRING
mode: NULLABLE
- name: address_line_2
type: STRING
mode: NULLABLE
- name: number_of_counties_with_service
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: reporter_acronym
type: STRING
mode: NULLABLE
- name: original_due_date
type: INTEGER
mode: NULLABLE
type: STRING
- name: sq_miles
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: address_line_1
type: STRING
mode: NULLABLE
- name: p_o__box
type: STRING
mode: NULLABLE
- name: fy_end_date
type: INTEGER
mode: NULLABLE
type: STRING
- name: reported_by_ntd_id
type: STRING
mode: NULLABLE
- name: population
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: reporting_module
type: STRING
mode: NULLABLE
- name: service_area_pop
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: subrecipient_type
type: STRING
mode: NULLABLE
- name: state
type: STRING
mode: NULLABLE
- name: volunteer_drivers
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: primary_uza
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: doing_business_as
type: STRING
mode: NULLABLE
- name: reporter_type
type: STRING
mode: NULLABLE
- name: legacy_ntd_id
type: STRING
mode: NULLABLE
- name: voms_do
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: url
type: STRING
mode: NULLABLE
- name: reported_by_name
type: STRING
mode: NULLABLE
- name: voms_pt
type: FLOAT
mode: NULLABLE
type: NUMERIC
- name: organization_type
type: STRING
mode: NULLABLE
- name: agency_name
type: STRING
mode: NULLABLE
- name: ntd_id
type: STRING
mode: NULLABLE
- name: division_department
type: STRING
- name: state_parent_ntd_id
type: STRING
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
operator: operators.ExternalTable
bucket: gs://calitp-state-geoportal-scrape
source_objects:
- "state_highway_network_geodata/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/"
destination_project_dataset_table: "external_state_geoportal.state_highway_network"
prefix_bucket: false
post_hook: |
SELECT *
FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network
LIMIT 1;
schema_fields:
- name: Route
type: INTEGER
- name: County
type: STRING
- name: District
type: INTEGER
- name: RouteType
type: STRING
- name: Direction
type: STRING
- name: wkt_coordinates
type: GEOGRAPHY
19 changes: 19 additions & 0 deletions airflow/dags/scrape_state_geoportal/METADATA.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
description: "Scrape State Highway Network from State Geoportal"
schedule_interval: "0 4 1 * *" # 4am UTC first day of every month
tags:
- all_gusty_features
default_args:
owner: airflow
depends_on_past: False
catchup: False
start_date: "2024-09-15"
email:
- "[email protected]"
email_on_failure: True
email_on_retry: False
retries: 1
retry_delay: !timedelta 'minutes: 2'
concurrency: 50
#sla: !timedelta 'hours: 2'
wait_for_defaults:
timeout: 3600
7 changes: 7 additions & 0 deletions airflow/dags/scrape_state_geoportal/state_highway_network.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
operator: operators.StateGeoportalAPIOperator

root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/'
service: "CHhighway/SHN_Lines"
layer: "0"
product: 'state_highway_network'
resultRecordCount: 2000
5 changes: 3 additions & 2 deletions airflow/dags/sync_ntd_data_xlsx/METADATA.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
description: "Scrape tables from DOT Ridership XLSX file daily"
schedule_interval: "0 10 * * *" # 10am UTC every day
description: "Scrape tables from DOT Ridership XLSX file weekly"
schedule_interval: "0 10 * * 1" # 10am UTC every Monday
tags:
- all_gusty_features
default_args:
Expand All @@ -15,5 +15,6 @@ default_args:
retry_delay: !timedelta 'minutes: 2'
concurrency: 50
#sla: !timedelta 'hours: 2'
provide_context: True
wait_for_defaults:
timeout: 3600
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
operator: operators.NtdDataProductXLSXOperator

product: 'complete_monthly_ridership_with_adjustments_and_estimates'
xlsx_file_url: 'https://www.transit.dot.gov/sites/fta.dot.gov/files/2024-11/September%202024%20Complete%20Monthly%20Ridership%20%28with%20adjustments%20and%20estimates%29_241101.xlsx'
year: 'historical'
xlsx_file_url: 'https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release' # placeholder for scraped url from scrape_ntd_ridership_url task
year: 'historical' # one of: 'historical' (long history), 'mutli-year' (select history), or a specific year (ex: 2022)
dependencies:
- scrape_ntd_ridership_xlsx_url
42 changes: 42 additions & 0 deletions airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ---
# python_callable: scrape_ntd_ridership_xlsx_url
# provide_context: true
# ---
import logging

import requests
from bs4 import BeautifulSoup
from pydantic import HttpUrl, parse_obj_as


# pushes the scraped URL value to XCom
def push_url_to_xcom(scraped_url, context):
task_instance = context["ti"]
task_instance.xcom_push(key="current_url", value=scraped_url)


# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
def href_matcher(href):
return (
href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx")
)


def scrape_ntd_ridership_xlsx_url(**context):
# page to find download URL
url = "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release"
req = requests.get(url)
soup = BeautifulSoup(req.text, "html.parser")

link = soup.find("a", href=href_matcher)

# Extract the href if the link is found
file_link = link["href"] if link else None

updated_url = f"https://www.transit.dot.gov{file_link}"

validated_url = parse_obj_as(HttpUrl, updated_url)

logging.info(f"Validated URL: {validated_url}.")

push_url_to_xcom(scraped_url=validated_url, context=context)
1 change: 1 addition & 0 deletions airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from operators.pod_operator import PodOperator
from operators.scrape_ntd_api import NtdDataProductAPIOperator
from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator
from operators.scrape_state_geoportal import StateGeoportalAPIOperator
35 changes: 27 additions & 8 deletions airflow/plugins/operators/scrape_ntd_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
CLEAN_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"]


# pulls the URL from XCom
def pull_url_from_xcom(context):
task_instance = context["ti"]
pulled_value = task_instance.xcom_pull(
task_ids="scrape_ntd_ridership_xlsx_url", key="current_url"
)
print(f"Pulled value from XCom: {pulled_value}")
return pulled_value


class NtdDataProductXLSXExtract(PartitionedGCSArtifact):
bucket: ClassVar[str]
year: str
Expand All @@ -41,6 +51,11 @@ class Config:
arbitrary_types_allowed = True

def fetch_from_ntd_xlsx(self, file_url):
# As of now, the only file that we are downloading is for complete_monthly_ridership_with_adjustments_and_estimates
# and the download link changes every time they update the date, so we have special handling for that here, which is dependent
# another dag task called scrape_ntd_ridership_xlsx_url.py. if we look to download other xlsx files from the DOT portal and they
# also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic

validated_url = parse_obj_as(HttpUrl, file_url)

logging.info(f"reading file from url {validated_url}")
Expand Down Expand Up @@ -84,6 +99,7 @@ def __init__(
product: str,
xlsx_file_url,
year: int,
*args,
**kwargs,
):
self.year = year
Expand All @@ -98,15 +114,18 @@ def __init__(
filename=f"{self.year}__{self.product}_raw.xlsx",
)

super().__init__(**kwargs)
super().__init__(*args, **kwargs)

def execute(self, **kwargs):
excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(
self.raw_excel_extract.file_url
)
logging.info(
f"file url is {self.xlsx_file_url} and file type is {type(self.xlsx_file_url)}"
)
def execute(self, context, *args, **kwargs):
download_url = self.raw_excel_extract.file_url

if self.product == "complete_monthly_ridership_with_adjustments_and_estimates":
download_url = pull_url_from_xcom(context=context)

# see what is returned
logging.info(f"reading ridership url as {download_url}")

excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url)

self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content)

Expand Down
Loading

0 comments on commit cc13683

Please sign in to comment.