Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Covid Recovery Dash generation code to ingestor #112

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ingestor/.chalice/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@
"update_time_predictions": {
"iam_policy_file": "policy-time-predictions.json",
"lambda_timeout": 300
},
"update_service_ridership_dashboard": {
"iam_policy_file": "policy-service-ridership-dashboard.json",
"lambda_timeout": 900,
"lambda_memory_size": 1024
}
}
}
}
}
}
34 changes: 34 additions & 0 deletions ingestor/.chalice/policy-service-ridership-dashboard.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:*:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::tm-gtfs",
"arn:aws:s3:::tm-gtfs/*",
"arn:aws:s3:::tm-service-ridership-dashboard",
"arn:aws:s3:::tm-service-ridership-dashboard/*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:473352343756:table/Ridership",
"arn:aws:dynamodb:us-east-1:473352343756:table/ScheduledServiceDaily"
]
}
]
}
7 changes: 7 additions & 0 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
predictions,
landing,
trip_metrics,
service_ridership_dashboard,
)

app = Chalice(app_name="ingestor")
Expand Down Expand Up @@ -148,3 +149,9 @@ def store_landing_data(event):
ridership_data = landing.get_ridership_data()
landing.upload_to_s3(json.dumps(trip_metrics_data), json.dumps(ridership_data))
landing.clear_cache()


# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have bene ingested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have bene ingested)
# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have been ingested)

@app.schedule(Cron(30, 9, "*", "*", "?", "*"))
def update_service_ridership_dashboard():
service_ridership_dashboard.create_service_ridership_dash_json()
18 changes: 15 additions & 3 deletions ingestor/chalicelib/gtfs/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
date_range,
index_by,
is_valid_route_id,
get_services_for_date,
get_service_ids_for_date_to_has_exceptions,
get_total_service_minutes,
)
from .models import SessionModels, RouteDateTotals
Expand Down Expand Up @@ -50,29 +50,39 @@ def create_gl_route_date_totals(totals: List[RouteDateTotals]):
total_by_hour[i] += total.by_hour[i]
total_count = sum(t.count for t in gl_totals)
total_service_minutes = sum(t.service_minutes for t in gl_totals)
has_service_exceptions = any((t.has_service_exceptions for t in gl_totals))
return RouteDateTotals(
route_id="Green",
line_id="Green",
date=totals[0].date,
count=total_count,
service_minutes=total_service_minutes,
by_hour=total_by_hour,
has_service_exceptions=has_service_exceptions,
)


def create_route_date_totals(today: date, models: SessionModels):
all_totals = []
services_for_today = get_services_for_date(models, today)
service_ids_and_exception_status_for_today = get_service_ids_for_date_to_has_exceptions(models, today)
for route_id, route in models.routes.items():
if not is_valid_route_id(route_id):
continue
trips = [trip for trip in models.trips_by_route_id.get(route_id, []) if trip.service_id in services_for_today]
trips = [
trip
for trip in models.trips_by_route_id.get(route_id, [])
if trip.service_id in service_ids_and_exception_status_for_today.keys()
]
has_service_exceptions = any(
(service_ids_and_exception_status_for_today.get(trip.service_id, False) for trip in trips)
)
totals = RouteDateTotals(
route_id=route_id,
line_id=route.line_id,
date=today,
count=len(trips),
by_hour=bucket_trips_by_hour(trips),
has_service_exceptions=has_service_exceptions,
service_minutes=get_total_service_minutes(trips),
)
all_totals.append(totals)
Expand All @@ -99,6 +109,7 @@ def ingest_feed_to_dynamo(
"lineId": total.line_id,
"count": total.count,
"serviceMinutes": total.service_minutes,
"hasServiceExceptions": total.has_service_exceptions,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "recovery dash" code needs to know, in general, whether the service levels on a given day include any service exceptions (usually additions/removals for holidays). It was easiest just to add this as a column to the ScheduledServiceDaily table — the migration has already run.

"byHour": {"totals": total.by_hour},
}
batch.put_item(Item=item)
Expand All @@ -112,6 +123,7 @@ def ingest_feeds(
force_rebuild_feeds: bool = False,
):
for feed in feeds:
feed.use_compact_only()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this okay for all the s3 uploads? This is what mbta-performance will use every half hour

