-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoperate.py
276 lines (224 loc) · 6.8 KB
/
operate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
from gpiozero import MotionSensor
from gpiozero import Button
import threading
from threading import Timer
import time
import RPi.GPIO as GPIO
import sys
import pigpio
# CV
import cv2
import sys
from tflite_support.task import core
from tflite_support.task import processor
from tflite_support.task import vision
import utils
pir = MotionSensor(17, threshold=0.3)
button = Button(2)
############## CV #######################
def look(model: str, camera_id: int, width: int, height: int, num_threads: int,
enable_edgetpu: bool) -> None:
"""Continuously run inference on images acquired from the camera.
Args:
model: Name of the TFLite object detection model.
camera_id: The camera id to be passed to OpenCV.
width: The width of the frame captured from the camera.
height: The height of the frame captured from the camera.
num_threads: The number of CPU threads to run the model.
enable_edgetpu: True/False whether the model is a EdgeTPU model.
"""
global person_detected_at, animal_detected_at
# Variables to calculate FPS
counter, fps = 0, 0
start_time = time.time()
# Start capturing video input from the camera
cap = cv2.VideoCapture(camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# Visualization parameters
row_size = 20 # pixels
left_margin = 24 # pixels
text_color = (0, 0, 255) # red
font_size = 1
font_thickness = 1
fps_avg_frame_count = 10
# Initialize the object detection model
base_options = core.BaseOptions(
file_name=model, use_coral=enable_edgetpu, num_threads=num_threads)
detection_options = processor.DetectionOptions(
max_results=3, score_threshold=0.3, category_name_allowlist=["teddy bear", "person"])
options = vision.ObjectDetectorOptions(
base_options=base_options, detection_options=detection_options)
detector = vision.ObjectDetector.create_from_options(options)
print("Starting Webcam loop")
# Continuously capture images from the camera and run inference
while cap.isOpened():
#print("webcam loop")
if camera_stop.is_set():
print("Breaking loop")
break
success, image = cap.read()
if not success:
sys.exit(
'ERROR: Unable to read from webcam. Please verify your webcam settings.'
)
counter += 1
image = cv2.flip(image, 1)
# Convert the image from BGR to RGB as required by the TFLite model.
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Create a TensorImage object from the RGB image.
input_tensor = vision.TensorImage.create_from_array(rgb_image)
# Run object detection estimation using the model.
detection_result = detector.detect(input_tensor)
for detection in detection_result.detections:
category = detection.categories[0]
category_name = category.category_name
print("Found:", category_name)
if category_name == "person":
print("Found Person")
person_detected_at = time.time()
if category_name == "teddy bear":
animal_detected_at = time.time()
image = utils.visualize(image, detection_result)
# Calculate the FPS
if counter % fps_avg_frame_count == 0:
end_time = time.time()
fps = fps_avg_frame_count / (end_time - start_time)
start_time = time.time()
# Show the FPS
fps_text = 'FPS = {:.1f}'.format(fps)
text_location = (left_margin, row_size)
cv2.putText(image, fps_text, text_location, cv2.FONT_HERSHEY_PLAIN,
font_size, text_color, font_thickness)
# Stop the program if the ESC key is pressed.
if cv2.waitKey(1) == 27:
break
cv2.imshow('object_detector', image)
cap.release()
cv2.destroyAllWindows()
print("Stopping webcam")
###
button_pressed = False
person_detected_at = 0
button_at = 0
animal_detected_at = 0
cool_off = 2
servo_pin = 13
camera_stop = threading.Event()
done = threading.Event()
def set_red():
GPIO.output(RED_PIN, GPIO.HIGH)
GPIO.output(GREEN_PIN, GPIO.LOW)
def set_green():
GPIO.output(RED_PIN, GPIO.LOW)
GPIO.output(GREEN_PIN, GPIO.HIGH)
def watch_button():
global button_pressed, button_at
while True:
button.wait_for_press()
button_pressed = True
print("Button was pressed")
button.wait_for_release()
print("Button was released")
button_at = time.time()
button_pressed = False
def watch_motion_detector():
global camera_stop, person_detected_at
model = 'efficientdet_lite0.tflite'
cameraId = 0
frameWidth = 320
frameHeight = 240
numThreads = 2
enableEdgeTPU = False
thread_running = False
last_motion_at = 0
look_thread = None
while True:
# IS there motion and no thread?
if pir.motion_detected:
#print("Motion")
last_motion_at = time.time()
if not thread_running:
print("Starting look thread!")
camera_stop = threading.Event()
look_thread = threading.Thread(target = look, args=(model, int(cameraId), frameWidth, frameHeight,
int(numThreads), bool(enableEdgeTPU)))
look_thread.start()
thread_running = True
else:
# print("No motion")
# print("Time since last mostion", time.time() - last_motion_at, "recent_person", recent_person())
if time.time() - last_motion_at > 5 and not recent_person() and thread_running:
print("Killing look thread")
camera_stop.set()
look_thread.join()
thread_running = False
time.sleep(0.05)
RED_PIN = 27
GREEN_PIN = 22
def setup_leds():
GPIO.setmode(GPIO.BCM)
GPIO.setup(RED_PIN, GPIO.OUT)
GPIO.setup(GREEN_PIN, GPIO.OUT)
def recent_person():
global person_detected_at
return recently(person_detected_at)
def recent_button():
global button_at
return recently(button_at)
def recent_animal():
global animal_detected_at
return recently(animal_detected_at, 1)
def recently(value, delta=3):
if value + delta > time.time():
return True
return False
def run():
GPIO.setmode(GPIO.BCM)
setup_leds()
pi = pigpio.pi()
OPEN = 600
CLOSED = 1150
if not pi.connected:
exit()
print("Starting")
was_open = False
while True:
if button_pressed or recent_button() or (recent_person() and not recent_animal()):
#print("button pressed: ", button_pressed)
#print("recent_person", recent_person(), "recent_button", recent_button())
if not was_open:
print("Opening")
set_green()
pi.set_servo_pulsewidth(servo_pin, OPEN)
was_open = True
else:
if was_open:
print("Closing")
set_red()
pi.set_servo_pulsewidth(servo_pin, CLOSED)
was_open = False
time.sleep(0.1)
def main():
motion_thread = threading.Thread(target=watch_motion_detector)
button_thread = threading.Thread(target=watch_button)
run_thread = threading.Thread(target=run)
try:
motion_thread.start()
button_thread.start()
run_thread.start()
except KeyboardInterrupt:
print("keyboard interrupt in main")
GPIO.output(RED_PIN, GPIO.LOW)
GPIO.output(GREEN_PIN, GPIO.LOW)
motion_thread.join()
button_thread.join()
run_thread.join()
if __name__ == "__main__":
try:
main()
finally:
print("cleaning up")
GPIO.output(RED_PIN, GPIO.LOW)
GPIO.output(GREEN_PIN, GPIO.LOW)
GPIO.cleanup()