-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathwindows.py
executable file
·398 lines (325 loc) · 17.8 KB
/
windows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# -*- coding: utf-8 -*-
# !/usr/bin/python
# from typing import List, Tuple
import numpy as np
from filterpy.common import Q_discrete_white_noise
from filterpy.kalman import KalmanFilter, dot3, logpdf
from scipy.ndimage.filters import gaussian_filter
import cv2
class Window(object):
def __init__(self, level, window_shape, img_shape, x_init, max_frozen_dur):
"""
Tracks a window as used for selecting lane lines in an image.
:param level: Level of the window, as counted from the bottom of the image up.
:param window_shape: (height, width) of the window in pixels.
:param img_shape: (height, width) of the image the window resides in.
:param x_init: Initial x position of the window.
:param max_frozen_dur: The maximum amount of frames a window can continue to be used when frozen (eg when not
found or when measurements are uncertain).
"""
if window_shape[1] % 2 == 0:
raise Exception("width must be odd")
# Image info
self.img_h = img_shape[0]
self.img_w = img_shape[1]
# Window shape
self.height = window_shape[0]
self.width = window_shape[1]
self.y_begin = self.img_h - (level + 1) * self.height # top row of pixels for window
self.y_end = self.y_begin + self.height # one past the bottom row of pixels for window
# Window position
self.x_filtered = x_init
self.y = self.y_begin + self.height // 2
self.level = level
# Detection info
self.filter = WindowFilter(pos_init=x_init)
self.x_measured = None
self.frozen = False
self.detected = False
self.max_frozen_dur = max_frozen_dur
self.frozen_dur = max_frozen_dur + 1
self.undrop_buffer = 1 # Number of calls to unfreeze() needed to go from dropped back to normal.
def x_begin(self, param='x_filtered'):
"""
The leftmost position of the window, relative to the last filtered position or measurement.
:param param: Whether to use the 'x_filtered' or 'x_measured' position.
"""
self.check_x_param(param)
x = getattr(self, param)
return int(max(0, x - self.width // 2))
def x_end(self, param='x_filtered'):
"""
One past the rightmost position of the window, relative to the last filtered position or measurement.
:param param: Whether to use the 'x_filtered' or 'x_measured' position.
"""
self.check_x_param(param)
x = getattr(self, param)
return int(min(x + self.width // 2, self.img_w))
def area(self):
"""Area of the window."""
return self.height * self.width
def freeze(self):
"""Marks the window as frozen, drops it if it's been frozen for too long, and increases filter uncertainty."""
self.frozen = True
self.frozen_dur += 1
self.filter.grow_uncertainty(1)
def unfreeze(self):
"""Marks the window as not frozen and not dropped, reduces frozen counter by 1."""
# Reduce frozen duration to max (plus some buffer)
self.frozen_dur = min(self.frozen_dur, self.max_frozen_dur + 1 + self.undrop_buffer)
self.frozen_dur -= 1
self.frozen_dur = max(0, self.frozen_dur)
# Change states
self.frozen = False
@property
def dropped(self):
return self.frozen_dur > self.max_frozen_dur
def update(self, score_img, x_search_range, min_log_likelihood=-40):
"""
Given a score image and the x search bounds, updates the window position to the likely position of the lane.
If the measurement is deemed suspect for some reason, the update will be rejected and the window will be
'frozen', causing it to stay in place. If the window is frozen beyond its `max_frozen_dur` then it will be
dropped entirely until a non-suspect measurement is made.
The window only searches within its y range defined at initialization.
:param score_img: A score image, where pixel intensity represents where the lane most likely is.
:param x_search_range: The (x_begin, x_end) range the window should search between in the score image.
:param min_log_likelihood: The minimum log likelihood allowed for a measurement before it is rejected.
"""
assert score_img.shape[0] == self.img_h and \
score_img.shape[1] == self.img_w, 'Window not parametrized for this score_img size'
# Apply a column-wise gaussian filter to score the x-positions in this window's search region
# 限制水平查找范围在图里面,即(0, img.w)之间
x_search_range = (max(0, int(x_search_range[0])), min(int(x_search_range[1]), self.img_w))
x_offset = x_search_range[0]
# 这些个滑动窗口的标号是从下开始往上标的,最下面的level是0,更新的窗口从窗口0开始更新
# 每一个滑动窗口的y_bigin是窗口的下线所在的y坐标,y_end是窗口的上线
search_region = score_img[self.y_begin: self.y_end, x_offset: x_search_range[1]] # 刚开始选取了图像的24分之一大小的长方形区域
# 这里是把搜索区域方框20 × 160 的每一个列累加,变成一维的160长度的序列,然后进行高斯变换, 得到了还是一个160大小的序列
# 这里使用的是高斯模糊,会把图像的一些细节信息变得模糊起来,原本集中的几个白点,变成了向两边渐变变化的点
# 这样便于后续提取最大值的点,就是粗车道线的中心点
column_scores = gaussian_filter(np.sum(search_region, axis=0), sigma=self.width / 3.0, truncate=3.0)
# 如果得到的向量中最大数值大于0,代表搜索区域有白色的点,如果没有,则表示当前搜索区域没有线,窗口要冻结
if max(column_scores) != 0:
self.detected = True
# Update measurement
# 选取搜索区域中的最大值所在的x坐标,加上搜索区域的x起始偏置,这样就找到了该搜索区域的车道线中心点的横坐标
self.x_measured = np.argmax(column_scores) + x_offset
# 这里获取了新的滑动窗口的左右边界的x坐标,是根据上一步得到的该搜索范围内检测的车道线最优代表点位置和半个窗口宽度相加减来算的
window_magnitude = \
np.sum(column_scores[self.x_begin('x_measured') - x_offset: self.x_end('x_measured') - x_offset])
# 这里是衡量搜索范围内的噪声大不大,就是说,是不是大部分点都在窗口范围内,在窗口范围外的值比较少,就是好的
noise_magnitude = np.sum(column_scores) - window_magnitude
# 这里是类似于信噪比,越大越好
signal_noise_ratio = \
1.0 * window_magnitude / (window_magnitude + noise_magnitude) if window_magnitude is not 0 else 0
# Filter measurement and set position
if signal_noise_ratio < 0.6 or self.filter.loglikelihood(self.x_measured) < min_log_likelihood:
# Suspect / bad measurement, don't update filter/position
self.freeze()
return
self.unfreeze()
# 卡尔曼滤波
self.filter.update(self.x_measured)
self.x_filtered = self.filter.get_position()
else:
# No signal in search region
self.detected = False
self.freeze()
def get_mask(self, param='x_filtered'):
"""
Returns a masking image of shape (self.img_h, self.img_w) with the pixels occupied by this window set to 1.
:param param: Whether to use the 'x_filtered' or 'x_measured' position of the window.
:return: An image with the pixels occupied by the window set to 1 and all other pixels set to 0.
"""
self.check_x_param(param)
mask = np.zeros((self.img_h, self.img_w))
mask[self.y_begin: self.y_end, self.x_begin(param): self.x_end(param)] = 1
return mask
def pos_xy(self, param='x_filtered'):
"""Returns the (x, y) position of this window."""
self.check_x_param(param)
return getattr(self, param), self.y
def check_x_param(self, param):
assert param == 'x_filtered' or param == 'x_measured', "Invalid position parameter. `param` must be " \
"'x_filtered' or 'x_measured' "
def sliding_window_update(windows, score_img, margin, mode):
"""
Updates each window in a list, constraining their search regions to a marginal distance of the last valid window.
Generally improved upon in `joint_sliding_window_update()`, which is typically recommended instead.
Each window's search region will be centered on the last undropped window position and extend a margin to the
left and right.
:param windows: A list of Window objects.
:param score_img: A score image, where pixel intensity represents where the lane most likely is.
:param margin: The maximum x distance the next window can be placed from the last undropped window.
:param mode: 'left' or 'right' for the lane the Windows are tracking.
"""
search_center = start_sliding_search(windows, score_img, mode)
# Update each window, searching nearby the last undropped window.
for window in windows[1:]:
x_search_range = (search_center - margin, search_center + margin)
window.update(score_img, x_search_range)
if not window.dropped:
search_center = window.x_filtered
def joint_sliding_window_update(windows_left, windows_right, score_img, margin):
"""
Updates Windows from both lists, preventing window crossover and constraining their search regions to a margin.
This improves on `sliding_window_update()` by preventing windows from different lanes from crossing over each other
or detecting the same part of the image.
Each window's search region will be centered on the last undropped window position and extend a margin to the
left and right. In cases where the margins of the left and right lane may overlap, they are truncated to the
halfway point between.
:param windows_left: A list of Window objects for the left lane.
:param windows_right: A list of Window objects for the right lane.
:param score_img: A score image, where pixel intensity represents where the lane most likely is.
:param margin: The maximum x distance the next window can be placed from the last undropped window.
"""
assert len(windows_left) == len(windows_right), "Window lists should be same length. Did you filter already?"
search_centers = [start_sliding_search(windows_left, score_img, 'left'),
start_sliding_search(windows_right, score_img, 'right')]
# cv2.circle(score_img, (int(search_centers[0]), int(search_centers[1])), 5, (255, 0, 0), 2)
# cv2.imwrite('./pic_watch/test.png', score_img)
# Update each window, searching nearby the last undropped window.
for i in range(len(windows_left)):
# Find search range for the left and right
x_search_ranges = [None, None]
for j in [0, 1]:
x_search_ranges[j] = [search_centers[j] - margin, search_centers[j] + margin]
# Fix any crossover
if x_search_ranges[0][1] > x_search_ranges[1][0]:
average = (x_search_ranges[0][1] + x_search_ranges[1][0]) // 2
x_search_ranges[0][1] = average
x_search_ranges[1][0] = average
# Perform update
for j, window in enumerate([windows_left[i], windows_right[i]]):
window.update(score_img, x_search_ranges[j])
if not window.dropped:
search_centers[j] = window.x_filtered
def start_sliding_search(windows, score_img, mode):
assert mode == 'left' or mode == 'right', "Mode not valid."
assert strictly_decreasing([w.y for w in windows]), "Windows not ordered properly. Should start at image bottom"
img_h, img_w = score_img.shape[0:2]
# Update the bottom window
if mode == 'left':
# 开始的水平查找范围是半张图的长度
windows[0].update(score_img, (0, img_w // 2))
elif mode == 'right':
windows[0].update(score_img, (img_w // 2, img_w))
# Find the starting point for our search
if windows[0].dropped:
# Starting window does not exist, find an approximation.
search_region = score_img[2 * img_h // 3:, :] # search bottom 1/3rd of score_img
column_scores = gaussian_filter(np.sum(search_region, axis=0), sigma=windows[0].width / 3.0, truncate=3.0)
if mode == 'left':
search_center = argmax_between(column_scores, 0, img_w // 2)
elif mode == 'right':
search_center = argmax_between(column_scores, img_w // 2, img_w)
assert 'search_center' in locals(), 'No lane was found to start with.'
# TODO: Do something if still no lane is found
else:
# Reuse the position of the bottom window
search_center = windows[0].x_filtered
return search_center
def strictly_decreasing(L):
"""Returns True if elements of L are strictly decreasing."""
return all(x > y for x, y in zip(L, L[1:]))
def argmax_between(arr, begin, end):
"""
Returns the position of the maximal value between indexes `begin` and `end`.
In case of multiple occurrences of the maximum value, the index of the first occurrence is returned.
"""
max_ndx = np.argmax(arr[begin:end]) + begin
return int(max_ndx)
def filter_window_list(windows, remove_frozen=False, remove_dropped=True, remove_undetected=False):
"""
Given a list of Windows, returns a new list with frozen and dropped windows optionally removed.
:param windows: A list of Window objects.
:param remove_frozen: Set True to prevent returning all frozen windows.
:param remove_dropped: Set True to prevent returning all dropped windows.
:param remove_undetected: Set True to prevent returning all undetected windows.
:return: (windows_filtered, args)
windows_filtered: The new list of Windows after filters are applied.
args: The index in `windows` that each window in `windows_filtered` originated from.
"""
windows_filtered = []
args = []
for i, window in enumerate(windows):
# print (window.dropped,window.frozen,window.detected)
if window.dropped and remove_dropped:
continue
if window.frozen and remove_frozen:
continue
if not window.detected and remove_undetected:
continue
windows_filtered.append(window)
args.append(i)
return windows_filtered, args
def window_image(windows, param='x_filtered', color=(0, 255, 0), color_frozen=None, color_dropped=None):
"""
Creates an image with the given `windows` colored in. By default dims frozen windows and hides dropped windows.
:param windows: A List of Windows to image.
:param param: Whether to use the 'x_filtered' or 'x_measured' position of the window.
:param color: Color for each window that is not frozen or dropped.
:param color_frozen: Color for each frozen window.
:param color_dropped: Color for each dropped window.
:return: An image with the windows colored in and all black elsewhere.
"""
if color_frozen is None:
color_frozen = [ch * 0.6 for ch in color]
if color_dropped is None:
color_dropped = [0, 0, 0]
mask = np.zeros((windows[0].img_h, windows[0].img_w, 3))
for window in windows:
if getattr(window, param) is None:
continue
if window.dropped:
color_curr = color_dropped
elif window.frozen:
color_curr = color_frozen
else:
color_curr = color
mask[window.get_mask(param) > 0] = color_curr
return mask.astype('uint8')
class WindowFilter(object):
def __init__(self, pos_init=0.0, meas_variance=50, process_variance=1, uncertainty_init=2 ** 30):
"""
A one dimensional Kalman filter tuned to track the position of a window.
State variable: = [position,
velocity]
:param pos_init: Initial position.
:param meas_variance: Variance of each measurement. Decrease to have the filter chase each measurement faster.
:param process_variance: Variance of each prediction. Decrease to follow predictions more.
:param uncertainty_init: Uncertainty of initial position.
"""
self.kf = KalmanFilter(dim_x=2, dim_z=1)
# State transition function
self.kf.F = np.array([[1., 1],
[0., 0.5]])
# Measurement function
self.kf.H = np.array([[1., 0.]])
# Initial state estimate
self.kf.x = np.array([pos_init, 0])
# Initial Covariance matrix
self.kf.P = np.eye(self.kf.dim_x) * uncertainty_init
# Measurement noise
self.kf.R = np.array([[meas_variance]])
# Process noise
self.kf.Q = Q_discrete_white_noise(dim=2, dt=1, var=process_variance)
def update(self, pos):
"""
Given an estimate x position, uses the kalman filter to estimate the most likely true position of the
lane pixel.
:param pos: measured x position of the pixel
"""
self.kf.predict()
self.kf.update(pos)
def grow_uncertainty(self, mag):
"""Grows state uncertainty."""
for i in range(mag):
# P = FPF' + Q
self.kf.P = self.kf._alpha_sq * dot3(self.kf.F, self.kf.P, self.kf.F.T) + self.kf.Q
def loglikelihood(self, pos):
"""Calculates the likelihood of a measurement given the filter parameters and gaussian assumption."""
self.kf.S = dot3(self.kf.H, self.kf.P, self.kf.H.T) + self.kf.R
return logpdf(pos, np.dot(self.kf.H, self.kf.x), self.kf.S)
def get_position(self):
return self.kf.x[0]