Skip to content

Commit

Permalink
Merge pull request #78 from LaboratoireMecaniqueLille/feature/various…
Browse files Browse the repository at this point in the history
…_fixes_in_camera_v4l2

Various style refactorings and fixes on `CameraGstreamer` and `CameraOpencv` objects
  • Loading branch information
WeisLeDocto authored Nov 28, 2023
2 parents 430b0ed + 0bd03ff commit 565fe7d
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 179 deletions.
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@

# PIERROOOTT owns the Opencv cameras
/src/crappy/camera/opencv_* @WeisLeDocto @PIERROOOTT

# PIERROOOTT owns the base v4l2 class
/src/crappy/camera/_v4l2_base.py @WeisLeDocto @PIERROOOTT
66 changes: 35 additions & 31 deletions src/crappy/camera/_v4l2_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,22 @@ def _get_param(self, device: Optional[Union[str, int]]) -> None:
v4l2-ctl with regex."""

# Trying to run v4l2-ctl to get the available settings
command = ['v4l2-ctl', '-L'] if device is None \
else ['v4l2-ctl', '-d', str(device), '-L']
self.log(logging.DEBUG, f"Running the command {' '.join(command)}")
check = run(command, capture_output=True, text=True)
check = check.stdout if check is not None else ''
self.log(logging.DEBUG, f"Got {check} from the previous command")
if device is None:
command = ['v4l2-ctl', '-L']
else:
command = ['v4l2-ctl', '-d', str(device), '-L']
self.log(logging.DEBUG, f"Getting the available image settings with "
f"command {' '.join(command)}")
ret = run(command, capture_output=True, text=True).stdout
self.log(logging.DEBUG, f"Got the following image settings: {ret}")

# Extract the different parameters and their information
matches = finditer(V4L2Parameter.param_pattern, check)
matches = finditer(V4L2Parameter.param_pattern, ret)
for match in matches:
self._parameters.append(V4L2Parameter.parse_info(match))

# Regex to extract the different options in a menu
menu_options = finditer(V4L2Parameter.option_pattern, check)
menu_options = finditer(V4L2Parameter.option_pattern, ret)

# Extract the different options
for menu_option in menu_options:
Expand All @@ -120,29 +122,31 @@ def _get_available_formats(self, device: Optional[Union[str, int]]) -> None:
regex."""

# Trying to run v4l2-ctl to get the available formats
command = ['v4l2-ctl', '--list-formats-ext'] if device is None \
else ['v4l2-ctl', '-d', str(device), '--list-formats-ext']
self.log(logging.DEBUG, f"Running the command {' '.join(command)}")
check = run(command, capture_output=True, text=True)
check = check.stdout if check is not None else ''
self.log(logging.DEBUG, f"Got {check} from the previous command")
if device is None:
command = ['v4l2-ctl', '--list-formats-ext']
else:
command = ['v4l2-ctl', '-d', str(device), '--list-formats-ext']
self.log(logging.DEBUG, f"Getting the available image formats with "
f"command {' '.join(command)}")
ret = run(command, capture_output=True, text=True).stdout
self.log(logging.DEBUG, f"Got the following image formats: {ret}")

# Splitting the returned string to isolate each encoding
if findall(r'\[\d+]', check):
check = split(r'\[\d+]', check)[1:]
elif findall(r'Pixel\sFormat', check):
check = split(r'Pixel\sFormat', check)[1:]
if findall(r'\[\d+]', ret):
formats = split(r'\[\d+]', ret)[1:]
elif findall(r'Pixel\sFormat', ret):
formats = split(r'Pixel\sFormat', ret)[1:]
else:
check = []
formats = list()

if check:
for img_format in check:
# For each encoding, finding its name
# For each encoding, finding its name, available sizes and framerates
if formats:
for img_format in formats:
name, *_ = search(r"'(\w+)'", img_format).groups()
sizes = findall(r'\d+x\d+', img_format)
fps_sections = split(r'\d+x\d+', img_format)[1:]

