Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape geoportal for shn, add SHN flag on stops table #3529

Merged
merged 20 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a807a10
scrape geoportal for shn, add flag on stops table
charlie-costanzo Nov 5, 2024
e8f3ad9
successful initial scrape of geoportal endpoint
charlie-costanzo Nov 6, 2024
44fa45f
functioning json format and external table creation, need to modify c…
charlie-costanzo Nov 7, 2024
ac47b74
shn external tables
charlie-costanzo Nov 12, 2024
0af1759
add initial source, staging, and docs for state geoportal scrape
charlie-costanzo Nov 12, 2024
df234a4
add flag for shn to dim_stops_latest
charlie-costanzo Nov 12, 2024
68d3fac
readd templating for buckets and remove hardcode schema
charlie-costanzo Nov 12, 2024
6a9064c
refactor operator to be more agnostic for data inputs
charlie-costanzo Nov 13, 2024
386abf8
add handling for state_highway_network table
charlie-costanzo Nov 13, 2024
f9b3615
fix comment
charlie-costanzo Nov 13, 2024
0dd22f7
remove local variables and comments to allow for a production merge w…
charlie-costanzo Nov 13, 2024
32becb7
use stop_id for calculations
charlie-costanzo Nov 14, 2024
45019d1
Apply suggestions from code review
charlie-costanzo Nov 18, 2024
008bc0e
restore source table
charlie-costanzo Nov 14, 2024
553682c
remove hardcoded source, remove old comments, add comment decribing b…
charlie-costanzo Nov 18, 2024
cf5f01e
remove hardcoded bucket
charlie-costanzo Nov 18, 2024
11d2c12
refactor how we keep final columns, dynamically rename columns
charlie-costanzo Nov 18, 2024
791ffef
restore _gtfs_key over stop_id in shn geo calculation
charlie-costanzo Nov 20, 2024
2bcea54
checking before refactor
charlie-costanzo Nov 22, 2024
0ca4a89
revisions based on Mjumbe's review
charlie-costanzo Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
operator: operators.ExternalTable
bucket: gs://calitp-state-geoportal-scrape
source_objects:
- "state_highway_network_geodata/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/"
destination_project_dataset_table: "external_state_geoportal.state_highway_network"
prefix_bucket: false
post_hook: |
SELECT *
FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network
LIMIT 1;
schema_fields:
- name: Route
type: INTEGER
- name: County
type: STRING
- name: District
type: INTEGER
- name: RouteType
type: STRING
- name: Direction
type: STRING
- name: wkt_coordinates
type: GEOGRAPHY
19 changes: 19 additions & 0 deletions airflow/dags/scrape_state_geoportal/METADATA.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
description: "Scrape State Highway Network from State Geoportal"
schedule_interval: "0 4 1 * *" # 4am UTC first day of every month
tags:
- all_gusty_features
default_args:
owner: airflow
depends_on_past: False
catchup: False
start_date: "2024-09-15"
email:
- "[email protected]"
email_on_failure: True
email_on_retry: False
retries: 1
retry_delay: !timedelta 'minutes: 2'
concurrency: 50
#sla: !timedelta 'hours: 2'
wait_for_defaults:
timeout: 3600
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
operator: operators.StateGeoportalAPIOperator
mjumbewu marked this conversation as resolved.
Show resolved Hide resolved

root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/'
service: "CHhighway/SHN_Lines"
layer: "0"
product: 'state_highway_network'
resultRecordCount: 2000
1 change: 1 addition & 0 deletions airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from operators.pod_operator import PodOperator
from operators.scrape_ntd_api import NtdDataProductAPIOperator
from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator
from operators.scrape_state_geoportal import StateGeoportalAPIOperator
217 changes: 217 additions & 0 deletions airflow/plugins/operators/scrape_state_geoportal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import gzip
import logging
import os
from typing import ClassVar, List

import pandas as pd # type: ignore
import pendulum
import requests
from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore
from pydantic import HttpUrl, parse_obj_as

from airflow.models import BaseOperator # type: ignore

API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"]


class StateGeoportalAPIExtract(PartitionedGCSArtifact):
bucket: ClassVar[str]
execution_ts: pendulum.DateTime = pendulum.now()
dt: pendulum.Date = execution_ts.date()
partition_names: ClassVar[List[str]] = ["dt", "execution_ts"]

# The name to be used in the data warehouse to refer to the data
# product.
product: str

# The root of the ArcGIS services. As of Nov 2024, this should
# be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/".
root_url: str

# The name of the service being requested. In the feature service's
# URL, this will be everything between the root and "/FeatureServer".
# Don't include a leading or trailing slash.
service: str

# The layer to query. This will usually be "0", so that is the
# default.
layer: str = "0"

# The query filter. By default, all rows will be returned from the
# service. Refer to the ArcGIS documentation for syntax:
# https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters
where: str = "1=1"

# A comma-separated list of fields to include in the results. Use
# "*" (the default) to include all fields.
outFields: str = "*"

# The number of records to request for each API call (the operator
# will request all data from the layer in batches of this size).
resultRecordCount: int

@property
def table(self) -> str:
return self.product

@property
def filename(self) -> str:
return self.table

class Config:
arbitrary_types_allowed = True

def fetch_from_state_geoportal(self):
""" """

logging.info(f"Downloading state geoportal data for {self.product}.")

try:
# Set up the parameters for the request
url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query"
validated_url = parse_obj_as(HttpUrl, url)

