|
9 | 9 |
|
10 | 10 | logger = logging.getLogger() |
11 | 11 |
|
| 12 | +class Picamera2EoncoderMock(threading.Thread): |
| 13 | + def __init__(self, output): |
| 14 | + self.output = output |
| 15 | + self.image_jpeg = open('test/test_image.jpeg', 'rb').read() |
| 16 | + self.exit = False |
| 17 | + super().__init__() |
| 18 | + |
| 19 | + def close(self): |
| 20 | + self.output[0].close() |
| 21 | + |
| 22 | + def stop(self): |
| 23 | + self.exit = True |
| 24 | + |
| 25 | + def run(self): |
| 26 | + while(not self.exit): |
| 27 | + self.output[0].outputframe(self.image_jpeg) |
| 28 | + time.sleep(0.03) |
| 29 | + |
| 30 | +class Picamera2MJPEGEncoderMock(Picamera2EoncoderMock): |
| 31 | + def __init__(self, output=None, quality=20): |
| 32 | + super().__init__(output) |
| 33 | + self.quality = quality |
| 34 | + |
| 35 | +class Picamera2H264EncoderMock(Picamera2EoncoderMock): |
| 36 | + def __init__(self, output=None): |
| 37 | + super().__init__(output) |
| 38 | + |
12 | 39 | class Picamera2Mock(object): |
13 | 40 | """Implements PiCamera mock class |
14 | 41 | PiCamera is the library used to access the integrated Camera, this mock class emulates the capture functions in order to test the streamer loop. |
@@ -53,35 +80,13 @@ def __init__(self, buffer, video): |
53 | 80 | def start(self): |
54 | 81 | pass |
55 | 82 |
|
56 | | - def start_recording(self, buffer, format, splitter_port, quality=None, bitrate=None, resize=None): |
57 | | - """mock start_recording""" |
58 | | - print(format) |
59 | | - if format == "bgra" and resize: |
60 | | - self.images[format] = cv2.resize(self.images[format], resize) |
61 | | - if format == "h264": |
62 | | - f = open("test/test.h264", "rb") |
63 | | - video = f.read() |
64 | | - f.close() |
65 | | - self.splitter_recorders[splitter_port] = self.VideoRecorder(buffer, video) |
66 | | - else: |
67 | | - self.splitter_recorders[splitter_port] = self.ImageRecorder(buffer, self.images[format]) |
68 | | - self.splitter_recorders[splitter_port].start() |
69 | | - |
70 | | - def stop_recording(self, splitter_port): |
71 | | - if splitter_port < 2: |
72 | | - self.splitter_recorders[splitter_port].go = False |
73 | | - self.splitter_recorders[splitter_port].join() |
74 | | - else: |
75 | | - recorder = self.splitter_recorders[splitter_port] |
76 | | - f = open(recorder.buffer, "wb") |
77 | | - f.write(recorder.video) |
78 | | - f.close() |
79 | | - |
80 | | - def start_encoder(self, encoder): |
81 | | - pass |
| 83 | + def start_encoder(self, encoder, output=None): |
| 84 | + encoder.start() |
| 85 | + output |
82 | 86 |
|
83 | 87 | def stop_encoder(self, encoders): |
84 | | - pass |
| 88 | + for encoder in encoders: |
| 89 | + encoder.stop() |
85 | 90 |
|
86 | 91 | def capture_buffer(self): |
87 | 92 | return self.images["bgra"] |
|
0 commit comments