# For each name, finding the available sizes
# Formatting the detected sizes and framerates into strings
for size, fps_section in zip(sizes, fps_sections):
fps_list = findall(r'\((\d+\.\d+)\sfps\)', fps_section)
for fps in fps_list:
Expand All @@ -160,7 +164,7 @@ def _add_setter(self,
The setter function.
"""

def setter(value) -> None:
def setter(value: Union[str, int, bool]) -> None:
"""The method to set the value of a setting running v4l2-ctl."""

if isinstance(value, str):
Expand Down Expand Up @@ -209,9 +213,9 @@ def getter() -> int:
command = ['v4l2-ctl', '--get-ctrl', name]
self.log(logging.DEBUG, f"Running the command {' '.join(command)}")
value = run(command, capture_output=True, text=True).stdout
value = search(r': (-?\d+)', value).group(1)
self.log(logging.DEBUG, f"Got {name}: {int(value)}")
return int(value)
value = int(search(r':\s(-?\d+)', value).group(1))
self.log(logging.DEBUG, f"Got {name}: {value}")
return value
return getter

def _add_bool_getter(self,
Expand All @@ -237,9 +241,9 @@ def getter() -> bool:
command = ['v4l2-ctl', '--get-ctrl', name]
self.log(logging.DEBUG, f"Running the command {' '.join(command)}")
value = run(command, capture_output=True, text=True).stdout
value = search(r': (\d+)', value).group(1)
self.log(logging.DEBUG, f"Got {name}: {bool(int(value))}")
return bool(int(value))
value = bool(int(search(r':\s(\d+)', value).group(1)))
self.log(logging.DEBUG, f"Got {name}: {value}")
return value
return getter

def _add_menu_getter(self,
Expand All @@ -265,7 +269,7 @@ def getter() -> str:
command = ['v4l2-ctl', '--get-ctrl', name]
self.log(logging.DEBUG, f"Running the command {' '.join(command)}")
value = run(command, capture_output=True, text=True).stdout
value = search(r': (\d+)', value).group(1)
value = search(r':\s(\d+)', value).group(1)
for param in self._parameters:
if param.name == name:
for option in param.options:
Expand Down
159 changes: 91 additions & 68 deletions src/crappy/camera/gstreamer_camera_v4l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,77 +186,81 @@ def open(self,
# Defining the settings in case no custom pipeline was given
if user_pipeline is None:

# Getting the available formats for the selected device
self._get_available_formats(self._device)

# Finally, creating the parameter if applicable
# Checking if the formats are supported with the installed libraries
if self._formats:
if not run(['gst-inspect-1.0', 'avdec_h264'],
capture_output=True, text=True).stdout:
h264_available = bool(run(['gst-inspect-1.0', 'avdec_h264'],
capture_output=True, text=True).stdout)
h265_available = bool(run(['gst-inspect-1.0', 'avdec_h265'],
capture_output=True, text=True).stdout)
if not h264_available and any(form.split()[0] == 'H264'
for form in self._formats):
self._formats = [form for form in self._formats
if form.split()[0] != 'H264']
self.log(logging.WARNING, "The format H264 is not available"
"It could be if gstreamer1.0-libav "
"was installed !")
if not run(['gst-inspect-1.0', 'avdec_h265'],
capture_output=True, text=True).stdout:
self.log(logging.WARNING, "The video format H264 could be available "
"for the selected camera if "
"gstreamer1.0-libav was installed !")
if not h265_available and any(form.split()[0] == 'HEVC'
for form in self._formats):
self._formats = [form for form in self._formats
if form.split()[0] != 'HEVC']
self.log(logging.WARNING, "The format HEVC is not available"
"It could be if gstreamer1.0-libav "
"was installed !")

# The format integrates the size selection
if ' ' in self._formats[0]:
self.add_choice_setting(name='format',
choices=tuple(self._formats),
getter=self._get_format,
setter=self._set_format)
# The size is independent of the format
else:
self.add_choice_setting(name='format', choices=tuple(self._formats),
setter=self._set_format)
self.log(logging.WARNING, "The video format H265 could be available "
"for the selected camera if "
"gstreamer1.0-libav was installed !")

# If there are remaining image formats, adding the corresponding setting
if self._formats:
self.add_choice_setting(name='format',
choices=tuple(self._formats),
getter=self._get_format,
setter=self._set_format)

# Getting the available parameters for the camera
self._get_param(self._device)

# Create the different settings
# Creating the different settings
for param in self._parameters:
if not param.flags:
if param.type == 'int':
self.add_scale_setting(
if param.type == 'int':
self.add_scale_setting(
name=param.name,
lowest=int(param.min),
highest=int(param.max),
getter=self._add_scale_getter(param.name, self._device),
setter=self._add_setter(param.name, self._device),
default=param.default,
step=int(param.step))

elif param.type == 'bool':
self.add_bool_setting(
name=param.name,
getter=self._add_bool_getter(param.name, self._device),
setter=self._add_setter(param.name, self._device),
default=bool(int(param.default)))

elif param.type == 'menu':
if param.options:
self.add_choice_setting(
name=param.name,
lowest=int(param.min),
highest=int(param.max),
getter=self._add_scale_getter(param.name, self._device),
choices=param.options,
getter=self._add_menu_getter(param.name, self._device),
setter=self._add_setter(param.name, self._device),
default=param.default,
step=int(param.step))

elif param.type == 'bool':
self.add_bool_setting(
name=param.name,
getter=self._add_bool_getter(param.name, self._device),
setter=self._add_setter(param.name, self._device),
default=bool(int(param.default)))

elif param.type == 'menu':
if param.options:
self.add_choice_setting(
name=param.name,
choices=param.options,
getter=self._add_menu_getter(param.name, self._device),
setter=self._add_setter(param.name, self._device),
default=param.default)
default=param.default)

else:
self.log(logging.ERROR, f'The type {param.type} is not yet'
f' implemented. Only int, bool and menu '
f'type are implemented. ')
raise NotImplementedError
else:
self.log(logging.ERROR, f'The type {param.type} is not yet'
f' implemented. Only int, bool and menu '
f'type are implemented. ')
raise NotImplementedError

self.add_choice_setting(name="channels", choices=('1', '3'), default='1')
# No need to add the channels setting if there's only one channel
if self._nb_channels > 1:
self.add_choice_setting(name="channels", choices=('1', '3'),
default='1')

# Adding the software ROI selection settings
if self._formats and ' ' in self._formats[0]:
if self._formats:
width, height = search(r'(\d+)x(\d+)', self._get_format()).groups()
self.add_software_roi(int(width), int(height))

Expand Down Expand Up @@ -363,23 +367,37 @@ def _get_pipeline(self, img_format: Optional[str] = None) -> str:
else:
device = ''

# Getting the format index
img_format = img_format if img_format is not None else self.format
# Getting the format string if not provided as an argument
if img_format is None and hasattr(self, 'format'):
img_format = self.format
# In case it goes wrong, just try without specifying the format
if img_format is None:
img_format = ''

try:
format_name, img_size, fps = findall(r"(\w+)\s(\w+)\s\((\d+.\d+) fps\)",
img_format)[0]
except ValueError:
format_name, img_size, fps = img_format, None, None

# Adding a mjpeg decoder to the pipeline if needed
# Default color is BGR in most cases
color = 'BGR'
# Adding the decoder to the pipeline if needed
if format_name == 'MJPG':
img_format = '! jpegdec'
elif format_name == 'H264':
img_format = '! h264parse ! avdec_h264'
elif format_name == 'HEVC':
img_format = '! h265parse ! avdec_h265'
elif format_name == 'YUYV':
elif format_name in ('YUYV', 'YUY2'):
img_format = ''
elif format_name == 'GREY':
img_format = ''
color = 'GRAY8'
else:
self.log(logging.WARNING, f"Unsupported format name: {format_name}, "
f"trying without explicitly setting the "
f"decoder in the pipeline")
img_format = ''

# Getting the width and height from the second half of the string
Expand All @@ -398,7 +416,7 @@ def _get_pipeline(self, img_format: Optional[str] = None) -> str:

# Finally, generate a single pipeline containing all the user settings
return f"""v4l2src {device} name=source {img_format} ! videoconvert !
video/x-raw,format=BGR{img_size}{fps_str} ! appsink name=sink"""
video/x-raw,format={color}{img_size}{fps_str} ! appsink name=sink"""

def _on_new_sample(self, app_sink):
"""Callback that reads every new frame and puts it into a buffer.
Expand Down Expand Up @@ -438,7 +456,9 @@ def _on_new_sample(self, app_sink):
"the format.\n(here BGR would be for 3 channels)")

# Converting to gray level if needed
if self._user_pipeline is None and self.channels == '1':
if (self._user_pipeline is None
and hasattr(self, 'channels')
and self.channels == '1'):
numpy_frame = cv2.cvtColor(numpy_frame, cv2.COLOR_BGR2GRAY)

# Cleaning up the buffer mapping
Expand All @@ -455,7 +475,7 @@ def _set_format(self, img_format) -> None:
self._restart_pipeline(self._get_pipeline(img_format=img_format))

# Reloading the software ROI selection settings
if self._soft_roi_set and self._formats and ' ' in self._formats[0]:
if self._soft_roi_set and self._formats:
width, height = search(r'(\d+)x(\d+)', img_format).groups()
self.reload_software_roi(int(width), int(height))

Expand All @@ -468,15 +488,18 @@ def _get_format(self) -> str:
command = ['v4l2-ctl', '-d', str(self._device), '--all']
else:
command = ['v4l2-ctl', '--all']
check = run(command, capture_output=True, text=True).stdout
self.log(logging.DEBUG, f"Getting the current image formats with "
f"command {' '.join(command)}")
ret = run(command, capture_output=True, text=True).stdout
self.log(logging.DEBUG, f"Got the following image formats: {ret}")

# Parsing the answer
format_ = width = height = fps = ''
if search(r"Pixel Format\s*:\s*'(\w+)'", check) is not None:
format_, *_ = search(r"Pixel Format\s*:\s*'(\w+)'", check).groups()
if search(r"Width/Height\s*:\s*(\d+)/(\d+)", check) is not None:
width, height = search(r"Width/Height\s*:\s*(\d+)/(\d+)", check).groups()
if search(r"Frames per second\s*:\s*(\d+.\d+)", check) is not None:
fps, *_ = search(r"Frames per second\s*:\s*(\d+.\d+)", check).groups()
if search(r"Pixel Format\s*:\s*'(\w+)'", ret) is not None:
format_, *_ = search(r"Pixel Format\s*:\s*'(\w+)'", ret).groups()
if search(r"Width/Height\s*:\s*(\d+)/(\d+)", ret) is not None:
width, height = search(r"Width/Height\s*:\s*(\d+)/(\d+)", ret).groups()
if search(r"Frames per second\s*:\s*(\d+.\d+)", ret) is not None:
fps, *_ = search(r"Frames per second\s*:\s*(\d+.\d+)", ret).groups()

return f'{format_} {width}x{height} ({fps} fps)'
Loading

0 comments on commit 565fe7d

Please sign in to comment.