-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port Covid Recovery Dash generation code to ingestor #112
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Action": [ | ||
"logs:CreateLogGroup", | ||
"logs:CreateLogStream", | ||
"logs:PutLogEvents" | ||
], | ||
"Effect": "Allow", | ||
"Resource": "arn:*:logs:*:*:*" | ||
}, | ||
{ | ||
"Effect": "Allow", | ||
"Action": "s3:*", | ||
"Resource": [ | ||
"arn:aws:s3:::tm-gtfs", | ||
"arn:aws:s3:::tm-gtfs/*", | ||
"arn:aws:s3:::tm-service-ridership-dashboard", | ||
"arn:aws:s3:::tm-service-ridership-dashboard/*" | ||
] | ||
}, | ||
{ | ||
"Effect": "Allow", | ||
"Action": [ | ||
"dynamodb:Query" | ||
], | ||
"Resource": [ | ||
"arn:aws:dynamodb:us-east-1:473352343756:table/Ridership", | ||
"arn:aws:dynamodb:us-east-1:473352343756:table/ScheduledServiceDaily" | ||
] | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
date_range, | ||
index_by, | ||
is_valid_route_id, | ||
get_services_for_date, | ||
get_service_ids_for_date_to_has_exceptions, | ||
get_total_service_minutes, | ||
) | ||
from .models import SessionModels, RouteDateTotals | ||
|
@@ -50,29 +50,39 @@ def create_gl_route_date_totals(totals: List[RouteDateTotals]): | |
total_by_hour[i] += total.by_hour[i] | ||
total_count = sum(t.count for t in gl_totals) | ||
total_service_minutes = sum(t.service_minutes for t in gl_totals) | ||
has_service_exceptions = any((t.has_service_exceptions for t in gl_totals)) | ||
return RouteDateTotals( | ||
route_id="Green", | ||
line_id="Green", | ||
date=totals[0].date, | ||
count=total_count, | ||
service_minutes=total_service_minutes, | ||
by_hour=total_by_hour, | ||
has_service_exceptions=has_service_exceptions, | ||
) | ||
|
||
|
||
def create_route_date_totals(today: date, models: SessionModels): | ||
all_totals = [] | ||
services_for_today = get_services_for_date(models, today) | ||
service_ids_and_exception_status_for_today = get_service_ids_for_date_to_has_exceptions(models, today) | ||
for route_id, route in models.routes.items(): | ||
if not is_valid_route_id(route_id): | ||
continue | ||
trips = [trip for trip in models.trips_by_route_id.get(route_id, []) if trip.service_id in services_for_today] | ||
trips = [ | ||
trip | ||
for trip in models.trips_by_route_id.get(route_id, []) | ||
if trip.service_id in service_ids_and_exception_status_for_today.keys() | ||
] | ||
has_service_exceptions = any( | ||
(service_ids_and_exception_status_for_today.get(trip.service_id, False) for trip in trips) | ||
) | ||
totals = RouteDateTotals( | ||
route_id=route_id, | ||
line_id=route.line_id, | ||
date=today, | ||
count=len(trips), | ||
by_hour=bucket_trips_by_hour(trips), | ||
has_service_exceptions=has_service_exceptions, | ||
service_minutes=get_total_service_minutes(trips), | ||
) | ||
all_totals.append(totals) | ||
|
@@ -99,6 +109,7 @@ def ingest_feed_to_dynamo( | |
"lineId": total.line_id, | ||
"count": total.count, | ||
"serviceMinutes": total.service_minutes, | ||
"hasServiceExceptions": total.has_service_exceptions, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The "recovery dash" code needs to know, in general, whether the service levels on a given day include any service exceptions (usually additions/removals for holidays). It was easiest just to add this as a column to the |
||
"byHour": {"totals": total.by_hour}, | ||
} | ||
batch.put_item(Item=item) | ||
|
@@ -112,6 +123,7 @@ def ingest_feeds( | |
force_rebuild_feeds: bool = False, | ||
): | ||
for feed in feeds: | ||
feed.use_compact_only() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this okay for all the s3 uploads? This is what mbta-performance will use every half hour |
||
try: | ||
if force_rebuild_feeds: | ||
print(f"[{feed.key}] Forcing rebuild locally") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
dash.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .ingest import create_service_ridership_dash_json | ||
|
||
__all__ = ["create_service_ridership_dash_json"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from datetime import date | ||
from pytz import timezone | ||
|
||
# Lower bound for time series and GTFS feeds | ||
PRE_COVID_DATE = date(2020, 2, 24) | ||
|
||
# Date to use as a baseline | ||
START_DATE = date(2018, 1, 1) | ||
|
||
# Boston baby | ||
TIME_ZONE = timezone("US/Eastern") | ||
|
||
# Ignore these | ||
IGNORE_LINE_IDS = ["line-CapeFlyer", "line-Foxboro"] | ||
|
||
# Date ranges with service gaps that we paper over because of major holidays or catastrophes | ||
# rather than doing more complicated special-casing with GTFS services | ||
FILL_DATE_RANGES = [ | ||
(date(2021, 11, 19), date(2021, 11, 26)), # Thanksgiving 2021 | ||
(date(2021, 12, 18), date(2021, 12, 26)), # Christmas 2021 | ||
(date(2022, 12, 18), date(2023, 1, 3)), # Christmas 2022 | ||
(date(2022, 3, 28), date(2022, 3, 29)), # Haymarket garage collapse | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import boto3 | ||
from typing import Optional | ||
from tempfile import TemporaryDirectory | ||
from mbta_gtfs_sqlite import MbtaGtfsArchive | ||
from mbta_gtfs_sqlite.models import Route, Line | ||
|
||
from ..gtfs.utils import bucket_by, index_by | ||
|
||
from .config import IGNORE_LINE_IDS | ||
|
||
RoutesByLine = dict[Line, Route] | ||
|
||
|
||
def get_routes_by_line(include_only_line_ids: Optional[list[str]]) -> dict[Line, Route]: | ||
s3 = boto3.resource("s3") | ||
archive = MbtaGtfsArchive( | ||
local_archive_path=TemporaryDirectory().name, | ||
s3_bucket=s3.Bucket("tm-gtfs"), | ||
) | ||
feed = archive.get_latest_feed() | ||
feed.use_compact_only() | ||
feed.download_or_build() | ||
session = feed.create_sqlite_session(compact=True) | ||
lines_by_id = index_by( | ||
session.query(Line).all(), | ||
lambda line: line.line_id, | ||
) | ||
all_routes_with_line_ids = [ | ||
route | ||
for route in session.query(Route).all() | ||
if route.line_id | ||
and route.line_id not in IGNORE_LINE_IDS | ||
and (not include_only_line_ids or route.line_id in include_only_line_ids) | ||
] | ||
return bucket_by(all_routes_with_line_ids, lambda route: lines_by_id[route.line_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.