Skip to content

Commit abf1c4c

Browse files
committed
Analyst sees fewer ZLIB errors
* This commit resolves a race condition where all parse_and_validate calls shared the same temporary directory * That contention meant that processes would overwrite the existing GTFS schedule with the same name * This also resulted in an elevated number of skipped protobuf validations * The gtfs-realtime-validator skips protobufs with the same MD5 * The race condition caused elevated MD5 collisions for protobufs Signed-off-by: Doc Ritezel <[email protected]>
1 parent 9ade2c7 commit abf1c4c

File tree

2 files changed

+143
-146
lines changed

2 files changed

+143
-146
lines changed

jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py

Lines changed: 142 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -596,142 +596,142 @@ def parse_and_upload(
596596
def parse_and_validate(
597597
hour: RTHourlyAggregation,
598598
jar_path: Path,
599-
tmp_dir: str,
600599
verbose: bool = False,
601600
pbar=None,
602601
) -> List[RTFileProcessingOutcome]:
603-
with sentry_sdk.push_scope() as scope:
604-
scope.set_tag("config_feed_type", hour.first_extract.config.feed_type)
605-
scope.set_tag("config_name", hour.first_extract.config.name)
606-
scope.set_tag("config_url", hour.first_extract.config.url)
607-
scope.set_context("RT Hourly Aggregation", json.loads(hour.json()))
608-
609-
fs = get_fs()
610-
dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/"
611-
get_with_retry(
612-
fs,
613-
rpath=[
614-
extract.path
615-
for extract in hour.local_paths_to_extract(dst_path_rt).values()
616-
],
617-
lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()),
618-
)
602+
with tempfile.TemporaryDirectory() as tmp_dir:
603+
with sentry_sdk.push_scope() as scope:
604+
scope.set_tag("config_feed_type", hour.first_extract.config.feed_type)
605+
scope.set_tag("config_name", hour.first_extract.config.name)
606+
scope.set_tag("config_url", hour.first_extract.config.url)
607+
scope.set_context("RT Hourly Aggregation", json.loads(hour.json()))
608+
609+
fs = get_fs()
610+
dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/"
611+
get_with_retry(
612+
fs,
613+
rpath=[
614+
extract.path
615+
for extract in hour.local_paths_to_extract(dst_path_rt).values()
616+
],
617+
lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()),
618+
)
619619