try:
if force_rebuild_feeds:
print(f"[{feed.key}] Forcing rebuild locally")
Expand Down
1 change: 1 addition & 0 deletions ingestor/chalicelib/gtfs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class RouteDateTotals:
count: int
service_minutes: int
by_hour: List[int]
has_service_exceptions: bool

@property
def timestamp(self):
Expand Down
18 changes: 10 additions & 8 deletions ingestor/chalicelib/gtfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ def date_range(start_date: date, end_date: date):
now = now + timedelta(days=1)


def get_services_for_date(models: "SessionModels", today: date):
services_for_today = set()
def get_service_ids_for_date_to_has_exceptions(models: "SessionModels", today: date) -> dict[str, bool]:
"""
Reports a dict of service IDs that are active on the given date mapped to a boolean indicating if
there are any exceptions for that service on that date.
"""
services_for_today: dict[str, bool] = {}
for service_id in models.calendar_services.keys():
service = models.calendar_services.get(service_id)
if not service:
Expand All @@ -81,15 +85,13 @@ def get_services_for_date(models: "SessionModels", today: date):
service.saturday,
service.sunday,
][today.weekday()] == ServiceDayAvailability.AVAILABLE
service_exceptions_today = [ex for ex in service_exceptions if ex.date == today]
is_removed_by_exception = any(
(
ex.date == today and ex.exception_type == CalendarServiceExceptionType.REMOVED
for ex in service_exceptions
)
(ex.exception_type == CalendarServiceExceptionType.REMOVED for ex in service_exceptions_today)
)
is_added_by_exception = any(
(ex.date == today and ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions)
(ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions_today)
)
if is_added_by_exception or (in_range and on_sevice_day and not is_removed_by_exception):
services_for_today.add(service_id)
services_for_today[service_id] = len(service_exceptions_today) > 0
return services_for_today
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dash.json
3 changes: 3 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .ingest import create_service_ridership_dash_json

__all__ = ["create_service_ridership_dash_json"]
23 changes: 23 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import date
from pytz import timezone

# Lower bound for time series and GTFS feeds
PRE_COVID_DATE = date(2020, 2, 24)

# Date to use as a baseline
START_DATE = date(2018, 1, 1)

# Boston baby
TIME_ZONE = timezone("US/Eastern")

# Ignore these
IGNORE_LINE_IDS = ["line-CapeFlyer", "line-Foxboro"]

# Date ranges with service gaps that we paper over because of major holidays or catastrophes
# rather than doing more complicated special-casing with GTFS services
FILL_DATE_RANGES = [
(date(2021, 11, 19), date(2021, 11, 26)), # Thanksgiving 2021
(date(2021, 12, 18), date(2021, 12, 26)), # Christmas 2021
(date(2022, 12, 18), date(2023, 1, 3)), # Christmas 2022
(date(2022, 3, 28), date(2022, 3, 29)), # Haymarket garage collapse
]
35 changes: 35 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/gtfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import boto3
from typing import Optional
from tempfile import TemporaryDirectory
from mbta_gtfs_sqlite import MbtaGtfsArchive
from mbta_gtfs_sqlite.models import Route, Line

from ..gtfs.utils import bucket_by, index_by

from .config import IGNORE_LINE_IDS

RoutesByLine = dict[Line, Route]


def get_routes_by_line(include_only_line_ids: Optional[list[str]]) -> dict[Line, Route]:
s3 = boto3.resource("s3")
archive = MbtaGtfsArchive(
local_archive_path=TemporaryDirectory().name,
s3_bucket=s3.Bucket("tm-gtfs"),
)
feed = archive.get_latest_feed()
feed.use_compact_only()
feed.download_or_build()
session = feed.create_sqlite_session(compact=True)
lines_by_id = index_by(
session.query(Line).all(),
lambda line: line.line_id,
)
all_routes_with_line_ids = [
route
for route in session.query(Route).all()
if route.line_id
and route.line_id not in IGNORE_LINE_IDS
and (not include_only_line_ids or route.line_id in include_only_line_ids)
]
return bucket_by(all_routes_with_line_ids, lambda route: lines_by_id[route.line_id])
Loading