diff --git a/contrib/opentimelineio_contrib/adapters/ale.py b/contrib/opentimelineio_contrib/adapters/ale.py deleted file mode 100644 index 5ba78ac95..000000000 --- a/contrib/opentimelineio_contrib/adapters/ale.py +++ /dev/null @@ -1,355 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# Copyright Contributors to the OpenTimelineIO project - -__doc__ = """OpenTimelineIO Avid Log Exchange (ALE) Adapter""" - - -import re -import opentimelineio as otio - -DEFAULT_VIDEO_FORMAT = '1080' -ASC_SOP_REGEX = re.compile(r'(-*\d+\.\d+)') - - -def AVID_VIDEO_FORMAT_FROM_WIDTH_HEIGHT(width, height): - """Utility function to map a width and height to an Avid Project Format""" - - format_map = { - '1080': "1080", - '720': "720", - '576': "PAL", - '486': "NTSC", - } - mapped = format_map.get(str(height), "CUSTOM") - # check for the 2K DCI 1080 format - if mapped == '1080' and width > 1920: - mapped = "CUSTOM" - return mapped - - -class ALEParseError(otio.exceptions.OTIOError): - pass - - -def _parse_data_line(line, columns, fps, ale_name_column_key='Name'): - row = line.split("\t") - - if len(row) < len(columns): - # Fill in blanks for any missing fields in this row - row.extend([""] * (len(columns) - len(row))) - - if len(row) > len(columns): - raise ALEParseError("Too many values on row: " + line) - - try: - - # Gather all the columns into a dictionary - # For expected columns, like Name, Start, etc. we will pop (remove) - # those from metadata, leaving the rest alone. - metadata = dict(zip(columns, row)) - - clip = otio.schema.Clip() - clip.name = metadata.get(ale_name_column_key, '') - - # When looking for Start, Duration and End, they might be missing - # or blank. Treat None and "" as the same via: get(k,"")!="" - # To have a valid source range, you need Start and either Duration - # or End. If all three are provided, we check to make sure they match. - if metadata.get("Start", "") != "": - value = metadata.pop("Start") - try: - start = otio.opentime.from_timecode(value, fps) - except (ValueError, TypeError): - raise ALEParseError(f"Invalid Start timecode: {value}") - duration = None - end = None - if metadata.get("Duration", "") != "": - value = metadata.pop("Duration") - try: - duration = otio.opentime.from_timecode(value, fps) - except (ValueError, TypeError): - raise ALEParseError("Invalid Duration timecode: {}".format( - value - )) - if metadata.get("End", "") != "": - value = metadata.pop("End") - try: - end = otio.opentime.from_timecode(value, fps) - except (ValueError, TypeError): - raise ALEParseError("Invalid End timecode: {}".format( - value - )) - if duration is None: - duration = end - start - if end is None: - end = start + duration - if end != start + duration: - raise ALEParseError( - "Inconsistent Start, End, Duration: " + line - ) - clip.source_range = otio.opentime.TimeRange( - start, - duration - ) - - if metadata.get("Source File"): - source = metadata.pop("Source File") - clip.media_reference = otio.schema.ExternalReference( - target_url=source - ) - - # If available, collect cdl values in the same way we do for CMX EDL - cdl = {} - - if metadata.get('CDL'): - cdl = _cdl_values_from_metadata(metadata['CDL']) - if cdl: - del metadata['CDL'] - - # If we have more specific metadata, let's use them - if metadata.get('ASC_SOP'): - cdl = _cdl_values_from_metadata(metadata['ASC_SOP']) - - if cdl: - del metadata['ASC_SOP'] - - if metadata.get('ASC_SAT'): - try: - asc_sat_value = float(metadata['ASC_SAT']) - cdl.update(asc_sat=asc_sat_value) - del metadata['ASC_SAT'] - except ValueError: - pass - - if cdl: - clip.metadata['cdl'] = cdl - - # We've pulled out the key/value pairs that we treat specially. - # Put the remaining key/values into clip.metadata["ALE"] - clip.metadata["ALE"] = metadata - - return clip - except Exception as ex: - raise ALEParseError("Error parsing line: {}\n{}".format( - line, repr(ex) - )) - - -def _cdl_values_from_metadata(asc_sop_string): - - if not isinstance(asc_sop_string, str): - return {} - - asc_sop_values = ASC_SOP_REGEX.findall(asc_sop_string) - - cdl_data = {} - - if len(asc_sop_values) >= 9: - - cdl_data.update( - asc_sop={ - 'slope': [float(v) for v in asc_sop_values[:3]], - 'offset': [float(v) for v in asc_sop_values[3:6]], - 'power': [float(v) for v in asc_sop_values[6:9]] - }) - - if len(asc_sop_values) == 10: - cdl_data.update(asc_sat=float(asc_sop_values[9])) - - return cdl_data - - -def _video_format_from_metadata(clips): - # Look for clips with Image Size metadata set - max_height = 0 - max_width = 0 - for clip in clips: - fields = clip.metadata.get("ALE", {}) - res = fields.get("Image Size", "") - m = re.search(r'([0-9]{1,})\s*[xX]\s*([0-9]{1,})', res) - if m and len(m.groups()) >= 2: - width = int(m.group(1)) - height = int(m.group(2)) - if height > max_height: - max_height = height - if width > max_width: - max_width = width - - # We don't have any image size information, use the defaut - if max_height == 0: - return DEFAULT_VIDEO_FORMAT - else: - return AVID_VIDEO_FORMAT_FROM_WIDTH_HEIGHT(max_width, max_height) - - -def read_from_string(input_str, fps=24, **adapter_argument_map): - ale_name_column_key = adapter_argument_map.get('ale_name_column_key', 'Name') - - collection = otio.schema.SerializableCollection() - header = {} - columns = [] - - def nextline(lines): - return lines.pop(0) - - lines = input_str.splitlines() - while len(lines): - line = nextline(lines) - - # skip blank lines - if line.strip() == "": - continue - - if line.strip() == "Heading": - while len(lines): - line = nextline(lines) - - if line.strip() == "": - break - - if "\t" not in line: - raise ALEParseError("Invalid Heading line: " + line) - - segments = line.split("\t") - while len(segments) >= 2: - key, val = segments.pop(0), segments.pop(0) - header[key] = val - if len(segments) != 0: - raise ALEParseError("Invalid Heading line: " + line) - - if "FPS" in header: - read_fps = float(header["FPS"]) - fps = otio.opentime.RationalTime.nearest_smpte_timecode_rate(read_fps) - - if line.strip() == "Column": - if len(lines) == 0: - raise ALEParseError("Unexpected end of file after: " + line) - - line = nextline(lines) - columns = line.split("\t") - - if line.strip() == "Data": - while len(lines): - line = nextline(lines) - - if line.strip() == "": - continue - - clip = _parse_data_line(line, - columns, - fps, - ale_name_column_key=ale_name_column_key) - - collection.append(clip) - - collection.metadata["ALE"] = { - "header": header, - "columns": columns - } - - return collection - - -def write_to_string(input_otio, columns=None, fps=None, video_format=None): - - # Get all the clips we're going to export - clips = list(input_otio.find_clips()) - - result = "" - - result += "Heading\n" - header = dict(input_otio.metadata.get("ALE", {}).get("header", {})) - - # Force this, since we've hard coded tab delimiters - header["FIELD_DELIM"] = "TABS" - - if fps is None: - # If we weren't given a FPS, is there one in the header metadata? - if "FPS" in header: - fps = float(header["FPS"]) - else: - # Would it be better to infer this by inspecting the input clips? - fps = 24 - header["FPS"] = str(fps) - else: - # Put the value we were given into the header - header["FPS"] = str(fps) - - # Check if we have been supplied a VIDEO_FORMAT, if not lets set one - if video_format is None: - # Do we already have it in the header? If so, lets leave that as is - if "VIDEO_FORMAT" not in header: - header["VIDEO_FORMAT"] = _video_format_from_metadata(clips) - else: - header["VIDEO_FORMAT"] = str(video_format) - - headers = list(header.items()) - headers.sort() # make the output predictable - for key, val in headers: - result += f"{key}\t{val}\n" - - # If the caller passed in a list of columns, use that, otherwise - # we need to discover the columns that should be output. - if columns is None: - # Is there a hint about the columns we want (and column ordering) - # at the top level? - columns = input_otio.metadata.get("ALE", {}).get("columns", []) - - # Scan all the clips for any extra columns - for clip in clips: - fields = clip.metadata.get("ALE", {}) - for key in fields.keys(): - if key not in columns: - columns.append(key) - - # Always output these - for c in ["Duration", "End", "Start", "Name", "Source File"]: - if c not in columns: - columns.insert(0, c) - - result += "\nColumn\n{}\n".format("\t".join(columns)) - - result += "\nData\n" - - def val_for_column(column, clip): - if column == "Name": - return clip.name - elif column == "Source File": - if ( - clip.media_reference and - hasattr(clip.media_reference, 'target_url') and - clip.media_reference.target_url - ): - return clip.media_reference.target_url - else: - return "" - elif column == "Start": - if not clip.source_range: - return "" - return otio.opentime.to_timecode( - clip.source_range.start_time, fps - ) - elif column == "Duration": - if not clip.source_range: - return "" - return otio.opentime.to_timecode( - clip.source_range.duration, fps - ) - elif column == "End": - if not clip.source_range: - return "" - return otio.opentime.to_timecode( - clip.source_range.end_time_exclusive(), fps - ) - else: - return clip.metadata.get("ALE", {}).get(column) - - for clip in clips: - row = [] - for column in columns: - val = str(val_for_column(column, clip) or "") - val.replace("\t", " ") # don't allow tabs inside a value - row.append(val) - result += "\t".join(row) + "\n" - - return result diff --git a/contrib/opentimelineio_contrib/adapters/tests/test_ale_adapter.py b/contrib/opentimelineio_contrib/adapters/tests/test_ale_adapter.py deleted file mode 100644 index 9f11ca7ba..000000000 --- a/contrib/opentimelineio_contrib/adapters/tests/test_ale_adapter.py +++ /dev/null @@ -1,222 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# Copyright Contributors to the OpenTimelineIO project - -"""Test the ALE adapter.""" - -# python -import os -import unittest - -import opentimelineio as otio - -SAMPLE_DATA_DIR = os.path.join(os.path.dirname(__file__), "sample_data") -EXAMPLE_PATH = os.path.join(SAMPLE_DATA_DIR, "sample.ale") -EXAMPLE2_PATH = os.path.join(SAMPLE_DATA_DIR, "sample2.ale") -EXAMPLE_CDL_PATH = os.path.join(SAMPLE_DATA_DIR, "sample_cdl.ale") -EXAMPLEUHD_PATH = os.path.join(SAMPLE_DATA_DIR, "sampleUHD.ale") - - -class ALEAdapterTest(unittest.TestCase): - - def test_ale_read(self): - ale_path = EXAMPLE_PATH - collection = otio.adapters.read_from_file(ale_path) - self.assertTrue(collection is not None) - self.assertEqual(type(collection), otio.schema.SerializableCollection) - self.assertEqual(len(collection), 4) - fps = float(collection.metadata.get("ALE").get("header").get("FPS")) - self.assertEqual(fps, 24) - self.assertEqual( - [c.name for c in collection], - ["test_017056", "test_017057", "test_017058", "Something"] - ) - self.assertEqual( - [c.source_range for c in collection], - [ - otio.opentime.TimeRange( - otio.opentime.from_timecode("01:00:00:00", fps), - otio.opentime.from_timecode("00:00:04:03", fps) - ), - otio.opentime.TimeRange( - otio.opentime.from_timecode("01:00:00:00", fps), - otio.opentime.from_timecode("00:00:04:04", fps) - ), - otio.opentime.TimeRange( - otio.opentime.from_timecode("01:00:00:00", fps), - otio.opentime.from_timecode("00:00:04:05", fps) - ), - otio.opentime.TimeRange( - otio.opentime.from_timecode("01:00:00:00", fps), - otio.opentime.from_timecode("00:00:04:06", fps) - ) - ] - ) - - def test_ale_read2(self): - ale_path = EXAMPLE2_PATH - collection = otio.adapters.read_from_file(ale_path) - self.assertTrue(collection is not None) - self.assertEqual(type(collection), otio.schema.SerializableCollection) - self.assertEqual(len(collection), 2) - fps = float(collection.metadata.get("ALE").get("header").get("FPS")) - self.assertEqual(fps, 23.98) - real_fps = otio.opentime.RationalTime.nearest_smpte_timecode_rate(fps) - self.assertEqual( - [c.name for c in collection], - ["19A-1xa", "19A-2xa"] - ) - self.assertEqual( - [c.source_range for c in collection], - [ - otio.opentime.TimeRange( - otio.opentime.from_timecode("04:00:00:00", real_fps), - otio.opentime.from_timecode("00:00:46:16", real_fps) - ), - otio.opentime.TimeRange( - otio.opentime.from_timecode("04:00:46:16", real_fps), - otio.opentime.from_timecode("00:00:50:16", real_fps) - ) - ] - ) - - def test_ale_read_cdl(self): - ale_path = EXAMPLE_CDL_PATH - collection = otio.adapters.read_from_file(ale_path) - self.assertTrue(collection is not None) - self.assertEqual(type(collection), otio.schema.SerializableCollection) - self.assertEqual(len(collection), 4) - fps = float(collection.metadata.get("ALE").get("header").get("FPS")) - self.assertEqual(fps, 23.976) - real_fps = otio.opentime.RationalTime.nearest_smpte_timecode_rate(fps) - self.assertEqual([c.name for c in collection], [ - "A005_C010_0501J0", "A005_C010_0501J0", "A005_C009_0501A0", - "A005_C010_0501J0" - ]) - self.assertEqual([c.source_range for c in collection], [ - - otio.opentime.TimeRange( - otio.opentime.from_timecode("17:49:33:01", real_fps), - otio.opentime.from_timecode("00:00:02:09", real_fps)), - - otio.opentime.TimeRange( - otio.opentime.from_timecode("17:49:55:19", real_fps), - otio.opentime.from_timecode("00:00:06:09", real_fps)), - - otio.opentime.TimeRange( - otio.opentime.from_timecode("17:40:25:06", real_fps), - otio.opentime.from_timecode("00:00:02:20", real_fps)), - - otio.opentime.TimeRange( - otio.opentime.from_timecode("17:50:21:23", real_fps), - otio.opentime.from_timecode("00:00:03:14", real_fps)) - ]) - - # Slope, offset, and power values are of type _otio.AnyVector - # So we have to convert them to lists otherwise - # the comparison between those two types would fail - - # FIRST CLIP - self.assertEqual( - list(collection[0].metadata['cdl']['asc_sop']['slope']), - [0.8714, 0.9334, 0.9947]) - self.assertEqual( - list(collection[0].metadata['cdl']['asc_sop']['offset']), - [-0.087, -0.0922, -0.0808]) - self.assertEqual( - list(collection[0].metadata['cdl']['asc_sop']['power']), - [0.9988, 1.0218, 1.0101]) - self.assertEqual(collection[0].metadata['cdl']['asc_sat'], 0.9) - - # SECOND CLIP - self.assertEqual( - list(collection[1].metadata['cdl']['asc_sop']['slope']), - [0.8714, 0.9334, 0.9947]) - self.assertEqual( - list(collection[1].metadata['cdl']['asc_sop']['offset']), - [-0.087, -0.0922, -0.0808]) - self.assertEqual( - list(collection[1].metadata['cdl']['asc_sop']['power']), - [0.9988, 1.0218, 1.0101]) - self.assertEqual(collection[1].metadata['cdl']['asc_sat'], 0.9) - - # THIRD CLIP - self.assertEqual( - list(collection[2].metadata['cdl']['asc_sop']['slope']), - [0.8604, 0.9252, 0.9755]) - self.assertEqual( - list(collection[2].metadata['cdl']['asc_sop']['offset']), - [-0.0735, -0.0813, -0.0737]) - self.assertEqual( - list(collection[2].metadata['cdl']['asc_sop']['power']), - [0.9988, 1.0218, 1.0101]) - self.assertEqual(collection[2].metadata['cdl']['asc_sat'], 0.9) - - # FOURTH CLIP - self.assertEqual( - list(collection[3].metadata['cdl']['asc_sop']['slope']), - [0.8714, 0.9334, 0.9947]) - self.assertEqual( - list(collection[3].metadata['cdl']['asc_sop']['offset']), - [-0.087, -0.0922, -0.0808]) - self.assertEqual( - list(collection[3].metadata['cdl']['asc_sop']['power']), - [0.9988, 1.0218, 1.0101]) - self.assertEqual(collection[3].metadata['cdl']['asc_sat'], 0.9) - - def test_ale_uhd(self): - ale_path = EXAMPLEUHD_PATH - collection = otio.adapters.read_from_file(ale_path) - frmt = str(collection.metadata.get("ALE").get("header").get("VIDEO_FORMAT")) - self.assertEqual(frmt, "CUSTOM") - - def test_ale_add_format(self): - - # adds a clip to the supplied timeline, sets the clips "Image Size" - # metadata and then rountrips the ALE verifying the supplied format is detected - def add_then_check(timeline, size, expected_format): - cl = otio.schema.Clip( - metadata={'ALE': {'Image Size': size}}, - source_range=otio.opentime.TimeRange( - start_time=otio.opentime.RationalTime(0, 24000 / 1001), - duration=otio.opentime.RationalTime(48, 24000 / 1001) - ) - ) - timeline.tracks[0].extend([cl]) - collection = otio.adapters.read_from_string( - otio.adapters.write_to_string( - timeline, - adapter_name='ale' - ), - adapter_name="ale" - ) - ale_meta = collection.metadata.get('ALE') - vid_format = str(ale_meta.get('header').get('VIDEO_FORMAT')) - self.assertEqual(vid_format, expected_format) - - track = otio.schema.Track() - tl = otio.schema.Timeline("Add Format", tracks=[track]) - - # add multiple clips with various resolutions, - # we want the ALE to return a project format - # that is compatible with the largest resolution - - add_then_check(tl, '720 x 486', 'NTSC') - add_then_check(tl, '720 x 576', 'PAL') - add_then_check(tl, '1280x 720', '720') - add_then_check(tl, '1920x1080', '1080') - add_then_check(tl, '2048x1080', 'CUSTOM') - add_then_check(tl, '4096x2304', 'CUSTOM') - - def test_ale_roundtrip(self): - ale_path = EXAMPLE_PATH - - with open(ale_path) as fi: - original = fi.read() - collection = otio.adapters.read_from_string(original, "ale") - output = otio.adapters.write_to_string(collection, "ale") - self.maxDiff = None - self.assertMultiLineEqual(original, output) - - -if __name__ == '__main__': - unittest.main()