620-
if hour.step == RTProcessingStep.validate:
621-
if not hour.extracts[0].config.schedule_url_for_validation:
622-
return [
623-
RTFileProcessingOutcome(
624-
step=hour.step,
625-
success=False,
626-
extract=extract,
627-
exception=NoScheduleDataSpecified(),
620+
if hour.step == RTProcessingStep.validate:
621+
if not hour.extracts[0].config.schedule_url_for_validation:
622+
return [
623+
RTFileProcessingOutcome(
624+
step=hour.step,
625+
success=False,
626+
extract=extract,
627+
exception=NoScheduleDataSpecified(),
628+
)
629+
for extract in hour.extracts
630+
]
631+
632+
try:
633+
first_extract = hour.extracts[0]
634+
extract_day = first_extract.dt
635+
for target_date in reversed(
636+
list(extract_day - extract_day.subtract(days=7))
637+
): # Fall back to most recent available schedule within 7 days
638+
try:
639+
schedule_extract = get_schedule_extracts_for_day(
640+
target_date
641+
)[first_extract.config.base64_validation_url]
642+
643+
scope.set_context(
644+
"Schedule Extract", json.loads(schedule_extract.json())
645+
)
646+
647+
gtfs_zip = download_gtfs_schedule_zip(
648+
fs,
649+
schedule_extract=schedule_extract,
650+
dst_dir=tmp_dir,
651+
pbar=pbar,
652+
)
653+
654+
break
655+
except (KeyError, FileNotFoundError):
656+
print(
657+
f"no schedule data found for {first_extract.path} and day {target_date}"
658+
)
659+
else:
660+
raise ScheduleDataNotFound(
661+
f"no recent schedule data found for {first_extract.path}"
662+
)
663+
664+
return validate_and_upload(
665+
fs=fs,
666+
jar_path=jar_path,
667+
dst_path_rt=dst_path_rt,
668+
tmp_dir=tmp_dir,
669+
hour=hour,
670+
gtfs_zip=gtfs_zip,
671+
verbose=verbose,
672+
pbar=pbar,
628673
)
629-
for extract in hour.extracts
630-
]
631674

632-
try:
633-
first_extract = hour.extracts[0]
634-
extract_day = first_extract.dt
635-
for target_date in reversed(
636-
list(extract_day - extract_day.subtract(days=7))
637-
): # Fall back to most recent available schedule within 7 days
638-
try:
639-
schedule_extract = get_schedule_extracts_for_day(target_date)[
640-
first_extract.config.base64_validation_url
641-
]
675+
# these are the only two types of errors we expect; let any others bubble up
676+
except (ScheduleDataNotFound, subprocess.CalledProcessError) as e:
677+
stderr = None
678+
679+
fingerprint: List[Any] = [
680+
type(e),
681+
# convert back to url manually, I don't want to mess around with the hourly class
682+
base64.urlsafe_b64decode(hour.base64_url.encode()).decode(),
683+
]
684+
if isinstance(e, subprocess.CalledProcessError):
685+
fingerprint.append(e.returncode)
686+
stderr = e.stderr.decode("utf-8")
642687

688+
# get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above
643689
scope.set_context(
644-
"Schedule Extract", json.loads(schedule_extract.json())
690+
"Process", {"stderr": e.stderr.decode("utf-8")[-2000:]}
645691
)
646692

647-
gtfs_zip = download_gtfs_schedule_zip(
648-
fs,
649-
schedule_extract=schedule_extract,
650-
dst_dir=tmp_dir,
693+
# we could also use a custom exception for this
694+
if "Unexpected end of ZLIB input stream" in stderr:
695+
fingerprint.append("Unexpected end of ZLIB input stream")
696+
697+
scope.fingerprint = fingerprint
698+
sentry_sdk.capture_exception(e, scope=scope)
699+
700+
if verbose:
701+
log(
702+
f"{str(e)} thrown for {hour.path}",
703+
fg=typer.colors.RED,
651704
pbar=pbar,
652705
)
653-
654-
break
655-
except (KeyError, FileNotFoundError):
656-
print(
657-
f"no schedule data found for {first_extract.path} and day {target_date}"
706+
if isinstance(e, subprocess.CalledProcessError):
707+
log(
708+
e.stderr.decode("utf-8"),
709+
fg=typer.colors.YELLOW,
710+
pbar=pbar,
711+
)
712+
713+
return [
714+
RTFileProcessingOutcome(
715+
step=hour.step,
716+
success=False,
717+
extract=extract,
718+
exception=e,
719+
process_stderr=stderr,
658720
)
659-
else:
660-
raise ScheduleDataNotFound(
661-
f"no recent schedule data found for {first_extract.path}"
662-
)
721+
for extract in hour.extracts
722+
]
663723

664-
return validate_and_upload(
724+
if hour.step == RTProcessingStep.parse:
725+
return parse_and_upload(
665726
fs=fs,
666-
jar_path=jar_path,
667727
dst_path_rt=dst_path_rt,
668728
tmp_dir=tmp_dir,
669729
hour=hour,
670-
gtfs_zip=gtfs_zip,
671730
verbose=verbose,
672731
pbar=pbar,
673732
)
674733

675-
# these are the only two types of errors we expect; let any others bubble up
676-
except (ScheduleDataNotFound, subprocess.CalledProcessError) as e:
677-
stderr = None
678-
679-
fingerprint: List[Any] = [
680-
type(e),
681-
# convert back to url manually, I don't want to mess around with the hourly class
682-
base64.urlsafe_b64decode(hour.base64_url.encode()).decode(),
683-
]
684-
if isinstance(e, subprocess.CalledProcessError):
685-
fingerprint.append(e.returncode)
686-
stderr = e.stderr.decode("utf-8")
687-
688-
# get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above
689-
scope.set_context(
690-
"Process", {"stderr": e.stderr.decode("utf-8")[-2000:]}
691-
)
692-
693-
# we could also use a custom exception for this
694-
if "Unexpected end of ZLIB input stream" in stderr:
695-
fingerprint.append("Unexpected end of ZLIB input stream")
696-
697-
scope.fingerprint = fingerprint
698-
sentry_sdk.capture_exception(e, scope=scope)
699-
700-
if verbose:
701-
log(
702-
f"{str(e)} thrown for {hour.path}",
703-
fg=typer.colors.RED,
704-
pbar=pbar,
705-
)
706-
if isinstance(e, subprocess.CalledProcessError):
707-
log(
708-
e.stderr.decode("utf-8"),
709-
fg=typer.colors.YELLOW,
710-
pbar=pbar,
711-
)
712-
713-
return [
714-
RTFileProcessingOutcome(
715-
step=hour.step,
716-
success=False,
717-
extract=extract,
718-
exception=e,
719-
process_stderr=stderr,
720-
)
721-
for extract in hour.extracts
722-
]
723-
724-
if hour.step == RTProcessingStep.parse:
725-
return parse_and_upload(
726-
fs=fs,
727-
dst_path_rt=dst_path_rt,
728-
tmp_dir=tmp_dir,
729-
hour=hour,
730-
verbose=verbose,
731-
pbar=pbar,
732-
)
733-
734-
raise RuntimeError("we should not be here")
734+
raise RuntimeError("we should not be here")
735735

736736

737737
@app.command()
@@ -835,37 +835,35 @@ def main(
835835
# gcfs does not seem to play nicely with multiprocessing right now, so use threads :(
836836
# https://github.com/fsspec/gcsfs/issues/379
837837

838-
with tempfile.TemporaryDirectory() as tmp_dir:
839-
with ThreadPoolExecutor(max_workers=threads) as pool:
840-
futures: Dict[Future, RTHourlyAggregation] = {
841-
pool.submit(
842-
parse_and_validate,
843-
hour=hour,
844-
jar_path=jar_path,
845-
tmp_dir=tmp_dir,
846-
verbose=verbose,
838+
with ThreadPoolExecutor(max_workers=threads) as pool:
839+
futures: Dict[Future, RTHourlyAggregation] = {
840+
pool.submit(
841+
parse_and_validate,
842+
hour=hour,
843+
jar_path=jar_path,
844+
verbose=verbose,
845+
pbar=pbar,
846+
): hour
847+
for hour in aggregations_to_process
848+
}
849+
850+
for future in concurrent.futures.as_completed(futures):
851+
hour = futures[future]
852+
if pbar:
853+
pbar.update(1)
854+
try:
855+
outcomes.extend(future.result())
856+
except KeyboardInterrupt:
857+
raise
858+
except Exception as e:
859+
log(
860+
f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}",
861+
err=True,
862+
fg=typer.colors.RED,
847863
pbar=pbar,
848-
): hour
849-
for hour in aggregations_to_process
850-
}
851-
852-
for future in concurrent.futures.as_completed(futures):
853-
hour = futures[future]
854-
if pbar:
855-
pbar.update(1)
856-
try:
857-
outcomes.extend(future.result())
858-
except KeyboardInterrupt:
859-
raise
860-
except Exception as e:
861-
log(
862-
f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}",
863-
err=True,
864-
fg=typer.colors.RED,
865-
pbar=pbar,
866-
)
867-
sentry_sdk.capture_exception(e)
868-
exceptions.append((e, hour.path, traceback.format_exc()))
864+
)
865+
sentry_sdk.capture_exception(e)
866+
exceptions.append((e, hour.path, traceback.format_exc()))
869867

870868
if pbar:
871869
del pbar

jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,9 @@ def test_no_vehicle_positions_for_date():
8888

8989

9090
def test_no_vehicle_positions_for_url():
91-
base64url = "nope"
9291
result = runner.invoke(
9392
app,
94-
["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", base64url],
93+
["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", "nope"],
9594
catch_exceptions=False,
9695
)
9796
assert result.exit_code == 0

0 commit comments

Comments
 (0)