-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeople_counting.py
157 lines (127 loc) · 5.06 KB
/
people_counting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import argparse
import vqpy
def make_parser():
parser = argparse.ArgumentParser('VQPy Demo!')
parser.add_argument('--path', help='path to video')
parser.add_argument(
"--save_folder",
default=None,
help="the folder to save the final result",
)
return parser
class Person(vqpy.VObjBase):
@vqpy.property()
@vqpy.stateful(4)
def direction_vector(self):
tlbr_c, tlbr_p = self.getv('tlbr'), self.getv('tlbr', -5)
if tlbr_c is None or tlbr_p is None:
return None
center_c = (tlbr_c[:2] + tlbr_c[2:]) / 2
center_p = (tlbr_p[:2] + tlbr_p[2:]) / 2
diff = center_c - center_p
return int(diff[0]), int(diff[1])
@vqpy.property()
@vqpy.stateful(4)
def direction(self):
def denoise(target, reference):
THRESHOLD = 10
if target != 0 and reference / target >= THRESHOLD:
target = 0
return target
def get_name(value, pos_name, neg_name):
if value > 0:
result = pos_name
elif value < 0:
result = neg_name
else:
result = ""
return result
def get_center(tlbr):
return (tlbr[:2] + tlbr[2:]) / 2
def most_frequent(List):
from collections import Counter
occurence_count = Counter(List)
return occurence_count.most_common(1)[0][0]
hist_len = 5
tlbr_past = [
self.getv("tlbr", i) for i in range(-(1 + hist_len), -1)
]
for value in tlbr_past:
if value is None:
return None
centers = list(map(get_center, tlbr_past))
diffs = [centers[i+1] - centers[i] for i in range(hist_len - 1)]
diff_xs = [denoise(diff[0], diff[1]) for diff in diffs]
diff_ys = [denoise(diff[1], diff[0]) for diff in diffs]
horizontal = most_frequent([get_name(diff_x, "right", "left")
for diff_x in diff_xs])
vertical = most_frequent([get_name(diff_y, "bottom", "top")
for diff_y in diff_ys])
direction = vertical + horizontal
if direction == "":
direction = None
return direction
class CountPersonOnCrosswalk(vqpy.QueryBase):
@staticmethod
def set_output_configs() -> vqpy.query.OutputConfig:
return vqpy.query.OutputConfig(
# output_frame_vobj_num=True,
output_total_vobj_num=True
)
@staticmethod
def setting() -> vqpy.VObjConstraint:
CROSSWALK_REGION_1 = [(731, 554), (963, 564), (436, 1076), (14, 1076)]
CROSSWALK_REGION_2 = [(1250, 528), (1292, 473),
(1839, 492), (1893, 547)]
CROSSWALK_REGIONS = [CROSSWALK_REGION_1, CROSSWALK_REGION_2]
filter_cons = {"__class__": lambda x: x == Person,
"bottom_center": vqpy.query.utils.within_regions(
CROSSWALK_REGIONS
)}
select_cons = {"track_id": None,
}
return vqpy.VObjConstraint(filter_cons=filter_cons,
select_cons=select_cons,
filename='on_crosswalk')
class CountPersonHeadLeft(CountPersonOnCrosswalk):
@staticmethod
def set_output_configs() -> vqpy.query.output_config.OutputConfig:
return vqpy.query.output_config.OutputConfig(
# output_frame_vobj_num=True,
output_total_vobj_num=True
)
@staticmethod
def setting() -> vqpy.VObjConstraint:
filter_cons = {"direction": lambda x: "left" in x}
select_cons = {"track_id": None,
"direction": None,
"direction_vector": None
}
return vqpy.VObjConstraint(filter_cons=filter_cons,
select_cons=select_cons,
filename='head_left')
class CountPersonHeadRight(CountPersonOnCrosswalk):
@staticmethod
def set_output_configs() -> vqpy.query.output_config.OutputConfig:
return vqpy.query.output_config.OutputConfig(
# output_frame_vobj_num=True,
output_total_vobj_num=True
)
@staticmethod
def setting() -> vqpy.VObjConstraint:
filter_cons = {"direction": lambda x: "right" in x}
select_cons = {"track_id": None,
"direction": None,
"direction_vector": None
}
return vqpy.VObjConstraint(filter_cons=filter_cons,
select_cons=select_cons,
filename='head_right')
if __name__ == '__main__':
args = make_parser().parse_args()
vqpy.launch(cls_name=vqpy.COCO_CLASSES,
cls_type={"person": Person},
tasks=[CountPersonHeadLeft(), CountPersonHeadRight()],
video_path=args.path,
save_folder=args.save_folder,
)