Skip to content

Commit

Permalink
Scrape geoportal for shn, add SHN flag on stops table (#3529)
Browse files Browse the repository at this point in the history
* scrape geoportal for shn, add flag on stops table

* successful initial scrape of geoportal endpoint

* functioning json format and external table creation, need to modify coordinates

* shn external tables

* add initial source, staging, and docs for state geoportal scrape

* add flag for shn to dim_stops_latest

* readd templating for buckets and remove hardcode schema

* refactor operator to be more agnostic for data inputs

* add handling for state_highway_network table

* fix comment

* remove local variables and comments to allow for a production merge when ready

* use stop_id for calculations

* Apply suggestions from code review

Co-authored-by: Mjumbe Poe <[email protected]>

* restore source table

* remove hardcoded source, remove old comments, add comment decribing buffer distance

* remove hardcoded bucket

* refactor how we keep final columns, dynamically rename columns

* restore _gtfs_key over stop_id in shn geo calculation

* checking before refactor

* revisions based on Mjumbe's review

---------

Co-authored-by: Mjumbe Poe <[email protected]>
  • Loading branch information
charlie-costanzo and mjumbewu authored Nov 26, 2024
1 parent 82c74e8 commit 0dfe3b6
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
operator: operators.ExternalTable
bucket: gs://calitp-state-geoportal-scrape
source_objects:
- "state_highway_network_geodata/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/"
destination_project_dataset_table: "external_state_geoportal.state_highway_network"
prefix_bucket: false
post_hook: |
SELECT *
FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network
LIMIT 1;
schema_fields:
- name: Route
type: INTEGER
- name: County
type: STRING
- name: District
type: INTEGER
- name: RouteType
type: STRING
- name: Direction
type: STRING
- name: wkt_coordinates
type: GEOGRAPHY
19 changes: 19 additions & 0 deletions airflow/dags/scrape_state_geoportal/METADATA.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
description: "Scrape State Highway Network from State Geoportal"
schedule_interval: "0 4 1 * *" # 4am UTC first day of every month
tags:
- all_gusty_features
default_args:
owner: airflow
depends_on_past: False
catchup: False
start_date: "2024-09-15"
email:
- "[email protected]"
email_on_failure: True
email_on_retry: False
retries: 1
retry_delay: !timedelta 'minutes: 2'
concurrency: 50
#sla: !timedelta 'hours: 2'
wait_for_defaults:
timeout: 3600
7 changes: 7 additions & 0 deletions airflow/dags/scrape_state_geoportal/state_highway_network.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
operator: operators.StateGeoportalAPIOperator

root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/'
service: "CHhighway/SHN_Lines"
layer: "0"
product: 'state_highway_network'
resultRecordCount: 2000
1 change: 1 addition & 0 deletions airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from operators.pod_operator import PodOperator
from operators.scrape_ntd_api import NtdDataProductAPIOperator
from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator
from operators.scrape_state_geoportal import StateGeoportalAPIOperator
217 changes: 217 additions & 0 deletions airflow/plugins/operators/scrape_state_geoportal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import gzip
import logging
import os
from typing import ClassVar, List

import pandas as pd # type: ignore
import pendulum
import requests
from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore
from pydantic import HttpUrl, parse_obj_as

from airflow.models import BaseOperator # type: ignore

API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"]


class StateGeoportalAPIExtract(PartitionedGCSArtifact):
bucket: ClassVar[str]
execution_ts: pendulum.DateTime = pendulum.now()
dt: pendulum.Date = execution_ts.date()
partition_names: ClassVar[List[str]] = ["dt", "execution_ts"]

# The name to be used in the data warehouse to refer to the data
# product.
product: str

# The root of the ArcGIS services. As of Nov 2024, this should
# be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/".
root_url: str

# The name of the service being requested. In the feature service's
# URL, this will be everything between the root and "/FeatureServer".
# Don't include a leading or trailing slash.
service: str

# The layer to query. This will usually be "0", so that is the
# default.
layer: str = "0"

# The query filter. By default, all rows will be returned from the
# service. Refer to the ArcGIS documentation for syntax:
# https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters
where: str = "1=1"

# A comma-separated list of fields to include in the results. Use
# "*" (the default) to include all fields.
outFields: str = "*"

# The number of records to request for each API call (the operator
# will request all data from the layer in batches of this size).
resultRecordCount: int

@property
def table(self) -> str:
return self.product

@property
def filename(self) -> str:
return self.table

class Config:
arbitrary_types_allowed = True

def fetch_from_state_geoportal(self):
""" """

logging.info(f"Downloading state geoportal data for {self.product}.")

try:
# Set up the parameters for the request
url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query"
validated_url = parse_obj_as(HttpUrl, url)

params = {
"where": self.where,
"outFields": self.outFields,
"f": "geojson",
"resultRecordCount": self.resultRecordCount,
}

all_features = [] # To store all retrieved rows
offset = 0

while True:
# Update the resultOffset for each request
params["resultOffset"] = offset

# Make the request
response = requests.get(validated_url, params=params)
response.raise_for_status()
data = response.json()

# Break the loop if there are no more features
if "features" not in data or not data["features"]:
break

# Append the retrieved features
all_features.extend(data["features"])

# Increment the offset
offset += params["resultRecordCount"]

if all_features is None or len(all_features) == 0:
logging.info(
f"There is no data to download for {self.product}. Ending pipeline."
)

pass
else:
logging.info(
f"Downloaded {self.product} data with {len(all_features)} rows!"
)

return all_features

except requests.exceptions.RequestException as e:
logging.info(f"An error occurred: {e}")

raise


# # Function to convert coordinates to WKT format
def to_wkt(geometry_type, coordinates):
if geometry_type == "LineString":
# Format as a LineString
coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates])
return f"LINESTRING({coords_str})"
elif geometry_type == "MultiLineString":
# Format as a MultiLineString
multiline_coords_str = ", ".join(
f"({', '.join([f'{lng} {lat}' for lng, lat in line])})"
for line in coordinates
)
return f"MULTILINESTRING({multiline_coords_str})"
else:
return None


