diff --git a/.github/workflows/changelog-check.yml b/.github/workflows/changelog-check.yml index 071ebb4..0309065 100644 --- a/.github/workflows/changelog-check.yml +++ b/.github/workflows/changelog-check.yml @@ -13,4 +13,4 @@ on: jobs: call-changelog-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.15.0 diff --git a/.github/workflows/create-jira-issue.yml b/.github/workflows/create-jira-issue.yml index c4e970a..b7ffba8 100644 --- a/.github/workflows/create-jira-issue.yml +++ b/.github/workflows/create-jira-issue.yml @@ -6,7 +6,7 @@ on: jobs: call-create-jira-issue-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.15.0 secrets: JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }} JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }} diff --git a/.github/workflows/labeled-pr-check.yml b/.github/workflows/labeled-pr-check.yml index c3c050d..2d4eb5b 100644 --- a/.github/workflows/labeled-pr-check.yml +++ b/.github/workflows/labeled-pr-check.yml @@ -12,4 +12,4 @@ on: jobs: call-labeled-pr-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.15.0 diff --git a/.github/workflows/release-checklist-comment.yml b/.github/workflows/release-checklist-comment.yml index 40458ea..cd6eca1 100644 --- a/.github/workflows/release-checklist-comment.yml +++ b/.github/workflows/release-checklist-comment.yml @@ -9,7 +9,7 @@ on: jobs: call-release-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.15.0 permissions: pull-requests: write secrets: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4815fdb..e979f10 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,7 +8,7 @@ on: jobs: call-release-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.15.0 with: release_prefix: burst2safe secrets: diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index 9a7a6cb..f4a0dde 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -1,16 +1,16 @@ name: Static analysis -on: [pull_request] +on: push jobs: call-secrets-analysis-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.15.0 call-ruff-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-ruff.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-ruff.yml@v0.15.0 call-mypy-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-mypy.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-mypy.yml@v0.15.0 diff --git a/.github/workflows/tag-version.yml b/.github/workflows/tag-version.yml index 327f853..ad87c7d 100644 --- a/.github/workflows/tag-version.yml +++ b/.github/workflows/tag-version.yml @@ -9,6 +9,6 @@ jobs: call-bump-version-workflow: # For first-time setup, create a v0.0.0 tag as shown here: # https://github.com/ASFHyP3/actions#reusable-bump-versionyml - uses: ASFHyP3/actions/.github/workflows/reusable-bump-version.yml@v0.14.0 + uses: ASFHyP3/actions/.github/workflows/reusable-bump-version.yml@v0.15.0 secrets: USER_TOKEN: ${{ secrets.TOOLS_BOT_PAK }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ed83e3..20a691b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/) and uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.2] + +### Changed +* Upgraded the `reusable-mypy` action to [v0.15.0](https://github.com/ASFHyP3/actions/releases/tag/v0.15.0) and replaced the `--ignore-missing-imports` option with `disable_error_code = ["import-untyped"]` as recommended by , then ignored or fixed the resulting `mypy` errors. + ## [1.4.1] ### Added diff --git a/pyproject.toml b/pyproject.toml index a70b8b6..e11e8be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,3 +99,7 @@ warn_unused_ignores = true warn_unreachable = true strict_equality = true check_untyped_defs = true +install_types = true +non_interactive = true +pretty = true +disable_error_code = ["import-untyped"] diff --git a/src/burst2safe/base.py b/src/burst2safe/base.py index 96091ae..5e92237 100644 --- a/src/burst2safe/base.py +++ b/src/burst2safe/base.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" import hashlib from copy import deepcopy from datetime import datetime, timedelta @@ -14,7 +15,7 @@ class ListOfListElements: def __init__( - self, inputs: List[ET.Element], start_line: Optional[int] = None, slc_lengths: Optional[List[int]] = None + self, inputs: List[ET._Element], start_line: Optional[int] = None, slc_lengths: Optional[List[int]] = None ): """Initialize the ListOfListElements object. @@ -23,7 +24,7 @@ def __init__( start_line: The starting line number of the first element. slc_lengths: The total line lengths of the SLCs corresponding to each element. """ - self.inputs: list[ET.Element] = inputs + self.inputs: list[ET._Element] = inputs self.start_line: Optional[int] = start_line self.slc_lengths: Optional[list[int]] = slc_lengths @@ -51,7 +52,7 @@ def __init__( self.inputs = sorted(self.inputs, key=self.get_first_time) self.has_line = elements[0].find('line') is not None - def get_first_time(self, element: ET.Element) -> datetime: + def get_first_time(self, element: ET._Element) -> datetime: """Get first time in List element Args: @@ -60,10 +61,10 @@ def get_first_time(self, element: ET.Element) -> datetime: Returns: The first time in the element. """ - first_time = min([datetime.fromisoformat(sub.find(self.time_field).text) for sub in element]) + first_time = min([datetime.fromisoformat(sub.find(self.time_field).text) for sub in element]) # type: ignore[arg-type] return first_time - def get_unique_elements(self) -> List[ET.Element]: + def get_unique_elements(self) -> List[ET._Element]: """Get the elements without duplicates. Adjust line number if present. Returns: @@ -71,21 +72,21 @@ def get_unique_elements(self) -> List[ET.Element]: """ list_of_element_lists = [item.findall('*') for item in self.inputs] - last_time = datetime.fromisoformat(list_of_element_lists[0][-1].find(self.time_field).text) + last_time = datetime.fromisoformat(list_of_element_lists[0][-1].find(self.time_field).text) # type: ignore[arg-type] uniques = [deepcopy(element) for element in list_of_element_lists[0]] if self.has_line: assert self.slc_lengths is not None previous_line_count = self.slc_lengths[0] for i, element_list in enumerate(list_of_element_lists[1:]): - times = [datetime.fromisoformat(element.find(self.time_field).text) for element in element_list] + times = [datetime.fromisoformat(element.find(self.time_field).text) for element in element_list] # type: ignore[arg-type] keep_index = [index for index, time in enumerate(times) if time > last_time] to_keep = [deepcopy(element_list[index]) for index in keep_index] if self.has_line: - new_lines = [int(elem.find('line').text) + previous_line_count for elem in to_keep] + new_lines = [int(elem.find('line').text) + previous_line_count for elem in to_keep] # type: ignore[arg-type] for elem, line in zip(to_keep, new_lines): - set_text(elem.find('line'), line) + set_text(elem.find('line'), line) # type: ignore[arg-type] assert self.slc_lengths is not None previous_line_count += self.slc_lengths[i] @@ -95,7 +96,7 @@ def get_unique_elements(self) -> List[ET.Element]: return uniques @staticmethod - def filter_by_line(element_list: List[ET.Element], line_bounds: tuple[float, float]) -> List[ET.Element]: + def filter_by_line(element_list: List[ET._Element], line_bounds: tuple[float, float]) -> List[ET._Element]: """Filter elements by line number. Args: @@ -106,24 +107,24 @@ def filter_by_line(element_list: List[ET.Element], line_bounds: tuple[float, flo """ new_list = [] for elem in element_list: - if line_bounds[0] <= int(elem.find('line').text) <= line_bounds[1]: + if line_bounds[0] <= int(elem.find('line').text) <= line_bounds[1]: # type: ignore[arg-type,operator] new_list.append(deepcopy(elem)) return new_list - def update_line_numbers(self, elements: List[ET.Element]) -> None: + def update_line_numbers(self, elements: List[ET._Element]) -> None: """Update the line numbers of the elements. Args: elements: The list of elements to update. """ for element in elements: - standard_line = int(element.find('line').text) + standard_line = int(element.find('line').text) # type: ignore[arg-type] assert self.start_line is not None element.find('line').text = str(standard_line - self.start_line) def filter_by_time( - self, elements: List[ET.Element], anx_bounds: tuple[datetime, datetime], buffer: timedelta - ) -> List[ET.Element]: + self, elements: List[ET._Element], anx_bounds: tuple[datetime, datetime], buffer: timedelta + ) -> List[ET._Element]: """Filter elements by time. Args: @@ -138,7 +139,7 @@ def filter_by_time( max_anx_bound = anx_bounds[1] + buffer filtered_elements = [] for element in elements: - azimuth_time = datetime.fromisoformat(element.find(self.time_field).text) + azimuth_time = datetime.fromisoformat(element.find(self.time_field).text) # type: ignore[arg-type] if min_anx_bound < azimuth_time < max_anx_bound: filtered_elements.append(deepcopy(element)) @@ -149,7 +150,7 @@ def create_filtered_list( anx_bounds: tuple[datetime, datetime], buffer: timedelta = timedelta(seconds=3), line_bounds: Optional[tuple[float, float]] = None, - ) -> ET.Element: + ) -> ET._Element: """Filter elements by time/line. Adjust line number if present. Args: @@ -172,12 +173,13 @@ def create_filtered_list( filtered_elements = self.filter_by_line(filtered_elements, line_bounds) new_element = ET.Element(self.name) - [new_element.append(element) for element in filtered_elements] + for element in filtered_elements: + new_element.append(element) new_element.set('count', str(len(filtered_elements))) return new_element -def create_content_unit(simple_name: str, unit_type: str, rep_id: str) -> ET.Element: +def create_content_unit(simple_name: str, unit_type: str, rep_id: str) -> ET._Element: """Create a content unit element for a manifest.safe file. Args: @@ -195,7 +197,7 @@ def create_content_unit(simple_name: str, unit_type: str, rep_id: str) -> ET.Ele return content_unit -def create_metadata_object(simple_name: str) -> ET.Element: +def create_metadata_object(simple_name: str) -> ET._Element: """Create a metadata object element for a manifest.safe file. Args: @@ -214,7 +216,7 @@ def create_metadata_object(simple_name: str) -> ET.Element: def create_data_object( simple_name: str, relative_path: Union[Path, str], rep_id: str, mime_type: str, size_bytes: int, md5: str -) -> ET.Element: +) -> ET._Element: """Create a data object element for a manifest.safe file. Args: @@ -273,14 +275,14 @@ def __init__(self, burst_infos: list[BurstInfo], metadata_type: str, ipf_version products = [get_subxml_from_metadata(path, 'product', self.swath, self.pol) for path in self.metadata_paths] slc_lengths = [] for annotation in products: - n_bursts = int(annotation.find('.//burstList').get('count')) - burst_length = int(annotation.find('.//linesPerBurst').text) + n_bursts = int(annotation.find('.//burstList').get('count')) # type: ignore[arg-type] + burst_length = int(annotation.find('.//linesPerBurst').text) # type: ignore[arg-type] slc_lengths.append(n_bursts * burst_length) self.slc_lengths = slc_lengths # annotation components to be extended by subclasses - self.ads_header = None - self.xml: Optional[ET.Element] = None + self.ads_header: Optional[ET._Element] = None + self.xml: Union[ET._Element, ET._ElementTree, None] = None # these attributes are updated when the annotation is written to a file self.size_bytes: Optional[int] = None @@ -294,7 +296,7 @@ def create_ads_header(self): ads_header.find('imageNumber').text = f'{self.image_number:03d}' self.ads_header = ads_header - def merge_lists(self, list_name: str, line_bounds: Optional[tuple[int, int]] = None) -> ET.Element: + def merge_lists(self, list_name: str, line_bounds: Optional[tuple[int, int]] = None) -> ET._Element: """Merge lists of elements into a single list. Args: @@ -303,7 +305,7 @@ def merge_lists(self, list_name: str, line_bounds: Optional[tuple[int, int]] = N Returns: The merged list element. """ - list_elements = [input_xml.find(list_name) for input_xml in self.inputs] + list_elements: list = [input_xml.find(list_name) for input_xml in self.inputs] list_of_list_elements = ListOfListElements(list_elements, self.start_line, self.slc_lengths) merged_list = list_of_list_elements.create_filtered_list((self.min_anx, self.max_anx), line_bounds=line_bounds) return merged_list @@ -331,6 +333,7 @@ def __str__(self, **kwargs): Args: kwargs: Keyword arguments to pass to the lxml """ + assert self.xml is not None xml_str = ET.tostring(self.xml, pretty_print=True, **kwargs) return xml_str.decode() diff --git a/src/burst2safe/calibration.py b/src/burst2safe/calibration.py index a0ad568..a1857d3 100644 --- a/src/burst2safe/calibration.py +++ b/src/burst2safe/calibration.py @@ -1,4 +1,5 @@ from copy import deepcopy +from typing import Optional import lxml.etree as ET @@ -18,12 +19,12 @@ def __init__(self, burst_infos: list[BurstInfo], ipf_version: str, image_number: image_number: Image number. """ super().__init__(burst_infos, 'calibration', ipf_version, image_number) - self.calibration_information = None + self.calibration_information: Optional[ET._Element] = None self.calibrattion_vector_list = None def create_calibration_information(self): """Create the calibration information.""" - calibration_information = [calibration.find('calibrationInformation') for calibration in self.inputs][0] + calibration_information = [calibration.find('calibrationInformation') for calibration in self.inputs][0] # type: ignore[union-attr] self.calibration_information = deepcopy(calibration_information) def create_calibration_vector_list(self): @@ -37,6 +38,8 @@ def assemble(self): self.create_calibration_vector_list() calibration = ET.Element('calibration') + assert self.ads_header is not None + assert self.calibration_information is not None calibration.append(self.ads_header) calibration.append(self.calibration_information) calibration.append(self.calibration_vector_list) diff --git a/src/burst2safe/local2safe.py b/src/burst2safe/local2safe.py index 5924f24..3f456e6 100644 --- a/src/burst2safe/local2safe.py +++ b/src/burst2safe/local2safe.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" """Generate a SAFE file from local burst extractor outputs""" import argparse @@ -29,13 +30,15 @@ def burst_info_from_local( manifest = utils.get_subxml_from_metadata(xml_path, 'manifest', swath, polarization) xml_orbit_path = './/{*}metadataObject[@ID="measurementOrbitReference"]/metadataWrap/xmlData/{*}orbitReference' meta_orbit = manifest.find(xml_orbit_path) - abs_orbit_start, abs_orbit_stop = [int(x.text) for x in meta_orbit.findall('{*}orbitNumber')] - rel_orbit_start, rel_orbit_stop = [int(x.text) for x in meta_orbit.findall('{*}relativeOrbitNumber')] + abs_orbit_start, abs_orbit_stop = [int(x.text) for x in meta_orbit.findall('{*}orbitNumber')] # type: ignore[arg-type] + rel_orbit_start, rel_orbit_stop = [int(x.text) for x in meta_orbit.findall('{*}relativeOrbitNumber')] # type: ignore[arg-type] direction = meta_orbit.find('{*}extension/{*}orbitProperties/{*}pass').text.upper() product = utils.get_subxml_from_metadata(xml_path, 'product', swath, polarization) sensing_time_str = product.findall('swathTiming/burstList/burst')[burst_index].find('sensingTime').text anx_time_str = meta_orbit.find('{*}extension/{*}orbitProperties/{*}ascendingNodeTime').text + assert sensing_time_str is not None + assert anx_time_str is not None burst_id, rel_orbit = calculate_burstid(sensing_time_str, anx_time_str, rel_orbit_start, rel_orbit_stop, swath) info = utils.BurstInfo( granule='', diff --git a/src/burst2safe/manifest.py b/src/burst2safe/manifest.py index d018212..766a0be 100644 --- a/src/burst2safe/manifest.py +++ b/src/burst2safe/manifest.py @@ -1,7 +1,7 @@ import hashlib from copy import deepcopy from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Union import lxml.etree as ET import numpy as np @@ -49,11 +49,11 @@ class Manifest: def __init__( self, - content_units: List[ET.Element], - metadata_objects: List[ET.Element], - data_objects: List[ET.Element], + content_units: List[ET._Element], + metadata_objects: List[ET._Element], + data_objects: List[ET._Element], bbox: Polygon, - template_manifest: ET.Element, + template_manifest: ET._Element, ): """Initialize a Manifest object. @@ -72,10 +72,10 @@ def __init__( self.version = 'esa/safe/sentinel-1.0/sentinel-1/sar/level-1/slc/standard/iwdp' # Updated by methods - self.information_package_map = None - self.metadata_section: Optional[ET.Element] = None - self.data_object_section: Optional[ET.Element] = None - self.xml: Optional[ET.Element] = None + self.information_package_map: Optional[ET._Element] = None + self.metadata_section: Optional[ET._Element] = None + self.data_object_section: Optional[ET._Element] = None + self.xml: Union[ET._Element, ET._ElementTree, None] = None self.path: Optional[Path] = None self.crc: Optional[str] = None @@ -119,11 +119,12 @@ def create_metadata_section(self): 's1Level1QuicklookSchema', 's1MapOverlaySchema', ] - for obj in self.template.find('metadataSection'): + for obj in self.template.find('metadataSection'): # type: ignore[union-attr] if obj.get('ID') in ids_to_keep: metadata_section.append(deepcopy(obj)) coordinates = metadata_section.find('.//{*}coordinates') + assert coordinates is not None coordinates.text = get_footprint_string(self.bbox) self.metadata_section = metadata_section @@ -142,6 +143,9 @@ def assemble(self): manifest = ET.Element('{%s}XFDU' % NAMESPACES['xfdu'], nsmap=NAMESPACES) manifest.set('version', self.version) + assert self.information_package_map is not None + assert self.metadata_section is not None + assert self.data_object_section is not None manifest.append(self.information_package_map) manifest.append(self.metadata_section) manifest.append(self.data_object_section) @@ -157,7 +161,7 @@ def write(self, out_path: Path, update_info: bool = True) -> None: out_path: The path to write the manifest to update_info: Whether to update the path and CRC """ - assert self.xml is not None + assert isinstance(self.xml, ET._ElementTree) self.xml.write(out_path, pretty_print=True, xml_declaration=True, encoding='utf-8') if update_info: self.path = out_path @@ -174,7 +178,7 @@ def __init__(self, bbox: Polygon): bbox: The bounding box of the product """ self.bbox = bbox - self.xml: Optional[ET.Element] = None + self.xml: Union[ET._Element, ET._ElementTree, None] = None def assemble(self): """Assemble the components of the SAFE KML preview file.""" @@ -209,7 +213,7 @@ def write(self, out_path: Path, update_info: bool = True) -> None: out_path: The path to write the manifest to update_info: Whether to update the path """ - assert self.xml is not None + assert isinstance(self.xml, ET._ElementTree) self.xml.write(out_path, pretty_print=True, xml_declaration=True, encoding='utf-8') if update_info: self.path = out_path @@ -268,7 +272,7 @@ def __init__( ] if len(self.rfi) > 0: self.support.append('s1-level-1-rfi.xsd') - self.html: Optional[ET.Element] = None + self.html: Optional[ET._ElementTree] = None self.path: Optional[Path] = None def create_base(self): diff --git a/src/burst2safe/noise.py b/src/burst2safe/noise.py index 87cc950..9be81a8 100644 --- a/src/burst2safe/noise.py +++ b/src/burst2safe/noise.py @@ -1,4 +1,6 @@ +# mypy: disable-error-code="union-attr" from copy import deepcopy +from typing import Optional import lxml.etree as ET import numpy as np @@ -19,9 +21,9 @@ def __init__(self, burst_infos: list[BurstInfo], ipf_version: str, image_number: image_number: Image number. """ super().__init__(burst_infos, 'noise', ipf_version, image_number) - self.noise_vector_list = None # Only used in version < 2.90 - self.range_vector_list = None - self.azimuth_vector_list = None + self.noise_vector_list: Optional[ET._Element] = None # Only used in version < 2.90 + self.range_vector_list: Optional[ET._Element] = None + self.azimuth_vector_list: Optional[ET._Element] = None def create_range_vector_list(self): """Create the range vector list.""" @@ -56,7 +58,9 @@ def _get_start_stop_indexes(lines: np.ndarray, last_line: int, first_line: int = return first_index, last_index @staticmethod - def _update_azimuth_vector(az_vector: ET.Element, line_offset: int, start_line: int, stop_line: int) -> ET.Element: + def _update_azimuth_vector( + az_vector: ET._Element, line_offset: int, start_line: int, stop_line: int + ) -> ET._Element: """Update the azimuth vector to match the new line range. Subset noiseAzimuthLut to match. Args: @@ -93,7 +97,7 @@ def create_azimuth_vector_list(self): """Create the azimuth vector list. ListOfListElements class can't be used here because the noiseAzimuthVectorList has a different structure than the other lists elements. """ - az_vectors = [noise.find('noiseAzimuthVectorList') for noise in self.inputs] + az_vectors: list = [noise.find('noiseAzimuthVectorList') for noise in self.inputs] updated_az_vectors = [] for i, az_vector_set in enumerate(az_vectors): slc_offset = sum(self.slc_lengths[:i]) @@ -101,15 +105,16 @@ def create_azimuth_vector_list(self): updated_az_vector_set = [] for az_vector in az_vectors: line_offset = slc_offset - self.start_line + assert az_vector is not None updated_az_vector = self._update_azimuth_vector(az_vector, line_offset, self.start_line, self.stop_line) updated_az_vector_set.append(updated_az_vector) updated_az_vectors.append(updated_az_vector_set) - updated_az_vectors = flatten(updated_az_vectors) + updated_az_vectors_flattened = flatten(updated_az_vectors) new_az_vector_list = ET.Element('noiseAzimuthVectorList') - new_az_vector_list.set('count', str(len(updated_az_vectors))) - for az_vector in updated_az_vectors: + new_az_vector_list.set('count', str(len(updated_az_vectors_flattened))) + for az_vector in updated_az_vectors_flattened: new_az_vector_list.append(az_vector) self.azimuth_vector_list = new_az_vector_list @@ -118,15 +123,19 @@ def assemble(self): self.create_ads_header() noise = ET.Element('noise') + assert self.ads_header is not None noise.append(self.ads_header) if self.major_version >= 3 or self.minor_version >= 90: self.create_range_vector_list() self.create_azimuth_vector_list() + assert self.range_vector_list is not None + assert self.azimuth_vector_list is not None noise.append(self.range_vector_list) noise.append(self.azimuth_vector_list) else: self.create_noise_vector_list() + assert self.noise_vector_list is not None noise.append(self.noise_vector_list) noise_tree = ET.ElementTree(noise) diff --git a/src/burst2safe/product.py b/src/burst2safe/product.py index a180639..07383f6 100644 --- a/src/burst2safe/product.py +++ b/src/burst2safe/product.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" from collections.abc import Iterable from copy import deepcopy from dataclasses import dataclass @@ -36,20 +37,20 @@ def __init__(self, burst_infos: list[BurstInfo], ipf_version: str, image_number: super().__init__(burst_infos, 'product', ipf_version, image_number) self.dummy = dummy self.qulatity_information = None - self.general_annotation = None - self.image_annotation = None - self.doppler_centroid = None - self.antenna_pattern = None - self.swath_timing: Optional[ET.Element] = None - self.geolocation_grid: Optional[ET.Element] = None - self.coordinate_conversion = None - self.swath_merging = None + self.general_annotation: Optional[ET._Element] = None + self.image_annotation: Optional[ET._Element] = None + self.doppler_centroid: Optional[ET._Element] = None + self.antenna_pattern: Optional[ET._Element] = None + self.swath_timing: Optional[ET._Element] = None + self.geolocation_grid: Optional[ET._Element] = None + self.coordinate_conversion: Optional[ET._Element] = None + self.swath_merging: Optional[ET._Element] = None self.gcps: list = [] def create_quality_information(self): """Create the qualityInformation element.""" quality_information = ET.Element('qualityInformation') - quality_information.append(deepcopy(self.inputs[0].find('qualityInformation/productQualityIndex'))) + quality_information.append(deepcopy(self.inputs[0].find('qualityInformation/productQualityIndex'))) # type: ignore[arg-type] quality_datas = flatten([cal.findall('qualityInformation/qualityDataList/qualityData') for cal in self.inputs]) quality_data_list = ET.Element('qualityDataList') @@ -75,9 +76,10 @@ def create_general_annotation(self): # TODO: productInformation/platformHeading should be calculated more accurately platform_heading_path = 'generalAnnotation/productInformation/platformHeading' - platform_heading = np.mean([float(prod.find(platform_heading_path).text) for prod in self.inputs]) + platform_heading = np.mean([float(prod.find(platform_heading_path).text) for prod in self.inputs]) # type: ignore[arg-type] product_information.find('platformHeading').text = f'{platform_heading:.14e}' + assert product_information is not None general_annotation.append(product_information) lists = [ @@ -91,7 +93,7 @@ def create_general_annotation(self): 'azimuthFmRateList', ] for list_name in lists: - list_elements = [prod.find(f'generalAnnotation/{list_name}') for prod in self.inputs] + list_elements: list = [prod.find(f'generalAnnotation/{list_name}') for prod in self.inputs] if len(flatten([element.findall('*') for element in list_elements])) == 0: filtered = ET.Element(list_elements[0].tag) filtered.set('count', '0') @@ -100,7 +102,8 @@ def create_general_annotation(self): unique = lol.get_unique_elements() filtered = ET.Element('replicaInformationList') filtered.set('count', str(len(unique))) - [filtered.append(element) for element in unique] + for element in unique: + filtered.append(element) else: lol = ListOfListElements(list_elements, self.start_line, self.slc_lengths) filtered = lol.create_filtered_list((self.min_anx, self.max_anx), buffer=timedelta(seconds=500)) @@ -135,7 +138,7 @@ def create_image_annotation(self): image_information.find('numberOfLines').text = str(self.total_lines) az_spacing_path = 'imageAnnotation/imageInformation/azimuthPixelSpacing' - az_spacing = np.mean([float(prod.find(az_spacing_path).text) for prod in self.inputs]) + az_spacing = np.mean([float(prod.find(az_spacing_path).text) for prod in self.inputs]) # type: ignore[arg-type] image_information.find('azimuthPixelSpacing').text = f'{az_spacing:.6e}' image_information.find('imageStatistics/outputDataMean/re').text = '' @@ -143,6 +146,7 @@ def create_image_annotation(self): image_information.find('imageStatistics/outputDataStdDev/re').text = '' image_information.find('imageStatistics/outputDataStdDev/im').text = '' + assert image_information is not None image_annotation.append(image_information) processing_information = deepcopy(self.inputs[0].find('imageAnnotation/processingInformation')) @@ -150,11 +154,14 @@ def create_image_annotation(self): for element in slice_list: dimensions_list.remove(element) - list_elements = [prod.find('imageAnnotation/processingInformation/inputDimensionsList') for prod in self.inputs] + list_elements: list = [ + prod.find('imageAnnotation/processingInformation/inputDimensionsList') for prod in self.inputs + ] lol = ListOfListElements(list_elements, self.start_line, self.slc_lengths) filtered = lol.create_filtered_list((self.min_anx, self.max_anx)) [dimensions_list.append(element) for element in filtered] + assert processing_information is not None image_annotation.append(processing_information) self.image_annotation = image_annotation @@ -192,14 +199,14 @@ def create_antenna_pattern(self): def create_swath_timing(self): """Create the swathTiming element.""" - burst_lists = [prod.find('swathTiming/burstList') for prod in self.inputs] + burst_lists: list = [prod.find('swathTiming/burstList') for prod in self.inputs] burst_lol = ListOfListElements(burst_lists, self.start_line, self.slc_lengths) filtered = burst_lol.create_filtered_list((self.min_anx, self.max_anx), buffer=timedelta(seconds=0.1)) # TODO: This is needed since we always buffer backward AND forward - if int(filtered.get('count')) > len(self.burst_infos): + if int(filtered.get('count')) > len(self.burst_infos): # type: ignore[arg-type] filtered.remove(filtered[-1]) - filtered.set('count', str(int(filtered.get('count')) - 1)) + filtered.set('count', str(int(filtered.get('count')) - 1)) # type: ignore[arg-type] for burst in filtered: burst.find('byteOffset').text = '' @@ -218,11 +225,11 @@ def update_gcps(self): gcp_xmls = self.geolocation_grid.find('geolocationGridPointList').findall('*') for gcp_xml in gcp_xmls: gcp = GeoPoint( - float(gcp_xml.find('longitude').text), - float(gcp_xml.find('latitude').text), - float(gcp_xml.find('height').text), - int(gcp_xml.find('line').text), - int(gcp_xml.find('pixel').text), + float(gcp_xml.find('longitude').text), # type: ignore[arg-type] + float(gcp_xml.find('latitude').text), # type: ignore[arg-type] + float(gcp_xml.find('height').text), # type: ignore[arg-type] + int(gcp_xml.find('line').text), # type: ignore[arg-type] + int(gcp_xml.find('pixel').text), # type: ignore[arg-type] ) self.gcps.append(gcp) @@ -238,6 +245,7 @@ def update_burst_byte_offsets(self, byte_offsets: Iterable[int]): for swath_timing in [self.swath_timing, self.xml.find('swathTiming')]: burst_list = swath_timing.find('burstList') + assert burst_list is not None for i, byte_offset in enumerate(byte_offsets): burst_list[i].find('byteOffset').text = str(byte_offset) @@ -294,6 +302,18 @@ def assemble(self): self.remove_burst_data() product = ET.Element('product') + + assert self.ads_header is not None + assert self.quality_information is not None + assert self.general_annotation is not None + assert self.image_annotation is not None + assert self.doppler_centroid is not None + assert self.antenna_pattern is not None + assert self.swath_timing is not None + assert self.geolocation_grid is not None + assert self.coordinate_conversion is not None + assert self.swath_merging is not None + product.append(self.ads_header) product.append(self.quality_information) product.append(self.general_annotation) diff --git a/src/burst2safe/rfi.py b/src/burst2safe/rfi.py index 36c127c..e6a4a3e 100644 --- a/src/burst2safe/rfi.py +++ b/src/burst2safe/rfi.py @@ -1,4 +1,5 @@ from copy import deepcopy +from typing import Optional import lxml.etree as ET @@ -21,13 +22,13 @@ def __init__(self, burst_infos: list[BurstInfo], ipf_version: str, image_number: image_number: Image number. """ super().__init__(burst_infos, 'rfi', ipf_version, image_number) - self.rfi_mitigation_applied = None - self.rfi_detection_from_noise_report_list = None - self.rfi_burst_report_list = None + self.rfi_mitigation_applied: Optional[ET._Element] = None + self.rfi_detection_from_noise_report_list: Optional[ET._Element] = None + self.rfi_burst_report_list: Optional[ET._Element] = None def create_rfi_mitigation_applied(self): """Create the rifMitigationApplied element.""" - self.rfi_mitigation_applied = deepcopy(self.inputs[0].find('rfiMitigationApplied')) + self.rfi_mitigation_applied = deepcopy(self.inputs[0].find('rfiMitigationApplied')) # type: ignore[union-attr] def create_rfi_detection_from_noise_report_list(self): """Create the rfiDetectionFromNoiseReportList element.""" @@ -45,6 +46,10 @@ def assemble(self): self.create_rfi_burst_report_list() rfi = ET.Element('rfi') + assert self.ads_header is not None + assert self.rfi_mitigation_applied is not None + assert self.rfi_detection_from_noise_report_list is not None + assert self.rfi_burst_report_list is not None rfi.append(self.ads_header) rfi.append(self.rfi_mitigation_applied) rfi.append(self.rfi_detection_from_noise_report_list) diff --git a/src/burst2safe/safe.py b/src/burst2safe/safe.py index 8ceace2..49375fd 100644 --- a/src/burst2safe/safe.py +++ b/src/burst2safe/safe.py @@ -6,7 +6,6 @@ from pathlib import Path from typing import List, Optional, Tuple, Union, cast -import lxml.etree as ET import numpy as np from shapely.geometry import MultiPolygon, Polygon @@ -41,7 +40,7 @@ def __init__( self.safe_path = self.work_dir / self.name self.swaths: list = [] self.blank_products: list = [] - self.manifest: Optional[ET.Element] = None + self.manifest: Optional[Manifest] = None self.kml: Optional[Kml] = None self.version = self.get_ipf_version(self.burst_infos[0].metadata_path) @@ -62,8 +61,8 @@ def get_creation_time(self) -> datetime: desired_tag = './/{http://www.esa.int/safe/sentinel-1.0}processing' creation_times = [] for manifest in manifests: - slc_processing = [elem for elem in manifest.findall(desired_tag) if elem.get('name') == 'SLC Processing'][0] - creation_times.append(datetime.strptime(slc_processing.get('stop'), '%Y-%m-%dT%H:%M:%S.%f')) + slc_processing = [elem for elem in manifest.findall(desired_tag) if elem.get('name') == 'SLC Processing'][0] # type: ignore[union-attr] + creation_times.append(datetime.strptime(slc_processing.get('stop'), '%Y-%m-%dT%H:%M:%S.%f')) # type: ignore[arg-type] creation_time = max(creation_times) return creation_time @@ -192,8 +191,10 @@ def get_ipf_version(metadata_path: Path) -> str: The IPF version as a string """ manifest = get_subxml_from_metadata(metadata_path, 'manifest') - version_xml = [elem for elem in manifest.findall('.//{*}software') if elem.get('name') == 'Sentinel-1 IPF'][0] - return version_xml.get('version') + version_xml = [elem for elem in manifest.findall('.//{*}software') if elem.get('name') == 'Sentinel-1 IPF'][0] # type: ignore[union-attr] + version_str = version_xml.get('version') + assert version_str is not None + return version_str def get_bbox(self) -> Polygon: """Get the bounding box for the SAFE file. @@ -393,6 +394,7 @@ def create_manifest(self) -> None: manifest_name = self.safe_path / 'manifest.safe' content_units, metadata_objects, data_objects = self.compile_manifest_components() template_manifest = get_subxml_from_metadata(self.burst_infos[0].metadata_path, 'manifest') + assert template_manifest is not None manifest = Manifest(content_units, metadata_objects, data_objects, self.get_bbox(), template_manifest) manifest.assemble() manifest.write(manifest_name) @@ -417,6 +419,7 @@ def create_preview(self): def update_product_identifier(self) -> None: """Update the product identifier using the CRC of the manifest file.""" assert self.manifest is not None + assert self.manifest.crc is not None new_new = self.get_name(unique_id=self.manifest.crc) new_path = self.work_dir / new_new if new_path.exists(): diff --git a/src/burst2safe/utils.py b/src/burst2safe/utils.py index c903108..8c14adf 100644 --- a/src/burst2safe/utils.py +++ b/src/burst2safe/utils.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" import json import warnings from argparse import Namespace @@ -44,18 +45,18 @@ class BurstInfo: def add_shape_info(self): """Add shape information to the BurstInfo object.""" annotation = get_subxml_from_metadata(self.metadata_path, 'product', self.swath, self.polarization) - self.length = int(annotation.find('swathTiming/linesPerBurst').text) - self.width = int(annotation.find('swathTiming/samplesPerBurst').text) + self.length = int(annotation.find('swathTiming/linesPerBurst').text) # type: ignore[arg-type] + self.width = int(annotation.find('swathTiming/samplesPerBurst').text) # type: ignore[arg-type] def add_start_stop_utc(self): """Add start and stop UTC to burst info. There is spatial overlap between bursts, so burst start/stop times will overlap as well. """ annotation = get_subxml_from_metadata(self.metadata_path, 'product', self.swath, self.polarization) - start_utcs = [datetime.fromisoformat(x.find('azimuthTime').text) for x in annotation.findall('.//burst')] + start_utcs = [datetime.fromisoformat(x.find('azimuthTime').text) for x in annotation.findall('.//burst')] # type: ignore[arg-type] self.start_utc = start_utcs[self.burst_index] - azimuth_time_interval = float(annotation.find('.//azimuthTimeInterval').text) + azimuth_time_interval = float(annotation.find('.//azimuthTimeInterval').text) # type: ignore[arg-type] assert self.length is not None burst_time_interval = timedelta(seconds=(self.length - 1) * azimuth_time_interval) self.stop_utc = self.start_utc + burst_time_interval @@ -183,7 +184,7 @@ def calculate_crc16(file_path: Path) -> str: def get_subxml_from_metadata( metadata_path: Path, xml_type: str, subswath: Optional[str] = None, polarization: Optional[str] = None -) -> ET.Element: +) -> Optional[ET._Element]: """Extract child xml info from ASF combined metadata file. Args: @@ -231,7 +232,7 @@ def drop_duplicates(input_list: List) -> List: return list(dict.fromkeys(input_list)) -def set_text(element: ET.Element, text: Union[str, int]) -> None: +def set_text(element: ET._Element, text: Union[str, int]) -> None: """Set the text of an element if it is not None. Args: diff --git a/tests/test_all_anns.py b/tests/test_all_anns.py index c881954..c46b561 100644 --- a/tests/test_all_anns.py +++ b/tests/test_all_anns.py @@ -15,10 +15,12 @@ def test_all_anns(tmp_path): xml = ET.parse(prod).getroot() burst_list = xml.find('.//burstList') + assert burst_list is not None assert burst_list.attrib['count'] == '0' assert len(burst_list) == 0 geolocation_grid = xml.find('.//geolocationGridPointList') + assert geolocation_grid is not None assert geolocation_grid.attrib['count'] == '0' assert len(geolocation_grid) == 0 @@ -26,9 +28,11 @@ def test_all_anns(tmp_path): xml_real = ET.parse(prod_real).getroot() burst_list = xml_real.find('.//burstList') + assert burst_list is not None assert int(burst_list.attrib['count']) > 0 assert len(burst_list) > 0 geolocation_grid = xml_real.find('.//geolocationGridPointList') + assert geolocation_grid is not None assert int(geolocation_grid.attrib['count']) > 0 assert len(geolocation_grid) > 0 diff --git a/tests/test_base.py b/tests/test_base.py index dbba30d..5240cc5 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" from datetime import datetime, timedelta from pathlib import Path diff --git a/tests/test_manifest.py b/tests/test_manifest.py index db19124..0ac4b69 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -13,6 +13,7 @@ def manifest(test_data1_xml): data_objects = [ET.Element('data_object')] bbox = Polygon([(1, 1), (1, 2), (2, 2), (2, 1), (1, 1)]) template_manifest = get_subxml_from_metadata(test_data1_xml, 'manifest') + assert template_manifest is not None test_manifest = Manifest(content_units, metadata_objects, data_objects, bbox, template_manifest) return test_manifest diff --git a/tests/test_noise.py b/tests/test_noise.py index a9f9c37..af43f15 100644 --- a/tests/test_noise.py +++ b/tests/test_noise.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" import xml.etree.ElementTree as ET import numpy as np @@ -36,7 +37,7 @@ def test_update_azimuth_vector(self): assert element.text is not None element.set('count', str(len(element.text.split(' ')))) - new_az_vector = Noise._update_azimuth_vector(az_vector, 0, 10, 20) + new_az_vector = Noise._update_azimuth_vector(az_vector, 0, 10, 20) # type: ignore[arg-type] assert new_az_vector.find('firstAzimuthLine').text == '0' assert new_az_vector.find('lastAzimuthLine').text == '10' assert new_az_vector.find('line').get('count') == '6' diff --git a/tests/test_product.py b/tests/test_product.py index bb1869e..07e6947 100644 --- a/tests/test_product.py +++ b/tests/test_product.py @@ -1,3 +1,4 @@ +# mypy: disable-error-code="union-attr" import lxml.etree as ET from burst2safe.product import GeoPoint, Product diff --git a/tests/test_utils.py b/tests/test_utils.py index 951a125..a234ce8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -113,8 +113,8 @@ def test_calculate_crc16(tmp_path, test_data_dir): def test_get_subxml_from_metadata(xml_type, swath, test_data1_xml): result = utils.get_subxml_from_metadata(test_data1_xml, xml_type, swath, 'VV') assert isinstance(result, lxml.etree._Element) - assert result.find('adsHeader/swath').text == swath - assert result.find('adsHeader/polarisation').text == 'VV' + assert result.find('adsHeader/swath').text == swath # type: ignore[union-attr] + assert result.find('adsHeader/polarisation').text == 'VV' # type: ignore[union-attr] assert result.tag == 'content'