@@ -61,6 +61,18 @@ class InvalidMetadata(Exception):
61
61
pass
62
62
63
63
64
+ class NoScheduleDataSpecified (Exception ):
65
+ pass
66
+
67
+
68
+ class ScheduleDataNotFound (Exception ):
69
+ pass
70
+
71
+
72
+ class NoValidatorResults (Exception ):
73
+ pass
74
+
75
+
64
76
class RTProcessingStep (str , Enum ):
65
77
parse = "parse"
66
78
validate = "validate"
@@ -77,14 +89,6 @@ class RTValidationMetadata(BaseModel):
77
89
gtfs_validator_version : str
78
90
79
91
80
- class NoScheduleDataSpecified (Exception ):
81
- pass
82
-
83
-
84
- class ScheduleDataNotFound (Exception ):
85
- pass
86
-
87
-
88
92
class RTHourlyAggregation (PartitionedGCSArtifact ):
89
93
partition_names : ClassVar [List [str ]] = ["dt" , "hour" , "base64_url" ]
90
94
step : RTProcessingStep
@@ -277,7 +281,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
277
281
.get_url_schedule (self .base64_validation_url )
278
282
)
279
283
except KeyError :
280
- print (
284
+ typer . secho (
281
285
f"no schedule data found for { self .base64_validation_url } on day { day } "
282
286
)
283
287
continue
@@ -287,7 +291,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
287
291
self .fs .get (schedule_extract .path , gtfs_zip )
288
292
return gtfs_zip
289
293
except FileNotFoundError :
290
- print (
294
+ typer . secho (
291
295
f"no schedule file found for { self .base64_validation_url } on day { day } "
292
296
)
293
297
continue
@@ -346,17 +350,17 @@ def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]:
346
350
def get_results_paths (self ) -> Dict [str , GTFSRTFeedExtract ]:
347
351
return {e .get_results_path (): e .extract for e in self .get_extracts ()}
348
352
349
- def get_hashed_results (self ):
353
+ def get_hashed_results (self ) -> Dict [ str , Any ] :
350
354
hashed = {}
351
355
for e in self .get_extracts ():
352
356
if e .has_results ():
353
- hashed [e .hash ()] = e .get_results ()
357
+ hashed [e .hash (). hex () ] = e .get_results ()
354
358
return hashed
355
359
356
- def get_hashes (self ) -> Dict [bytes , List [GTFSRTFeedExtract ]]:
357
- hashed : Dict [bytes , List [GTFSRTFeedExtract ]] = defaultdict (list )
360
+ def get_hashes (self ) -> Dict [str , List [GTFSRTFeedExtract ]]:
361
+ hashed : Dict [str , List [GTFSRTFeedExtract ]] = defaultdict (list )
358
362
for e in self .get_extracts ():
359
- hashed [e .hash ()].append (e .extract )
363
+ hashed [e .hash (). hex () ].append (e .extract )
360
364
return hashed
361
365
362
366
def download (self ):
@@ -507,7 +511,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
507
511
e = ScheduleDataNotFound (
508
512
f"no recent schedule data found for { self .aggregation .extracts [0 ].path } "
509
513
)
510
- print (e )
514
+ typer . secho (e )
511
515
512
516
scope .fingerprint = [
513
517
type (e ),
@@ -571,11 +575,11 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
571
575
for hash , extracts in aggregation_extracts .get_hashes ().items ():
572
576
try :
573
577
records = hashed_results [hash ]
574
- except KeyError as e :
578
+ except KeyError :
575
579
if self .verbose :
576
580
paths = ", " .join (e .path for e in extracts )
577
581
typer .secho (
578
- f"WARNING: no results found for { paths } " ,
582
+ f"WARNING: validator did not produce results for { paths } " ,
579
583
fg = typer .colors .YELLOW ,
580
584
)
581
585
@@ -584,7 +588,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
584
588
RTFileProcessingOutcome (
585
589
step = self .aggregation .step ,
586
590
success = False ,
587
- exception = e ,
591
+ exception = NoValidatorResults ( "No validator output" ) ,
588
592
extract = extract ,
589
593
)
590
594
)
@@ -680,7 +684,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
680
684
except DecodeError as e :
681
685
if self .verbose :
682
686
typer .secho (
683
- f"WARNING: DecodeError for { str (extract .path )} " ,
687
+ f'DecodeError: " { str ( e ) } " thrown when decoding { str (extract .path )} ' ,
684
688
fg = typer .colors .YELLOW ,
685
689
)
686
690
outcomes .append (
@@ -918,13 +922,9 @@ def main(
918
922
# TODO: I dislike having to exclude the records here
919
923
# I need to figure out the best way to have a single type represent the "metadata" of
920
924
# the content as well as the content itself
921
- result .save_content (
922
- fs = get_fs (),
923
- content = "\n " .join (
924
- (json .dumps (make_pydantic_model_bq_safe (o )) for o in result .outcomes )
925
- ).encode (),
926
- exclude = {"outcomes" },
927
- )
925
+ raw = [json .dumps (make_pydantic_model_bq_safe (o )) for o in result .outcomes ]
926
+ content = "\n " .join (raw ).encode ("utf-8" )
927
+ result .save_content (fs = get_fs (), content = content , exclude = {"outcomes" })
928
928
929
929
assert (
930
930
len (outcomes )
0 commit comments