class JSONExtract(StateGeoportalAPIExtract):
bucket = API_BUCKET


class StateGeoportalAPIOperator(BaseOperator):
template_fields = (
"product",
"root_url",
"service",
"layer",
"resultRecordCount",
)

def __init__(
self,
product,
root_url,
service,
layer,
resultRecordCount,
**kwargs,
):
self.product = product
self.root_url = root_url
self.service = service
self.layer = layer
self.resultRecordCount = resultRecordCount

"""An operator that extracts and saves JSON data from the State Geoportal
and saves it as one JSONL file, hive-partitioned by date in Google Cloud
"""

# Save JSONL files to the bucket
self.extract = JSONExtract(
root_url=self.root_url,
service=self.service,
product=f"{self.product}_geodata",
layer=self.layer,
resultRecordCount=self.resultRecordCount,
filename=f"{self.product}_geodata.jsonl.gz",
)

super().__init__(**kwargs)

def execute(self, **kwargs):
api_content = self.extract.fetch_from_state_geoportal()

df = pd.json_normalize(api_content)

if self.product == "state_highway_network":
# Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing
df = df[
[
"properties.Route",
"properties.County",
"properties.District",
"properties.RouteType",
"properties.Direction",
"geometry.type",
"geometry.coordinates",
]
]

# Dynamically create a mapping by removing known prefixes
columns = {col: col.split(".")[-1] for col in df.columns}

# Rename columns using the dynamically created mapping
df = df.rename(columns=columns)

# Create new column with WKT format
df["wkt_coordinates"] = df.apply(
lambda row: to_wkt(row["type"], row["coordinates"]), axis=1
)

# Compress the DataFrame content and save it
self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
self.extract.save_content(fs=get_fs(), content=self.gzipped_content)
45 changes: 44 additions & 1 deletion warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,49 @@ dim_stops_latest AS (
table_name = ref('dim_stops'),
clean_table_name = 'dim_stops'
) }}
),

stg_state_geoportal__state_highway_network_stops AS (
SELECT *
FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }}
),


buffer_geometry_table AS (
SELECT
-- equal to 100ft, as requested by Uriel
ST_BUFFER(wkt_coordinates,
30.48) AS buffer_geometry
FROM stg_state_geoportal__state_highway_network_stops
),

current_stops AS (
SELECT
pt_geom,
_gtfs_key
FROM dim_stops_latest
),


stops_on_shn AS (
SELECT
current_stops.*
FROM buffer_geometry_table, current_stops
WHERE ST_DWITHIN(
buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0)
),

dim_stops_latest_with_shn_boolean AS (

SELECT
dim_stops_latest.*,
IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest
FROM
dim_stops_latest
LEFT JOIN
stops_on_shn
ON
dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key
)

SELECT * FROM dim_stops_latest
SELECT * FROM dim_stops_latest_with_shn_boolean
9 changes: 9 additions & 0 deletions warehouse/models/staging/state_geoportal/_src.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 2

sources:
- name: external_state_geoportal
description: Data tables scraped from state geoportal.
database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}"
schema: external_state_geoportal
tables:
- name: state_highway_network
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
version: 2

models:
- name: stg_state_geoportal__state_highway_network_stops
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
WITH external_state_geoportal__state_highway_network AS (
SELECT *
FROM {{ source('external_state_geoportal', 'state_highway_network') }}
),

stg_state_geoportal__state_highway_network_stops AS(

SELECT *
FROM external_state_geoportal__state_highway_network
-- we pull the whole table every month in the pipeline, so this gets only the latest extract
QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1
)

SELECT * FROM stg_state_geoportal__state_highway_network_stops

0 comments on commit 0dfe3b6

Please sign in to comment.