Skip to content

Commit

Permalink
Provide a better message when GTFS-RT validator jar skips a file
Browse files Browse the repository at this point in the history
Signed-off-by: Erika Pacheco <[email protected]>
  • Loading branch information
ohrite committed Nov 26, 2024
1 parent adb083f commit a069890
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 30 deletions.
54 changes: 27 additions & 27 deletions jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ class InvalidMetadata(Exception):
pass


class NoScheduleDataSpecified(Exception):
pass


class ScheduleDataNotFound(Exception):
pass


class NoValidatorResults(Exception):
pass


class RTProcessingStep(str, Enum):
parse = "parse"
validate = "validate"
Expand All @@ -77,14 +89,6 @@ class RTValidationMetadata(BaseModel):
gtfs_validator_version: str


class NoScheduleDataSpecified(Exception):
pass


class ScheduleDataNotFound(Exception):
pass


class RTHourlyAggregation(PartitionedGCSArtifact):
partition_names: ClassVar[List[str]] = ["dt", "hour", "base64_url"]
step: RTProcessingStep
Expand Down Expand Up @@ -277,7 +281,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
.get_url_schedule(self.base64_validation_url)
)
except KeyError:
print(
typer.secho(
f"no schedule data found for {self.base64_validation_url} on day {day}"
)
continue
Expand All @@ -287,7 +291,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
self.fs.get(schedule_extract.path, gtfs_zip)
return gtfs_zip
except FileNotFoundError:
print(
typer.secho(
f"no schedule file found for {self.base64_validation_url} on day {day}"
)
continue
Expand Down Expand Up @@ -346,17 +350,17 @@ def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]:
def get_results_paths(self) -> Dict[str, GTFSRTFeedExtract]:
return {e.get_results_path(): e.extract for e in self.get_extracts()}

def get_hashed_results(self):
def get_hashed_results(self) -> Dict[str, Any]:
hashed = {}
for e in self.get_extracts():
if e.has_results():
hashed[e.hash()] = e.get_results()
hashed[e.hash().hex()] = e.get_results()
return hashed

def get_hashes(self) -> Dict[bytes, List[GTFSRTFeedExtract]]:
hashed: Dict[bytes, List[GTFSRTFeedExtract]] = defaultdict(list)
def get_hashes(self) -> Dict[str, List[GTFSRTFeedExtract]]:
hashed: Dict[str, List[GTFSRTFeedExtract]] = defaultdict(list)
for e in self.get_extracts():
hashed[e.hash()].append(e.extract)
hashed[e.hash().hex()].append(e.extract)
return hashed

def download(self):
Expand Down Expand Up @@ -507,7 +511,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
e = ScheduleDataNotFound(
f"no recent schedule data found for {self.aggregation.extracts[0].path}"
)
print(e)
typer.secho(e)

scope.fingerprint = [
type(e),
Expand Down Expand Up @@ -571,11 +575,11 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
for hash, extracts in aggregation_extracts.get_hashes().items():
try:
records = hashed_results[hash]
except KeyError as e:
except KeyError:
if self.verbose:
paths = ", ".join(e.path for e in extracts)
typer.secho(
f"WARNING: no results found for {paths}",
f"WARNING: validator did not produce results for {paths}",
fg=typer.colors.YELLOW,
)

Expand All @@ -584,7 +588,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
RTFileProcessingOutcome(
step=self.aggregation.step,
success=False,
exception=e,
exception=NoValidatorResults("No validator output"),
extract=extract,
)
)
Expand Down Expand Up @@ -680,7 +684,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
except DecodeError as e:
if self.verbose:
typer.secho(
f"WARNING: DecodeError for {str(extract.path)}",
f'DecodeError: "{str(e)}" thrown when decoding {str(extract.path)}',
fg=typer.colors.YELLOW,
)
outcomes.append(
Expand Down Expand Up @@ -918,13 +922,9 @@ def main(
# TODO: I dislike having to exclude the records here
# I need to figure out the best way to have a single type represent the "metadata" of
# the content as well as the content itself
result.save_content(
fs=get_fs(),
content="\n".join(
(json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes)
).encode(),
exclude={"outcomes"},
)
raw = [json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes]
content = "\n".join(raw).encode("utf-8")
result.save_content(fs=get_fs(), content=content, exclude={"outcomes"})

assert (
len(outcomes)
Expand Down
53 changes: 51 additions & 2 deletions jobs/gtfs-rt-parser-v2/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion jobs/gtfs-rt-parser-v2/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = ""
authors = ["Andrew Vaccaro <[email protected]>"]

[tool.poetry.dependencies]
python = ">=3.8,<3.10"
python = ">=3.8.1,<3.10"
gtfs-realtime-bindings = "0.0.7"
google-auth = "1.32.1"
pathy = {extras = ["gcs"], version = "^0.6.1"}
Expand All @@ -26,6 +26,7 @@ types-protobuf = "^5.28.0.20240924"
types-tqdm = "^4.66.0.20240417"
isort = "^5.13.2"
pytest-env = "^1.1.5"
flake8 = "^7.1.1"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down

0 comments on commit a069890

Please sign in to comment.