params = {
"where": self.where,
"outFields": self.outFields,
"f": "geojson",
"resultRecordCount": self.resultRecordCount,
}

all_features = [] # To store all retrieved rows
offset = 0

while True:
# Update the resultOffset for each request
params["resultOffset"] = offset

# Make the request
response = requests.get(validated_url, params=params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What should happen if we get a non-200 response from the server? I would think we should raise an exception so that the task fails and has to retry. Does that seem right? You could add .raise_for_status() at the end of the line if so. See https://3.python-requests.org/user/quickstart/#response-status-codes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added! great suggestion

response.raise_for_status()
data = response.json()

# Break the loop if there are no more features
if "features" not in data or not data["features"]:
break

# Append the retrieved features
all_features.extend(data["features"])

# Increment the offset
offset += params["resultRecordCount"]

if all_features is None or len(all_features) == 0:
logging.info(
f"There is no data to download for {self.product}. Ending pipeline."
)

pass
else:
logging.info(
f"Downloaded {self.product} data with {len(all_features)} rows!"
)

return all_features

except requests.exceptions.RequestException as e:
logging.info(f"An error occurred: {e}")

raise


# # Function to convert coordinates to WKT format
def to_wkt(geometry_type, coordinates):
if geometry_type == "LineString":
# Format as a LineString
coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates])
return f"LINESTRING({coords_str})"
elif geometry_type == "MultiLineString":
# Format as a MultiLineString
multiline_coords_str = ", ".join(
f"({', '.join([f'{lng} {lat}' for lng, lat in line])})"
for line in coordinates
)
return f"MULTILINESTRING({multiline_coords_str})"
else:
return None
mjumbewu marked this conversation as resolved.
Show resolved Hide resolved


class JSONExtract(StateGeoportalAPIExtract):
bucket = API_BUCKET


class StateGeoportalAPIOperator(BaseOperator):
template_fields = (
"product",
"root_url",
"service",
"layer",
"resultRecordCount",
)

def __init__(
self,
product,
root_url,
service,
layer,
resultRecordCount,
**kwargs,
):
self.product = product
self.root_url = root_url
self.service = service
self.layer = layer
self.resultRecordCount = resultRecordCount

"""An operator that extracts and saves JSON data from the State Geoportal
and saves it as one JSONL file, hive-partitioned by date in Google Cloud
"""

# Save JSONL files to the bucket
self.extract = JSONExtract(
root_url=self.root_url,
service=self.service,
product=f"{self.product}_geodata",
layer=self.layer,
resultRecordCount=self.resultRecordCount,
filename=f"{self.product}_geodata.jsonl.gz",
)

super().__init__(**kwargs)

def execute(self, **kwargs):
api_content = self.extract.fetch_from_state_geoportal()

df = pd.json_normalize(api_content)

if self.product == "state_highway_network":
# Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing
df = df[
[
"properties.Route",
"properties.County",
"properties.District",
"properties.RouteType",
"properties.Direction",
"geometry.type",
"geometry.coordinates",
]
]

# Dynamically create a mapping by removing known prefixes
columns = {col: col.split(".")[-1] for col in df.columns}

# Rename columns using the dynamically created mapping
df = df.rename(columns=columns)

# Create new column with WKT format
df["wkt_coordinates"] = df.apply(
lambda row: to_wkt(row["type"], row["coordinates"]), axis=1
)

# Compress the DataFrame content and save it
self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
self.extract.save_content(fs=get_fs(), content=self.gzipped_content)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,49 @@ dim_stops_latest AS (
table_name = ref('dim_stops'),
clean_table_name = 'dim_stops'
) }}
),

stg_state_geoportal__state_highway_network_stops AS (
SELECT *
FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }}
),


buffer_geometry_table AS (
SELECT
-- equal to 100ft, as requested by Uriel
ST_BUFFER(wkt_coordinates,
30.48) AS buffer_geometry
mjumbewu marked this conversation as resolved.
Show resolved Hide resolved
FROM stg_state_geoportal__state_highway_network_stops
),

current_stops AS (
SELECT
pt_geom,
_gtfs_key
FROM dim_stops_latest
),


stops_on_shn AS (
SELECT
current_stops.*
FROM buffer_geometry_table, current_stops
WHERE ST_DWITHIN(
buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0)
),

dim_stops_latest_with_shn_boolean AS (

SELECT
dim_stops_latest.*,
IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest
FROM
dim_stops_latest
LEFT JOIN
stops_on_shn
ON
dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key
)

SELECT * FROM dim_stops_latest
SELECT * FROM dim_stops_latest_with_shn_boolean
9 changes: 9 additions & 0 deletions warehouse/models/staging/state_geoportal/_src.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 2

sources:
- name: external_state_geoportal
description: Data tables scraped from state geoportal.
database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}"
schema: external_state_geoportal
tables:
- name: state_highway_network
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
version: 2

models:
- name: stg_state_geoportal__state_highway_network_stops
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
WITH external_state_geoportal__state_highway_network AS (
SELECT *
FROM {{ source('external_state_geoportal', 'state_highway_network') }}
),

stg_state_geoportal__state_highway_network_stops AS(

SELECT *
FROM external_state_geoportal__state_highway_network
-- we pull the whole table every month in the pipeline, so this gets only the latest extract
QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1
)

SELECT * FROM stg_state_geoportal__state_highway_network_stops
Loading