diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index 09ae0976b6..323fc12e43 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -29,7 +29,7 @@ jobs: - name: Build jupyter book run: jb build docs --warningiserror --keep-going # set doc to fail on any sphinx warning - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v3 if: always() with: name: docs-build diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78df7f10af..0f497dc163 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,9 +15,11 @@ repos: rev: 6.0.0 hooks: - id: flake8 - args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these) + args: ["--ignore=E501,W503,E231"] # line too long and line before binary operator (black is ok with these) and explicitly ignore the whitespace after colon error types: - python + # Suppress SyntaxWarning about invalid escape sequence from calitp-data-infra dependency without modifying source + entry: env PYTHONWARNINGS="ignore::SyntaxWarning" flake8 - repo: https://github.com/psf/black rev: 23.1.0 hooks: diff --git a/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml b/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml index 7aaf092756..e4054fc24c 100644 --- a/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml +++ b/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml @@ -16,125 +16,88 @@ hive_options: source_uri_prefix: "annual-database-agency-information/{dt:DATE}/{ts:TIMESTAMP}/{year:INTEGER}/" schema_fields: - name: number_of_state_counties - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: tam_tier type: STRING - mode: NULLABLE - name: personal_vehicles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: density type: FLOAT - mode: NULLABLE - name: uza_name type: STRING - mode: NULLABLE - name: tribal_area_name type: STRING - mode: NULLABLE - name: service_area_sq_miles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: total_voms - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: city type: STRING - mode: NULLABLE - name: fta_recipient_id - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: region - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: state_admin_funds_expended - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: zip_code_ext - type: FLOAT - mode: NULLABLE + type: STRING - name: zip_code - type: FLOAT - mode: NULLABLE + type: STRING - name: ueid type: STRING - mode: NULLABLE - name: address_line_2 type: STRING - mode: NULLABLE - name: number_of_counties_with_service - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: reporter_acronym type: STRING - mode: NULLABLE - name: original_due_date - type: INTEGER - mode: NULLABLE + type: STRING - name: sq_miles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: address_line_1 type: STRING - mode: NULLABLE - name: p_o__box type: STRING - mode: NULLABLE - name: fy_end_date - type: INTEGER - mode: NULLABLE + type: STRING - name: reported_by_ntd_id type: STRING - mode: NULLABLE - name: population - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: reporting_module type: STRING - mode: NULLABLE - name: service_area_pop - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: subrecipient_type type: STRING - mode: NULLABLE - name: state type: STRING - mode: NULLABLE - name: volunteer_drivers - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: primary_uza - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: doing_business_as type: STRING - mode: NULLABLE - name: reporter_type type: STRING - mode: NULLABLE - name: legacy_ntd_id type: STRING - mode: NULLABLE - name: voms_do - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: url type: STRING - mode: NULLABLE - name: reported_by_name type: STRING - mode: NULLABLE - name: voms_pt - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: organization_type type: STRING - mode: NULLABLE - name: agency_name type: STRING - mode: NULLABLE - name: ntd_id type: STRING - mode: NULLABLE + - name: division_department + type: STRING + - name: state_parent_ntd_id + type: STRING diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..9dd396fcdf --- /dev/null +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -0,0 +1,29 @@ +operator: operators.ExternalTable +bucket: gs://calitp-state-geoportal-scrape +source_objects: + - "state_highway_network_geodata/*.jsonl.gz" +source_format: NEWLINE_DELIMITED_JSON +use_bq_client: true +hive_options: + mode: CUSTOM + require_partition_filter: false + source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/" +destination_project_dataset_table: "external_state_geoportal.state_highway_network" +prefix_bucket: false +post_hook: | + SELECT * + FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network + LIMIT 1; +schema_fields: + - name: Route + type: INTEGER + - name: County + type: STRING + - name: District + type: INTEGER + - name: RouteType + type: STRING + - name: Direction + type: STRING + - name: wkt_coordinates + type: GEOGRAPHY diff --git a/airflow/dags/scrape_state_geoportal/METADATA.yml b/airflow/dags/scrape_state_geoportal/METADATA.yml new file mode 100644 index 0000000000..95ec8d3742 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/METADATA.yml @@ -0,0 +1,19 @@ +description: "Scrape State Highway Network from State Geoportal" +schedule_interval: "0 4 1 * *" # 4am UTC first day of every month +tags: + - all_gusty_features +default_args: + owner: airflow + depends_on_past: False + catchup: False + start_date: "2024-09-15" + email: + - "hello@calitp.org" + email_on_failure: True + email_on_retry: False + retries: 1 + retry_delay: !timedelta 'minutes: 2' + concurrency: 50 + #sla: !timedelta 'hours: 2' +wait_for_defaults: + timeout: 3600 diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..d5be284196 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -0,0 +1,7 @@ +operator: operators.StateGeoportalAPIOperator + +root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' +service: "CHhighway/SHN_Lines" +layer: "0" +product: 'state_highway_network' +resultRecordCount: 2000 diff --git a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml index b25e179385..b6de58e9b7 100644 --- a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml +++ b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml @@ -1,5 +1,5 @@ -description: "Scrape tables from DOT Ridership XLSX file daily" -schedule_interval: "0 10 * * *" # 10am UTC every day +description: "Scrape tables from DOT Ridership XLSX file weekly" +schedule_interval: "0 10 * * 1" # 10am UTC every Monday tags: - all_gusty_features default_args: @@ -15,5 +15,6 @@ default_args: retry_delay: !timedelta 'minutes: 2' concurrency: 50 #sla: !timedelta 'hours: 2' + provide_context: True wait_for_defaults: timeout: 3600 diff --git a/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml b/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml index 6099b20ca3..042891b0d8 100644 --- a/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml +++ b/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml @@ -1,5 +1,7 @@ operator: operators.NtdDataProductXLSXOperator product: 'complete_monthly_ridership_with_adjustments_and_estimates' -xlsx_file_url: 'https://www.transit.dot.gov/sites/fta.dot.gov/files/2024-11/September%202024%20Complete%20Monthly%20Ridership%20%28with%20adjustments%20and%20estimates%29_241101.xlsx' -year: 'historical' +xlsx_file_url: 'https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release' # placeholder for scraped url from scrape_ntd_ridership_url task +year: 'historical' # one of: 'historical' (long history), 'mutli-year' (select history), or a specific year (ex: 2022) +dependencies: + - scrape_ntd_ridership_xlsx_url diff --git a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py new file mode 100644 index 0000000000..95b4d1e486 --- /dev/null +++ b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py @@ -0,0 +1,42 @@ +# --- +# python_callable: scrape_ntd_ridership_xlsx_url +# provide_context: true +# --- +import logging + +import requests +from bs4 import BeautifulSoup +from pydantic import HttpUrl, parse_obj_as + + +# pushes the scraped URL value to XCom +def push_url_to_xcom(scraped_url, context): + task_instance = context["ti"] + task_instance.xcom_push(key="current_url", value=scraped_url) + + +# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/' +def href_matcher(href): + return ( + href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx") + ) + + +def scrape_ntd_ridership_xlsx_url(**context): + # page to find download URL + url = "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release" + req = requests.get(url) + soup = BeautifulSoup(req.text, "html.parser") + + link = soup.find("a", href=href_matcher) + + # Extract the href if the link is found + file_link = link["href"] if link else None + + updated_url = f"https://www.transit.dot.gov{file_link}" + + validated_url = parse_obj_as(HttpUrl, updated_url) + + logging.info(f"Validated URL: {validated_url}.") + + push_url_to_xcom(scraped_url=validated_url, context=context) diff --git a/airflow/plugins/operators/__init__.py b/airflow/plugins/operators/__init__.py index 209be114fd..78a7178186 100644 --- a/airflow/plugins/operators/__init__.py +++ b/airflow/plugins/operators/__init__.py @@ -9,3 +9,4 @@ from operators.pod_operator import PodOperator from operators.scrape_ntd_api import NtdDataProductAPIOperator from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator +from operators.scrape_state_geoportal import StateGeoportalAPIOperator diff --git a/airflow/plugins/operators/scrape_ntd_xlsx.py b/airflow/plugins/operators/scrape_ntd_xlsx.py index ead796952b..ecdd2052ba 100644 --- a/airflow/plugins/operators/scrape_ntd_xlsx.py +++ b/airflow/plugins/operators/scrape_ntd_xlsx.py @@ -20,6 +20,16 @@ CLEAN_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"] +# pulls the URL from XCom +def pull_url_from_xcom(context): + task_instance = context["ti"] + pulled_value = task_instance.xcom_pull( + task_ids="scrape_ntd_ridership_xlsx_url", key="current_url" + ) + print(f"Pulled value from XCom: {pulled_value}") + return pulled_value + + class NtdDataProductXLSXExtract(PartitionedGCSArtifact): bucket: ClassVar[str] year: str @@ -41,6 +51,11 @@ class Config: arbitrary_types_allowed = True def fetch_from_ntd_xlsx(self, file_url): + # As of now, the only file that we are downloading is for complete_monthly_ridership_with_adjustments_and_estimates + # and the download link changes every time they update the date, so we have special handling for that here, which is dependent + # another dag task called scrape_ntd_ridership_xlsx_url.py. if we look to download other xlsx files from the DOT portal and they + # also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic + validated_url = parse_obj_as(HttpUrl, file_url) logging.info(f"reading file from url {validated_url}") @@ -84,6 +99,7 @@ def __init__( product: str, xlsx_file_url, year: int, + *args, **kwargs, ): self.year = year @@ -98,15 +114,18 @@ def __init__( filename=f"{self.year}__{self.product}_raw.xlsx", ) - super().__init__(**kwargs) + super().__init__(*args, **kwargs) - def execute(self, **kwargs): - excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx( - self.raw_excel_extract.file_url - ) - logging.info( - f"file url is {self.xlsx_file_url} and file type is {type(self.xlsx_file_url)}" - ) + def execute(self, context, *args, **kwargs): + download_url = self.raw_excel_extract.file_url + + if self.product == "complete_monthly_ridership_with_adjustments_and_estimates": + download_url = pull_url_from_xcom(context=context) + + # see what is returned + logging.info(f"reading ridership url as {download_url}") + + excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py new file mode 100644 index 0000000000..a538df5495 --- /dev/null +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -0,0 +1,217 @@ +import gzip +import logging +import os +from typing import ClassVar, List + +import pandas as pd # type: ignore +import pendulum +import requests +from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore +from pydantic import HttpUrl, parse_obj_as + +from airflow.models import BaseOperator # type: ignore + +API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] + + +class StateGeoportalAPIExtract(PartitionedGCSArtifact): + bucket: ClassVar[str] + execution_ts: pendulum.DateTime = pendulum.now() + dt: pendulum.Date = execution_ts.date() + partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + + # The name to be used in the data warehouse to refer to the data + # product. + product: str + + # The root of the ArcGIS services. As of Nov 2024, this should + # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". + root_url: str + + # The name of the service being requested. In the feature service's + # URL, this will be everything between the root and "/FeatureServer". + # Don't include a leading or trailing slash. + service: str + + # The layer to query. This will usually be "0", so that is the + # default. + layer: str = "0" + + # The query filter. By default, all rows will be returned from the + # service. Refer to the ArcGIS documentation for syntax: + # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + where: str = "1=1" + + # A comma-separated list of fields to include in the results. Use + # "*" (the default) to include all fields. + outFields: str = "*" + + # The number of records to request for each API call (the operator + # will request all data from the layer in batches of this size). + resultRecordCount: int + + @property + def table(self) -> str: + return self.product + + @property + def filename(self) -> str: + return self.table + + class Config: + arbitrary_types_allowed = True + + def fetch_from_state_geoportal(self): + """ """ + + logging.info(f"Downloading state geoportal data for {self.product}.") + + try: + # Set up the parameters for the request + url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" + validated_url = parse_obj_as(HttpUrl, url) + + params = { + "where": self.where, + "outFields": self.outFields, + "f": "geojson", + "resultRecordCount": self.resultRecordCount, + } + + all_features = [] # To store all retrieved rows + offset = 0 + + while True: + # Update the resultOffset for each request + params["resultOffset"] = offset + + # Make the request + response = requests.get(validated_url, params=params) + response.raise_for_status() + data = response.json() + + # Break the loop if there are no more features + if "features" not in data or not data["features"]: + break + + # Append the retrieved features + all_features.extend(data["features"]) + + # Increment the offset + offset += params["resultRecordCount"] + + if all_features is None or len(all_features) == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + + pass + else: + logging.info( + f"Downloaded {self.product} data with {len(all_features)} rows!" + ) + + return all_features + + except requests.exceptions.RequestException as e: + logging.info(f"An error occurred: {e}") + + raise + + +# # Function to convert coordinates to WKT format +def to_wkt(geometry_type, coordinates): + if geometry_type == "LineString": + # Format as a LineString + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + # Format as a MultiLineString + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + return None + + +class JSONExtract(StateGeoportalAPIExtract): + bucket = API_BUCKET + + +class StateGeoportalAPIOperator(BaseOperator): + template_fields = ( + "product", + "root_url", + "service", + "layer", + "resultRecordCount", + ) + + def __init__( + self, + product, + root_url, + service, + layer, + resultRecordCount, + **kwargs, + ): + self.product = product + self.root_url = root_url + self.service = service + self.layer = layer + self.resultRecordCount = resultRecordCount + + """An operator that extracts and saves JSON data from the State Geoportal + and saves it as one JSONL file, hive-partitioned by date in Google Cloud + """ + + # Save JSONL files to the bucket + self.extract = JSONExtract( + root_url=self.root_url, + service=self.service, + product=f"{self.product}_geodata", + layer=self.layer, + resultRecordCount=self.resultRecordCount, + filename=f"{self.product}_geodata.jsonl.gz", + ) + + super().__init__(**kwargs) + + def execute(self, **kwargs): + api_content = self.extract.fetch_from_state_geoportal() + + df = pd.json_normalize(api_content) + + if self.product == "state_highway_network": + # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) + + # Create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) + + # Compress the DataFrame content and save it + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) diff --git a/airflow/requirements.txt b/airflow/requirements.txt index 769ccfcaa0..5d99a6f713 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -7,3 +7,4 @@ sentry-sdk==1.17.0 platformdirs<3,>=2.5 boto3==1.26.87 openpyxl==3.1.5 +beautifulsoup4==4.12.3 diff --git a/docs/warehouse/warehouse_starter_kit.md b/docs/warehouse/warehouse_starter_kit.md index 893b089c77..6ac226eb0f 100644 --- a/docs/warehouse/warehouse_starter_kit.md +++ b/docs/warehouse/warehouse_starter_kit.md @@ -65,7 +65,7 @@ For a given day: ### Other -- [dim_annual_ntd_agency_information](https://dbt-docs.calitp.org/#!/model/model.calitp_warehouse.dim_annual_database_agency_information) +- [dim_annual_agency_information](https://dbt-docs.calitp.org/#!/model/model.calitp_warehouse.dim_annual_database_agency_information) - View some of the data produced by the [US Department of Transportation](https://www.transit.dot.gov/ntd) for the National Transit Database. - Information from 2018-2021 are available. diff --git a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py index 3c272b5785..e4b792540e 100644 --- a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py @@ -10,30 +10,22 @@ import json import os import subprocess -import sys import tempfile import traceback from collections import defaultdict from concurrent.futures import Future, ThreadPoolExecutor from enum import Enum from functools import lru_cache -from pathlib import Path -from typing import Any, ClassVar, Dict, List, Optional, Sequence, Tuple, Union +from itertools import islice +from typing import Any, ClassVar, Dict, List, Optional, Tuple -import backoff # type: ignore import gcsfs # type: ignore import pendulum import sentry_sdk import typer -from aiohttp.client_exceptions import ( - ClientOSError, - ClientResponseError, - ServerDisconnectedError, -) from calitp_data_infra.storage import ( # type: ignore JSONL_GZIP_EXTENSION, GTFSDownloadConfig, - GTFSFeedExtract, GTFSFeedType, GTFSRTFeedExtract, GTFSScheduleFeedExtract, @@ -48,14 +40,8 @@ from google.protobuf.message import DecodeError from google.transit import gtfs_realtime_pb2 # type: ignore from pydantic import BaseModel, Field, validator -from tqdm import tqdm - -RT_VALIDATOR_JAR_LOCATION_ENV_KEY = "GTFS_RT_VALIDATOR_JAR" -JAR_DEFAULT = typer.Option( - os.environ.get(RT_VALIDATOR_JAR_LOCATION_ENV_KEY), - help="Path to the GTFS RT Validator JAR", -) +JAR_DEFAULT = os.environ["GTFS_RT_VALIDATOR_JAR"] RT_PARSED_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_PARSED"] RT_VALIDATION_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_VALIDATION"] GTFS_RT_VALIDATOR_VERSION = os.environ["GTFS_RT_VALIDATOR_VERSION"] @@ -67,27 +53,23 @@ sentry_sdk.init() -def make_dict_bq_safe(d: Dict[str, Any]) -> Dict[str, Any]: - return { - make_name_bq_safe(key): make_dict_bq_safe(value) - if isinstance(value, dict) - else value - for key, value in d.items() - } +class MissingMetadata(Exception): + pass -def make_pydantic_model_bq_safe(model: BaseModel) -> Dict[str, Any]: - """ - This is ugly but I think it's the best option until https://github.com/pydantic/pydantic/issues/1409 - """ - return make_dict_bq_safe(json.loads(model.json())) +class InvalidMetadata(Exception): + pass -class MissingMetadata(Exception): +class NoScheduleDataSpecified(Exception): pass -class InvalidMetadata(Exception): +class ScheduleDataNotFound(Exception): + pass + + +class NoValidatorResults(Exception): pass @@ -107,72 +89,6 @@ class RTValidationMetadata(BaseModel): gtfs_validator_version: str -def log(*args, err=False, fg=None, pbar=None, **kwargs): - # capture fg so we don't pass it to pbar - if pbar: - pbar.write(*args, **kwargs, file=sys.stderr if err else None) - else: - typer.secho(*args, err=err, fg=fg, **kwargs) - - -def upload_if_records( - fs, - tmp_dir: str, - artifact: PartitionedGCSArtifact, - records: Sequence[Union[Dict, BaseModel]], - pbar=None, -): - # BigQuery fails when trying to parse empty files, so shouldn't write them - if not records: - log( - f"WARNING: no records found for {artifact.path}, skipping upload", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - return - - log( - f"writing {len(records)} lines to {artifact.path}", - pbar=pbar, - ) - with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=tmp_dir) as f: - gzipfile = gzip.GzipFile(mode="wb", fileobj=f) - encoded = ( - r.json() if isinstance(r, BaseModel) else json.dumps(r) for r in records - ) - gzipfile.write("\n".join(encoded).encode("utf-8")) - gzipfile.close() - - put_with_retry(fs, f.name, artifact.path) - - -class NoScheduleDataSpecified(Exception): - pass - - -class ScheduleDataNotFound(Exception): - pass - - -@lru_cache -def get_schedule_extracts_for_day( - dt: pendulum.Date, -) -> Dict[str, GTFSScheduleFeedExtract]: - extracts: List[GTFSScheduleFeedExtract] - extracts, missing, invalid = fetch_all_in_partition( - cls=GTFSScheduleFeedExtract, - partitions={ - "dt": dt, - }, - ) - - # Explicitly put extracts in timestamp order so dict construction below sets - # values to the most recent extract for a given base64_url - extracts.sort(key=lambda extract: extract.ts) - - return {extract.base64_url: extract for extract in extracts} - - class RTHourlyAggregation(PartitionedGCSArtifact): partition_names: ClassVar[List[str]] = ["dt", "hour", "base64_url"] step: RTProcessingStep @@ -299,440 +215,609 @@ def dt(self) -> pendulum.Date: return self.hour.date() -def save_job_result(fs: gcsfs.GCSFileSystem, result: GTFSRTJobResult): - typer.secho( - f"saving {len(result.outcomes)} outcomes to {result.path}", - fg=typer.colors.GREEN, - ) - # TODO: I dislike having to exclude the records here - # I need to figure out the best way to have a single type represent the "metadata" of - # the content as well as the content itself - result.save_content( - fs=fs, - content="\n".join( - (json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes) - ).encode(), - exclude={"outcomes"}, - ) +class RtValidator: + def __init__(self, jar_path: str = JAR_DEFAULT): + self.jar_path = jar_path + + def execute(self, gtfs_file: str, rt_path: str): + typer.secho(f"validating {rt_path} with {gtfs_file}", fg=typer.colors.MAGENTA) + args = [ + "java", + "-jar", + str(self.jar_path), + "-gtfs", + gtfs_file, + "-gtfsRealtimePath", + rt_path, + "-sort", + "name", + ] + typer.secho(f"executing rt_validator: {' '.join(args)}") + subprocess.run( + args, + capture_output=True, + check=True, + ) -def fatal_code(e): - return isinstance(e, ClientResponseError) and e.status == 404 +class DailyScheduleExtracts: + def __init__(self, extracts: Dict[str, GTFSScheduleFeedExtract]): + self.extracts = extracts -@backoff.on_exception( - backoff.expo, - exception=(ClientOSError, ClientResponseError, ServerDisconnectedError), - max_tries=3, - giveup=fatal_code, -) -def get_with_retry(fs, *args, **kwargs): - return fs.get(*args, **kwargs) + def get_url_schedule(self, base64_url: str) -> GTFSScheduleFeedExtract: + return self.extracts[base64_url] -@backoff.on_exception( - backoff.expo, - exception=(ClientOSError, ClientResponseError, ServerDisconnectedError), - max_tries=3, -) -def put_with_retry(fs, *args, **kwargs): - return fs.put(*args, **kwargs) - - -def download_gtfs_schedule_zip( - fs, - schedule_extract: GTFSFeedExtract, - dst_dir: str, - pbar=None, -) -> str: - # fetch and zip gtfs schedule - full_dst_path = "/".join([dst_dir, schedule_extract.filename]) - log( - f"Fetching gtfs schedule data from {schedule_extract.path} to {full_dst_path}", - pbar=pbar, - ) - get_with_retry(fs, schedule_extract.path, full_dst_path) +class ScheduleStorage: + @lru_cache + def get_day(self, dt: pendulum.Date) -> DailyScheduleExtracts: + extracts, _, _ = fetch_all_in_partition( + cls=GTFSScheduleFeedExtract, + partitions={"dt": dt}, + verbose=True, + ) + # Explicitly put extracts in timestamp order so dict construction below sets + # values to the most recent extract for a given base64_url + extracts.sort(key=lambda extract: extract.ts) - # https://github.com/MobilityData/gtfs-realtime-validator/issues/92 - # try: - # os.remove(os.path.join(dst_path, "areas.txt")) - # except FileNotFoundError: - # pass + extract_dict = {extract.base64_url: extract for extract in extracts} - return full_dst_path + return DailyScheduleExtracts(extract_dict) -def execute_rt_validator( - gtfs_file: str, rt_path: str, jar_path: Path, verbose=False, pbar=None -): - log(f"validating {rt_path} with {gtfs_file}", fg=typer.colors.MAGENTA, pbar=pbar) - - args = [ - "java", - "-jar", - str(jar_path), - "-gtfs", - gtfs_file, - "-gtfsRealtimePath", - rt_path, - "-sort", - "name", - ] +class MostRecentSchedule: + def __init__(self, fs: gcsfs.GCSFileSystem, path: str, base64_validation_url: str): + self.fs = fs + self.path = path + self.base64_validation_url = base64_validation_url - log(f"executing rt_validator: {' '.join(args)}", pbar=pbar) - subprocess.run( - args, - capture_output=True, - check=True, - ) + def download(self, date: datetime.datetime) -> Optional[str]: + for day in reversed(list(date - date.subtract(days=7))): + try: + schedule_extract = ( + ScheduleStorage() + .get_day(day) + .get_url_schedule(self.base64_validation_url) + ) + except KeyError: + typer.secho( + f"no schedule data found for {self.base64_validation_url} on day {day}" + ) + continue + try: + gtfs_zip = "/".join([self.path, schedule_extract.filename]) + self.fs.get(schedule_extract.path, gtfs_zip) + return gtfs_zip + except FileNotFoundError: + typer.secho( + f"no schedule file found for {self.base64_validation_url} on day {day}" + ) + continue + return None -def validate_and_upload( - fs, - jar_path: Path, - dst_path_rt: str, - tmp_dir: str, - hour: RTHourlyAggregation, - gtfs_zip: str, - verbose: bool = False, - pbar=None, -) -> List[RTFileProcessingOutcome]: - execute_rt_validator( - gtfs_zip, - dst_path_rt, - jar_path=jar_path, - verbose=verbose, - pbar=pbar, - ) - records_to_upload = [] - outcomes = [] - for local_path, extract in hour.local_paths_to_extract(dst_path_rt).items(): - results_path = local_path + ".results.json" - try: - with open(results_path) as f: - records = json.load(f) - except FileNotFoundError as e: - # This exception was previously generating the error "[Errno 2] No such file or directory" - msg = f"WARNING: no validation output file found in {results_path} for {extract.path}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, - ) - outcomes.append( +class AggregationExtract: + def __init__(self, path: str, extract: GTFSRTFeedExtract): + self.path = path + self.extract = extract + + def get_local_path(self) -> str: + return os.path.join(self.path, self.extract.timestamped_filename) + + def get_results_path(self) -> str: + return os.path.join( + self.path, f"{self.extract.timestamped_filename}.results.json" + ) + + def hash(self) -> bytes: + with open( + os.path.join(self.path, self.extract.timestamped_filename), "rb" + ) as f: + file_hash = hashlib.md5() + while chunk := f.read(8192): + file_hash.update(chunk) + return file_hash.digest() + + def get_results(self) -> Dict[str, str]: + with open(self.get_results_path()) as f: + return json.load(f) + + def has_results(self) -> bool: + return os.path.exists(self.get_results_path()) + + +class AggregationExtracts: + def __init__( + self, fs: gcsfs.GCSFileSystem, path: str, aggregation: RTHourlyAggregation + ): + self.fs = fs + self.path = path + self.aggregation = aggregation + + def get_path(self): + return f"{self.path}/rt_{self.aggregation.name_hash}/" + + def get_extracts(self) -> List[AggregationExtract]: + return [ + AggregationExtract(self.get_path(), e) for e in self.aggregation.extracts + ] + + def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]: + return {e.get_local_path(): e.extract for e in self.get_extracts()} + + def get_results_paths(self) -> Dict[str, GTFSRTFeedExtract]: + return {e.get_results_path(): e.extract for e in self.get_extracts()} + + def get_hashed_results(self) -> Dict[str, Any]: + hashed = {} + for e in self.get_extracts(): + if e.has_results(): + hashed[e.hash().hex()] = e.get_results() + return hashed + + def get_hashes(self) -> Dict[str, List[GTFSRTFeedExtract]]: + hashed: Dict[str, List[GTFSRTFeedExtract]] = defaultdict(list) + for e in self.get_extracts(): + hashed[e.hash().hex()].append(e.extract) + return hashed + + def download(self): + self.fs.get( + rpath=[extract.path for extract in self.get_local_paths().values()], + lpath=list(self.get_local_paths().keys()), + ) + + def download_most_recent_schedule(self) -> Optional[str]: + first_extract = self.aggregation.extracts[0] + schedule = MostRecentSchedule( + self.fs, self.path, first_extract.config.base64_validation_url + ) + return schedule.download(first_extract.dt) + + +class HourlyFeedQuery: + def __init__( + self, + step: RTProcessingStep, + feed_type: GTFSFeedType, + files: List[GTFSRTFeedExtract], + limit: int = 0, + base64_url: Optional[str] = None, + ): + self.step = step + self.feed_type = feed_type + self.files = files + self.limit = limit + self.base64_url = base64_url + + def set_limit(self, limit: int): + return HourlyFeedQuery( + self.step, self.feed_type, self.files, limit, self.base64_url + ) + + def where_base64url(self, base64_url: Optional[str]): + return HourlyFeedQuery( + self.step, self.feed_type, self.files, self.limit, base64_url + ) + + def get_aggregates( + self, + ) -> List[RTHourlyAggregation]: + aggregates: Dict[ + Tuple[pendulum.DateTime, str], List[GTFSRTFeedExtract] + ] = defaultdict(list) + + for file in self.files: + if self.base64_url is None or file.base64_url == self.base64_url: + aggregates[(file.hour, file.base64_url)].append(file) + + if self.limit > 0: + aggregates = dict(islice(aggregates.items(), self.limit)) + + return [ + RTHourlyAggregation( + step=self.step, + filename=f"{self.feed_type}{JSONL_GZIP_EXTENSION}", + first_extract=entries[0], + extracts=entries, + ) + for (hour, base64_url), entries in aggregates.items() + ] + + def total(self) -> int: + return sum(len(agg.extracts) for agg in self.get_aggregates()) + + +class HourlyFeedFiles: + def __init__( + self, + files: List[GTFSRTFeedExtract], + files_missing_metadata: List[Blob], + files_invalid_metadata: List[Blob], + ): + self.files = files + self.files_missing_metadata = files_missing_metadata + self.files_invalid_metadata = files_invalid_metadata + + def total(self) -> int: + return ( + len(self.files) + + len(self.files_missing_metadata) + + len(self.files_invalid_metadata) + ) + + def valid(self) -> bool: + return not self.files or len(self.files) / self.total() > 0.99 + + def get_query( + self, step: RTProcessingStep, feed_type: GTFSFeedType + ) -> HourlyFeedQuery: + return HourlyFeedQuery(step, feed_type, self.files) + + +class FeedStorage: + def __init__(self, feed_type: GTFSFeedType): + self.feed_type = feed_type + + @lru_cache + def get_hour(self, hour: datetime.datetime) -> HourlyFeedFiles: + pendulum_hour = pendulum.instance(hour, tz="Etc/UTC") + files, files_missing_metadata, files_invalid_metadata = fetch_all_in_partition( + cls=GTFSRTFeedExtract, + partitions={ + "dt": pendulum_hour.date(), + "hour": pendulum_hour, + }, + table=self.feed_type, + verbose=True, + ) + return HourlyFeedFiles(files, files_missing_metadata, files_invalid_metadata) + + +class ValidationProcessor: + def __init__( + self, + aggregation: RTHourlyAggregation, + verbose: bool = False, + ): + self.aggregation = aggregation + self.verbose = verbose + + def validator(self): + return RtValidator() + + def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: + outcomes: List[RTFileProcessingOutcome] = [] + fs = get_fs() + + if not self.aggregation.extracts[0].config.schedule_url_for_validation: + outcomes = [ RTFileProcessingOutcome( - step=hour.step, + step=self.aggregation.step, success=False, - exception=e, extract=extract, + exception=NoScheduleDataSpecified(), ) - ) - continue - - records_to_upload.extend( - [ - { - "metadata": json.loads( - RTValidationMetadata( - extract_ts=extract.ts, - extract_config=extract.config, - gtfs_validator_version=GTFS_RT_VALIDATOR_VERSION, - ).json() - ), - **record, - } - for record in records + for extract in self.aggregation.extracts ] - ) - outcomes.append( - RTFileProcessingOutcome( - step=hour.step, - success=True, - extract=extract, - aggregation=hour, + aggregation_extracts = AggregationExtracts(fs, tmp_dir, self.aggregation) + aggregation_extracts.download() + gtfs_zip = aggregation_extracts.download_most_recent_schedule() + + if not gtfs_zip: + e = ScheduleDataNotFound( + f"no recent schedule data found for {self.aggregation.extracts[0].path}" ) - ) + typer.secho(e) - upload_if_records( - fs, - tmp_dir, - artifact=hour, - records=records_to_upload, - pbar=pbar, - ) + scope.fingerprint = [ + type(e), + # convert back to url manually, I don't want to mess around with the hourly class + base64.urlsafe_b64decode(self.aggregation.base64_url.encode()).decode(), + ] + sentry_sdk.capture_exception(e, scope=scope) + + outcomes = [ + RTFileProcessingOutcome( + step=self.aggregation.step, + success=False, + extract=extract, + exception=e, + ) + for extract in self.aggregation.extracts + ] - return outcomes + if not outcomes: + try: + self.validator().execute(gtfs_zip, aggregation_extracts.get_path()) + # these are the only two types of errors we expect; let any others bubble up + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode("utf-8") -def parse_and_upload( - fs, - dst_path_rt, - tmp_dir, - hour: RTHourlyAggregation, - verbose=False, - pbar=None, -) -> List[RTFileProcessingOutcome]: - written = 0 - outcomes = [] - gzip_fname = str(tmp_dir + hour.unique_filename) + fingerprint: List[Any] = [ + type(e), + # convert back to url manually, I don't want to mess around with the hourly class + base64.urlsafe_b64decode( + self.aggregation.base64_url.encode() + ).decode(), + ] + fingerprint.append(e.returncode) - # ParseFromString() seems to not release memory well, so manually handle - # writing to the gzip and cleaning up after ourselves + # we could also use a custom exception for this + if "Unexpected end of ZLIB input stream" in stderr: + fingerprint.append("Unexpected end of ZLIB input stream") - with gzip.open(gzip_fname, "w") as gzipfile: - for extract in hour.extracts: - feed = gtfs_realtime_pb2.FeedMessage() + scope.fingerprint = fingerprint - try: - with open( - os.path.join(dst_path_rt, extract.timestamped_filename), "rb" - ) as f: - feed.ParseFromString(f.read()) - parsed = json_format.MessageToDict(feed) - except DecodeError as e: - if verbose: - log( - f"WARNING: DecodeError for {str(extract.path)}", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - outcomes.append( + # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above + scope.set_context("Process", {"stderr": stderr[-2000:]}) + + sentry_sdk.capture_exception(e, scope=scope) + + outcomes = [ RTFileProcessingOutcome( - step=RTProcessingStep.parse, + step=self.aggregation.step, success=False, - exception=e, extract=extract, + exception=e, + process_stderr=stderr, ) + for extract in self.aggregation.extracts + ] + + if not outcomes: + records_to_upload = [] + hashed_results = aggregation_extracts.get_hashed_results() + for hash, extracts in aggregation_extracts.get_hashes().items(): + try: + records = hashed_results[hash] + except KeyError: + if self.verbose: + paths = ", ".join(e.path for e in extracts) + typer.secho( + f"WARNING: validator did not produce results for {paths}", + fg=typer.colors.YELLOW, + ) + + for extract in extracts: + outcomes.append( + RTFileProcessingOutcome( + step=self.aggregation.step, + success=False, + exception=NoValidatorResults("No validator output"), + extract=extract, + ) + ) + continue + + records_to_upload.extend( + [ + { + "metadata": json.loads( + RTValidationMetadata( + extract_ts=extracts[0].ts, + extract_config=extracts[0].config, + gtfs_validator_version=GTFS_RT_VALIDATOR_VERSION, + ).json() + ), + **record, + } + for record in records + ] ) - continue - if not parsed: - msg = f"WARNING: no parsed dictionary found in {str(extract.path)}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, + for extract in extracts: + outcomes.append( + RTFileProcessingOutcome( + step=self.aggregation.step, + success=True, + extract=extract, + aggregation=self.aggregation, + ) ) - outcomes.append( - RTFileProcessingOutcome( - step=RTProcessingStep.parse, - success=False, - exception=ValueError(msg), - extract=extract, + + # BigQuery fails when trying to parse empty files, so shouldn't write them + if records_to_upload: + typer.secho( + f"writing {len(records_to_upload)} lines to {self.aggregation.path}", + ) + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, dir=tmp_dir + ) as f: + gzipfile = gzip.GzipFile(mode="wb", fileobj=f) + encoded = ( + r.json() if isinstance(r, BaseModel) else json.dumps(r) + for r in records_to_upload ) + gzipfile.write("\n".join(encoded).encode("utf-8")) + gzipfile.close() + + fs.put(f.name, self.aggregation.path) + else: + typer.secho( + f"WARNING: no records found for {self.aggregation.path}, skipping upload", + fg=typer.colors.YELLOW, ) - continue - if "entity" not in parsed: - msg = f"WARNING: no parsed entity found in {str(extract.path)}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, + return outcomes + + +class ParseProcessor: + def __init__(self, aggregation: RTHourlyAggregation, verbose: bool = False): + self.aggregation = aggregation + self.verbose = verbose + + def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: + outcomes: List[RTFileProcessingOutcome] = [] + fs = get_fs() + dst_path_rt = f"{tmp_dir}/rt_{self.aggregation.name_hash}/" + fs.get( + rpath=[ + extract.path + for extract in self.aggregation.local_paths_to_extract( + dst_path_rt + ).values() + ], + lpath=list(self.aggregation.local_paths_to_extract(dst_path_rt).keys()), + ) + + written = 0 + gzip_fname = str(tmp_dir + self.aggregation.unique_filename) + + # ParseFromString() seems to not release memory well, so manually handle + # writing to the gzip and cleaning up after ourselves + + with gzip.open(gzip_fname, "w") as gzipfile: + for extract in self.aggregation.extracts: + feed = gtfs_realtime_pb2.FeedMessage() + + try: + with open( + os.path.join(dst_path_rt, extract.timestamped_filename), "rb" + ) as f: + feed.ParseFromString(f.read()) + parsed = json_format.MessageToDict(feed) + except DecodeError as e: + if self.verbose: + typer.secho( + f'DecodeError: "{str(e)}" thrown when decoding {str(extract.path)}', + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=False, + exception=e, + extract=extract, + ) ) + continue + + if not parsed: + msg = f"WARNING: no parsed dictionary found in {str(extract.path)}" + if self.verbose: + typer.secho( + msg, + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=False, + exception=ValueError(msg), + extract=extract, + ) + ) + continue + + if "entity" not in parsed: + msg = f"WARNING: no parsed entity found in {str(extract.path)}" + if self.verbose: + typer.secho( + msg, + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=True, + extract=extract, + header=parsed["header"], + ) + ) + continue + + for record in parsed["entity"]: + gzipfile.write( + ( + json.dumps( + { + "header": parsed["header"], + # back and forth so we use pydantic serialization + "metadata": json.loads( + RTParsingMetadata( + extract_ts=extract.ts, + extract_config=extract.config, + ).json() + ), + **copy.deepcopy(record), + } + ) + + "\n" + ).encode("utf-8") + ) + written += 1 outcomes.append( RTFileProcessingOutcome( step=RTProcessingStep.parse, success=True, extract=extract, + aggregation=self.aggregation, header=parsed["header"], ) ) - continue + del parsed - for record in parsed["entity"]: - gzipfile.write( - ( - json.dumps( - { - "header": parsed["header"], - # back and forth so we use pydantic serialization - "metadata": json.loads( - RTParsingMetadata( - extract_ts=extract.ts, - extract_config=extract.config, - ).json() - ), - **copy.deepcopy(record), - } - ) - + "\n" - ).encode("utf-8") - ) - written += 1 - outcomes.append( - RTFileProcessingOutcome( - step=RTProcessingStep.parse, - success=True, - extract=extract, - aggregation=hour, - header=parsed["header"], - ) + if written: + typer.secho( + f"writing {written} lines to {self.aggregation.path}", + ) + fs.put(gzip_fname, self.aggregation.path) + else: + typer.secho( + f"WARNING: no records at all for {self.aggregation.path}", + fg=typer.colors.YELLOW, ) - del parsed - - if written: - log( - f"writing {written} lines to {hour.path}", - pbar=pbar, - ) - put_with_retry(fs, gzip_fname, hour.path) - else: - log( - f"WARNING: no records at all for {hour.path}", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - return outcomes + return outcomes # Originally this whole function was retried, but tmpdir flakiness will throw # exceptions in backoff's context, which ruins things def parse_and_validate( - hour: RTHourlyAggregation, - jar_path: Path, + aggregation: RTHourlyAggregation, verbose: bool = False, - pbar=None, ) -> List[RTFileProcessingOutcome]: with tempfile.TemporaryDirectory() as tmp_dir: with sentry_sdk.push_scope() as scope: - scope.set_tag("config_feed_type", hour.first_extract.config.feed_type) - scope.set_tag("config_name", hour.first_extract.config.name) - scope.set_tag("config_url", hour.first_extract.config.url) - scope.set_context("RT Hourly Aggregation", json.loads(hour.json())) - - fs = get_fs() - dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/" - get_with_retry( - fs, - rpath=[ - extract.path - for extract in hour.local_paths_to_extract(dst_path_rt).values() - ], - lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()), + scope.set_tag( + "config_feed_type", aggregation.first_extract.config.feed_type ) + scope.set_tag("config_name", aggregation.first_extract.config.name) + scope.set_tag("config_url", aggregation.first_extract.config.url) + scope.set_context("RT Hourly Aggregation", json.loads(aggregation.json())) - if hour.step == RTProcessingStep.validate: - if not hour.extracts[0].config.schedule_url_for_validation: - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=NoScheduleDataSpecified(), - ) - for extract in hour.extracts - ] + if ( + aggregation.step != RTProcessingStep.validate + and aggregation.step != RTProcessingStep.parse + ): + raise RuntimeError("we should not be here") - try: - first_extract = hour.extracts[0] - extract_day = first_extract.dt - for target_date in reversed( - list(extract_day - extract_day.subtract(days=7)) - ): # Fall back to most recent available schedule within 7 days - try: - schedule_extract = get_schedule_extracts_for_day( - target_date - )[first_extract.config.base64_validation_url] - - scope.set_context( - "Schedule Extract", json.loads(schedule_extract.json()) - ) + if aggregation.step == RTProcessingStep.validate: + return ValidationProcessor(aggregation, verbose).process(tmp_dir, scope) - gtfs_zip = download_gtfs_schedule_zip( - fs, - schedule_extract=schedule_extract, - dst_dir=tmp_dir, - pbar=pbar, - ) + if aggregation.step == RTProcessingStep.parse: + return ParseProcessor(aggregation, verbose).process(tmp_dir, scope) - break - except (KeyError, FileNotFoundError): - print( - f"no schedule data found for {first_extract.path} and day {target_date}" - ) - else: - raise ScheduleDataNotFound( - f"no recent schedule data found for {first_extract.path}" - ) - return validate_and_upload( - fs=fs, - jar_path=jar_path, - dst_path_rt=dst_path_rt, - tmp_dir=tmp_dir, - hour=hour, - gtfs_zip=gtfs_zip, - verbose=verbose, - pbar=pbar, - ) - - # these are the only two types of errors we expect; let any others bubble up - except (ScheduleDataNotFound, subprocess.CalledProcessError) as e: - stderr = None - - fingerprint: List[Any] = [ - type(e), - # convert back to url manually, I don't want to mess around with the hourly class - base64.urlsafe_b64decode(hour.base64_url.encode()).decode(), - ] - if isinstance(e, subprocess.CalledProcessError): - fingerprint.append(e.returncode) - stderr = e.stderr.decode("utf-8") - - # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above - scope.set_context( - "Process", {"stderr": e.stderr.decode("utf-8")[-2000:]} - ) - - # we could also use a custom exception for this - if "Unexpected end of ZLIB input stream" in stderr: - fingerprint.append("Unexpected end of ZLIB input stream") - - scope.fingerprint = fingerprint - sentry_sdk.capture_exception(e, scope=scope) - - if verbose: - log( - f"{str(e)} thrown for {hour.path}", - fg=typer.colors.RED, - pbar=pbar, - ) - if isinstance(e, subprocess.CalledProcessError): - log( - e.stderr.decode("utf-8"), - fg=typer.colors.YELLOW, - pbar=pbar, - ) - - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=e, - process_stderr=stderr, - ) - for extract in hour.extracts - ] +def make_dict_bq_safe(d: Dict[str, Any]) -> Dict[str, Any]: + return { + make_name_bq_safe(key): make_dict_bq_safe(value) + if isinstance(value, dict) + else value + for key, value in d.items() + } - if hour.step == RTProcessingStep.parse: - return parse_and_upload( - fs=fs, - dst_path_rt=dst_path_rt, - tmp_dir=tmp_dir, - hour=hour, - verbose=verbose, - pbar=pbar, - ) - raise RuntimeError("we should not be here") +def make_pydantic_model_bq_safe(model: BaseModel) -> Dict[str, Any]: + """ + This is ugly but I think it's the best option until https://github.com/pydantic/pydantic/issues/1409 + """ + return make_dict_bq_safe(json.loads(model.json())) @app.command() @@ -741,56 +826,25 @@ def main( feed_type: GTFSFeedType, hour: datetime.datetime, limit: int = 0, - progress: bool = typer.Option( - False, - help="If true, display progress bar; useful for development but not in production.", - ), threads: int = 4, - jar_path: Path = JAR_DEFAULT, verbose: bool = False, base64url: Optional[str] = None, ): - pendulum_hour = pendulum.instance(hour, tz="Etc/UTC") - files: List[GTFSRTFeedExtract] - files_missing_metadata: List[Blob] - files_invalid_metadata: List[Blob] - files, files_missing_metadata, files_invalid_metadata = fetch_all_in_partition( - cls=GTFSRTFeedExtract, - partitions={ - "dt": pendulum_hour.date(), - "hour": pendulum_hour, - }, - table=feed_type, - verbose=True, - ) - - total = len(files) + len(files_missing_metadata) + len(files_invalid_metadata) - if files and len(files) / total < 0.99: - typer.secho(f"missing: {files_missing_metadata}") - typer.secho(f"invalid: {files_invalid_metadata}") + hourly_feed_files = FeedStorage(feed_type).get_hour(hour) + if not hourly_feed_files.valid(): + typer.secho(f"missing: {hourly_feed_files.files_missing_metadata}") + typer.secho(f"invalid: {hourly_feed_files.files_invalid_metadata}") + error_count = hourly_feed_files.total() - len(hourly_feed_files.files) raise RuntimeError( - f"too many files have missing/invalid metadata; {total - len(files)} of {total}" # noqa: E702 + f"too many files have missing/invalid metadata; {error_count} of {hourly_feed_files.total()}" # noqa: E702 ) - - rt_aggs: Dict[Tuple[pendulum.DateTime, str], List[GTFSRTFeedExtract]] = defaultdict( - list + aggregated_feed = hourly_feed_files.get_query(step, feed_type) + aggregations_to_process = ( + aggregated_feed.where_base64url(base64url).set_limit(limit).get_aggregates() ) - for file in files: - rt_aggs[(file.hour, file.base64_url)].append(file) - - aggregations_to_process = [ - RTHourlyAggregation( - step=step, - filename=f"{feed_type}{JSONL_GZIP_EXTENSION}", - first_extract=files[0], - extracts=files, - ) - for (hour, base64url), files in rt_aggs.items() - ] - typer.secho( - f"found {len(files)} {feed_type} files in {len(aggregations_to_process)} aggregations to process", + f"found {len(hourly_feed_files.files)} {feed_type} files in {len(aggregated_feed.get_aggregates())} aggregations to process", fg=typer.colors.MAGENTA, ) @@ -798,18 +852,9 @@ def main( typer.secho( f"url filter applied, only processing {base64url}", fg=typer.colors.YELLOW ) - aggregations_to_process = [ - agg for agg in aggregations_to_process if agg.base64_url == base64url - ] if limit: typer.secho(f"limit of {limit} feeds was set", fg=typer.colors.YELLOW) - aggregations_to_process = list( - sorted(aggregations_to_process, key=lambda feed: feed.path) - )[:limit] - - aggregated_total = sum(len(agg.extracts) for agg in aggregations_to_process) - pbar = tqdm(total=len(aggregations_to_process)) if progress else None outcomes: List[RTFileProcessingOutcome] = [ RTFileProcessingOutcome( @@ -818,7 +863,7 @@ def main( blob_path=blob.path, exception=MissingMetadata(), ) - for blob in files_missing_metadata + for blob in hourly_feed_files.files_missing_metadata ] + [ RTFileProcessingOutcome( step=step.value, @@ -826,7 +871,7 @@ def main( blob_path=blob.path, exception=InvalidMetadata(), ) - for blob in files_invalid_metadata + for blob in hourly_feed_files.files_invalid_metadata ] exceptions = [] @@ -840,34 +885,26 @@ def main( futures: Dict[Future, RTHourlyAggregation] = { pool.submit( parse_and_validate, - hour=hour, - jar_path=jar_path, + aggregation=aggregation, verbose=verbose, - pbar=pbar, - ): hour - for hour in aggregations_to_process + ): aggregation + for aggregation in aggregations_to_process } for future in concurrent.futures.as_completed(futures): - hour = futures[future] - if pbar: - pbar.update(1) + aggregation = futures[future] try: outcomes.extend(future.result()) except KeyboardInterrupt: raise except Exception as e: - log( - f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}", + typer.secho( + f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {aggregation.path}\n{traceback.format_exc()}", err=True, fg=typer.colors.RED, - pbar=pbar, ) sentry_sdk.capture_exception(e) - exceptions.append((e, hour.path, traceback.format_exc())) - - if pbar: - del pbar + exceptions.append((e, aggregation.path, traceback.format_exc())) if aggregations_to_process: result = GTFSRTJobResult( @@ -878,11 +915,21 @@ def main( feed_type=feed_type, outcomes=outcomes, ) - save_job_result(get_fs(), result) + typer.secho( + f"saving {len(result.outcomes)} outcomes to {result.path}", + fg=typer.colors.GREEN, + ) + # TODO: I dislike having to exclude the records here + # I need to figure out the best way to have a single type represent the "metadata" of + # the content as well as the content itself + raw = [json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes] + content = "\n".join(raw).encode("utf-8") + result.save_content(fs=get_fs(), content=content, exclude={"outcomes"}) assert ( - len(outcomes) == aggregated_total - ), f"we ended up with {len(outcomes)} outcomes from {aggregated_total}" + len(outcomes) + == aggregated_feed.where_base64url(base64url).set_limit(limit).total() + ), f"we ended up with {len(outcomes)} outcomes from {aggregated_feed.where_base64url(base64url).set_limit(limit).total()}" if exceptions: exc_str = "\n".join(str(tup) for tup in exceptions) diff --git a/jobs/gtfs-rt-parser-v2/poetry.lock b/jobs/gtfs-rt-parser-v2/poetry.lock index 2d2c695f4d..fb12c1b018 100644 --- a/jobs/gtfs-rt-parser-v2/poetry.lock +++ b/jobs/gtfs-rt-parser-v2/poetry.lock @@ -507,6 +507,22 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "flake8" +version = "7.1.1" +description = "the modular source code checker: pep8 pyflakes and co" +optional = false +python-versions = ">=3.8.1" +files = [ + {file = "flake8-7.1.1-py2.py3-none-any.whl", hash = "sha256:597477df7860daa5aa0fdd84bf5208a043ab96b8e96ab708770ae0364dd03213"}, + {file = "flake8-7.1.1.tar.gz", hash = "sha256:049d058491e228e03e67b390f311bbf88fce2dbaa8fa673e7aea87b7198b8d38"}, +] + +[package.dependencies] +mccabe = ">=0.7.0,<0.8.0" +pycodestyle = ">=2.12.0,<2.13.0" +pyflakes = ">=3.2.0,<3.3.0" + [[package]] name = "fonttools" version = "4.54.1" @@ -1357,6 +1373,17 @@ pillow = ">=6.2.0" pyparsing = ">=2.3.1" python-dateutil = ">=2.7" +[[package]] +name = "mccabe" +version = "0.7.0" +description = "McCabe checker, plugin for flake8" +optional = false +python-versions = ">=3.6" +files = [ + {file = "mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"}, + {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, +] + [[package]] name = "memory-profiler" version = "0.60.0" @@ -2007,6 +2034,17 @@ files = [ [package.dependencies] pyasn1 = ">=0.4.6,<0.7.0" +[[package]] +name = "pycodestyle" +version = "2.12.1" +description = "Python style guide checker" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3"}, + {file = "pycodestyle-2.12.1.tar.gz", hash = "sha256:6838eae08bbce4f6accd5d5572075c63626a15ee3e6f842df996bf62f6d73521"}, +] + [[package]] name = "pydantic" version = "1.9.2" @@ -2058,6 +2096,17 @@ typing-extensions = ">=3.7.4.3" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] +[[package]] +name = "pyflakes" +version = "3.2.0" +description = "passive checker of Python programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pyflakes-3.2.0-py2.py3-none-any.whl", hash = "sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"}, + {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, +] + [[package]] name = "pyparsing" version = "3.1.4" @@ -2583,5 +2632,5 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" -python-versions = ">=3.8,<3.10" -content-hash = "2ff33638394c0014c2e5df03ad30b6e2e57ec5f048c5087b75a2546e0e0bd9fa" +python-versions = ">=3.8.1,<3.10" +content-hash = "51e6481ee50e162cc336f8581791ba5a2864a56bf00d722091837931f1a75f0f" diff --git a/jobs/gtfs-rt-parser-v2/pyproject.toml b/jobs/gtfs-rt-parser-v2/pyproject.toml index aca94cbc89..7c94bdd1f5 100644 --- a/jobs/gtfs-rt-parser-v2/pyproject.toml +++ b/jobs/gtfs-rt-parser-v2/pyproject.toml @@ -5,7 +5,7 @@ description = "" authors = ["Andrew Vaccaro "] [tool.poetry.dependencies] -python = ">=3.8,<3.10" +python = ">=3.8.1,<3.10" gtfs-realtime-bindings = "0.0.7" google-auth = "1.32.1" pathy = {extras = ["gcs"], version = "^0.6.1"} @@ -26,6 +26,7 @@ types-protobuf = "^5.28.0.20240924" types-tqdm = "^4.66.0.20240417" isort = "^5.13.2" pytest-env = "^1.1.5" +flake8 = "^7.1.1" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py index 467f2f5c34..09c3e56789 100644 --- a/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py @@ -54,11 +54,7 @@ def test_rt_file_processing_outcome_construction() -> None: @pytest.mark.skipif("not config.getoption('--gcs')", reason="requires GCS credentials") def test_vehicle_positions(): - result = runner.invoke( - app, - ["parse", "vehicle_positions", "1999-10-22T18:00:00"], - catch_exceptions=False, - ) + result = runner.invoke(app, ["parse", "vehicle_positions", "1999-10-22T18:00:00"]) assert result.exit_code == 0 assert ( "test-calitp-gtfs-rt-raw-v2/vehicle_positions/dt=1999-10-22/hour=1999-10-22T18:00:00+00:00" @@ -79,7 +75,6 @@ def test_no_vehicle_positions_for_date(): result = runner.invoke( app, ["parse", "vehicle_positions", "2022-09-14T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert "0 vehicle_positions files in 0 aggregations" in result.stdout @@ -92,7 +87,6 @@ def test_no_vehicle_positions_for_url(): result = runner.invoke( app, ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", "nope"], - catch_exceptions=False, ) assert result.exit_code == 0 assert "found 5158 vehicle_positions files in 136 aggregations" in result.stdout @@ -108,7 +102,6 @@ def test_no_records_for_url_vehicle_positions_on_date(): result = runner.invoke( app, ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert "found 5158 vehicle_positions files in 136 aggregations" in result.stdout @@ -121,9 +114,7 @@ def test_no_records_for_url_vehicle_positions_on_date(): def test_trip_updates(): base64url = "aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L3RyaXB1cGRhdGVzP2FnZW5jeT1TQQ==" result = runner.invoke( - app, - ["parse", "trip_updates", "2024-10-22T18:00:00", "--base64url", base64url], - catch_exceptions=False, + app, ["parse", "trip_updates", "2024-10-22T18:00:00", "--base64url", base64url] ) assert result.exit_code == 0 assert ( @@ -144,7 +135,6 @@ def test_service_alerts(): result = runner.invoke( app, ["parse", "service_alerts", "2024-10-22T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert ( @@ -165,7 +155,6 @@ def test_validation(): result = runner.invoke( app, ["validate", "trip_updates", "2024-08-28T19:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert ( @@ -173,7 +162,6 @@ def test_validation(): in result.stdout ) assert "3269 trip_updates files in 125 aggregations" in result.stdout - assert "Fetching gtfs schedule data" in result.stdout assert "validating" in result.stdout assert "executing rt_validator" in result.stdout assert "writing 50 lines" in result.stdout @@ -194,7 +182,6 @@ def test_no_recent_schedule_for_vehicle_positions_on_validation(): "--base64url", base64url, ], - catch_exceptions=True, ) assert result.exit_code == 0 assert ( @@ -204,6 +191,7 @@ def test_no_recent_schedule_for_vehicle_positions_on_validation(): assert "5158 vehicle_positions files in 136 aggregations" in result.stdout assert f"url filter applied, only processing {base64url}" in result.stdout assert "no schedule data found" in result.stdout + assert "no recent schedule data found" in result.stdout assert "test-calitp-gtfs-rt-validation" in result.stdout assert "saving 38 outcomes" in result.stdout @@ -220,9 +208,7 @@ def test_no_output_file_for_vehicle_positions_on_validation(): 3, "--verbose", ], - catch_exceptions=True, ) - print(result.stdout) assert result.exit_code == 0 assert ( "test-calitp-gtfs-rt-raw-v2/vehicle_positions/dt=2024-10-17/hour=2024-10-17T00:00:00+00:00" @@ -230,6 +216,7 @@ def test_no_output_file_for_vehicle_positions_on_validation(): ) assert "5487 vehicle_positions files in 139 aggregations" in result.stdout assert "limit of 3 feeds was set" in result.stdout - # "WARNING: no validation output file found" was previously generating the error "[Errno 2] No such file or directory" - assert "WARNING: no validation output file found" in result.stdout - assert "saving 122 outcomes" in result.stdout + assert "validating" in result.stdout + assert "executing rt_validator" in result.stdout + assert "writing 69 lines" in result.stdout + assert "saving 114 outcomes" in result.stdout diff --git a/warehouse/dbt_project.yml b/warehouse/dbt_project.yml index be4c05713f..74bb12a0c6 100644 --- a/warehouse/dbt_project.yml +++ b/warehouse/dbt_project.yml @@ -69,3 +69,5 @@ models: schema: mart_benefits ntd_validation: schema: mart_ntd_validation + ntd_fct_annual: + schema: mart_ntd_fct_annual diff --git a/warehouse/models/docs/_docs_ntd.md b/warehouse/models/docs/_docs_ntd.md index 4ca3df7fc6..6ce0a129e6 100644 --- a/warehouse/models/docs/_docs_ntd.md +++ b/warehouse/models/docs/_docs_ntd.md @@ -2,6 +2,10 @@ Docs for NTD models; {% docs ntd_id %} A five-digit identifying number for each agency used in the current NTD system. +FTA assigns each reporter a unique five-digit NTD Identification Number. +The first digit of the NTD ID corresponds to the FTA Region where the reporter is located (e.g., 9#### indicates Region IX). +The code will have a four-to-five digit prefix for any entity submitting the report on behalf of the reporter. +For example, State Departments of Transportation (usually indicated as #R##) submit on behalf of their subrecipients. {% enddocs %} {% docs ntd_legacy_id %} @@ -40,6 +44,7 @@ The state in which the agency is headquartered. {% enddocs %} {% docs ntd_primary_uza_code %} +The primary urbanized area served by the transit agency. UACE Code remains consistent across census years. {% enddocs %} diff --git a/warehouse/models/docs/_docs_transit_database.md b/warehouse/models/docs/_docs_transit_database.md index 2fa85cefcb..34cb0e36d3 100644 --- a/warehouse/models/docs/_docs_transit_database.md +++ b/warehouse/models/docs/_docs_transit_database.md @@ -200,7 +200,7 @@ are implemented for future schema consistency, but historical data has not yet b {% docs ntd_agency_info_table %} -DEPRECATED: Please use mart_ntd.dim_annual_ntd_agency_information going forward. +DEPRECATED: Please use mart_ntd.dim_annual_agency_information going forward. 2018 NTD Agency Info Table Imported 10/6/2021 from fta.gov diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 8c964acb31..c39c5d803d 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,6 +4,49 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} +), + +stg_state_geoportal__state_highway_network_stops AS ( + SELECT * + FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} +), + + +buffer_geometry_table AS ( + SELECT + -- equal to 100ft, as requested by Uriel + ST_BUFFER(wkt_coordinates, + 30.48) AS buffer_geometry + FROM stg_state_geoportal__state_highway_network_stops +), + +current_stops AS ( + SELECT + pt_geom, + _gtfs_key + FROM dim_stops_latest +), + + +stops_on_shn AS ( + SELECT + current_stops.* + FROM buffer_geometry_table, current_stops + WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) +), + +dim_stops_latest_with_shn_boolean AS ( + +SELECT + dim_stops_latest.*, + IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS on_state_highway_network +FROM + dim_stops_latest +LEFT JOIN + stops_on_shn +ON + dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key ) -SELECT * FROM dim_stops_latest +SELECT * FROM dim_stops_latest_with_shn_boolean diff --git a/warehouse/models/mart/ntd/_mart_ntd.yml b/warehouse/models/mart/ntd/_mart_ntd.yml index 9d5040e991..9b7c360f99 100644 --- a/warehouse/models/mart/ntd/_mart_ntd.yml +++ b/warehouse/models/mart/ntd/_mart_ntd.yml @@ -102,33 +102,35 @@ x-common-fields: description: '{{ doc("ntd_xlsx_execution_ts") }}' models: - - name: dim_annual_ntd_agency_information + - name: dim_annual_agency_information description: > - Versioned extracts of the NTD Annual Database Agency Information. + Contains basic contact and agency information for each NTD reporter. - The versioning is bitemporal, so records are versioned at the year + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. - and ntd_id level. This means you must join based on - _valid_from/_valid_from + The versioning is bitemporal, so records are versioned at the year, ntd_id, and state_parent_ntd_id level. + This means you must join based on _valid_from/_valid_from to get the records for a given ntd_id and state_parent_ntd_id, + and then choose which year to look up. - to get the records for a given ntd_id, and then choose which year to - - look up. + Use _is_current to find the latest version for each set of year, ntd_id, and state_parent_ntd_id. tests: - dbt_utils.mutually_exclusive_ranges: lower_bound_column: _valid_from upper_bound_column: _valid_to - partition_by: CONCAT(year, '_', ntd_id) + partition_by: CONCAT(year, '_', ntd_id, '_', COALESCE(state_parent_ntd_id, '')) gaps: required columns: - name: key tests: - not_null - unique - - name: year + - <<: *report_year + name: year tests: - not_null - - name: ntd_id + - <<: *ntd_id tests: - not_null - name: _valid_from @@ -138,8 +140,120 @@ models: tests: - not_null - name: _is_current + description: Indicates the latest report version for each year, ntd_id, and state_parent_ntd_id. tests: - not_null + - name: state_parent_ntd_id + description: | + Indicates the ID number of the transit agency reporting to the database on behalf of the transit agency. + - name: agency_name + description: | + The agency name is the full legal name of the agency. + If reporting is required under an FTA grant program, this must reflect the legal name of the funding recipient. + - name: doing_business_as + description: The name under which the reporting agency is doing business. + - name: address_line_1 + description: First line of the agency's mailing address. + - name: address_line_2 + description: Second line of the agency's mailing address (if applicable). + - name: p_o__box + description: The PO Box of the agency (if applicable). + - name: city + description: City of the agency's mailing address. + - name: state + description: State of the agency's mailing address. + - name: zip_code + description: Zip Code of the agency's mailing address. + - name: zip_code_ext + description: Zip Code Extension of the agency's mailing address. + - name: region + description: The FTA region in which the reporter is located. + - name: density + description: The population density of the Primary UZA of the agency, if one exists. + - name: ueid + description: | + The UEID is a number or other identifier used to identify a specific commercial, nonprofit, or Government entity. + This is now reported in place of DUNS number for each unique transit agency reporting to the NTD. + See the U.S. General Services Administration UEID web page for more information. + - name: fta_recipient_id + description: | + The four-digit number assigned to a transit agency for the Federal Transit Administration (FTA) electronic grant making system — TrAMS (Transportation Award Management System). + - name: original_due_date + description: The date on which the 2020 NTD Report was due to FTA. + - name: fy_end_date + description: Calendar selection for the last day of an agency's fiscal year. + - name: number_of_counties_with_service + description: | + States report the total number of counties in the state that are currently served, in whole or in part, by Formula Grants for Rural Areas (§5311)-funded operators. + - name: number_of_state_counties + description: The number of Counties in given State (for State Departments of Transportation). + - *organization_type + - name: personal_vehicles + description: | + Vehicles that are used by the transit provider to transport passengers in revenue service but are owned by private individuals, typically an employee of the agency or a volunteer driver. + - name: population + description: The population of the Primary UZA of the agency, if one exists. + - *primary_uza_code + - *primary_uza_name + - name: reported_by_name + description: The NTD ID of the entity reporting on behalf of another entity. + - name: reported_by_ntd_id + description: | + The entity, usually a State, submitting an NTD report on behalf of another entity, usually a subrecipient of the State. + - name: reporter_acronym + description: The acronym used by the reporting agency. + - name: reporter_type + description: | + Reporter Type will be based on where they operate and the reporting requirements associated with their agency. + Agencies that receive Chapter 53 funds and own, operate, or manage capital assets in public transportation are also required to file an annual report, even if they do not receive §5307 or §5311 funds. + Agencies that do not receive or benefit from FTA funding may elect to submit their data to the NTD as Voluntary Reporter but are still assigned a reporter type. + Current types are: + `Building Reporter`, + `Full Reporter`, + `Group Plan Sponsor`, + `Planning Reporter`, + `Reduced Asset Reporter`, + `Reduced Reporter`, + `Rural Reporter`, + `Separate Service`, + `State Reporter`. + - name: reporting_module + description: | + A general classification that will determine which, if any, FTA formula programs will use the NTD data. + For example, Tribes and Native Villages will have data included in the in the §5311j Tribal Transit Program. + Reporters receiving Chapter 53 funds but not receiving or benefiting from §5307 and §5311 + AND not electing to report service data are classified as Asset due to the requirement to report asset inventory data. + These agencies are not presently included in formula program datasets. + - name: service_area_pop + description: | + A measure of access to transit service in terms of population served and area coverage (square miles). + The reporting transit agency determines the service area boundaries and population for most transit services using the definitions contained in the Americans with Disabilities Act of 1990 (ADA), + i.e. a corridor surrounding the routes 3/4 of a mile on either side, or for rail, a series of circles of radius 3/4 mile centered on each station Transit agency reporters are required to submit service area information. + - name: sq_miles + description: The square miles of the Primary UZA of the agency, if one exists. + - *service_area_sq_miles + - name: state_admin_funds_expended + description: | + States report the §5311 revenues they expended as a result of administering the program. + Since the §5311 program operates on a reimbursement basis, revenues expended during the report year will be expended during the same year. + Report the operating revenue expended during the report year from FTA §5311 Formula Grants for Rural Areas funds. + - name: subrecipient_type + description: Reflects the type of Rural Formula Grant funding received by the subrecipient. + - name: tam_tier + description: | + Defines whether the agency is a Tier I agency required to produce their own Transit Asset Management plan (and, in parenparens, on what basis) + or a Tier II operator eligible to be in a group TAM Plan. + N/A indicates that the requirement does not apply. + - name: total_voms + description: | + The Vehicles Operated in Maximum Service ("peak service level") across the entire fiscal year for the given agency. + - name: tribal_area_name + description: The tribal land, determined by US Census data, on which tribes operate. + - name: url + description: Agency's transit website. + - name: volunteer_drivers + description: | + Individuals who drive vehicles in revenue service to transport passengers for the transit provider but are not employees of the transit provider and are not compensated for their labor. - name: dim_annual_funding_sources description: >- diff --git a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql b/warehouse/models/mart/ntd/dim_annual_agency_information.sql similarity index 77% rename from warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql rename to warehouse/models/mart/ntd/dim_annual_agency_information.sql index 250667c5d4..772aa18532 100644 --- a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql +++ b/warehouse/models/mart/ntd/dim_annual_agency_information.sql @@ -2,59 +2,61 @@ WITH stg_ntd__annual_database_agency_information AS ( SELECT *, -- TODO: this does not handle deletes - LEAD(ts) OVER (PARTITION BY year, ntd_id ORDER BY ts ASC) AS next_ts, + LEAD(ts) OVER (PARTITION BY year, ntd_id, state_parent_ntd_id ORDER BY ts ASC) AS next_ts, FROM {{ ref('stg_ntd__annual_database_agency_information') }} ), -dim_annual_ntd_agency_information AS ( +dim_annual_agency_information AS ( SELECT - {{ dbt_utils.generate_surrogate_key(['year', 'ntd_id', 'ts']) }} as key, + {{ dbt_utils.generate_surrogate_key(['year', 'ntd_id', 'state_parent_ntd_id', 'ts']) }} AS key, year, ntd_id, - number_of_state_counties, - tam_tier, - personal_vehicles, - density, - uza_name, - tribal_area_name, - service_area_sq_miles, - total_voms, - city, - fta_recipient_id, - region, - state_admin_funds_expended, - zip_code_ext, - zip_code, - ueid, - address_line_2, - number_of_counties_with_service, + state_parent_ntd_id, + agency_name, reporter_acronym, - original_due_date, - sq_miles, - address_line_1, - p_o__box, - fy_end_date, + doing_business_as, + division_department, + legacy_ntd_id, reported_by_ntd_id, - population, + reported_by_name, + reporter_type, reporting_module, - service_area_pop, + organization_type, subrecipient_type, + fy_end_date, + original_due_date, + address_line_1, + address_line_2, + p_o__box, + city, state, - volunteer_drivers, - primary_uza, - doing_business_as, - reporter_type, - legacy_ntd_id, - voms_do, + zip_code, + zip_code_ext, + region, url, - reported_by_name, + fta_recipient_id, + ueid, + service_area_sq_miles, + service_area_pop, + primary_uza_code, + primary_uza_name, + tribal_area_name, + population, + density, + sq_miles, + voms_do, voms_pt, - organization_type, - agency_name, + total_voms, + volunteer_drivers, + personal_vehicles, + tam_tier, + number_of_state_counties, + number_of_counties_with_service, + state_admin_funds_expended, ts AS _valid_from, {{ make_end_of_valid_range('COALESCE(next_ts, CAST("2099-01-01" AS TIMESTAMP))') }} AS _valid_to, next_ts IS NULL AS _is_current, FROM stg_ntd__annual_database_agency_information ) -SELECT * FROM dim_annual_ntd_agency_information +SELECT * FROM dim_annual_agency_information diff --git a/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml b/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml new file mode 100644 index 0000000000..21dec5a337 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml @@ -0,0 +1,37 @@ +version: 2 + +models: + - name: fct_ntd_annual_data__breakdowns + - name: fct_ntd_annual_data__breakdowns_by_agency + - name: fct_ntd_annual_data__capital_expenses_by_capital_use + - name: fct_ntd_annual_data__capital_expenses_by_mode + - name: fct_ntd_annual_data__capital_expenses_for_existing_service + - name: fct_ntd_annual_data__capital_expenses_for_expansion_of_service + - name: fct_ntd_annual_data__employees_by_agency + - name: fct_ntd_annual_data__employees_by_mode + - name: fct_ntd_annual_data__employees_by_mode_and_employee_type + - name: fct_ntd_annual_data__fuel_and_energy + - name: fct_ntd_annual_data__fuel_and_energy_by_agency + - name: fct_ntd_annual_data__funding_sources_by_expense_type + - name: fct_ntd_annual_data__funding_sources_directly_generated + - name: fct_ntd_annual_data__funding_sources_federal + - name: fct_ntd_annual_data__funding_sources_local + - name: fct_ntd_annual_data__funding_sources_state + - name: fct_ntd_annual_data__funding_sources_taxes_levied_by_agency + - name: fct_ntd_annual_data__maintenance_facilities + - name: fct_ntd_annual_data__maintenance_facilities_by_agency + - name: fct_ntd_annual_data__metrics + - name: fct_ntd_annual_data__operating_expenses_by_function + - name: fct_ntd_annual_data__operating_expenses_by_function_and_agency + - name: fct_ntd_annual_data__operating_expenses_by_type + - name: fct_ntd_annual_data__operating_expenses_by_type_and_agency + - name: fct_ntd_annual_data__service_by_agency + - name: fct_ntd_annual_data__service_by_mode + - name: fct_ntd_annual_data__service_by_mode_and_time_period + - name: fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type + - name: fct_ntd_annual_data__stations_by_mode_and_age + - name: fct_ntd_annual_data__track_and_roadway_by_agency + - name: fct_ntd_annual_data__track_and_roadway_by_mode + - name: fct_ntd_annual_data__track_and_roadway_guideway_age_distribution + - name: fct_ntd_annual_data__vehicles_age_distribution + - name: fct_ntd_annual_data__vehicles_type_count_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql new file mode 100644 index 0000000000..a860034665 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql @@ -0,0 +1,11 @@ +WITH staging_breakdowns AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__breakdowns') }} +), + +fct_ntd_annual_data__breakdowns AS ( + SELECT * + FROM staging_breakdowns +) + +SELECT * FROM fct_ntd_annual_data__breakdowns diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql new file mode 100644 index 0000000000..ffdf747d28 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_breakdowns_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__breakdowns_by_agency') }} +), + +fct_ntd_annual_data__breakdowns_by_agency AS ( + SELECT * + FROM staging_breakdowns_by_agency +) + +SELECT * FROM fct_ntd_annual_data__breakdowns_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql new file mode 100644 index 0000000000..82882ea9f1 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_by_capital_use AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_by_capital_use') }} +), + +fct_ntd_annual_data__capital_expenses_by_capital_use AS ( + SELECT * + FROM staging_capital_expenses_by_capital_use +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_by_capital_use diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql new file mode 100644 index 0000000000..96457a6c8b --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_by_mode') }} +), + +fct_ntd_annual_data__capital_expenses_by_mode AS ( + SELECT * + FROM staging_capital_expenses_by_mode +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql new file mode 100644 index 0000000000..d219716996 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_for_existing_service AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_for_existing_service') }} +), + +fct_ntd_annual_data__capital_expenses_for_existing_service AS ( + SELECT * + FROM staging_capital_expenses_for_existing_service +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_for_existing_service diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql new file mode 100644 index 0000000000..ca2a7b72be --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_for_expansion_of_service AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_for_expansion_of_service') }} +), + +fct_ntd_annual_data__capital_expenses_for_expansion_of_service AS ( + SELECT * + FROM staging_capital_expenses_for_expansion_of_service +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_for_expansion_of_service diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql new file mode 100644 index 0000000000..1b74d3bf79 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_agency') }} +), + +fct_ntd_annual_data__employees_by_agency AS ( + SELECT * + FROM staging_employees_by_agency +) + +SELECT * FROM fct_ntd_annual_data__employees_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql new file mode 100644 index 0000000000..fac6d6aa76 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_mode') }} +), + +fct_ntd_annual_data__employees_by_mode AS ( + SELECT * + FROM staging_employees_by_mode +) + +SELECT * FROM fct_ntd_annual_data__employees_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql new file mode 100644 index 0000000000..01efa4bcaa --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_mode_and_employee_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_mode_and_employee_type') }} +), + +fct_ntd_annual_data__employees_by_mode_and_employee_type AS ( + SELECT * + FROM staging_employees_by_mode_and_employee_type +) + +SELECT * FROM fct_ntd_annual_data__employees_by_mode_and_employee_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql new file mode 100644 index 0000000000..fbb795d0ae --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql @@ -0,0 +1,11 @@ +WITH staging_fuel_and_energy AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__fuel_and_energy') }} +), + +fct_ntd_annual_data__fuel_and_energy AS ( + SELECT * + FROM staging_fuel_and_energy +) + +SELECT * FROM fct_ntd_annual_data__fuel_and_energy diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql new file mode 100644 index 0000000000..79f1f6e099 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_fuel_and_energy_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__fuel_and_energy_by_agency') }} +), + +fct_ntd_annual_data__fuel_and_energy_by_agency AS ( + SELECT * + FROM staging_fuel_and_energy_by_agency +) + +SELECT * FROM fct_ntd_annual_data__fuel_and_energy_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql new file mode 100644 index 0000000000..0943e513ef --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_by_expense_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_by_expense_type') }} +), + +fct_ntd_annual_data__funding_sources_by_expense_type AS ( + SELECT * + FROM staging_funding_sources_by_expense_type +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_by_expense_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql new file mode 100644 index 0000000000..93c704f122 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_directly_generated AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_directly_generated') }} +), + +fct_ntd_annual_data__funding_sources_directly_generated AS ( + SELECT * + FROM staging_funding_sources_directly_generated +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_directly_generated diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql new file mode 100644 index 0000000000..26b07b825d --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_federal AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_federal') }} +), + +fct_ntd_annual_data__funding_sources_federal AS ( + SELECT * + FROM staging_funding_sources_federal +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_federal diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql new file mode 100644 index 0000000000..fbab02f91e --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_local AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_local') }} +), + +fct_ntd_annual_data__funding_sources_local AS ( + SELECT * + FROM staging_funding_sources_local +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_local diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql new file mode 100644 index 0000000000..128a3e581c --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_state AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_state') }} +), + +fct_ntd_annual_data__funding_sources_state AS ( + SELECT * + FROM staging_funding_sources_state +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_state diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql new file mode 100644 index 0000000000..4585ac93f8 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_taxes_levied_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_taxes_levied_by_agency') }} +), + +fct_ntd_annual_data__funding_sources_taxes_levied_by_agency AS ( + SELECT * + FROM staging_funding_sources_taxes_levied_by_agency +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_taxes_levied_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql new file mode 100644 index 0000000000..cba0055eb4 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql @@ -0,0 +1,11 @@ +WITH staging_maintenance_facilities AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__maintenance_facilities') }} +), + +fct_ntd_annual_data__maintenance_facilities AS ( + SELECT * + FROM staging_maintenance_facilities +) + +SELECT * FROM fct_ntd_annual_data__maintenance_facilities diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql new file mode 100644 index 0000000000..bd119a2e74 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_maintenance_facilities_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__maintenance_facilities_by_agency') }} +), + +fct_ntd_annual_data__maintenance_facilities_by_agency AS ( + SELECT * + FROM staging_maintenance_facilities_by_agency +) + +SELECT * FROM fct_ntd_annual_data__maintenance_facilities_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql new file mode 100644 index 0000000000..580cddef53 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql @@ -0,0 +1,11 @@ +WITH staging_metrics AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__metrics') }} +), + +fct_ntd_annual_data__metrics AS ( + SELECT * + FROM staging_metrics +) + +SELECT * FROM fct_ntd_annual_data__metrics diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql new file mode 100644 index 0000000000..7966560391 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_function AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_function') }} +), + +fct_ntd_annual_data__operating_expenses_by_function AS ( + SELECT * + FROM staging_operating_expenses_by_function +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_function diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql new file mode 100644 index 0000000000..694367bcbe --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_function_and_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_function_and_agency') }} +), + +fct_ntd_annual_data__operating_expenses_by_function_and_agency AS ( + SELECT * + FROM staging_operating_expenses_by_function_and_agency +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_function_and_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql new file mode 100644 index 0000000000..8a8beb8831 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_type') }} +), + +fct_ntd_annual_data__operating_expenses_by_type AS ( + SELECT * + FROM staging_operating_expenses_by_type +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql new file mode 100644 index 0000000000..6400320d13 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_type_and_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_type_and_agency') }} +), + +fct_ntd_annual_data__operating_expenses_by_type_and_agency AS ( + SELECT * + FROM staging_operating_expenses_by_type_and_agency +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_type_and_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql new file mode 100644 index 0000000000..4de6f4c445 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_agency') }} +), + +fct_ntd_annual_data__service_by_agency AS ( + SELECT * + FROM staging_service_by_agency +) + +SELECT * FROM fct_ntd_annual_data__service_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql new file mode 100644 index 0000000000..a1775331a8 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_mode') }} +), + +fct_ntd_annual_data__service_by_mode AS ( + SELECT * + FROM staging_service_by_mode +) + +SELECT * FROM fct_ntd_annual_data__service_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql new file mode 100644 index 0000000000..bfe4a47d9f --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_mode_and_time_period AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_mode_and_time_period') }} +), + +fct_ntd_annual_data__service_by_mode_and_time_period AS ( + SELECT * + FROM staging_service_by_mode_and_time_period +) + +SELECT * FROM fct_ntd_annual_data__service_by_mode_and_time_period diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql new file mode 100644 index 0000000000..ec613e72dc --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql @@ -0,0 +1,11 @@ +WITH staging_stations_and_facilities_by_agency_and_facility_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type') }} +), + +fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type AS ( + SELECT * + FROM staging_stations_and_facilities_by_agency_and_facility_type +) + +SELECT * FROM fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql new file mode 100644 index 0000000000..3c84a410a9 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql @@ -0,0 +1,11 @@ +WITH staging_stations_by_mode_and_age AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__stations_by_mode_and_age') }} +), + +fct_ntd_annual_data__stations_by_mode_and_age AS ( + SELECT * + FROM staging_stations_by_mode_and_age +) + +SELECT * FROM fct_ntd_annual_data__stations_by_mode_and_age diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql new file mode 100644 index 0000000000..d1bb760745 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_by_agency') }} +), + +fct_ntd_annual_data__track_and_roadway_by_agency AS ( + SELECT * + FROM staging_track_and_roadway_by_agency +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql new file mode 100644 index 0000000000..2f60b8bfe2 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_by_mode') }} +), + +fct_ntd_annual_data__track_and_roadway_by_mode AS ( + SELECT * + FROM staging_track_and_roadway_by_mode +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql new file mode 100644 index 0000000000..5c1a4af5cf --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_guideway_age_distribution AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_guideway_age_distribution') }} +), + +fct_ntd_annual_data__track_and_roadway_guideway_age_distribution AS ( + SELECT * + FROM staging_track_and_roadway_guideway_age_distribution +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_guideway_age_distribution diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql new file mode 100644 index 0000000000..7af715712f --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql @@ -0,0 +1,11 @@ +WITH staging_vehicles_age_distribution AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__vehicles_age_distribution') }} +), + +fct_ntd_annual_data__vehicles_age_distribution AS ( + SELECT * + FROM staging_vehicles_age_distribution +) + +SELECT * FROM fct_ntd_annual_data__vehicles_age_distribution diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql new file mode 100644 index 0000000000..dd93cce541 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_vehicles_type_count_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__vehicles_type_count_by_agency') }} +), + +fct_ntd_annual_data__vehicles_type_count_by_agency AS ( + SELECT * + FROM staging_vehicles_type_count_by_agency +) + +SELECT * FROM fct_ntd_annual_data__vehicles_type_count_by_agency diff --git a/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql b/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql index 5c5b7e8947..600331d1d1 100644 --- a/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql +++ b/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql @@ -77,7 +77,7 @@ funding_by_org AS ( -- We cannot use `_is_current` here because every year is marked as "current" -- since it's the "current" record for the respective year. annual_ntd AS ( - SELECT * FROM {{ ref('dim_annual_ntd_agency_information') }} + SELECT * FROM {{ ref('dim_annual_agency_information') }} WHERE state = "CA" -- We only want data from the latest data from NTD. In the rare edge case diff --git a/warehouse/models/staging/ntd/_src.yml b/warehouse/models/staging/ntd/_src.yml index 099e5fa62f..24737376d7 100644 --- a/warehouse/models/staging/ntd/_src.yml +++ b/warehouse/models/staging/ntd/_src.yml @@ -7,3 +7,9 @@ sources: schema: external_ntd_data_products tables: - name: annual_database_agency_information + description: | + Contains basic contact and agency information for each NTD reporter. + + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. diff --git a/warehouse/models/staging/ntd/_stg_ntd.yml b/warehouse/models/staging/ntd/_stg_ntd.yml index 87ff1d007e..61c9194dfa 100644 --- a/warehouse/models/staging/ntd/_stg_ntd.yml +++ b/warehouse/models/staging/ntd/_stg_ntd.yml @@ -2,12 +2,19 @@ version: 2 models: - name: stg_ntd__annual_database_agency_information + description: | + Contains basic contact and agency information for each NTD reporter. + + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - ts - year - ntd_id + - state_parent_ntd_id columns: - name: ntd_id tests: diff --git a/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql b/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql index 61c3eed407..0112e9bee2 100644 --- a/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql +++ b/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql @@ -4,50 +4,52 @@ WITH source AS ( stg_ntd__annual_database_agency_information AS ( SELECT - number_of_state_counties, - tam_tier, - personal_vehicles, - density, - uza_name, - tribal_area_name, - service_area_sq_miles, - total_voms, - city, - fta_recipient_id, - region, - state_admin_funds_expended, - zip_code_ext, - zip_code, - ueid, - address_line_2, - number_of_counties_with_service, + year, + ntd_id, + state_parent_ntd_id, + agency_name, reporter_acronym, - original_due_date, - sq_miles, - address_line_1, - p_o__box, - fy_end_date, + doing_business_as, + division_department, + legacy_ntd_id, reported_by_ntd_id, - population, + reported_by_name, + reporter_type, reporting_module, - service_area_pop, + organization_type, subrecipient_type, + fy_end_date, + original_due_date, + address_line_1, + address_line_2, + p_o__box, + city, state, - volunteer_drivers, - primary_uza, - doing_business_as, - reporter_type, - legacy_ntd_id, - voms_do, + zip_code, + zip_code_ext, + region, url, - reported_by_name, + fta_recipient_id, + ueid, + service_area_sq_miles, + service_area_pop, + primary_uza AS primary_uza_code, + uza_name AS primary_uza_name, + tribal_area_name, + population, + density, + sq_miles, + voms_do, voms_pt, - organization_type, - agency_name, - ntd_id, + total_voms, + volunteer_drivers, + personal_vehicles, + tam_tier, + number_of_state_counties, + number_of_counties_with_service, + state_admin_funds_expended, dt, - ts, - year, + ts FROM source ) diff --git a/warehouse/models/staging/ntd_annual_data_tables/_src.yml b/warehouse/models/staging/ntd_annual_data_tables/_src.yml index 383dabc05d..1bb8da7080 100644 --- a/warehouse/models/staging/ntd_annual_data_tables/_src.yml +++ b/warehouse/models/staging/ntd_annual_data_tables/_src.yml @@ -41,7 +41,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: agency @@ -247,7 +247,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: type_of_service @@ -275,7 +275,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: agency diff --git a/warehouse/models/staging/state_geoportal/_src.yml b/warehouse/models/staging/state_geoportal/_src.yml new file mode 100644 index 0000000000..2e2d8dc2de --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_src.yml @@ -0,0 +1,9 @@ +version: 2 + +sources: + - name: external_state_geoportal + description: Data tables scraped from state geoportal. + database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}" + schema: external_state_geoportal + tables: + - name: state_highway_network diff --git a/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml new file mode 100644 index 0000000000..7e12ae93f5 --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml @@ -0,0 +1,4 @@ +version: 2 + +models: + - name: stg_state_geoportal__state_highway_network_stops diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql new file mode 100644 index 0000000000..a89a1075ee --- /dev/null +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -0,0 +1,14 @@ +WITH external_state_geoportal__state_highway_network AS ( + SELECT * + FROM {{ source('external_state_geoportal', 'state_highway_network') }} +), + +stg_state_geoportal__state_highway_network_stops AS( + + SELECT * + FROM external_state_geoportal__state_highway_network + -- we pull the whole table every month in the pipeline, so this gets only the latest extract + QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1 +) + +SELECT * FROM stg_state_geoportal__state_highway_network_stops