-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathviewer.py
1510 lines (1390 loc) · 81.3 KB
/
viewer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Minimalistic video player that allows visualization and easier interpretation of FAR-VVD results.
"""
import argparse
import difflib
import itertools
import logging
import math
import os
import re
import sys
import time
import uuid
from utils import (
ToolTip,
draw_bbox,
parse_timestamp,
read_metafile,
seconds2timestamp,
split_sentences,
timestamp2seconds,
timestamp2srt,
write_metafile
)
from typing import Any, Dict, List, Optional
import cv2 as cv
import PIL.Image
import PIL.ImageTk
import tkinter as tk
from stream import VideoCaptureThread
__NAME__ = os.path.splitext(os.path.split(__file__)[-1])[0]
LOGGER = logging.getLogger(__NAME__)
LOGGER.setLevel(logging.INFO)
_handler = logging.StreamHandler()
_formatter = logging.Formatter(fmt="[%(asctime)s] %(levelname)-10.10s [%(threadName)s][%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
_handler.setFormatter(_formatter)
LOGGER.addHandler(_handler)
class VideoResultPlayerApp(object):
"""
Builds and runs the player by looping video frames and handling events.
"""
version = "1.5.2" # updated automatically from bump version
# flags and control
error = False
# video information
video = None
video_scale = 1.0
video_width = None
video_height = None
video_frame = None
video_duration = None
frame = None
frame_fps = 0
frame_time = 0
frame_queue = 10
frame_delta = None
frame_index = None
frame_count = None
frame_output = None
frame_drop_factor = 4
frame_skip_factor = 1
last_time = 0
next_time = 0
call_cumul_count = 0
call_cumul_value = 0
# metadata references
NO_DATA_TEXT = "<no-metadata>"
NO_MORE_TEXT = "(metadata exhausted)"
NO_DATA_INDEX = None
NO_MORE_INDEX = -1
video_desc_meta = None
video_desc_index = None # type: Optional[int]
video_infer_meta = None
video_infer_indices = None # type: Optional[List[int]]
video_infer_multi = None # type: Optional[List[bool]]
text_annot_meta = None
text_annot_index = None # type: Optional[int]
text_infer_meta = None
text_infer_index = None # type: Optional[int]
mapping_label = None # type: Optional[Dict[str, str]]
mapping_regex = None # type: Optional[Dict[re.Pattern, str]]
# handles to UI elements
window = None
video_viewer = None
video_slider = None
video_desc_label = None
video_desc_scrollY = None
video_desc_textbox = None
video_infer_label = None
video_infer_scrollX = None
video_infer_scrollY = None
video_infer_textbox = None
text_annot_label = None
text_annot_scrollX = None
text_annot_scrollY = None
text_annot_textbox = None
snapshot_button = None
checkbox_regions = None
checkbox_regions_central = None
display_regions = None
display_regions_central = None
display_colors = None
play_button = None
play_state = True
play_label = None
play_text = None
font_header = ("Helvetica", 16, "bold")
font_code = ("Courier", 12, "normal")
font_normal = ("Times", 12, "normal")
font_code_tag = "code"
font_normal_tag = "normal"
# shared metadata keys
ref_section = "references"
ref_key = "id" # chosen specifically to match existing field in VD actors/scenes
actors_key = "actors"
scenes_key = "scenes"
vd_key = "video_description"
ta_key = "text_annotation"
ti_key = "text_inference"
vi_key = "video_inference"
ts_key = "start_ms"
te_key = "end_ms"
precision = 2
def __init__(self, video_file, video_description, video_inferences, text_annotations, text_inferences,
text_auto=None, merged_metadata_input=None, merged_metadata_output=None,
mapping_file=None, vd_subtitles=None, use_references=False, output=None,
scale=1.0, queue_size=10, frame_drop_factor=4, frame_skip_factor=1):
if video_file is not None:
self.video_source = os.path.abspath(video_file)
if not os.path.isfile(video_file):
raise ValueError("Cannot find video file: [{}]".format(video_file))
LOGGER.info("Using video file: [%s]", video_file)
# use video name as best title minimally available, adjust after if possible with metadata
self.video_title = os.path.splitext(os.path.split(video_file)[-1])[0]
self.frame_output = output
self.video_scale = scale
if scale <= 0:
raise ValueError("Invalid scaling must be greater than 0.")
if queue_size > 1:
LOGGER.debug("Setting queue size: %s", queue_size)
self.frame_queue = queue_size
if frame_drop_factor > 1:
LOGGER.debug("Setting frame drop factor: %s", frame_drop_factor)
self.frame_drop_factor = frame_drop_factor
if frame_skip_factor > 1:
LOGGER.debug("Setting frame skip factor: %s", frame_skip_factor)
self.frame_skip_factor = frame_skip_factor
self.setup_player()
self.setup_window()
self.setup_colors()
valid_meta = self.setup_metadata(video_description, video_inferences, text_annotations, text_inferences,
text_auto, merged_metadata_input, merged_metadata_output,
mapping_file, use_references)
if vd_subtitles and not self.video_desc_meta:
LOGGER.error("Requested SRT generation without VD metadata.")
raise ValueError("Cannot generate SRT output without any VD metadata.")
if vd_subtitles:
self.generate_srt(vd_subtitles)
if not valid_meta:
return
if video_file is None:
LOGGER.info("No video to display")
return
self.update_metadata(seek=True)
self.run()
def run(self):
self.update_video() # after called once, update method will call itself with delay to loop frames
self.window.mainloop() # blocking
LOGGER.log(logging.INFO if self.error else logging.ERROR, "Exit")
def setup_player(self):
LOGGER.info("Creating player...")
self.video = VideoCaptureThread(self.video_source, queue_size=self.frame_queue).start()
self.frame_index = 0
self.frame_time = 0.0
real_fps = self.video.get(cv.CAP_PROP_FPS)
self.frame_fps = round(real_fps)
self.frame_delta = 1. / float(real_fps) * 1000.
self.frame_count = int(self.video.get(cv.CAP_PROP_FRAME_COUNT))
self.video_duration = self.frame_delta * self.frame_count
self.video_width = round(self.video.get(cv.CAP_PROP_FRAME_WIDTH))
self.video_height = round(self.video.get(cv.CAP_PROP_FRAME_HEIGHT))
expected_width = round(self.video_width * self.video_scale)
if expected_width < 480:
new_scale = 480. / float(self.video_width)
LOGGER.warning("Readjusting video scale [%.3f] to [%.3f] to ensure minimal width [480px].",
self.video_scale, new_scale)
self.video_scale = new_scale
def setup_window(self):
LOGGER.info("Creating window...")
display_width = round(self.video_width * self.video_scale)
display_height = round(self.video_height * self.video_scale)
self.window = tk.Tk()
self.window.title("Video Result Viewer: {}".format(self.video_title))
self.window.attributes("-fullscreen", False)
self.window.bind("<F11>",
lambda _: self.window.attributes("-fullscreen", not self.window.attributes("-fullscreen")))
self.window.bind("<Escape>", lambda _: self.window.attributes("-fullscreen", False))
self.window.bind("<space>", lambda _: self.toggle_playing())
padding = 5
split_main_left_right = (4, 2) # number of columns for left/right distribution
split_left_top_bottom = (4, 1) # number of rows for left-size distribution of top/bottom
split_right_top_bottom = (1, 4) # number of rows for right-size distribution of top/bottom
panel_video_viewer = tk.Frame(self.window, padx=padding, pady=padding)
panel_video_viewer.grid(row=0, column=0,
rowspan=split_main_left_right[0], columnspan=split_left_top_bottom[0], sticky=tk.NSEW)
panel_video_infer = tk.Frame(self.window, padx=padding, pady=padding)
panel_video_infer.grid(row=0, column=split_main_left_right[0] + 1,
rowspan=split_right_top_bottom[0], columnspan=split_main_left_right[1], sticky=tk.NSEW)
panel_video_desc = tk.Frame(self.window, padx=padding, pady=padding)
panel_video_desc.grid(row=split_left_top_bottom[0] + 1, column=0,
rowspan=split_left_top_bottom[1], columnspan=split_main_left_right[0], sticky=tk.NSEW)
panel_text_annot = tk.Frame(self.window, padx=padding, pady=padding)
panel_text_annot.grid(row=split_right_top_bottom[0] + 1, column=split_main_left_right[0] + 1,
rowspan=split_right_top_bottom[1], columnspan=split_main_left_right[1], sticky=tk.NSEW)
self.window.grid_columnconfigure(0, weight=0)
self.window.grid_columnconfigure(split_main_left_right[0] + 1, weight=1)
self.window.grid_rowconfigure(0, weight=0)
self.window.grid_rowconfigure(split_left_top_bottom[0] + 1, weight=1)
self.window.grid_rowconfigure(split_right_top_bottom[0] + 1, weight=1)
# Create a canvas that can fit the above video source size
self.video_viewer = tk.Canvas(panel_video_viewer, width=display_width, height=display_height)
self.video_viewer.pack(anchor=tk.NW, fill=tk.BOTH, expand=True)
# adjust number of labels displayed on slider with somewhat dynamic amount based on video display scaling
slider_interval = self.frame_count // round(10 * self.video_scale)
slider_elements = self.frame_count // slider_interval
slider_interval = self.frame_count // (slider_elements if slider_elements % 2 else slider_elements + 1)
self.video_slider = tk.Scale(panel_video_viewer, from_=0, to=self.frame_count - 1, length=display_width,
tickinterval=slider_interval, orient=tk.HORIZONTAL,
repeatinterval=1, repeatdelay=1, command=self.seek_frame)
self.video_slider.bind("<Button-1>", self.trigger_seek)
self.video_slider.pack(side=tk.TOP, anchor=tk.NW, expand=True)
self.play_state = True
self.play_text = tk.StringVar()
self.play_text.set("PAUSE")
self.play_button = tk.Button(panel_video_viewer, width=20, padx=padding, pady=padding,
textvariable=self.play_text, command=self.toggle_playing)
self.play_button.pack(side=tk.LEFT, anchor=tk.NW)
self.snapshot_button = tk.Button(panel_video_viewer, text="Snapshot",
width=20, padx=padding, pady=padding, command=self.snapshot)
self.snapshot_button.pack(side=tk.LEFT, anchor=tk.NW)
txt_display_regions = "Display video inference regions"
txt_display_central = "Display only central regions"
checkbox_label_width = max(len(txt_display_central), len(txt_display_regions))
self.checkbox_regions = tk.Frame(panel_video_viewer)
self.display_regions = tk.IntVar(value=1)
checkbox_regions_label = tk.Label(self.checkbox_regions, justify=tk.LEFT, anchor=tk.W,
width=checkbox_label_width, text=txt_display_regions)
checkbox_regions_label.grid(row=0, column=1)
checkbox_regions_check = tk.Checkbutton(self.checkbox_regions, variable=self.display_regions)
checkbox_regions_check.grid(row=0, column=0)
ToolTip(checkbox_regions_check, justify=tk.LEFT,
text="When displayed, dashed regions represent upcoming/passed central key frames.\n"
"Filled region box indicate current time is close to center key frame of video segment metadata.\n"
"This is because regions are defined only for the central key frame of each segment, rather than\n"
"following objects over each frame.")
self.display_regions_central = tk.IntVar(value=1)
checkbox_central_label = tk.Label(self.checkbox_regions, justify=tk.LEFT, anchor=tk.W,
width=checkbox_label_width, text=txt_display_central)
checkbox_central_label.grid(row=1, column=1)
checkbox_central_check = tk.Checkbutton(self.checkbox_regions, variable=self.display_regions_central)
checkbox_central_check.grid(row=1, column=0)
ToolTip(checkbox_central_check, justify=tk.LEFT,
text="When enabled, only full bounding boxes around the video segment central time are displayed.\n"
"Otherwise, both upcoming/passed center key (dashed) and central bounding boxes (full)\n"
"are displayed.")
self.checkbox_regions.pack(side=tk.RIGHT, anchor=tk.SE)
self.video_desc_label = tk.Label(panel_video_desc, text="Video Description Metadata",
font=self.font_header, justify=tk.LEFT, anchor=tk.W)
self.video_desc_label.pack(side=tk.TOP, fill=tk.X)
video_desc_xy_scroll_box = tk.Frame(panel_video_desc, padx=0, pady=0)
video_desc_xy_scroll_box.pack(fill=tk.BOTH, expand=True)
self.video_desc_textbox = tk.Text(video_desc_xy_scroll_box, height=10, wrap=tk.WORD)
self.video_desc_scrollY = tk.Scrollbar(video_desc_xy_scroll_box, command=self.video_desc_textbox.yview)
self.video_desc_textbox.configure(yscrollcommand=self.video_desc_scrollY.set)
self.video_desc_textbox.tag_configure(self.font_code_tag, font=self.font_code)
self.video_desc_textbox.tag_configure(self.font_normal_tag, font=self.font_normal)
self.video_desc_textbox.grid(row=0, column=0, sticky=tk.NSEW)
self.video_desc_scrollY.grid(row=0, column=1, sticky=tk.NS)
video_desc_xy_scroll_box.grid_rowconfigure(0, weight=1)
video_desc_xy_scroll_box.grid_columnconfigure(0, weight=1)
self.update_video_desc()
self.video_infer_label = tk.Label(panel_video_infer, text="Video Inference Metadata",
font=self.font_header, justify=tk.LEFT, anchor=tk.W)
self.video_infer_label.pack(side=tk.TOP, fill=tk.X)
video_infer_xy_scroll_box = tk.Frame(panel_video_infer, padx=0, pady=0)
video_infer_xy_scroll_box.pack(fill=tk.BOTH, expand=True)
self.video_infer_textbox = tk.Text(video_infer_xy_scroll_box, wrap=tk.NONE)
self.video_infer_scrollX = tk.Scrollbar(video_infer_xy_scroll_box, orient=tk.HORIZONTAL,
command=self.video_infer_textbox.xview)
self.video_infer_scrollY = tk.Scrollbar(video_infer_xy_scroll_box, orient=tk.VERTICAL,
command=self.video_infer_textbox.yview)
self.video_infer_textbox.configure(xscrollcommand=self.video_infer_scrollX.set,
yscrollcommand=self.video_infer_scrollY.set)
self.video_infer_textbox.tag_configure(self.font_code_tag, font=self.font_code)
self.video_infer_textbox.tag_configure(self.font_normal_tag, font=self.font_normal)
self.video_infer_textbox.grid(row=0, column=0, sticky=tk.NSEW)
self.video_infer_scrollY.grid(row=0, column=1, sticky=tk.NS)
self.video_infer_scrollX.grid(row=1, column=0, sticky=tk.EW)
video_infer_xy_scroll_box.grid_rowconfigure(0, weight=1)
video_infer_xy_scroll_box.grid_columnconfigure(0, weight=1)
self.update_video_infer()
self.text_annot_label = tk.Label(panel_text_annot, text="Text Annotation Metadata",
font=self.font_header, justify=tk.LEFT, anchor=tk.W)
self.text_annot_label.pack(side=tk.TOP, fill=tk.X)
text_annot_xy_scroll_box = tk.Frame(panel_text_annot, padx=0, pady=0)
text_annot_xy_scroll_box.pack(fill=tk.BOTH, expand=True)
self.text_annot_textbox = tk.Text(text_annot_xy_scroll_box, wrap=tk.NONE)
self.text_annot_scrollX = tk.Scrollbar(text_annot_xy_scroll_box, orient=tk.HORIZONTAL,
command=self.text_annot_textbox.xview)
self.text_annot_scrollY = tk.Scrollbar(text_annot_xy_scroll_box, orient=tk.VERTICAL,
command=self.text_annot_textbox.yview)
self.text_annot_textbox.configure(xscrollcommand=self.text_annot_scrollX.set,
yscrollcommand=self.text_annot_scrollY.set)
self.text_annot_textbox.tag_configure(self.font_code_tag, font=self.font_code)
self.text_annot_textbox.tag_configure(self.font_normal_tag, font=self.font_normal)
self.text_annot_textbox.grid(row=0, column=0, sticky=tk.NSEW)
self.text_annot_scrollY.grid(row=0, column=1, sticky=tk.NS)
self.text_annot_scrollX.grid(row=1, column=0, sticky=tk.EW)
text_annot_xy_scroll_box.grid_rowconfigure(0, weight=1)
text_annot_xy_scroll_box.grid_columnconfigure(0, weight=1)
self.update_text_annot()
def trigger_seek(self, event):
coord_min = self.video_slider.coords(0)
coord_max = self.video_slider.coords(self.frame_count)
if self.video_slider.identify(event.x, event.y) == "slider":
return # ignore event when clicking directly on the slider
# wait for seek to complete to resume frame display
# (avoids random flickers of the displayed metadata)
self.play_state = False
# find and apply seek location from mouse click
if event.x <= coord_min[0]:
index = 0
elif event.x >= coord_max[0]:
index = self.frame_count - 1
else:
ratio = float(self.frame_count) / float(coord_max[0] - coord_min[0])
index = round((event.x - coord_min[0]) * ratio)
while index % self.frame_skip_factor:
index += 1
LOGGER.debug("Seek frame %8s from click event (%s, %s) between [%s, %s]",
index, event.x, event.y, coord_min, coord_max)
self.seek_frame(index)
self.play_state = True # resume
def toggle_playing(self):
if self.play_state:
self.play_text.set("PLAY")
LOGGER.debug("Video paused.")
else:
self.play_text.set("PAUSE")
LOGGER.debug("Video resume.")
self.play_state = not self.play_state
def update_video_desc(self, metadata=None, indices=None):
self.video_desc_textbox.delete("1.0", tk.END)
if not metadata or not indices:
text = self.NO_DATA_TEXT
elif indices[0] == self.NO_DATA_INDEX:
text = self.NO_DATA_TEXT
elif indices[0] == self.NO_MORE_INDEX:
text = self.NO_MORE_TEXT
else:
# only one dimension for this kind of annotation
index = indices[0]
metadata = metadata[0][index]
# display plain video description text
entry = "(index: {}, start: {:.2f}, end: {:.2f})".format(index, metadata["start"], metadata["end"])
text = "{}\n\n{}".format(entry, metadata["vd"])
self.video_desc_textbox.insert(tk.END, text, self.font_normal_tag)
self.video_desc_textbox.insert(tk.END, "", self.font_code_tag)
def format_video_infer(self, number, index, metadata, multi):
"""
Format a single video inference metadata file into lines to be displayed.
:param number: index of the metadata list (in case of multiple provided).
:param index: index of the current entry within the corresponding metadata list.
:param metadata: metadata list corresponding to number where index entry can be retrieved.
:param multi: index of multi-predictions regions of entry if applicable (-1 if overall prediction on sequence).
"""
template = "(file: {}, index: {})"
if index == self.NO_MORE_INDEX:
return [template.format(number, len(metadata)), self.NO_MORE_TEXT]
if index == self.NO_DATA_INDEX:
return [template.format(number, "n/a"), self.NO_DATA_TEXT]
meta = metadata[index]
info = ""
entry = template.format(number, index)
times = "(start: {:.2f}, end: {:.2f})".format(meta["start"], meta["end"])
header = "[Score] [Classes]"
if multi >= 0:
meta = meta["regions"][multi]
info = str(tuple(meta["bbox"]))
values = ["[{:.2f}] {}".format(s, c)
for c, s in zip(meta["classes"], meta["scores"])]
return [entry, times, info, "", header] + values
@staticmethod
def flatten_video_meta(indices, metadata, regions):
"""
Flattens 2D lists of predictions to 1D for rendering.
First dimension is along the number of provided video-inference files (length of :paramref:`indices`).
Second dimension are along the number of regions with predictions within each of those files.
If a file was defined with ``multi_predictions``, the number of (variable) predictions sets retrieved
per bounding box at the given index are flattened as if they were provided with individual files.
[file-1 predictions, file-2 multi-predictions-1, file-2 multi-predictions-2, file-3 predictions, ...]
Each of the above predictions represent a set of [Top-K] classifications scores/classes.
:param indices: list of indices of the current predictions set for each of the provided files
:param metadata: parsed metadata list of predictions for each of the provided files.
:param regions: boolean indicator for each file of whether it is formatted as single- or multi-predictions.
:return: tuple of flattened (file indices, index of predictions set, corresponding metadata, region index)
"""
multi_indices = [len(metadata[n][indices[n]]["regions"]) if regions[n] else -1 for n in range(len(metadata))]
flatten_metadata = []
flatten_indices = []
flatten_number = []
flatten_regions = []
for i, count in enumerate(multi_indices):
if count < 0:
flatten_metadata.append(metadata[i])
flatten_indices.append(indices[i])
flatten_number.append(i)
flatten_regions.append(-1)
else:
flatten_metadata.extend([metadata[i]] * count)
flatten_indices.extend([indices[i]] * count)
flatten_number.extend([i] * count)
flatten_regions.extend(list(range(count)))
return flatten_number, flatten_indices, flatten_metadata, flatten_regions
def update_video_infer(self, metadata=None, indices=None):
"""
Format video inference metadata entries side-by-side from N sources.
"""
self.video_infer_textbox.delete("1.0", tk.END)
if not metadata or not indices:
text = self.NO_DATA_TEXT
else:
text = ""
meta_lines = [
self.format_video_infer(number, index, meta, multi)
for (number, index, meta, multi)
in zip(*self.flatten_video_meta(indices, metadata, self.video_infer_multi))
]
# display lines ordered from top-1 to lowest top-k, with possibility variable amounts for each
max_lines = max([len(lines) for lines in meta_lines])
for line_index in range(max_lines):
for meta in meta_lines:
line = meta[line_index] if line_index < len(meta) else ""
# reasonable padding to align columns, adjust if class names are too long to display
text += "{:<32s}".format(line)
text += "\n"
self.video_infer_textbox.insert(tk.END, "", self.font_normal_tag)
self.video_infer_textbox.insert(tk.END, text, self.font_code_tag)
def update_text_annot(self, metadata=None, indices=None):
self.text_annot_textbox.delete("1.0", tk.END)
if not metadata or not indices:
text = self.NO_DATA_TEXT
elif indices[0] == self.NO_DATA_INDEX:
text = self.NO_DATA_TEXT
elif indices[0] == self.NO_MORE_INDEX:
text = self.NO_MORE_TEXT
else:
# only one dimension for this kind of annotation
index = indices[0]
metadata = metadata[0][index]
# update displayed metadata as text table
annotations = metadata["annotations"]
fmt = " {:<16s} | {:<24s} | {:<16s}"
fields = ["POS", "type", "lemme"]
header = fmt.format(*fields)
entry = "(index: {}, start: {:.2f}, end: {:.2f})".format(index, metadata["start"], metadata["end"])
text = "{}\n\n{}\n{}\n".format(entry, header, "_" * len(header))
for i, annot in enumerate(annotations):
text += "\n[{}]: {}\n".format(i, annot["sentence"])
tokens = annot.get("words", annot.get("tokens", [])) # pre/post app version 1.x
for item in tokens:
if "POS" in fields and "pos" in item:
fields[0] = "pos" # v3/v4 is lowercase
if "type" in fields and "type" not in item:
fields[1] = "iob" # v3/v4 removed type
if "iob" in fields:
item = dict(item) # copy to edit and leave original intact
item["iob"] = ", ".join(item["iob"]) # can have multiple annotations
text += "\n" + fmt.format(*[item[f] for f in fields])
self.text_annot_textbox.insert(tk.END, "", self.font_normal_tag)
self.text_annot_textbox.insert(tk.END, text, self.font_code_tag)
def update_metadata(self, seek=False):
def update_meta(meta_container, meta_index, meta_updater):
"""
Updates the view element with the next metadata if the time for it to change was reached.
If seek was requested, searches from the start to find the applicable metadata.
:param meta_container: all possible metadata entries, assumed ascending pre-ordered by 'ts' key.
:param meta_index: active metadata index
:param meta_updater: method that updates the view element for the found metadata entry
:return: index of updated metadata or already active one if time is still applicable for current metadata
"""
# update only if metadata container entries are available
if meta_container or meta_index == self.NO_DATA_INDEX:
# convert containers to 2D list regardless of original inputs
if not isinstance(meta_index, list):
meta_index = [meta_index]
meta_container = [meta_container]
if not isinstance(meta_container[0], list):
meta_container = [meta_container]
must_update = False
computed_indices = []
for i, index in enumerate(meta_index):
current_index = 0 if seek else index
updated_index = current_index # if nothing needs to change (current is still valid for timestamp)
index_total = len(meta_container[i])
if seek:
# search the earliest index that provides metadata within the new time
must_update = True
updated_index = self.NO_MORE_INDEX # default if not found
for idx in range(index_total):
meta = meta_container[i][idx]
if meta[self.ts_key] >= self.frame_time:
updated_index = idx
break
else:
# validate meta is within time range of last entry, or out of scope
if meta_container[i][updated_index][self.te_key] >= self.frame_time:
updated_index = len(meta_container[i]) - 1
else:
# if next index exceeds the list, entries are exhausted
if current_index == self.NO_MORE_INDEX or current_index >= index_total:
computed_indices.append(self.NO_MORE_INDEX) # set for following iterations
must_update = current_index == self.NO_MORE_INDEX # updated last iteration
continue
# otherwise bump to next one if timestamp of the current is passed
current_meta = meta_container[i][current_index] # type: dict
if self.frame_time > current_meta[self.te_key]:
updated_index = current_index + 1
# apply change of metadata, update all stack of metadata type if any must be changed
if current_index < index_total - 1 and current_index != updated_index:
must_update = True
computed_indices.append(updated_index)
if must_update:
meta_updater(meta_container, computed_indices)
return computed_indices
return self.NO_DATA_INDEX
self.video_desc_index = update_meta(self.video_desc_meta, self.video_desc_index, self.update_video_desc)
self.video_infer_indices = update_meta(self.video_infer_meta, self.video_infer_indices, self.update_video_infer)
self.text_annot_index = update_meta(self.text_annot_meta, self.text_annot_index, self.update_text_annot)
def display_frame_info(self, frame, current_fps, average_fps):
"""
Displays basic information on the frame.
"""
text_offset = (10, 25)
text_delta = 40
font_scale = 0.5
font_color = (209, 80, 0, 255)
font_stroke = 1
text0 = "Title: {}".format(self.video_title)
text1 = "Original FPS: {}, Process FPS: {:0.2f} ({:0.2f})".format(self.frame_fps, current_fps, average_fps)
cur_sec = self.frame_time / 1000.
tot_sec = self.video_duration / 1000.
cur_hms = time.strftime("%H:%M:%S", time.gmtime(cur_sec))
tot_hms = time.strftime("%H:%M:%S", time.gmtime(tot_sec))
text2 = "Time: {:0>.2f}/{:0.2f} ({}/{}) Frame: {}".format(cur_sec, tot_sec, cur_hms, tot_hms, self.frame_index)
for text_row, text in [(0, text0), (-2, text1), (-1, text2)]:
y_offset = round(text_delta * font_scale) * text_row
if text_row < 0:
y_offset = self.video_height + (y_offset - text_offset[1])
text_pos = (text_offset[0], text_offset[1] + y_offset)
cv.putText(frame, text, text_pos, cv.FONT_HERSHEY_SIMPLEX, font_scale, font_color, font_stroke)
def display_frame_regions(self, frame):
"""
Displays bounding boxes whenever available from video inference metadata.
Because regions defined by bounding boxes only refer to the central key frame ``tc`` within video segments where
the associated action predictions are provided, it is possible that detection regions are not overlapping the
actual visual person/object over the whole video segment. To indicate when the bounding box position overlaps
the central key frame region, use a solid line rectangle when inside of a somewhat close interval ``dt`` from
central ``tc``. For times outside that approximate interval, use a dashed line as potential but possibly
erroneous person/object within the region. For a video segment spanning from ``ts`` to ``te``, the boxes will
be rendered as dashed (``--``) or filled (``==``) as follows:
ts tc-dt tc tc+dt te
|-----------|======|======|-----------|
"""
only_center = self.display_regions_central.get()
for i, video_meta_index in enumerate(self.video_infer_indices):
if self.video_infer_multi[i]:
meta = self.video_infer_meta[i][video_meta_index]
ts = meta["start_ms"]
te = meta["end_ms"]
# skip if region time is not yet reached or is passed
if self.frame_time < ts or te < self.frame_time:
continue
dt = 1000 # ms
tc = ts + (te - ts) / 2
ts_dt = tc - dt
te_dt = tc + dt
dash = 5 # dash spacing if not within ±dt, otherwise filled
if ts_dt <= self.frame_time <= te_dt:
dash = None
if only_center and dash:
continue # skip draw dashed bounding box if not within ±dt when not requested
for r, region in enumerate(meta["regions"]):
tl = (region["bbox"][0], region["bbox"][1])
br = (region["bbox"][2], region["bbox"][3])
color = self.display_colors[r % len(self.display_colors)]
label = "file: {}, bbox: {}".format(i, r)
draw_bbox(frame, tl, br, label, color,
box_thickness=1, box_dash_gap=dash, box_contour=False,
font_thickness=1, font_scale=0.5, font_contour=False)
def update_video(self):
"""
Periodic update of video frame. Self-calling.
"""
self.next_time = time.perf_counter()
# in case of pause button or normal end of video reached, just loop for next event to resume reading video
if not self.play_state or self.frame_index >= self.frame_count:
self.window.after(30, self.update_video)
return
grabbed, frame, self.frame_index, self.frame_time = self.video.read()
if not grabbed:
LOGGER.error("Playback error occurred when reading next video frame.")
self.error = True
return
self.next_time = time.perf_counter()
call_time_delta = self.next_time - self.last_time
self.last_time = self.next_time
# default if cannot be inferred from previous time
wait_time_delta = 1 if call_time_delta > self.frame_delta else max(self.frame_delta - call_time_delta, 1)
# if delays become too big, drop frames to catch up, ignore first that is always big because no previous one
call_msec_delta = call_time_delta * 1000.
call_fps = 1. / call_time_delta
if self.frame_index not in [0, self.frame_count] and self.frame_index % self.frame_skip_factor:
LOGGER.debug("Skip Frame: %8s", self.frame_index)
self.window.after(1, self.update_video)
return
if call_msec_delta > self.frame_delta * self.frame_drop_factor and self.frame_index > 1:
LOGGER.warning("Drop Frame: %8s, Last: %8.2f, Time: %8.2f, Real Delta: %6.2fms, "
"Target Delta: %6.2fms, Call Delta: %6.2fms, Real FPS: %6.2f",
self.frame_index, self.last_time, self.frame_time, wait_time_delta,
self.frame_delta, call_msec_delta, call_fps)
self.window.after(1, self.update_video)
return
self.call_cumul_value += call_time_delta
self.call_cumul_count += 1
call_avg_fps = self.call_cumul_count / self.call_cumul_value
if self.display_regions.get():
self.display_frame_regions(frame) # must call before any resize to employ with original bbox dimensions
frame_dims = (self.video_width, self.video_height)
if self.video_scale != 1:
frame_dims = (round(self.video_width * self.video_scale), round(self.video_height * self.video_scale))
frame = cv.resize(frame, frame_dims, interpolation=cv.INTER_NEAREST)
LOGGER.debug("Show Frame: %8s, Last: %8.2f, Time: %8.2f, Real Delta: %6.2fms, "
"Target Delta: %6.2fms, Call Delta: %6.2fms, Real FPS: %6.2f (%.2f) WxH: %s",
self.frame_index, self.last_time, self.frame_time, wait_time_delta,
self.frame_delta, call_msec_delta, call_fps, call_avg_fps, frame_dims)
self.display_frame_info(frame, call_fps, call_avg_fps)
# note: 'self.frame' is important as without instance reference, it gets garbage collected and is not displayed
self.video_frame = frame # in case of snapshot
self.frame = PIL.ImageTk.PhotoImage(image=PIL.Image.fromarray(cv.cvtColor(frame, cv.COLOR_RGB2BGR)))
self.video_viewer.create_image(0, 0, image=self.frame, anchor=tk.NW)
self.video_slider.set(self.frame_index)
self.update_metadata()
wait_time_delta = 1 # WARNING: just go as fast as possible... tkinter image convert is the limiting factor
self.window.after(math.floor(wait_time_delta), self.update_video)
self.video_viewer.update_idletasks()
def seek_frame(self, frame_index):
"""
Moves the video to the given frame index (if not the next one).
Finally, updates the visual feedback of video progress with the slider.
"""
frame_index = min(int(frame_index), self.frame_count - 1)
# only execute an actual video frame seek() when it doesn't correspond to the next index, since it is already
# fetched by the main loop using read()
# without this, we would otherwise flush the frame queue and reset everything on each frame
if frame_index not in [self.frame_index, self.frame_index - 1]:
LOGGER.debug("Seek frame: %8s (fetching)", frame_index)
self.frame_time = self.video.seek(frame_index)
self.update_metadata(seek=True) # enforce fresh update since everything changed drastically
# update slider position
self.video_slider.set(frame_index)
self.frame_index = frame_index
def snapshot(self):
"""
Save current frame of the video with corresponding metadata.
"""
if self.video_frame is None:
LOGGER.warning("No available frame snapshot to save.")
return
name_clean = self.video_title.replace(" ", "-")
frame_name = "{}_{}_{:.2f}.jpg".format(name_clean, self.frame_index, self.frame_time)
os.makedirs(self.frame_output, exist_ok=True)
frame_path = os.path.join(self.frame_output, frame_name)
cv.imwrite(frame_path, self.video_frame)
LOGGER.info("Saved frame snapshot: [%s]", frame_path)
def generate_srt(self, output):
dir_path, srt_ext = os.path.splitext(output)
if not srt_ext:
srt_file = os.path.join(dir_path, "vd.srt")
else:
dir_path = os.path.dirname(output)
srt_file = output
os.makedirs(dir_path, exist_ok=True)
srt_meta = []
for i, vd in enumerate(self.video_desc_meta):
# For each SRT 'subtitle'
# <magic-number>
# <timestamp start> --> <timestamp end>
# <VD>
# <empty line>
# Timestamps formatted as: HH:MM:SS,fff
srt_times = "{} --> {}".format(timestamp2srt(vd["start_ts"]), timestamp2srt(vd["end_ts"]))
srt_meta.extend([str(i), srt_times, vd["vd"], ""])
write_metafile(srt_meta, srt_file)
def setup_colors(self):
"""
Generate list of colors for bounding boxes display.
"""
# start with most distinct color variations using 0/255 RGB values
self.display_colors = set(itertools.product([0, 255], repeat=3))
# then add the intermediate colors to have more options
half_colors = set(itertools.product([0, 128, 255], repeat=3)) - self.display_colors
# remove black/white which are often hard to see against image
self.display_colors.remove((0, 0, 0))
self.display_colors.remove((255, 255, 255))
# total 25 colors, more than enough for most use cases
self.display_colors = list(self.display_colors) + list(half_colors)
def setup_metadata(self, video_description, video_inferences, text_annotations, text_inferences,
text_auto, merged_metadata_input, merged_metadata_output, mapping_file, use_references):
"""
Parse available metadata files and prepare the first entry according to provided file references.
"""
try:
video_desc_full_meta = video_infer_full_meta = text_annot_full_meta = text_infer_full_meta = None
if merged_metadata_input:
LOGGER.info("Detected input merged metadata. Generation and other metadata sources will be ignored.")
LOGGER.info("Parsing merged metadata file [%s]...", merged_metadata_input)
metadata = read_metafile(merged_metadata_input)
merged, detail = metadata["merged"], metadata["details"]
vi_indices = detail.get("total_{}".format(self.vi_key), [])
vi_default = [None] * len(vi_indices)
self.video_desc_meta = [meta.get(self.vd_key) for meta in merged if meta.get(self.vd_key) is not None]
self.text_annot_meta = [meta.get(self.ta_key) for meta in merged if meta.get(self.ta_key) is not None]
self.text_infer_meta = [meta.get(self.ti_key) for meta in merged if meta.get(self.ti_key) is not None]
self.video_infer_meta = []
self.video_infer_multi = [meta.get("multi_predictions", False) for meta in detail[self.vi_key]]
for i in range(len(vi_indices)):
vi_meta = [meta.get(self.vi_key, vi_default)[i] for meta in merged]
self.video_infer_meta.append([meta for meta in vi_meta if meta is not None])
self.video_desc_index = 0 if detail.get("total_{}".format(self.vd_key), 0) > 0 else None
self.text_annot_index = 0 if detail.get("total_{}".format(self.ta_key), 0) > 0 else None
self.text_infer_index = 0 if detail.get("total_{}".format(self.ti_key), 0) > 0 else None
self.video_infer_indices = [0 if vi_index > 0 else None for vi_index in vi_indices]
return True
if mapping_file:
LOGGER.info("Parsing class mapping file [%s]...", mapping_file)
self.setup_mapper(mapping_file)
if video_description and os.path.isfile(video_description):
LOGGER.info("Parsing video description metadata [%s]...", video_description)
self.video_desc_meta, video_desc_full_meta = self.parse_video_description_metadata(video_description)
self.video_desc_index = 0
elif video_description:
LOGGER.warning("Skipping video description metadata file not found: [%s]", video_description)
if video_inferences and isinstance(video_inferences, list):
video_infer_full_meta = []
for result in video_inferences:
if not os.path.isfile(result):
LOGGER.warning("Skipping video inference metadata file not found: [%s]", result)
continue
LOGGER.info("Parsing video inference metadata [%s]...", result)
meta, full_meta, multi = self.parse_video_inference_metadata(result)
video_infer_full_meta.append(full_meta)
if not self.video_infer_meta:
self.video_infer_meta = []
self.video_infer_indices = []
self.video_infer_multi = []
self.video_infer_meta.append(meta)
self.video_infer_indices.append(0)
self.video_infer_multi.append(multi)
if text_annotations and os.path.isfile(text_annotations):
LOGGER.info("Parsing text annotations metadata [%s]...", text_annotations)
meta, full_meta = self.parse_text_annotations_metadata(text_annotations)
text_method = ("auto" if text_auto else "manual") if isinstance(text_auto, bool) else "undefined"
full_meta["method"] = text_method
self.text_annot_meta, text_annot_full_meta = meta, full_meta
self.text_annot_index = 0
elif text_annotations:
LOGGER.warning("Skipping text annotations metadata file not found: [%s]", text_annotations)
if text_inferences and os.path.isfile(text_inferences):
LOGGER.info("Parsing text inference metadata [%s]...", text_inferences)
self.text_infer_meta, text_infer_full_meta = self.parse_text_inferences_metadata(text_inferences)
elif text_inferences:
LOGGER.warning("Skipping text inferences metadata file not found: [%s]", text_inferences)
if merged_metadata_output:
self.merge_metadata(video_desc_full_meta, video_infer_full_meta,
text_annot_full_meta, text_infer_full_meta,
self.video_desc_meta, self.video_infer_meta,
self.text_annot_meta, self.text_infer_meta,
merged_metadata_output, self.mapping_label, use_references)
except Exception as exc:
self.error = True
LOGGER.error("Invalid formats. One or more metadata file could not be parsed.", exc_info=exc)
return False
return True
def setup_mapper(self, path):
"""
Setup label mapping with regex support.
.. code-block:: YAML
# replaces all labels with literal key match with the corresponding value
"carry/hold (an object)": "carry/hold"
# replaces all labels that have '<words> (an object)' by '<words> <something>'
"(.+) \\(an object\\)": "\\1 <something>"
.. seealso::
- ``label_mapping.yml`` for more details and examples.
"""
self.mapping_label = read_metafile(path)
if self.mapping_label:
LOGGER.info("\n ".join(["Will use label mapping:"] +
["{}: {}".format(k, v) for k, v in self.mapping_label.items()]))
self.mapping_regex = {}
regexes = [(key, val) for key, val in self.mapping_label.items()
if "\\" in key or "\\" in val or any(p in key for p in [".*", ".+", ".?"])]
for key, val in regexes:
key = ("^" if not key.startswith("^") else "") + key + ("$" if not key.endswith("$") else "")
self.mapping_regex[re.compile(key)] = val # replace must be string for literal replace (no group)
def map_label(self, label):
"""
Replace label using provided mapping, with literal string match followed by regex substitution if applicable.
"""
if not self.mapping_label:
return label
mapped = self.mapping_label.get(label)
if mapped:
return mapped
for search, replace in self.mapping_regex.items():
mapped = re.sub(search, replace, label)
if mapped != label:
return mapped
return label
def merge_metadata(self,
video_description_full_metadata, video_inference_full_metadata,
text_annotation_full_metadata, text_inferences_full_metadata,
video_description_time_metadata, video_inference_time_metadata,
text_annotation_time_metadata, text_inference_time_metadata,
merged_path, mapping, use_references):
"""
Merges all provided metadata variations into a common file with timestamp alignment.
.. seealso::
:ref:`Metadata Merging <docs/usage.md#metadata-merging>` for full details and example.
"""
if (
not video_description_full_metadata and not video_description_time_metadata
and not video_inference_full_metadata and not video_inference_time_metadata
and not text_annotation_full_metadata and not text_annotation_time_metadata
and not text_inferences_full_metadata and not text_inference_time_metadata
):
LOGGER.error("No metadata provided, nothing to merge!")
raise ValueError("Missing metadata")
# define generic metadata details without the merged timestamped metadata
metadata = {
"version": self.version,
"details": {self.vd_key: None, self.ta_key: None, self.ti_key: None, self.vi_key: None},
"mapping": mapping,
"merged": [],
}
if use_references:
metadata[self.ref_section] = {
self.vd_key: {}, self.ta_key: {}, self.vi_key: {}, self.ti_key: {},
self.actors_key: {}, self.scenes_key: {}
}
if video_description_full_metadata:
video_description_full_metadata.pop("standard_vd_metadata", None)
video_description_full_metadata.pop("augmented_vd_metadata", None)
metadata["details"][self.vd_key] = video_description_full_metadata
if text_annotation_full_metadata:
text_annotation_full_metadata.pop("data", None)
metadata["details"][self.ta_key] = text_annotation_full_metadata
if text_inferences_full_metadata:
text_inferences_full_metadata.pop("data", None)
metadata["details"][self.ti_key] = text_inferences_full_metadata
if video_inference_full_metadata:
for meta in video_inference_full_metadata:
meta.pop("predictions", None)
metadata["details"][self.vi_key] = video_inference_full_metadata
def ref_link(meta_key, ref_id):
return {"$ref": "#/{}/{}/{}".format(self.ref_section, meta_key, ref_id)}
def make_ref(meta_entry, meta_key):
"""
Generates the JSON link pointing to the metadata entry, and adds it to references if not already done.
"""
meta_entry.setdefault(self.ref_key, str(uuid.uuid4()))
ref_id = meta_entry[self.ref_key]
refs = metadata[self.ref_section] # type: Dict[str, Dict[str, Any]]
if ref_id not in refs[meta_key]:
refs[meta_key][ref_id] = meta_entry
return ref_link(meta_key, ref_id)
if use_references and metadata["details"][self.vd_key] is not None:
# patch all existing VD references to use real JSON '$ref' instead of literal UUID placed as-is
for meta_section in [self.actors_key, self.scenes_key]:
section_items = metadata["details"][self.vd_key]["metadata"].pop(meta_section, []) # noqa
for meta_item in section_items:
make_ref(meta_item, meta_section)
for vd in video_description_time_metadata:
for i in range(len(vd[meta_section])):
vd[meta_section][i] = ref_link(meta_section, vd[meta_section][i])
# lookup timestamped metadata entries and combine them appropriately
vd_index = None
ta_index = None
ti_index = None
vi_indices = []
if video_description_time_metadata:
vd_index = 0
else:
video_description_time_metadata = []
if text_annotation_time_metadata:
ta_index = 0
else:
text_annotation_time_metadata = []
if text_inference_time_metadata:
ti_index = 0
else:
text_inference_time_metadata = []
if video_inference_time_metadata:
vi_indices = [0] * len(video_inference_time_metadata)
else:
video_inference_time_metadata = []