diff --git a/musicalgestures/__init__.py b/musicalgestures/__init__.py index f4f39cd..3c72d11 100644 --- a/musicalgestures/__init__.py +++ b/musicalgestures/__init__.py @@ -82,6 +82,7 @@ def __init__( self.audio = Audio(self.filename) from musicalgestures._motionvideo import mg_motion as motion + from musicalgestures._motionvideo_mp_run import mg_motion_mp as motion_mp from musicalgestures._motionvideo import mg_motiongrams as motiongrams from musicalgestures._motionvideo import mg_motiondata as motiondata from musicalgestures._motionvideo import mg_motionplots as motionplots diff --git a/musicalgestures/_motionvideo_mp_render.py b/musicalgestures/_motionvideo_mp_render.py new file mode 100644 index 0000000..285560c --- /dev/null +++ b/musicalgestures/_motionvideo_mp_render.py @@ -0,0 +1,277 @@ +import multiprocessing +import numpy as np +import os +import argparse +import cv2 +import sys +try: + import musicalgestures +except: + # local dev mode + sys.path.append('../') + import musicalgestures +from musicalgestures._utils import frame2ms +from musicalgestures._centroid import centroid +from musicalgestures._filter import filter_frame +import socket + + +def mg_motion_mp(args): + + target_folder, of, fex, fps, width, height, length, color, filtertype, thresh, blur, kernel_size, inverted_motionvideo, inverted_motiongram, equalize_motiongram, save_data, save_motiongrams, save_video, start_frame, num_frames, process_id, client = args + + # init + target_name_video = None + gramx = None + gramy = None + time = None + qom = None + com = None + out = None + fourcc = None + process_id_str = '{:02d}'.format(process_id) + + # ignore runtime warnings when dividing by 0 + np.seterr(divide='ignore', invalid='ignore') + + # load video + vidcap = cv2.VideoCapture(of+fex) + + # set start frame + vidcap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) + ret, frame = vidcap.read() + + if save_video: + target_name_video = target_folder + of + '_motion_' + process_id_str + fex + # print(target_name_video) + fourcc = cv2.VideoWriter_fourcc(*'MJPG') + out = cv2.VideoWriter(target_name_video, fourcc, fps, (width, height)) + + if save_motiongrams: + gramx = np.zeros([1, width, 3]) + gramy = np.zeros([height, 1, 3]) + if save_data: + time = np.array([]) # time in ms + qom = np.array([]) # quantity of motion + com = np.array([]) # centroid of motion + + ii = 0 + + if color == False: + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if save_motiongrams: + gramx = np.zeros([1, width]) + gramy = np.zeros([height, 1]) + + try: + while(ii < num_frames): + # report progress + client.sendall(bytes("progress:" + str(process_id) + " " + str(ii), 'utf-8')) + + if blur.lower() == 'average': + prev_frame = cv2.blur(frame, (10, 10)) + elif blur.lower() == 'none': + prev_frame = frame + + ret, frame = vidcap.read() + + if not ret: + break + + if blur.lower() == 'average': + # The higher these numbers the more blur you get + frame = cv2.blur(frame, (10, 10)) + + if color == False: + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + frame = np.array(frame) + frame = frame.astype(np.int32) + + if color == True: + motion_frame_rgb = np.zeros( + [height, width, 3]) + + for i in range(frame.shape[2]): + motion_frame = ( + np.abs(frame[:, :, i]-prev_frame[:, :, i])).astype(np.uint8) + motion_frame = filter_frame( + motion_frame, filtertype, thresh, kernel_size) + motion_frame_rgb[:, :, i] = motion_frame + + if save_motiongrams: + movement_y = np.mean(motion_frame_rgb, axis=1).reshape(height, 1, 3) + movement_x = np.mean(motion_frame_rgb, axis=0).reshape(1, width, 3) + gramy = np.append(gramy, movement_y, axis=1) + gramx = np.append(gramx, movement_x, axis=0) + + else: + motion_frame = ( + np.abs(frame-prev_frame)).astype(np.uint8) + motion_frame = filter_frame( + motion_frame, filtertype, thresh, kernel_size) + + if save_motiongrams: + movement_y = np.mean( + motion_frame, axis=1).reshape(height, 1) + movement_x = np.mean( + motion_frame, axis=0).reshape(1, width) + gramy = np.append(gramy, movement_y, axis=1) + gramx = np.append(gramx, movement_x, axis=0) + + if color == False: + motion_frame = cv2.cvtColor( + motion_frame, cv2.COLOR_GRAY2BGR) + motion_frame_rgb = motion_frame + + if save_video: + # if this is not the first process (rendering the start of the video) then don't save the first frame of the output (it'll always be black). + if process_id != 0 and ii > 0: + if inverted_motionvideo: + out.write(cv2.bitwise_not( + motion_frame_rgb.astype(np.uint8))) + else: + out.write(motion_frame_rgb.astype(np.uint8)) + elif process_id == 0: + if inverted_motionvideo: + out.write(cv2.bitwise_not( + motion_frame_rgb.astype(np.uint8))) + else: + out.write(motion_frame_rgb.astype(np.uint8)) + + + if save_data: + combite, qombite = centroid(motion_frame_rgb.astype( + np.uint8), width, height) + if ii == 0: + time = frame2ms(ii, fps) + com = combite.reshape(1, 2) + qom = qombite + else: + time = np.append(time, frame2ms(ii, fps)) + com = np.append(com, combite.reshape(1, 2), axis=0) + qom = np.append(qom, qombite) + + ii += 1 + + except: + vidcap.release() + if save_video: + out.release() + + if save_motiongrams: + # save grams to npy files + with open(target_folder + f'gramx_{process_id_str}.npy', 'wb') as f: + np.save(f, gramx) + with open(target_folder + f'gramy_{process_id_str}.npy', 'wb') as f: + np.save(f, gramy) + + if save_data: + # save data to npy files + with open(target_folder + f'time_{process_id_str}.npy', 'wb') as f: + np.save(f, time) + with open(target_folder + f'com_{process_id_str}.npy', 'wb') as f: + np.save(f, com) + with open(target_folder + f'qom_{process_id_str}.npy', 'wb') as f: + np.save(f, qom) + + # resetting numpy warnings for dividing by 0 + np.seterr(divide='warn', invalid='warn') + + vidcap.release() + if save_video: + out.release() + + +def run_pool(func, args, numprocesses): + pool = multiprocessing.Pool(numprocesses) + # results = pool.map(func, args) + pool.map(func, args) + # return results + + +def calc_frame_groups(framecount, num_cores): + import math + groups = [] # [startframe, numframes] + frames_per_core = math.floor(framecount / num_cores) + remaining = framecount - (num_cores * frames_per_core) + + for i in range(num_cores): + startframe = i * frames_per_core + numframes = frames_per_core + 1 if i < num_cores-1 else frames_per_core + remaining + groups.append([startframe, numframes]) + + return groups + + +def testhogfunc(process_id): + limit = 20 + count = 0 + while count < limit: + print(process_id, count) + futyi = 0 + for i in range(10_000_000): + futyi += 1 + count += 1 + return process_id, count + + +def bool_from_str(boolstring): + return True if boolstring == "True" else False + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Render motion video and data in a multicore process') + parser.add_argument('target_folder', metavar='target_folder', type=str, help='path to temporary folder') + parser.add_argument('of', metavar='of', type=str, help='filename without path or ext') + parser.add_argument('fex', metavar='fex', type=str, help='file extension including dot') + parser.add_argument('fps', metavar='fps', type=str, help='fps') + parser.add_argument('width', metavar='width', type=str, help='width') + parser.add_argument('height', metavar='height', type=str, help='height') + parser.add_argument('length', metavar='length', type=str, help='length') + parser.add_argument('color', metavar='color', type=str, help='color') + parser.add_argument('filtertype', metavar='filtertype', type=str, help='filtertype') + parser.add_argument('thresh', metavar='thresh', type=str, help='thresh') + parser.add_argument('blur', metavar='blur', type=str, help='blur') + parser.add_argument('kernel_size', metavar='kernel_size', type=str, help='kernel_size') + parser.add_argument('inverted_motionvideo', metavar='inverted_motionvideo', type=str, help='inverted_motionvideo') + parser.add_argument('inverted_motiongram', metavar='inverted_motiongram', type=str, help='inverted_motiongram') + parser.add_argument('equalize_motiongram', metavar='equalize_motiongram', type=str, help='equalize_motiongram') + parser.add_argument('save_data', metavar='save_data', type=str, help='save_data') + parser.add_argument('save_motiongrams', metavar='save_motiongrams', type=str, help='save_motiongrams') + parser.add_argument('save_video', metavar='save_video', type=str, help='save_video') + + args = parser.parse_args() + + HOST = '127.0.0.1' # The server's hostname or IP address + PORT = 65432 # The port used by the server + + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect((HOST, PORT)) + + target_folder = os.path.abspath(args.target_folder).replace('\\', '/') + if target_folder[-1] != "/": + target_folder += '/' + + of, fex = args.of, args.fex + fps, width, height, length = int(float(args.fps)), int(float(args.width)), int(float(args.height)), int(float(args.length)) + color, filtertype, thresh, blur, kernel_size = bool_from_str(args.color), args.filtertype, float(args.thresh), args.blur, int(float(args.kernel_size)) + inverted_motionvideo, inverted_motiongram, equalize_motiongram = bool_from_str(args.inverted_motionvideo), bool_from_str(args.inverted_motiongram), bool_from_str(args.equalize_motiongram) + save_data, save_motiongrams, save_video = bool_from_str(args.save_data), bool_from_str(args.save_motiongrams), bool_from_str(args.save_video) + + numprocessors = multiprocessing.cpu_count() + frame_groups = calc_frame_groups(length, numprocessors) + + feed_args = [] + + for i in range(numprocessors): + start_frame, num_frames = frame_groups[i] + initargs = [target_folder, of, fex, fps, width, height, length, color, filtertype, thresh, blur, kernel_size, inverted_motionvideo, inverted_motiongram, equalize_motiongram, save_data, save_motiongrams, save_video, start_frame, num_frames, i, client] + # client.sendall(bytes(str(initargs), 'utf-8')) + feed_args.append(initargs) + + # results = run_pool(mg_motion_mp, feed_args, numprocessors) + run_pool(mg_motion_mp, feed_args, numprocessors) + + client.close() + # print("Closed socket client.") \ No newline at end of file diff --git a/musicalgestures/_motionvideo_mp_run.py b/musicalgestures/_motionvideo_mp_run.py new file mode 100644 index 0000000..ffe2d41 --- /dev/null +++ b/musicalgestures/_motionvideo_mp_run.py @@ -0,0 +1,324 @@ +from musicalgestures._utils import ffmpeg_cmd, generate_outfilename, get_length, wrap_str, convert_to_avi, MgProgressbar, extract_wav, embed_audio_in_video, MgImage +from musicalgestures._motionvideo import save_txt, plot_motion_metrics +import subprocess +import tempfile +import shutil +import platform +import musicalgestures +import os +import numpy as np +import socket +import threading +import multiprocessing +import cv2 + +def mg_motion_mp( + self, + filtertype='Regular', + thresh=0.05, + blur='None', + kernel_size=5, + inverted_motionvideo=False, + inverted_motiongram=False, + unit='seconds', + equalize_motiongram=True, + save_plot=True, + plot_title=None, + save_data=True, + data_format="csv", + save_motiongrams=True, + save_video=True, + target_name_video=None, + target_name_plot=None, + target_name_data=None, + target_name_mgx=None, + target_name_mgy=None, + overwrite=False): + + of, fex = self.of, self.fex + + # Convert to avi if the input is not avi - necesarry for cv2 compatibility on all platforms + if fex != '.avi': + # first check if there already is a converted version, if not create one and register it to the parent self + if "as_avi" not in self.__dict__.keys(): + file_as_avi = convert_to_avi(of + fex, overwrite=overwrite) + # register it as the avi version for the file + self.as_avi = musicalgestures.MgObject(file_as_avi) + # point of and fex to the avi version + of, fex = self.as_avi.of, self.as_avi.fex + + module_path = os.path.abspath(os.path.dirname(musicalgestures.__file__)).replace('\\', '/') + the_system = platform.system() + pythonkw = "python" + if the_system != "Windows": + pythonkw += "3" + pyfile = wrap_str(module_path + '/_motionvideo_mp_render.py') + + temp_folder = tempfile.mkdtemp().replace('\\', '/') + if temp_folder[-1] != "/": + temp_folder += '/' + # print("Temp folder:", temp_folder) + + of_feed = of.replace('\\', '/') + of_feed = os.path.basename(of_feed) + + save_data_feed = save_data or save_plot + + command = [pythonkw, pyfile, temp_folder, of_feed, fex, self.fps, self.width, self.height, self.length, self.color, filtertype, thresh, blur, kernel_size, inverted_motionvideo, inverted_motiongram, equalize_motiongram, save_data_feed, save_motiongrams, save_video] + command = [str(item) for item in command] + # print() + # print(command) + # print() + + pgbar_text = 'Rendering motion' + ", ".join(np.array(["-video", "-grams", "-plots", "-data"])[ + np.array([save_video, save_motiongrams, save_plot, save_data])]) + ":" + pb = MgProgressbar(total=self.length, prefix=pgbar_text) + progress = 0 + + # set up socket server thread + HOST = '127.0.0.1' # Standard loopback interface address (localhost) + PORT = 65432 # Port to listen on (non-privileged ports are > 1023) + + process = subprocess.Popen(command) + + num_cores = multiprocessing.cpu_count() + # print("starting thread") + t = threading.Thread(target=run_socket_server(HOST, PORT, pb, num_cores)) + t.start() + + try: + # print("will wait") + process.wait() + + except KeyboardInterrupt: + try: + process.terminate() + except OSError: + pass + process.wait() + # print("will join") + t.join() + # print("joined") + raise KeyboardInterrupt + + # print("will join") + t.join() + # print("joined") + + # print("organizing results...") + results = os.listdir(temp_folder) + time_files = [temp_folder + file for file in results if file.startswith("time")] + com_files = [temp_folder + file for file in results if file.startswith("com")] + qom_files = [temp_folder + file for file in results if file.startswith("qom")] + gramx_files = [temp_folder + file for file in results if file.startswith("gramx")] + gramy_files = [temp_folder + file for file in results if file.startswith("gramy")] + video_files = [temp_folder + file for file in results if file.endswith("avi")] + + gramx, gramy, time, com, qom = None, None, None, None, None + + if save_motiongrams: + # load gramx + for idx, item in enumerate(gramx_files): + if idx == 0: + gramx = np.load(item) + else: + gramx = np.append(gramx, np.load(item)[1:-1], axis=0) + + # load gramy + for idx, item in enumerate(gramy_files): + if idx == 0: + gramy = np.load(item) + else: + gramy = np.append(gramy, np.load(item)[:, 1:], axis=1) + + if self.color == False: + # Normalize before converting to uint8 to keep precision + gramx = gramx/gramx.max()*255 + gramy = gramy/gramy.max()*255 + gramx = cv2.cvtColor(gramx.astype( + np.uint8), cv2.COLOR_GRAY2BGR) + gramy = cv2.cvtColor(gramy.astype( + np.uint8), cv2.COLOR_GRAY2BGR) + + gramx = (gramx-gramx.min())/(gramx.max()-gramx.min())*255.0 + gramy = (gramy-gramy.min())/(gramy.max()-gramy.min())*255.0 + + if equalize_motiongram: + gramx = gramx.astype(np.uint8) + gramx_hsv = cv2.cvtColor(gramx, cv2.COLOR_BGR2HSV) + gramx_hsv[:, :, 2] = cv2.equalizeHist(gramx_hsv[:, :, 2]) + gramx = cv2.cvtColor(gramx_hsv, cv2.COLOR_HSV2BGR) + + gramy = gramy.astype(np.uint8) + gramy_hsv = cv2.cvtColor(gramy, cv2.COLOR_BGR2HSV) + gramy_hsv[:, :, 2] = cv2.equalizeHist(gramy_hsv[:, :, 2]) + gramy = cv2.cvtColor(gramy_hsv, cv2.COLOR_HSV2BGR) + + if target_name_mgx == None: + target_name_mgx = of+'_mgx.png' + if target_name_mgy == None: + target_name_mgy = of+'_mgy.png' + if not overwrite: + target_name_mgx = generate_outfilename(target_name_mgx) + target_name_mgy = generate_outfilename(target_name_mgy) + + if inverted_motiongram: + cv2.imwrite(target_name_mgx, cv2.bitwise_not(gramx.astype(np.uint8))) + cv2.imwrite(target_name_mgy, cv2.bitwise_not(gramy.astype(np.uint8))) + else: + cv2.imwrite(target_name_mgx, gramx.astype(np.uint8)) + cv2.imwrite(target_name_mgy, gramy.astype(np.uint8)) + + # save rendered motiongrams as MgImages into parent MgObject + self.motiongram_x = MgImage(target_name_mgx) + self.motiongram_y = MgImage(target_name_mgy) + + if save_data: + # load time + for idx, item in enumerate(time_files): + if idx == 0: + time = np.load(item) + else: + last_time = time[-1] + time = np.append(time, np.load(item)[1:] + last_time) + + # load qom + for idx, item in enumerate(qom_files): + if idx == 0: + qom = np.load(item) + else: + qom = np.append(qom, np.load(item)[1:]) + + # load com + for idx, item in enumerate(com_files): + if idx == 0: + com = np.load(item) + else: + com = np.append(com, np.load(item)[1:], axis=0) + + save_txt(of, time, com, qom, self.width, self.height, data_format, target_name_data=target_name_data, overwrite=overwrite) + + if save_plot: + if not save_data: + # load qom + for idx, item in enumerate(qom_files): + if idx == 0: + qom = np.load(item) + else: + qom = np.append(qom, np.load(item)[1:]) + + # load com + for idx, item in enumerate(com_files): + if idx == 0: + com = np.load(item) + else: + com = np.append(com, np.load(item)[1:], axis=0) + + if plot_title == None: + plot_title = os.path.basename(of + fex) + # save plot as an MgImage at motion_plot for parent MgObject + self.motion_plot = MgImage(plot_motion_metrics(of, self.fps, com, qom, self.width, self.height, unit, plot_title, target_name_plot=target_name_plot, overwrite=overwrite)) + + if save_video: + if target_name_video == None: + target_name_video = of + '_motion' + fex + # enforce avi + else: + target_name_video = os.path.splitext(target_name_video)[0] + fex + if not overwrite: + target_name_video = generate_outfilename(target_name_video) + + # print("stitching video...") + concatenated = concat_videos(video_files, pb_prefix='Cleanup') + # print("moving...") + os.replace(concatenated, target_name_video) + # print("checking audio...") + destination_video = target_name_video + if self.has_audio: + source_audio = extract_wav(of + fex) + embed_audio_in_video(source_audio, destination_video) + os.remove(source_audio) + # save rendered motion video as the motion_video of the parent MgObject + self.motion_video = musicalgestures.MgObject(destination_video, color=self.color, returned_by_process=True) + + # print("Cleanup...") + shutil.rmtree(temp_folder) + # print("Removed temp folder.") + + if save_video: + return self.motion_video + else: + return self + + +def run_socket_server(host, port, pb, numprocesses): + # print("will start server") + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((host, port)) + s.listen() + conn, addr = s.accept() + with conn: + # print('Connected by', addr) + tracker = TrackMultiProgress(numprocesses) + while True: + data = conn.recv(1024) + # print(data.decode()) + data_str = data.decode() + if data_str.startswith("progress:"): + progress_message = data_str[len("progress:"):] + progress_message_list = progress_message.split('progress:') + progress = None + for progress_bit in progress_message_list: + node, iteration = progress_bit.split(' ') + node, iteration = int(node), int(iteration) + progress = tracker.progress(node, iteration) + pb.progress(progress) + else: + # print() + # print("got something else...") + # print(data_str) + pass + + if not data: + # print("shutting down server") + break + + +class TrackMultiProgress(): + def __init__(self, numprocesses): + self.numprocesses = numprocesses + self.processmap = np.zeros(self.numprocesses) + + def progress(self, node, iteration): + self.processmap[node] = iteration + return np.sum(self.processmap) + + def reset(self): + self.processmap = np.zeros(self.numprocesses) + + +def concat_videos(list_of_videos, target_name=None, overwrite=False, pb_prefix='Concatenating videos:', stream=True): + import os + of, fex = os.path.splitext(list_of_videos[0]) + if not target_name: + target_name = of + '_concat' + fex + if not overwrite: + target_name = generate_outfilename(target_name) + cmds = ['ffmpeg', '-y'] + + for video in list_of_videos: + cmds.append('-i') + cmds.append(video) + + cmd_filter = ["-filter_complex", f'concat=n={len(list_of_videos)}:v=1:a=0'] + for cmdlet in cmd_filter: + cmds.append(cmdlet) + + cmd_end = ["-c:v", "mjpeg", "-q:v", "3", target_name] + for cmdlet in cmd_end: + cmds.append(cmdlet) + + ffmpeg_cmd(cmds, get_length(list_of_videos[0]) * len(list_of_videos), pb_prefix=pb_prefix, stream=stream) + + return target_name +