Skip to content

Commit

Permalink
Add optical flow velocity (motion smoothness) #210
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimpoutaraud committed Nov 18, 2022
1 parent 1cf37b2 commit e80e4f0
Showing 1 changed file with 102 additions and 47 deletions.
149 changes: 102 additions & 47 deletions musicalgestures/_flow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import cv2
import os
import cv2
import numpy as np
from musicalgestures._utils import extract_wav, embed_audio_in_video, MgProgressbar, convert_to_avi, generate_outfilename
import musicalgestures
import math
import weakref

import musicalgestures
from musicalgestures._utils import extract_wav, embed_audio_in_video, MgProgressbar, convert_to_avi, generate_outfilename


class Flow:
"""
Expand All @@ -25,7 +27,7 @@ def __init__(self, parent, filename, color, has_audio):
self.filename = filename
self.color = color
self.has_audio = has_audio

def dense(
self,
filename=None,
Expand All @@ -36,6 +38,12 @@ def dense(
poly_n=5,
poly_sigma=1.2,
flags=0,
velocity=False,
distance=None,
timestep=1,
move_step=16,
perspective_angle=0,
scaledown=1,
skip_empty=False,
target_name=None,
overwrite=False):
Expand All @@ -51,6 +59,12 @@ def dense(
poly_n (int, optional): The size of the pixel neighborhood used to find polynomial expansion in each pixel. Larger values mean that the image will be approximated with smoother surfaces, yielding more robust algorithm and more blurred motion field, typically poly_n =5 or 7. Defaults to 5.
poly_sigma (float, optional): The standard deviation of the Gaussian that is used to smooth derivatives used as a basis for the polynomial expansion. For `poly_n=5`, you can set `poly_sigma=1.1`, for `poly_n=7`, a good value would be `poly_sigma=1.5`. Defaults to 1.2.
flags (int, optional): Operation flags that can be a combination of the following: - **OPTFLOW_USE_INITIAL_FLOW** uses the input flow as an initial flow approximation. - **OPTFLOW_FARNEBACK_GAUSSIAN** uses the Gaussian \\f$\\texttt{winsize}\\times\\texttt{winsize}\\f$ filter instead of a box filter of the same size for optical flow estimation. Usually, this option gives z more accurate flow than with a box filter, at the cost of lower speed. Normally, `winsize` for a Gaussian window should be set to a larger value to achieve the same level of robustness. Defaults to 0.
velocity (bool, optional): Whether to compute optical flow velocity or not. Defaults to False.
distance (int, optional): Distance in meters to image (focal length) for returning flow in meters per second. Defualts to None.
timestep (int, optional): Time step in seconds for returning flow in meters per second. Defaults to 1.
move_step (int, optional): step size in pixels for sampling the flow image. Defaults to 16.
perspective_angle (int, optional): perspective angle of camera, for reporting flow in meters per second. Defaults to 0.
scaledown (int, optional): factor to scaledown frame size of the video. Defaults to 1.
skip_empty (bool, optional): If True, repeats previous frame in the output when encounters an empty frame. Defaults to False.
target_name (str, optional): Target output name for the video. Defaults to None (which assumes that the input filename with the suffix "_flow_dense" should be used).
overwrite (bool, optional): Whether to allow overwriting existing files or to automatically increment target filenames to avoid overwriting. Defaults to False.
Expand All @@ -76,63 +90,84 @@ def dense(
filename = self.parent().as_avi.filename

vidcap = cv2.VideoCapture(filename)
ret, frame = vidcap.read()
fourcc = cv2.VideoWriter_fourcc(*'MJPG')

fps = int(vidcap.get(cv2.CAP_PROP_FPS))
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))

pb = MgProgressbar(
total=length, prefix='Rendering dense optical flow video:')
size = (int(width/scaledown), int(height/scaledown))

if target_name == None:
target_name = of + '_flow_dense' + fex
if not overwrite:
target_name = generate_outfilename(target_name)
if velocity:
pb = MgProgressbar(total=length, prefix='Rendering dense optical flow velocity:')

out = cv2.VideoWriter(target_name, fourcc, fps, (width, height))
else:
pb = MgProgressbar(total=length, prefix='Rendering dense optical flow video:')

if target_name == None:
target_name = of + '_flow_dense' + fex
if not overwrite:
target_name = generate_outfilename(target_name)

fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out = cv2.VideoWriter(target_name, fourcc, fps, (width, height))

ret, frame1 = vidcap.read()
prev_frame = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
prev_frame = cv2.cvtColor(cv2.resize(frame1, size), cv2.COLOR_BGR2GRAY)

prev_rgb = None
hsv = np.zeros_like(frame1)
hsv[..., 1] = 255

ii = 0
# Create two lists for storing optical flow velocity values
xvel, yvel = [], []

while(vidcap.isOpened()):
ret, frame2 = vidcap.read()
xsum, ysum = 0, 0

if ret == True:
next_frame = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
next_frame = cv2.cvtColor(cv2.resize(frame2, size), cv2.COLOR_BGR2GRAY)

flow = cv2.calcOpticalFlowFarneback(prev_frame, next_frame, None, pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, flags)

if velocity:
# Cumulative sum of optical flow vectors
for y in range(0, flow.shape[0]):
for x in range(0, flow.shape[1]):
fx, fy = flow[y, x]
xsum += fx
ysum += fy

# Compute average velocity of pixels by dividing the cumulative sum of optical flow vectors by timesteps
xvel.append(self.get_velocity(flow, xsum, flow.shape[1], distance, timestep, move_step, perspective_angle))
yvel.append(self.get_velocity(flow, ysum, flow.shape[0], distance, timestep, move_step, perspective_angle))

flow = cv2.calcOpticalFlowFarneback(
prev_frame, next_frame, None, pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, flags)

mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
hsv[..., 0] = ang*180/np.pi/2
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
else:
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
hsv[..., 0] = ang*180/np.pi/2
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

if skip_empty:
if np.sum(rgb) > 0:
out.write(rgb.astype(np.uint8))
else:
if ii == 0:
if skip_empty:
if np.sum(rgb) > 0:
out.write(rgb.astype(np.uint8))
else:
out.write(prev_rgb.astype(np.uint8))
else:
out.write(rgb.astype(np.uint8))

prev_frame = next_frame
if ii == 0:
out.write(rgb.astype(np.uint8))
else:
out.write(prev_rgb.astype(np.uint8))
else:
out.write(rgb.astype(np.uint8))

if skip_empty:
if np.sum(rgb) > 0 or ii == 0:
if skip_empty:
if np.sum(rgb) > 0 or ii == 0:
prev_rgb = rgb
else:
prev_rgb = rgb
else:
prev_rgb = rgb

prev_frame = next_frame

else:
pb.progress(length)
Expand All @@ -141,20 +176,40 @@ def dense(
pb.progress(ii)
ii += 1

out.release()
if velocity:
return xvel, yvel

else:
out.release()
destination_video = target_name

destination_video = target_name
if self.has_audio:
source_audio = extract_wav(of + fex)
embed_audio_in_video(source_audio, destination_video)
os.remove(source_audio)

if self.has_audio:
source_audio = extract_wav(of + fex)
embed_audio_in_video(source_audio, destination_video)
os.remove(source_audio)
# save result at flow_dense_video at parent MgVideo
self.parent().flow_dense_video = musicalgestures.MgVideo(
destination_video, color=self.color, returned_by_process=True)

# save result at flow_dense_video at parent MgVideo
self.parent().flow_dense_video = musicalgestures.MgVideo(
destination_video, color=self.color, returned_by_process=True)
return self.parent().flow_dense_video


def get_velocity(self, flow, sum_flow_pixels, flow_shape, distance_meters, timestep_seconds, move_step, perspective_angle):

pixel_count = (flow.shape[0] * flow.shape[1]) / move_step**2
average_velocity_pixels_per_second = (sum_flow_pixels / pixel_count / timestep_seconds)

return (self.velocity_meters_per_second(average_velocity_pixels_per_second, flow_shape, distance_meters, perspective_angle)
if perspective_angle and distance_meters else average_velocity_pixels_per_second)

def velocity_meters_per_second(self, velocity_pixels_per_second, flow_shape, distance_meters, perspective_angle):

distance_pixels = ((flow_shape / 2) / math.tan(perspective_angle / 2))
pixels_per_meter = distance_pixels / distance_meters

return velocity_pixels_per_second / pixels_per_meter

return self.parent().flow_dense_video

def sparse(
self,
Expand Down Expand Up @@ -301,4 +356,4 @@ def sparse(
self.parent().flow_sparse_video = musicalgestures.MgVideo(
destination_video, color=self.color, returned_by_process=True)

return self.parent().flow_sparse_video
return self.parent().flow_sparse_video

0 comments on commit e80e4f0

Please sign in to comment.