From e51be34ed17cf7ef3eb835206a78be3661fcf5fe Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 01:08:18 +0200 Subject: [PATCH 1/3] Re-work ECSS TM time handling --- CHANGELOG.md | 16 ++++++ docs/examples.rst | 2 +- spacepackets/ccsds/spacepacket.py | 1 + spacepackets/ccsds/time/cds.py | 55 +++++++++++++----- spacepackets/ccsds/time/common.py | 2 +- spacepackets/ecss/pus_17_test.py | 22 +++----- spacepackets/ecss/pus_1_verification.py | 72 ++++++++++++------------ spacepackets/ecss/tm.py | 75 ++++++++++++------------- tests/ccsds/test_sp_parser.py | 4 +- tests/ccsds/test_time.py | 2 +- tests/ecss/test_pus_tm.py | 42 ++++++-------- tests/ecss/test_srv1.py | 43 ++++++-------- tests/ecss/test_srv17.py | 15 +++-- tests/test_pus_verificator.py | 20 +++---- 14 files changed, 196 insertions(+), 175 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f1ada5..1dca2a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +# [v0.24.0] 2024-04-23 + +## Changed + +- ECSS PUS telemetry time handling is now more generic and low level: Constructors expect + a simple `bytes` type while unpackers/readers expect the length of the timestamp. Determining + or knowing the size of the timestamp is now the task of the user. +- `CdsShortTimestamp.from_now` renamed to `now`. + +## Added + + +- `spacepackets.ecss.tm.PUS_TM_TIMESTAMP_OFFSET` constant which can be used as a look-ahead to + determine the timestamp length from a raw PUS TM packet. +- `spacepackets.ccsds.CCSDS_HEADER_LEN` constant. + # [v0.23.1] 2024-04-22 ## Added diff --git a/docs/examples.rst b/docs/examples.rst index 619a56f..ea53994 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -16,7 +16,7 @@ PUS ping telemetry reply without a timestamp. cmd_as_bytes = ping_cmd.pack() print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]") - ping_reply = PusTm(service=17, subservice=2, apid=0x01, time_provider=None) + ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes()) tm_as_bytes = ping_reply.pack() print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]") diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index c508fc9..be299b8 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -11,6 +11,7 @@ from spacepackets.exceptions import BytesTooShortError SPACE_PACKET_HEADER_SIZE: Final = 6 +CCSDS_HEADER_LEN: Final = SPACE_PACKET_HEADER_SIZE SEQ_FLAG_MASK = 0xC000 APID_MASK = 0x7FF PACKET_ID_MASK = 0x1FFF diff --git a/spacepackets/ccsds/time/cds.py b/spacepackets/ccsds/time/cds.py index 6a5fd7b..09ab7ca 100644 --- a/spacepackets/ccsds/time/cds.py +++ b/spacepackets/ccsds/time/cds.py @@ -84,11 +84,11 @@ def _calculate_unix_seconds(self): def _calculate_date_time(self): if self._unix_seconds < 0: - self._date_time = datetime.datetime( + self._datetime = datetime.datetime( 1970, 1, 1, tzinfo=datetime.timezone.utc ) + datetime.timedelta(seconds=self._unix_seconds) else: - self._date_time = datetime.datetime.fromtimestamp( + self._datetime = datetime.datetime.fromtimestamp( self._unix_seconds, tz=datetime.timezone.utc ) @@ -108,7 +108,7 @@ def ccsds_days(self) -> int: def ms_of_day(self) -> int: return self._ms_of_day - def pack(self) -> bytearray: + def pack(self) -> bytes: cds_packet = bytearray() cds_packet.extend(self.__p_field) cds_packet.extend(struct.pack("!H", self._ccsds_days)) @@ -169,12 +169,14 @@ def __repr__(self): ) def __str__(self): - return f"Date {self._date_time!r} with representation {self!r}" + return f"Date {self._datetime!r} with representation {self!r}" - def __eq__(self, other: CdsShortTimestamp): - return (self.ccsds_days == other.ccsds_days) and ( - self.ms_of_day == other.ms_of_day - ) + def __eq__(self, other: object): + if isinstance(other, CdsShortTimestamp): + return (self.ccsds_days == other.ccsds_days) and ( + self.ms_of_day == other.ms_of_day + ) + return False def __add__(self, timedelta: datetime.timedelta): """Allows adding timedelta to the CDS timestamp provider. @@ -200,10 +202,20 @@ def __add__(self, timedelta: datetime.timedelta): return self @classmethod - def from_now(cls) -> CdsShortTimestamp: + def now(cls) -> CdsShortTimestamp: """Returns a seven byte CDS short timestamp with the current time.""" return cls.from_date_time(datetime.datetime.now(tz=datetime.timezone.utc)) + @classmethod + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use now instead", + ) + def from_now(cls) -> CdsShortTimestamp: + """Returns a seven byte CDS short timestamp with the current time.""" + return cls.now() + @classmethod @deprecation.deprecated( deprecated_in="0.14.0rc1", @@ -211,12 +223,12 @@ def from_now(cls) -> CdsShortTimestamp: details="use from_now instead", ) def from_current_time(cls) -> CdsShortTimestamp: - return cls.from_now() + return cls.now() @classmethod - def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: + def from_datetime(cls, dt: datetime.datetime) -> CdsShortTimestamp: instance = cls.empty(False) - instance._date_time = dt + instance._datetime = dt instance._unix_seconds = dt.timestamp() full_unix_secs = int(math.floor(instance._unix_seconds)) subsec_millis = int((instance._unix_seconds - full_unix_secs) * 1000) @@ -226,6 +238,15 @@ def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: instance._ccsds_days = convert_unix_days_to_ccsds_days(unix_days) return instance + @classmethod + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use from_datetime instead", + ) + def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: + return cls.from_datetime(dt) + @staticmethod def ms_of_today(seconds_since_epoch: Optional[float] = None): if seconds_since_epoch is None: @@ -238,5 +259,13 @@ def ms_of_today(seconds_since_epoch: Optional[float] = None): def as_unix_seconds(self) -> float: return self._unix_seconds + def as_datetime(self) -> datetime.datetime: + return self._datetime + + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use as_datetime instead", + ) def as_date_time(self) -> datetime.datetime: - return self._date_time + return self.as_datetime() diff --git a/spacepackets/ccsds/time/common.py b/spacepackets/ccsds/time/common.py index 83b6c30..e1b8f24 100644 --- a/spacepackets/ccsds/time/common.py +++ b/spacepackets/ccsds/time/common.py @@ -72,7 +72,7 @@ def len(self) -> int: return self.len_packed @abstractmethod - def pack(self) -> bytearray: + def pack(self) -> bytes: pass @abstractmethod diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index b4ff5ac..febc762 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -1,10 +1,8 @@ from __future__ import annotations import enum -from typing import Optional from spacepackets import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ccsds.time import CcsdsTimeProvider from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -19,7 +17,7 @@ class Service17Tm(AbstractPusTm): def __init__( self, subservice: int, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ssc: int = 0, source_data: bytes = bytes(), apid: int = FETCH_GLOBAL_APID, @@ -30,7 +28,7 @@ def __init__( self.pus_tm = PusTm( service=PusService.S17_TEST, subservice=subservice, - time_provider=time_provider, + timestamp=timestamp, seq_count=ssc, source_data=source_data, apid=apid, @@ -60,8 +58,8 @@ def service(self) -> int: return self.pus_tm.service @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm.time_provider + def timestamp(self) -> bytes: + return self.pus_tm.timestamp @property def subservice(self) -> int: @@ -75,19 +73,17 @@ def pack(self) -> bytearray: return self.pus_tm.pack() @classmethod - def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service17Tm: - return cls(subservice=0, time_provider=time_provider) + def __empty(cls) -> Service17Tm: + return cls(subservice=0, timestamp=bytes()) @classmethod - def unpack( - cls, data: bytes, time_reader: Optional[CcsdsTimeProvider] - ) -> Service17Tm: + def unpack(cls, data: bytes, timestamp_len: int) -> Service17Tm: """ :raises BytesTooShortError: Passed bytestream too short. :raises ValueError: Unsupported PUS version. :raises InvalidTmCrc16: Invalid CRC16. """ - service_17_tm = cls.__empty(time_provider=time_reader) - service_17_tm.pus_tm = PusTm.unpack(data=data, time_reader=time_reader) + service_17_tm = cls.__empty() + service_17_tm.pus_tm = PusTm.unpack(data=data, timestamp_len=timestamp_len) return service_17_tm diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index e957310..01d0759 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -8,7 +8,6 @@ from spacepackets.ccsds import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ccsds.time import CcsdsTimeProvider from spacepackets.ecss import PusTc from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService @@ -67,7 +66,7 @@ def __repr__(self): @dataclass class UnpackParams: - time_reader: Optional[CcsdsTimeProvider] + timestamp_len: int bytes_step_id: int = 1 bytes_err_code: int = 1 @@ -124,7 +123,7 @@ class Service1Tm(AbstractPusTm): def __init__( self, subservice: Subservice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, verif_params: Optional[VerificationParams] = None, seq_count: int = 0, apid: int = FETCH_GLOBAL_APID, @@ -139,7 +138,7 @@ def __init__( self.pus_tm = PusTm( service=PusService.S1_VERIFICATION, subservice=subservice, - time_provider=time_provider, + timestamp=timestamp, seq_count=seq_count, apid=apid, packet_version=packet_version, @@ -154,12 +153,12 @@ def pack(self) -> bytearray: return self.pus_tm.pack() @classmethod - def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service1Tm: - return cls(subservice=Subservice.INVALID, time_provider=time_provider) + def __empty(cls) -> Service1Tm: + return cls(subservice=Subservice.INVALID, timestamp=bytes()) @classmethod def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm: - service_1_tm = cls.__empty(params.time_reader) + service_1_tm = cls.__empty() service_1_tm.pus_tm = tm cls._unpack_raw_tm(service_1_tm, params) return service_1_tm @@ -175,14 +174,16 @@ def unpack(cls, data: bytes, params: UnpackParams) -> Service1Tm: :raises TmSourceDataTooShortError: TM source data too short. :return: """ - service_1_tm = cls.__empty(params.time_reader) - service_1_tm.pus_tm = PusTm.unpack(data=data, time_reader=params.time_reader) + service_1_tm = cls.__empty() + service_1_tm.pus_tm = PusTm.unpack( + data=data, timestamp_len=params.timestamp_len + ) cls._unpack_raw_tm(service_1_tm, params) return service_1_tm @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm.time_provider + def timestamp(self) -> bytes: + return self.pus_tm.timestamp @property def ccsds_version(self) -> int: @@ -277,6 +278,7 @@ def tc_req_id(self, value): @property def error_code(self) -> Optional[ErrorCode]: if self.has_failure_notice: + assert self._verif_params.failure_notice is not None return self._verif_params.failure_notice.code else: return None @@ -293,26 +295,26 @@ def step_id(self) -> Optional[StepId]: """Retrieve the step number. Returns NONE if this packet does not have a step ID""" return self._verif_params.step_id - def __eq__(self, other: Service1Tm): - return (self.pus_tm == other.pus_tm) and ( - self._verif_params == other._verif_params - ) + def __eq__(self, other: object): + if isinstance(other, Service1Tm): + return (self.pus_tm == other.pus_tm) and ( + self._verif_params == other._verif_params + ) + return False -def create_acceptance_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_ACCEPTANCE_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_acceptance_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_ACCEPTANCE_FAILURE, @@ -320,24 +322,22 @@ def create_acceptance_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) -def create_start_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_START_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_start_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_START_FAILURE, @@ -345,21 +345,21 @@ def create_start_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) def create_step_success_tm( pus_tc: PusTc, step_id: PacketFieldEnum, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_STEP_SUCCESS, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), step_id=step_id ), - time_provider=time_provider, + timestamp=timestamp, ) @@ -367,7 +367,7 @@ def create_step_failure_tm( pus_tc: PusTc, step_id: PacketFieldEnum, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_STEP_FAILURE, @@ -376,24 +376,22 @@ def create_step_failure_tm( step_id=step_id, failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) -def create_completion_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_COMPLETION_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_completion_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_COMPLETION_FAILURE, @@ -401,5 +399,5 @@ def create_completion_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index 3efd122..a96ea8c 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -12,20 +12,20 @@ from .exceptions import TmSrcDataTooShortError # noqa # re-export from spacepackets.version import get_version -from spacepackets.ccsds.time.common import read_p_field from spacepackets.exceptions import BytesTooShortError from spacepackets.util import PrintFormats, get_printable_data_string from spacepackets.ccsds.spacepacket import ( PacketSeqCtrl, SpacePacketHeader, SPACE_PACKET_HEADER_SIZE, + CCSDS_HEADER_LEN, get_total_space_packet_len_from_len_field, PacketType, SpacePacket, AbstractSpacePacket, SequenceFlags, ) -from spacepackets.ccsds.time import CdsShortTimestamp, CcsdsTimeProvider +from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ecss.conf import ( PusVersion, get_default_tm_apid, @@ -57,7 +57,7 @@ def service(self) -> int: @property @abstractmethod - def time_provider(self) -> Optional[CcsdsTimeProvider]: + def timestamp(self) -> bytes: pass @property @@ -81,7 +81,7 @@ def __init__( self, service: int, subservice: int, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, message_counter: int, dest_id: int = 0, spacecraft_time_ref: int = 0, @@ -109,14 +109,14 @@ def __init__( ) self.message_counter = message_counter self.dest_id = dest_id - self.time_provider = time_provider + self.timestamp = timestamp @classmethod def __empty(cls) -> PusTmSecondaryHeader: return PusTmSecondaryHeader( service=0, subservice=0, - time_provider=CdsShortTimestamp.from_now(), + timestamp=bytes(), message_counter=0, ) @@ -127,20 +127,16 @@ def pack(self) -> bytearray: secondary_header.append(self.subservice) secondary_header.extend(struct.pack("!H", self.message_counter)) secondary_header.extend(struct.pack("!H", self.dest_id)) - if self.time_provider: - secondary_header.extend(self.time_provider.pack()) + secondary_header.extend(self.timestamp) return secondary_header @classmethod - def unpack( - cls, data: bytes, time_reader: Optional[CcsdsTimeProvider] - ) -> PusTmSecondaryHeader: + def unpack(cls, data: bytes, timestamp_len: int) -> PusTmSecondaryHeader: """Unpack the PUS TM secondary header from the raw packet starting at the header index. :param data: Raw data. Please note that the passed buffer should start where the actual header start is. - :param time_reader: Generic time reader which knows the time stamp size and how to interpret - the raw timestamp + :param timestamp_len: Expected timestamp length. :raises ValueError: bytearray too short or PUS version missmatch. :return: """ @@ -170,22 +166,13 @@ def unpack( "!H", data[current_idx : current_idx + 2] )[0] current_idx += 2 - # If other time formats are supported in the future, this information can be used - # to unpack the correct time code - time_code_id = read_p_field(data[current_idx]) - if time_code_id: - pass - if time_reader: - time_reader.read_from_raw( - data[current_idx : current_idx + time_reader.len_packed] - ) - secondary_header.time_provider = time_reader + secondary_header.timestamp = data[current_idx : current_idx + timestamp_len] return secondary_header def __repr__(self): return ( f"{self.__class__.__name__}(service={self.service!r}," - f" subservice={self.subservice!r}, time={self.time_provider!r}," + f" subservice={self.subservice!r}, time={self.timestamp!r}," f" message_counter={self.message_counter!r}, dest_id={self.dest_id!r}," f" spacecraft_time_ref={self.spacecraft_time_ref!r}," f" pus_version={self.pus_version!r})" @@ -199,11 +186,14 @@ def __eq__(self, other: object): @property def header_size(self) -> int: base_len = 7 - if self.time_provider: - base_len += self.time_provider.len_packed + if self.timestamp: + base_len += len(self.timestamp) return base_len +PUS_TM_TIMESTAMP_OFFSET = CCSDS_HEADER_LEN + PusTmSecondaryHeader.MIN_LEN + + class InvalidTmCrc16(Exception): def __init__(self, tm: PusTm): self.tm = tm @@ -216,10 +206,15 @@ class PusTm(AbstractPusTm): or to deserialize TM packets from a raw byte stream using the :py:meth:`unpack` method. This implementation only supports PUS C. + Deserialization of PUS telemetry requires the timestamp length to be known. If the size of the + timestamp is variable but can be determined from the data, a look-ahead should be performed on + the raw data. The :py:const:`PUS_TM_TIMESTAMP_OFFSET` (13) can be used to do this, assuming + that the timestamp length can be extracted from the timestamp itself. + The following doc example cuts off the timestamp (7 byte CDS Short) and the CRC16 from the ping packet because those change regularly. - >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, time_provider=CdsShortTimestamp.empty()) # noqa + >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, timestamp=CdsShortTimestamp.now().pack()) # noqa >>> ping_tm.service 17 >>> ping_tm.subservice @@ -236,8 +231,8 @@ def __init__( self, service: int, subservice: int, - time_provider: Optional[CcsdsTimeProvider], - source_data: bytes = bytes([]), + timestamp: bytes, + source_data: bytes = bytes(), seq_count: int = 0, apid: int = FETCH_GLOBAL_APID, message_counter: int = 0, @@ -248,9 +243,7 @@ def __init__( if apid == FETCH_GLOBAL_APID: apid = get_default_tm_apid() self._source_data = source_data - len_stamp = 0 - if time_provider: - len_stamp += time_provider.len_packed + len_stamp = len(timestamp) data_length = self.data_len_from_src_len_timestamp_len( timestamp_len=len_stamp, source_data_len=len(self._source_data), @@ -270,13 +263,15 @@ def __init__( message_counter=message_counter, dest_id=destination_id, spacecraft_time_ref=space_time_ref, - time_provider=time_provider, + timestamp=timestamp, ) self._crc16: Optional[bytes] = None @classmethod def empty(cls) -> PusTm: - return PusTm(service=0, subservice=0, time_provider=CdsShortTimestamp.empty()) + return PusTm( + service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() + ) def pack(self, recalc_crc: bool = True) -> bytearray: """Serializes the packet into a raw bytearray. @@ -303,7 +298,7 @@ def calc_crc(self): self._crc16 = struct.pack("!H", crc.crcValue) @classmethod - def unpack(cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]) -> PusTm: + def unpack(cls, data: bytes, timestamp_len: int) -> PusTm: """Attempts to construct a generic PusTelemetry class given a raw bytearray. :param data: Raw bytes containing the PUS telemetry packet. @@ -324,7 +319,7 @@ def unpack(cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]) -> PusTm: raise BytesTooShortError(expected_packet_len, len(data)) pus_tm.pus_tm_sec_header = PusTmSecondaryHeader.unpack( data=data[SPACE_PACKET_HEADER_SIZE:], - time_reader=time_reader, + timestamp_len=timestamp_len, ) if ( expected_packet_len @@ -417,8 +412,8 @@ def ccsds_version(self) -> int: return self.space_packet_header.ccsds_version @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm_sec_header.time_provider + def timestamp(self) -> bytes: + return self.pus_tm_sec_header.timestamp @property def service(self) -> int: @@ -449,8 +444,8 @@ def tm_data(self) -> bytes: def tm_data(self, data: bytes): self._source_data = data stamp_len = 0 - if self.pus_tm_sec_header.time_provider: - stamp_len += self.pus_tm_sec_header.time_provider.len_packed + if self.pus_tm_sec_header.timestamp: + stamp_len += len(self.pus_tm_sec_header.timestamp) self.space_packet_header.data_len = self.data_len_from_src_len_timestamp_len( stamp_len, len(data) ) diff --git a/tests/ccsds/test_sp_parser.py b/tests/ccsds/test_sp_parser.py index f6060b1..19831b5 100644 --- a/tests/ccsds/test_sp_parser.py +++ b/tests/ccsds/test_sp_parser.py @@ -9,7 +9,7 @@ class TestSpParser(TestCase): def setUp(self) -> None: self.tm_packet = PusTm( - service=17, subservice=2, time_provider=CdsShortTimestamp.empty() + service=17, subservice=2, timestamp=CdsShortTimestamp.empty().pack() ) self.packet_ids = (self.tm_packet.packet_id,) self.tm_packet_raw = self.tm_packet.pack() @@ -30,7 +30,7 @@ def test_sp_parser_crap_data_is_skipped(self): service=8, subservice=128, source_data=bytearray(64), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) other_larger_packet_raw = other_larger_packet.pack() self.packet_deque.append(self.tm_packet_raw) diff --git a/tests/ccsds/test_time.py b/tests/ccsds/test_time.py index 4450984..3a0b378 100644 --- a/tests/ccsds/test_time.py +++ b/tests/ccsds/test_time.py @@ -82,7 +82,7 @@ def test_dt_is_utc(self): self.assertEqual(dt.tzinfo.utcoffset(dt), datetime.timedelta(0)) def test_compare_from_now_against_manually_created(self): - stamp = CdsShortTimestamp.from_now() + stamp = CdsShortTimestamp.now() ccsds_days = stamp.ccsds_days ms_of_day = stamp.ms_of_day new_stamp = CdsShortTimestamp(ccsds_days, ms_of_day) diff --git a/tests/ecss/test_pus_tm.py b/tests/ecss/test_pus_tm.py index 6e6ee8a..3b3f84d 100755 --- a/tests/ecss/test_pus_tm.py +++ b/tests/ecss/test_pus_tm.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import struct from unittest import TestCase -from unittest.mock import MagicMock from crcmod.predefined import mkPredefinedCrcFun @@ -25,21 +24,18 @@ from spacepackets.ecss.pus_1_verification import ( RequestId, ) -from .common import generic_time_provider_mock, TEST_STAMP +from .common import TEST_STAMP class TestTelemetry(TestCase): def setUp(self) -> None: - self.time_stamp_provider = MagicMock(spec=CdsShortTimestamp) - - self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) self.ping_reply = PusTm( service=17, subservice=2, apid=0x123, seq_count=0x234, source_data=bytearray(), - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ) self.ping_reply_raw = self.ping_reply.pack() @@ -72,9 +68,9 @@ def test_no_timestamp(self): apid=0x123, seq_count=0x234, source_data=bytearray(), - time_provider=None, + timestamp=bytes(), ) - self.assertEqual(self.ping_reply.pus_tm_sec_header.time_provider, None) + self.assertEqual(self.ping_reply.pus_tm_sec_header.timestamp, bytes()) tm_raw = self.ping_reply.pack() self.assertEqual(self.ping_reply.packet_len, 15) self.assertEqual(len(tm_raw), 15) @@ -115,7 +111,8 @@ def test_state_setting(self): self.ping_reply.tm_data = source_data self.assertEqual(self.ping_reply.apid, 0x22) self.assertTrue(isinstance(self.ping_reply.crc16, bytes)) - self.assertTrue(len(self.ping_reply.crc16), 2) + assert self.ping_reply.crc16 is not None + self.assertEqual(len(self.ping_reply.crc16), 2) self.assertEqual( self.ping_reply.pus_tm_sec_header.pus_version, PusVersion.PUS_C ) @@ -162,6 +159,7 @@ def test_full_printout(self): sp_header_as_str = raw_space_packet_header.hex(sep=",") raw_secondary_packet_header = self.ping_reply.pus_tm_sec_header.pack() second_header_as_str = raw_secondary_packet_header.hex(sep=",") + assert crc16 is not None expected_printout = ( f"hex [{sp_header_as_str},{second_header_as_str},{crc16.hex(sep=',')}]" ) @@ -185,7 +183,7 @@ def test_unpack(self): self.ping_reply_raw = self.ping_reply.pack() # self.time_stamp_provider.read_from_raw = MagicMock() pus_17_tm_unpacked = PusTm.unpack( - data=self.ping_reply_raw, time_reader=self.time_stamp_provider + data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP) ) self.assertEqual(self.ping_reply.crc16, pus_17_tm_unpacked.crc16) self.assertEqual(pus_17_tm_unpacked, self.ping_reply) @@ -194,10 +192,6 @@ def test_unpack(self): self.assertEqual( pus_17_tm_unpacked.pus_tm_sec_header.pus_version, PusVersion.PUS_C ) - pus_17_tm_unpacked.pus_tm_sec_header.time_provider.read_from_raw.assert_called_once() - pus_17_tm_unpacked.pus_tm_sec_header.time_provider.read_from_raw.assert_called_with( - TEST_STAMP - ) self.assertEqual(pus_17_tm_unpacked.tm_data, source_data) self.assertEqual(pus_17_tm_unpacked.packet_id.raw(), 0x0822) @@ -209,7 +203,7 @@ def test_unpack(self): ValueError, PusTm.unpack, self.ping_reply_raw, - CdsShortTimestamp.empty(), + len(TEST_STAMP), ) self.ping_reply_raw[4] = 0xFF self.ping_reply_raw[5] = 0xFF @@ -229,12 +223,10 @@ def test_unpack(self): self.ping_reply_raw[4] = (incorrect_size & 0xFF00) >> 8 self.ping_reply_raw[5] = incorrect_size & 0xFF with self.assertRaises(InvalidTmCrc16): - PusTm.unpack(data=self.ping_reply_raw, time_reader=self.time_stamp_provider) + PusTm.unpack(data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP)) def test_calc_crc(self): - new_ping_tm = PusTm( - service=17, subservice=2, time_provider=self.time_stamp_provider - ) + new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) new_ping_tm.calc_crc() self.assertIsNotNone(new_ping_tm.crc16) @@ -242,14 +234,12 @@ def test_calc_crc(self): self.assertEqual(len(new_ping_tm.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tm = PusTm( - service=17, subservice=2, time_provider=self.time_stamp_provider - ) + new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) # Should still calculate CRC tc_raw = new_ping_tm.pack(recalc_crc=False) # Will throw invalid CRC16 error if CRC was not calculated - tc_unpacked = PusTm.unpack(tc_raw, time_reader=self.time_stamp_provider) + tc_unpacked = PusTm.unpack(tc_raw, timestamp_len=len(TEST_STAMP)) self.assertEqual(tc_unpacked, new_ping_tm) def test_faulty_unpack(self): @@ -287,7 +277,7 @@ def test_faulty_sec_header(self): PusTmSecondaryHeader( service=0, subservice=0, - time_provider=CdsShortTimestamp.from_now(), + timestamp=CdsShortTimestamp.now().pack(), message_counter=129302, ) @@ -295,7 +285,9 @@ def test_invalid_raw_size(self): # Set length field invalid self.ping_reply_raw[4] = 0x00 self.ping_reply_raw[5] = 0x00 - self.assertRaises(ValueError, PusTm.unpack, self.ping_reply_raw, None) + self.assertRaises( + ValueError, PusTm.unpack, self.ping_reply_raw, len(TEST_STAMP) + ) def test_req_id(self): tc_packet_id = PacketId(ptype=PacketType.TC, sec_header_flag=True, apid=0x42) diff --git a/tests/ecss/test_srv1.py b/tests/ecss/test_srv1.py index 32c6fdf..cd0b6f7 100644 --- a/tests/ecss/test_srv1.py +++ b/tests/ecss/test_srv1.py @@ -21,14 +21,13 @@ ErrorCode, StepId, ) -from tests.ecss.common import generic_time_provider_mock, TEST_STAMP +from tests.ecss.common import TEST_STAMP class Service1TmTest(TestCase): def setUp(self) -> None: ping_tc = PusTc(service=17, subservice=1) - self.srv1_tm = create_start_success_tm(ping_tc, None) - self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) + self.srv1_tm = create_start_success_tm(ping_tc, TEST_STAMP) def test_failure_notice_invalid_creation(self): with self.assertRaises(ValueError): @@ -38,14 +37,16 @@ def test_basic(self): self.assertEqual( self.srv1_tm.sp_header, self.srv1_tm.pus_tm.space_packet_header ) - self.assertEqual(self.srv1_tm.time_provider, None) + self.assertEqual(self.srv1_tm.timestamp, TEST_STAMP) self.assertEqual(self.srv1_tm.is_step_reply, False) self.assertEqual(self.srv1_tm.service, 1) self.assertEqual(self.srv1_tm.subservice, 3) self.assertEqual(self.srv1_tm.error_code, None) def test_other_ctor(self): - srv1_tm = Service1Tm.from_tm(self.srv1_tm.pus_tm, UnpackParams(None)) + srv1_tm = Service1Tm.from_tm( + self.srv1_tm.pus_tm, UnpackParams(timestamp_len=len(TEST_STAMP)) + ) self.assertEqual(srv1_tm, self.srv1_tm) def test_failure_notice(self): @@ -95,20 +96,14 @@ def _generic_test_srv_1_success(self, subservice: Subservice): helper_created = None step_id = None if subservice == Subservice.TM_ACCEPTANCE_SUCCESS: - helper_created = create_acceptance_success_tm( - pus_tc, self.time_stamp_provider - ) + helper_created = create_acceptance_success_tm(pus_tc, TEST_STAMP) elif subservice == Subservice.TM_START_SUCCESS: - helper_created = create_start_success_tm(pus_tc, self.time_stamp_provider) + helper_created = create_start_success_tm(pus_tc, TEST_STAMP) elif subservice == Subservice.TM_STEP_SUCCESS: step_id = PacketFieldEnum.with_byte_size(1, 4) - helper_created = create_step_success_tm( - pus_tc, step_id, self.time_stamp_provider - ) + helper_created = create_step_success_tm(pus_tc, step_id, TEST_STAMP) elif subservice == Subservice.TM_COMPLETION_SUCCESS: - helper_created = create_completion_success_tm( - pus_tc, time_provider=self.time_stamp_provider - ) + helper_created = create_completion_success_tm(pus_tc, timestamp=TEST_STAMP) self._test_srv_1_success_tm( pus_tc, Service1Tm( @@ -117,7 +112,7 @@ def _generic_test_srv_1_success(self, subservice: Subservice): req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), step_id=step_id, ), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ), subservice, ) @@ -148,7 +143,7 @@ def _test_srv_1_success_tm( self.assertEqual(srv_1_tm.tc_req_id.tc_psc, pus_tc.packet_seq_control) srv_1_tm_raw = srv_1_tm.pack() srv_1_tm_unpacked = Service1Tm.unpack( - srv_1_tm_raw, UnpackParams(self.time_stamp_provider) + srv_1_tm_raw, UnpackParams(len(TEST_STAMP)) ) self.assertEqual( srv_1_tm_unpacked.tc_req_id.tc_packet_id.raw(), pus_tc.packet_id.raw() @@ -168,23 +163,21 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): step_id = None if subservice == Subservice.TM_ACCEPTANCE_FAILURE: helper_created = create_acceptance_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider + pus_tc, failure_notice, TEST_STAMP ) elif subservice == Subservice.TM_START_FAILURE: - helper_created = create_start_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider - ) + helper_created = create_start_failure_tm(pus_tc, failure_notice, TEST_STAMP) elif subservice == Subservice.TM_STEP_FAILURE: step_id = PacketFieldEnum.with_byte_size(2, 12) helper_created = create_step_failure_tm( pus_tc, failure_notice=failure_notice, step_id=step_id, - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ) elif subservice == Subservice.TM_COMPLETION_FAILURE: helper_created = create_completion_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider + pus_tc, failure_notice, TEST_STAMP ) self._test_srv_1_failure_comparison_helper( pus_tc, @@ -195,7 +188,7 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): failure_notice=failure_notice, step_id=step_id, ), - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ), subservice=subservice, failure_notice=failure_notice, @@ -222,7 +215,7 @@ def _test_srv_1_failure_comparison_helper( self.assertEqual(srv_1_tm.tc_req_id.tc_packet_id, pus_tc.packet_id) self.assertEqual(srv_1_tm.tc_req_id.tc_psc, pus_tc.packet_seq_control) srv_1_tm_raw = srv_1_tm.pack() - unpack_params = UnpackParams(self.time_stamp_provider) + unpack_params = UnpackParams(len(TEST_STAMP)) if failure_notice is not None: unpack_params.bytes_err_code = failure_notice.code.len() if step_id is not None: diff --git a/tests/ecss/test_srv17.py b/tests/ecss/test_srv17.py index 1ca2102..34414bb 100644 --- a/tests/ecss/test_srv17.py +++ b/tests/ecss/test_srv17.py @@ -9,7 +9,7 @@ class TestSrv17Tm(TestCase): def setUp(self) -> None: - self.srv17_tm = Service17Tm(subservice=1, time_provider=None) + self.srv17_tm = Service17Tm(subservice=1, timestamp=bytes()) self.srv17_tm.pus_tm.apid = 0x72 self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) @@ -19,7 +19,7 @@ def test_state(self): ) self.assertEqual(self.srv17_tm.service, PusService.S17_TEST) self.assertEqual(self.srv17_tm.subservice, 1) - self.assertEqual(self.srv17_tm.time_provider, None) + self.assertEqual(self.srv17_tm.timestamp, bytes()) self.assertEqual(self.srv17_tm.apid, 0x72) self.assertEqual(self.srv17_tm.seq_count, 0) self.assertEqual(self.srv17_tm.seq_flags, SequenceFlags.UNSEGMENTED) @@ -30,17 +30,20 @@ def test_state(self): def test_other_state(self): srv17_with_data = Service17Tm( subservice=128, - time_provider=CdsShortTimestamp(0, 0), + timestamp=CdsShortTimestamp(0, 0).pack(), source_data=bytes([0, 1, 2]), ) self.assertEqual(srv17_with_data.source_data, bytes([0, 1, 2])) - self.assertEqual(srv17_with_data.time_provider, CdsShortTimestamp(0, 0)) + + self.assertEqual( + CdsShortTimestamp.unpack(srv17_with_data.timestamp), CdsShortTimestamp(0, 0) + ) def test_service_17_tm(self): - srv_17_tm = Service17Tm(subservice=2, time_provider=self.time_stamp_provider) + srv_17_tm = Service17Tm(subservice=2, timestamp=TEST_STAMP) self.assertEqual(srv_17_tm.pus_tm.subservice, 2) srv_17_tm_raw = srv_17_tm.pack() srv_17_tm_unpacked = Service17Tm.unpack( - data=srv_17_tm_raw, time_reader=self.time_stamp_provider + data=srv_17_tm_raw, timestamp_len=len(TEST_STAMP) ) self.assertEqual(srv_17_tm_unpacked.pus_tm.subservice, 2) diff --git a/tests/test_pus_verificator.py b/tests/test_pus_verificator.py index 882e05f..08a6c24 100644 --- a/tests/test_pus_verificator.py +++ b/tests/test_pus_verificator.py @@ -28,17 +28,15 @@ class SuccessSet: def __init__(self, pus_tc: PusTc): self.pus_tc = pus_tc self.req_id = RequestId.from_pus_tc(pus_tc) - self.time_reader = CdsShortTimestamp.empty() - self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.time_reader) - self.sta_suc_tm = create_start_success_tm(pus_tc, self.time_reader) + self.empty_stamp = CdsShortTimestamp.empty() + self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.empty_stamp.pack()) + self.sta_suc_tm = create_start_success_tm(pus_tc, self.empty_stamp.pack()) self.ste_suc_tm = create_step_success_tm( pus_tc=pus_tc, step_id=StepId.with_byte_size(1, 1), - time_provider=self.time_reader, - ) - self.fin_suc_tm = create_completion_success_tm( - pus_tc, CdsShortTimestamp.empty() + timestamp=self.empty_stamp.pack(), ) + self.fin_suc_tm = create_completion_success_tm(pus_tc, self.empty_stamp.pack()) class FailureSet: @@ -46,21 +44,21 @@ def __init__(self, pus_tc: PusTc, failure_notice: FailureNotice): self.suc_set = SuccessSet(pus_tc) self.failure_notice = failure_notice self.acc_fail_tm = create_acceptance_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty() + pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.sta_fail_tm = create_start_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty() + pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.ste_fail_tm = create_step_failure_tm( pus_tc, failure_notice=self.failure_notice, step_id=StepId.with_byte_size(1, 1), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) self.fin_fail_tm = create_completion_failure_tm( failure_notice=self.failure_notice, pus_tc=self.suc_set.pus_tc, - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) @property From 0eb1fe85176ff43fec7a9e73e03b814dc0104edb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 10:10:09 +0200 Subject: [PATCH 2/3] some more improvements for structure --- CHANGELOG.md | 8 ++- docs/api/ecss.rst | 18 +++---- pyproject.toml | 2 +- spacepackets/ccsds/spacepacket.py | 22 ++++---- spacepackets/ecss/__init__.py | 4 +- spacepackets/ecss/conf.py | 46 ---------------- spacepackets/ecss/defs.py | 9 ++++ spacepackets/ecss/pus_17_test.py | 5 +- spacepackets/ecss/pus_1_verification.py | 28 +++++++--- spacepackets/ecss/tc.py | 22 +++----- spacepackets/ecss/tm.py | 17 ++---- tests/ccsds/test_sp_parser.py | 7 ++- tests/ecss/test_pus_tc.py | 20 +++---- tests/ecss/test_pus_tm.py | 11 ++-- tests/ecss/test_srv1.py | 36 +++++++++---- tests/ecss/test_srv17.py | 6 ++- tests/test_pus_verificator.py | 70 +++++++++++++++++++------ 17 files changed, 172 insertions(+), 159 deletions(-) delete mode 100644 spacepackets/ecss/conf.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dca2a8..4e1d40c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - ECSS PUS telemetry time handling is now more generic and low level: Constructors expect a simple `bytes` type while unpackers/readers expect the length of the timestamp. Determining - or knowing the size of the timestamp is now the task of the user. + or knowing the size of the timestamp is now the task of the user, but a helper constant + is expose which can help with this. - `CdsShortTimestamp.from_now` renamed to `now`. +- The ECSS TMTC APID field must not be set explicitely in the class constructors. ## Added @@ -24,6 +26,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). determine the timestamp length from a raw PUS TM packet. - `spacepackets.ccsds.CCSDS_HEADER_LEN` constant. +## Removed + +- Global configuration module for TC and TM APID was removed. + # [v0.23.1] 2024-04-22 ## Added diff --git a/docs/api/ecss.rst b/docs/api/ecss.rst index cc1662c..5b1dbf0 100644 --- a/docs/api/ecss.rst +++ b/docs/api/ecss.rst @@ -6,8 +6,13 @@ ECSS Package :undoc-members: :show-inheritance: +.. automodule:: spacepackets.ecss.defs + :members: + :undoc-members: + :show-inheritance: + ECSS PUS Telecommand Module --------------------------------- +---------------------------- :ref:`api/ecss_tc:ECSS Telecommand Module` @@ -16,17 +21,6 @@ ECSS PUS Telemetry Module :ref:`api/ecss_tm:ECSS Telemetry Module` -ECSS Configuration Submodule ----------------------------------- - -This module can be used to configure common default values so these don't have to be specified -each time anymore when creating ECSS packets - -.. automodule:: spacepackets.ecss.conf - :members: - :undoc-members: - :show-inheritance: - ECSS Fields Submodule ---------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 7f8c9ce..fbbc5ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "spacepackets" description = "Various CCSDS and ECSS packet implementations" readme = "README.md" -version = "0.23.1" +version = "0.24.0" requires-python = ">=3.8" license = {text = "Apache-2.0"} authors = [ diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index be299b8..979c851 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -10,11 +10,13 @@ from spacepackets.exceptions import BytesTooShortError -SPACE_PACKET_HEADER_SIZE: Final = 6 -CCSDS_HEADER_LEN: Final = SPACE_PACKET_HEADER_SIZE -SEQ_FLAG_MASK = 0xC000 -APID_MASK = 0x7FF -PACKET_ID_MASK = 0x1FFF +CCSDS_HEADER_LEN: Final[int] = 6 +SPACE_PACKET_HEADER_SIZE: Final[int] = CCSDS_HEADER_LEN +SEQ_FLAG_MASK: Final[int] = 0xC000 +APID_MASK: Final[int] = 0x7FF +PACKET_ID_MASK: Final[int] = 0x1FFF +MAX_SEQ_COUNT: Final[int] = pow(2, 14) - 1 +MAX_APID: Final[int] = pow(2, 11) - 1 class PacketType(enum.IntEnum): @@ -35,7 +37,7 @@ class PacketSeqCtrl: """ def __init__(self, seq_flags: SequenceFlags, seq_count: int): - if seq_count > pow(2, 14) - 1 or seq_count < 0: + if seq_count > MAX_SEQ_COUNT or seq_count < 0: raise ValueError( f"Sequence count larger than allowed {pow(2, 14) - 1} or negative" ) @@ -206,7 +208,7 @@ def __init__( :param data_len: Contains a length count C that equals one fewer than the length of the packet data field. Should not be larger than 65535 bytes :param ccsds_version: - :param sec_header_flag: Secondary header flag, 1 or True by default + :param sec_header_flag: Secondary header flag, or False by default. :param seq_flags: :raises ValueError: On invalid parameters """ @@ -300,7 +302,7 @@ def seq_flags(self, value): @property def header_len(self) -> int: - return SPACE_PACKET_HEADER_SIZE + return CCSDS_HEADER_LEN @apid.setter def apid(self, apid): @@ -312,7 +314,7 @@ def packet_len(self) -> int: :return: Size of the TM packet based on the space packet header data length field. """ - return SPACE_PACKET_HEADER_SIZE + self.data_len + 1 + return CCSDS_HEADER_LEN + self.data_len + 1 @classmethod def unpack(cls, data: bytes) -> SpacePacketHeader: @@ -501,7 +503,7 @@ def parse_space_packets( # Packet ID detected while True: # Can't even parse CCSDS header. Wait for more data to arrive. - if current_idx + SPACE_PACKET_HEADER_SIZE >= len(concatenated_packets): + if current_idx + CCSDS_HEADER_LEN >= len(concatenated_packets): break current_packet_id = ( struct.unpack("!H", concatenated_packets[current_idx : current_idx + 2])[0] diff --git a/spacepackets/ecss/__init__.py b/spacepackets/ecss/__init__.py index 643cadb..aa34f70 100644 --- a/spacepackets/ecss/__init__.py +++ b/spacepackets/ecss/__init__.py @@ -1,6 +1,6 @@ from crcmod.predefined import mkPredefinedCrcFun -from .tc import PusVersion, PusTelecommand, PusTc, PusTcDataFieldHeader +from .tc import PusTc, PusTelecommand, PusTcDataFieldHeader from .tm import PusTm, PusTelemetry, PusTmSecondaryHeader from .fields import ( PacketFieldEnum, @@ -13,7 +13,7 @@ PfcSigned, PfcUnsigned, ) -from .defs import PusService +from .defs import PusService, PusVersion from .req_id import RequestId from .pus_verificator import PusVerificator diff --git a/spacepackets/ecss/conf.py b/spacepackets/ecss/conf.py deleted file mode 100644 index 0a0c15c..0000000 --- a/spacepackets/ecss/conf.py +++ /dev/null @@ -1,46 +0,0 @@ -import enum - - -#: Can be passed as an APID value to automatically fetch an APID value configured globally for -#: the library -FETCH_GLOBAL_APID = -1 - - -class PusVersion(enum.IntEnum): - # ESA PSS-07-101. Not supported by this package! - ESA_PUS = 0 - # ECSS-E-70-41A - PUS_A = 1 - # ECSS-E-ST-70-41C - PUS_C = 2 - GLOBAL_CONFIG = 98 - UNKNOWN = 99 - - -class EcssConfKeys(enum.IntEnum): - ECSS_TC_APID = 0 - ECSS_TM_APID = 1 - PUS_TM_TYPE = 2 - PUS_TC_TYPE = 3 - - -__ECSS_DICT = { - EcssConfKeys.ECSS_TM_APID: 0x00, - EcssConfKeys.ECSS_TC_APID: 0x00, -} - - -def set_default_tm_apid(tm_apid: int): - __ECSS_DICT[EcssConfKeys.ECSS_TM_APID] = tm_apid - - -def get_default_tm_apid() -> int: - return __ECSS_DICT[EcssConfKeys.ECSS_TM_APID] - - -def set_default_tc_apid(tc_apid: int): - __ECSS_DICT[EcssConfKeys.ECSS_TC_APID] = tc_apid - - -def get_default_tc_apid() -> int: - return __ECSS_DICT[EcssConfKeys.ECSS_TC_APID] diff --git a/spacepackets/ecss/defs.py b/spacepackets/ecss/defs.py index 19cc1c7..958c863 100644 --- a/spacepackets/ecss/defs.py +++ b/spacepackets/ecss/defs.py @@ -1,6 +1,15 @@ import enum +class PusVersion(enum.IntEnum): + # ESA PSS-07-101. Not supported by this package! + ESA_PUS = 0 + # ECSS-E-70-41A + PUS_A = 1 + # ECSS-E-ST-70-41C + PUS_C = 2 + + class PusService(enum.IntEnum): S1_VERIFICATION = 1 S2_RAW_CMD = 2 diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index febc762..252039f 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -3,7 +3,6 @@ from spacepackets import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -16,11 +15,11 @@ class Subservice(enum.IntEnum): class Service17Tm(AbstractPusTm): def __init__( self, + apid: int, subservice: int, timestamp: bytes, ssc: int = 0, source_data: bytes = bytes(), - apid: int = FETCH_GLOBAL_APID, packet_version: int = 0b000, space_time_ref: int = 0b0000, destination_id: int = 0, @@ -74,7 +73,7 @@ def pack(self) -> bytearray: @classmethod def __empty(cls) -> Service17Tm: - return cls(subservice=0, timestamp=bytes()) + return cls(apid=0, subservice=0, timestamp=bytes()) @classmethod def unpack(cls, data: bytes, timestamp_len: int) -> Service17Tm: diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index 01d0759..73250be 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -9,7 +9,6 @@ from spacepackets.ccsds import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl from spacepackets.ecss import PusTc -from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.fields import PacketFieldEnum from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -122,11 +121,11 @@ class Service1Tm(AbstractPusTm): def __init__( self, + apid: int, subservice: Subservice, timestamp: bytes, verif_params: Optional[VerificationParams] = None, seq_count: int = 0, - apid: int = FETCH_GLOBAL_APID, packet_version: int = 0b000, space_time_ref: int = 0b0000, destination_id: int = 0, @@ -154,7 +153,7 @@ def pack(self) -> bytearray: @classmethod def __empty(cls) -> Service1Tm: - return cls(subservice=Subservice.INVALID, timestamp=bytes()) + return cls(apid=0, subservice=Subservice.INVALID, timestamp=bytes()) @classmethod def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm: @@ -303,8 +302,11 @@ def __eq__(self, other: object): return False -def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_acceptance_success_tm( + apid: int, pus_tc: PusTc, timestamp: bytes +) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_ACCEPTANCE_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -312,11 +314,13 @@ def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_acceptance_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_ACCEPTANCE_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -326,8 +330,9 @@ def create_acceptance_failure_tm( ) -def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_start_success_tm(apid: int, pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_START_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -335,11 +340,13 @@ def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_start_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_START_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -350,11 +357,13 @@ def create_start_failure_tm( def create_step_success_tm( + apid: int, pus_tc: PusTc, step_id: PacketFieldEnum, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_STEP_SUCCESS, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), step_id=step_id @@ -364,12 +373,14 @@ def create_step_success_tm( def create_step_failure_tm( + apid: int, pus_tc: PusTc, step_id: PacketFieldEnum, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_STEP_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -380,8 +391,11 @@ def create_step_failure_tm( ) -def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_completion_success_tm( + apid: int, pus_tc: PusTc, timestamp: bytes +) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_COMPLETION_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -389,11 +403,13 @@ def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_completion_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_COMPLETION_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), diff --git a/spacepackets/ecss/tc.py b/spacepackets/ecss/tc.py index fab2d66..9c9d322 100644 --- a/spacepackets/ecss/tc.py +++ b/spacepackets/ecss/tc.py @@ -15,18 +15,14 @@ from spacepackets.ccsds.spacepacket import ( SpacePacketHeader, PacketType, - SPACE_PACKET_HEADER_SIZE, + CCSDS_HEADER_LEN, SpacePacket, AbstractSpacePacket, PacketId, PacketSeqCtrl, SequenceFlags, ) -from spacepackets.ecss.conf import ( - get_default_tc_apid, - PusVersion, - FETCH_GLOBAL_APID, -) +from spacepackets.ecss.defs import PusVersion class PusTcDataFieldHeader: @@ -123,10 +119,10 @@ class PusTc(AbstractSpacePacket): def __init__( self, + apid: int, service: int, subservice: int, - app_data: bytes = bytes([]), - apid: int = FETCH_GLOBAL_APID, + app_data: bytes = bytes(), seq_count: int = 0, source_id: int = 0, ack_flags: int = 0b1111, @@ -144,8 +140,6 @@ def __init__( different packet sources (e.g. different ground stations) :raises ValueError: Invalid input parameters """ - if apid == FETCH_GLOBAL_APID: - apid = get_default_tc_apid() self.pus_tc_sec_header = PusTcDataFieldHeader( service=service, subservice=subservice, @@ -214,7 +208,7 @@ def from_composite_fields( @classmethod def empty(cls) -> PusTc: - return PusTc(service=0, subservice=0) + return PusTc(apid=0, service=0, subservice=0) def __repr__(self): """Returns the representation of a class instance.""" @@ -286,11 +280,9 @@ def unpack(cls, data: bytes) -> PusTc: tc_unpacked = cls.empty() tc_unpacked.sp_header = SpacePacketHeader.unpack(data=data) tc_unpacked.pus_tc_sec_header = PusTcDataFieldHeader.unpack( - data=data[SPACE_PACKET_HEADER_SIZE:] - ) - header_len = ( - SPACE_PACKET_HEADER_SIZE + tc_unpacked.pus_tc_sec_header.get_header_size() + data=data[CCSDS_HEADER_LEN:] ) + header_len = CCSDS_HEADER_LEN + tc_unpacked.pus_tc_sec_header.get_header_size() expected_packet_len = tc_unpacked.packet_len if len(data) < expected_packet_len: raise BytesTooShortError(expected_packet_len, len(data)) diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index a96ea8c..10b5186 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -25,12 +25,8 @@ AbstractSpacePacket, SequenceFlags, ) +from spacepackets.ecss.defs import PusVersion from spacepackets.ccsds.time import CdsShortTimestamp -from spacepackets.ecss.conf import ( - PusVersion, - get_default_tm_apid, - FETCH_GLOBAL_APID, -) from spacepackets.crc import CRC16_CCITT_FUNC @@ -226,22 +222,19 @@ class PusTm(AbstractPusTm): CDS_SHORT_SIZE = 7 PUS_TIMESTAMP_SIZE = CDS_SHORT_SIZE - # TODO: Supply this constructor as a classmethod in reduced form def __init__( self, + apid: int, service: int, subservice: int, timestamp: bytes, source_data: bytes = bytes(), seq_count: int = 0, - apid: int = FETCH_GLOBAL_APID, message_counter: int = 0, space_time_ref: int = 0b0000, destination_id: int = 0, packet_version: int = 0b000, ): - if apid == FETCH_GLOBAL_APID: - apid = get_default_tm_apid() self._source_data = source_data len_stamp = len(timestamp) data_length = self.data_len_from_src_len_timestamp_len( @@ -270,7 +263,7 @@ def __init__( @classmethod def empty(cls) -> PusTm: return PusTm( - service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() + apid=0, service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() ) def pack(self, recalc_crc: bool = True) -> bytearray: @@ -443,9 +436,7 @@ def tm_data(self) -> bytes: @tm_data.setter def tm_data(self, data: bytes): self._source_data = data - stamp_len = 0 - if self.pus_tm_sec_header.timestamp: - stamp_len += len(self.pus_tm_sec_header.timestamp) + stamp_len = len(self.pus_tm_sec_header.timestamp) self.space_packet_header.data_len = self.data_len_from_src_len_timestamp_len( stamp_len, len(data) ) diff --git a/tests/ccsds/test_sp_parser.py b/tests/ccsds/test_sp_parser.py index 19831b5..2e0dfe1 100644 --- a/tests/ccsds/test_sp_parser.py +++ b/tests/ccsds/test_sp_parser.py @@ -8,8 +8,12 @@ class TestSpParser(TestCase): def setUp(self) -> None: + self.def_apid = 0x03 self.tm_packet = PusTm( - service=17, subservice=2, timestamp=CdsShortTimestamp.empty().pack() + apid=self.def_apid, + service=17, + subservice=2, + timestamp=CdsShortTimestamp.empty().pack(), ) self.packet_ids = (self.tm_packet.packet_id,) self.tm_packet_raw = self.tm_packet.pack() @@ -27,6 +31,7 @@ def test_sp_parser(self): def test_sp_parser_crap_data_is_skipped(self): other_larger_packet = PusTm( + apid=self.def_apid, service=8, subservice=128, source_data=bytearray(64), diff --git a/tests/ecss/test_pus_tc.py b/tests/ecss/test_pus_tc.py index 9068a2b..944d243 100644 --- a/tests/ecss/test_pus_tc.py +++ b/tests/ecss/test_pus_tc.py @@ -4,8 +4,7 @@ from spacepackets import SpacePacketHeader, PacketType from spacepackets.ccsds.spacepacket import SequenceFlags -from spacepackets.ecss import PusTc, PusTcDataFieldHeader, check_pus_crc -from spacepackets.ecss.conf import get_default_tc_apid, set_default_tc_apid, PusVersion +from spacepackets.ecss import PusTc, PusTcDataFieldHeader, check_pus_crc, PusVersion from spacepackets.ecss.tc import generate_crc, generate_packet_crc, InvalidTcCrc16 @@ -92,13 +91,10 @@ def test_print(self): print(repr(self.ping_tc)) print(self.ping_tc) - set_default_tc_apid(42) - self.assertTrue(get_default_tc_apid() == 42) - def test_with_app_data(self): test_app_data = bytearray([1, 2, 3]) ping_with_app_data = PusTc( - service=17, subservice=32, seq_count=52, app_data=test_app_data + apid=0, service=17, subservice=32, seq_count=52, app_data=test_app_data ) # 6 bytes CCSDS header, 5 bytes secondary header, 2 bytes CRC, 3 bytes app data self.assertEqual(ping_with_app_data.packet_len, 16) @@ -114,7 +110,7 @@ def test_with_app_data(self): def test_invalid_seq_count(self): with self.assertRaises(ValueError): - PusTc(service=493, subservice=5252, seq_count=99432942) + PusTc(apid=55, service=493, subservice=5252, seq_count=99432942) def test_unpack(self): pus_17_unpacked = PusTc.unpack(data=self.ping_tc_raw) @@ -147,15 +143,15 @@ def test_sec_header(self): self.assertEqual(tc_header_pus_c_raw[2], 2) def test_calc_crc(self): - new_ping_tc = PusTc(service=17, subservice=1) + new_ping_tc = PusTc(apid=27, service=17, subservice=1) self.assertIsNone(new_ping_tc.crc16) new_ping_tc.calc_crc() - self.assertIsNotNone(new_ping_tc.crc16) + assert new_ping_tc.crc16 is not None self.assertTrue(isinstance(new_ping_tc.crc16, bytes)) self.assertEqual(len(new_ping_tc.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tc = PusTc(service=17, subservice=1) + new_ping_tc = PusTc(apid=28, service=17, subservice=1) self.assertIsNone(new_ping_tc.crc16) # Should still calculate CRC tc_raw = new_ping_tc.pack(recalc_crc=False) @@ -172,7 +168,7 @@ def test_from_composite_fields(self): self.assertEqual(pus_17_from_composite_fields.pack(), self.ping_tc.pack()) def test_crc_16(self): - pus_17_telecommand = PusTc(service=17, subservice=1, seq_count=25) + pus_17_telecommand = PusTc(apid=25, service=17, subservice=1, seq_count=25) crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(pus_17_telecommand.pack()) self.assertTrue(crc == 0) @@ -189,7 +185,7 @@ def test_crc_16(self): self.assertTrue(crc_func(packet_raw) == 0) def test_getter_functions(self): - pus_17_telecommand = PusTc(service=17, subservice=1, seq_count=25) + pus_17_telecommand = PusTc(apid=26, service=17, subservice=1, seq_count=25) self.assertTrue(pus_17_telecommand.seq_count == 25) self.assertTrue(pus_17_telecommand.service == 17) self.assertEqual(pus_17_telecommand.subservice, 1) diff --git a/tests/ecss/test_pus_tm.py b/tests/ecss/test_pus_tm.py index 3b3f84d..7fa6a13 100755 --- a/tests/ecss/test_pus_tm.py +++ b/tests/ecss/test_pus_tm.py @@ -11,13 +11,11 @@ SequenceFlags, SpacePacketHeader, ) -from spacepackets.ecss import check_pus_crc -from spacepackets.ecss.conf import set_default_tm_apid +from spacepackets.ecss import check_pus_crc, PusVersion from spacepackets.util import PrintFormats, get_printable_data_string from spacepackets.ecss.tm import ( PusTm, CdsShortTimestamp, - PusVersion, PusTmSecondaryHeader, InvalidTmCrc16, ) @@ -129,7 +127,6 @@ def test_service_from_raw_invalid(self): self.assertRaises(ValueError, PusTm.service_from_bytes, bytearray()) def test_source_data_string_getters(self): - set_default_tm_apid(0x22) source_data = bytearray([0x42, 0x38]) self.ping_reply.tm_data = source_data self.assertEqual( @@ -226,15 +223,15 @@ def test_unpack(self): PusTm.unpack(data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP)) def test_calc_crc(self): - new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) + new_ping_tm = PusTm(apid=0, service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) new_ping_tm.calc_crc() - self.assertIsNotNone(new_ping_tm.crc16) + assert new_ping_tm.crc16 is not None self.assertTrue(isinstance(new_ping_tm.crc16, bytes)) self.assertEqual(len(new_ping_tm.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) + new_ping_tm = PusTm(apid=0, service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) # Should still calculate CRC tc_raw = new_ping_tm.pack(recalc_crc=False) diff --git a/tests/ecss/test_srv1.py b/tests/ecss/test_srv1.py index cd0b6f7..9e0e60f 100644 --- a/tests/ecss/test_srv1.py +++ b/tests/ecss/test_srv1.py @@ -26,8 +26,11 @@ class Service1TmTest(TestCase): def setUp(self) -> None: - ping_tc = PusTc(service=17, subservice=1) - self.srv1_tm = create_start_success_tm(ping_tc, TEST_STAMP) + self.def_apid = 0x02 + ping_tc = PusTc(apid=self.def_apid, service=17, subservice=1) + self.srv1_tm = create_start_success_tm( + apid=self.def_apid, pus_tc=ping_tc, timestamp=TEST_STAMP + ) def test_failure_notice_invalid_creation(self): with self.assertRaises(ValueError): @@ -92,21 +95,28 @@ def test_service_1_tm_completion_success(self): self._generic_test_srv_1_success(Subservice.TM_COMPLETION_SUCCESS) def _generic_test_srv_1_success(self, subservice: Subservice): - pus_tc = PusTc(service=17, subservice=1) + pus_tc = PusTc(apid=self.def_apid, service=17, subservice=1) helper_created = None step_id = None if subservice == Subservice.TM_ACCEPTANCE_SUCCESS: - helper_created = create_acceptance_success_tm(pus_tc, TEST_STAMP) + helper_created = create_acceptance_success_tm( + apid=self.def_apid, pus_tc=pus_tc, timestamp=TEST_STAMP + ) elif subservice == Subservice.TM_START_SUCCESS: - helper_created = create_start_success_tm(pus_tc, TEST_STAMP) + helper_created = create_start_success_tm(self.def_apid, pus_tc, TEST_STAMP) elif subservice == Subservice.TM_STEP_SUCCESS: step_id = PacketFieldEnum.with_byte_size(1, 4) - helper_created = create_step_success_tm(pus_tc, step_id, TEST_STAMP) + helper_created = create_step_success_tm( + self.def_apid, pus_tc, step_id, TEST_STAMP + ) elif subservice == Subservice.TM_COMPLETION_SUCCESS: - helper_created = create_completion_success_tm(pus_tc, timestamp=TEST_STAMP) + helper_created = create_completion_success_tm( + self.def_apid, pus_tc, timestamp=TEST_STAMP + ) self._test_srv_1_success_tm( pus_tc, Service1Tm( + apid=self.def_apid, subservice=subservice, verif_params=VerificationParams( req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), @@ -155,7 +165,7 @@ def _test_srv_1_success_tm( self.assertEqual(srv_1_tm_unpacked.step_id, step_id) def _generic_test_srv_1_failure(self, subservice: Subservice): - pus_tc = PusTc(service=17, subservice=1) + pus_tc = PusTc(apid=self.def_apid, service=17, subservice=1) failure_notice = FailureNotice( code=PacketFieldEnum.with_byte_size(1, 8), data=bytes([2, 4]) ) @@ -163,13 +173,16 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): step_id = None if subservice == Subservice.TM_ACCEPTANCE_FAILURE: helper_created = create_acceptance_failure_tm( - pus_tc, failure_notice, TEST_STAMP + self.def_apid, pus_tc, failure_notice, TEST_STAMP ) elif subservice == Subservice.TM_START_FAILURE: - helper_created = create_start_failure_tm(pus_tc, failure_notice, TEST_STAMP) + helper_created = create_start_failure_tm( + self.def_apid, pus_tc, failure_notice, TEST_STAMP + ) elif subservice == Subservice.TM_STEP_FAILURE: step_id = PacketFieldEnum.with_byte_size(2, 12) helper_created = create_step_failure_tm( + self.def_apid, pus_tc, failure_notice=failure_notice, step_id=step_id, @@ -177,11 +190,12 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): ) elif subservice == Subservice.TM_COMPLETION_FAILURE: helper_created = create_completion_failure_tm( - pus_tc, failure_notice, TEST_STAMP + self.def_apid, pus_tc, failure_notice, TEST_STAMP ) self._test_srv_1_failure_comparison_helper( pus_tc, Service1Tm( + apid=self.def_apid, subservice=subservice, verif_params=VerificationParams( req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), diff --git a/tests/ecss/test_srv17.py b/tests/ecss/test_srv17.py index 34414bb..a6ca240 100644 --- a/tests/ecss/test_srv17.py +++ b/tests/ecss/test_srv17.py @@ -9,7 +9,8 @@ class TestSrv17Tm(TestCase): def setUp(self) -> None: - self.srv17_tm = Service17Tm(subservice=1, timestamp=bytes()) + self.def_apid = 0x05 + self.srv17_tm = Service17Tm(apid=self.def_apid, subservice=1, timestamp=bytes()) self.srv17_tm.pus_tm.apid = 0x72 self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) @@ -29,6 +30,7 @@ def test_state(self): def test_other_state(self): srv17_with_data = Service17Tm( + apid=self.def_apid, subservice=128, timestamp=CdsShortTimestamp(0, 0).pack(), source_data=bytes([0, 1, 2]), @@ -40,7 +42,7 @@ def test_other_state(self): ) def test_service_17_tm(self): - srv_17_tm = Service17Tm(subservice=2, timestamp=TEST_STAMP) + srv_17_tm = Service17Tm(apid=self.def_apid, subservice=2, timestamp=TEST_STAMP) self.assertEqual(srv_17_tm.pus_tm.subservice, 2) srv_17_tm_raw = srv_17_tm.pack() srv_17_tm_unpacked = Service17Tm.unpack( diff --git a/tests/test_pus_verificator.py b/tests/test_pus_verificator.py index 08a6c24..a868885 100644 --- a/tests/test_pus_verificator.py +++ b/tests/test_pus_verificator.py @@ -25,37 +25,44 @@ class SuccessSet: - def __init__(self, pus_tc: PusTc): + def __init__(self, apid: int, pus_tc: PusTc): self.pus_tc = pus_tc self.req_id = RequestId.from_pus_tc(pus_tc) self.empty_stamp = CdsShortTimestamp.empty() - self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.empty_stamp.pack()) - self.sta_suc_tm = create_start_success_tm(pus_tc, self.empty_stamp.pack()) + self.acc_suc_tm = create_acceptance_success_tm( + apid, pus_tc, self.empty_stamp.pack() + ) + self.sta_suc_tm = create_start_success_tm(apid, pus_tc, self.empty_stamp.pack()) self.ste_suc_tm = create_step_success_tm( + apid=apid, pus_tc=pus_tc, step_id=StepId.with_byte_size(1, 1), timestamp=self.empty_stamp.pack(), ) - self.fin_suc_tm = create_completion_success_tm(pus_tc, self.empty_stamp.pack()) + self.fin_suc_tm = create_completion_success_tm( + apid, pus_tc, self.empty_stamp.pack() + ) class FailureSet: - def __init__(self, pus_tc: PusTc, failure_notice: FailureNotice): - self.suc_set = SuccessSet(pus_tc) + def __init__(self, apid: int, pus_tc: PusTc, failure_notice: FailureNotice): + self.suc_set = SuccessSet(apid, pus_tc) self.failure_notice = failure_notice self.acc_fail_tm = create_acceptance_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() + apid, pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.sta_fail_tm = create_start_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() + apid, pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.ste_fail_tm = create_step_failure_tm( + apid, pus_tc, failure_notice=self.failure_notice, step_id=StepId.with_byte_size(1, 1), timestamp=CdsShortTimestamp.empty().pack(), ) self.fin_fail_tm = create_completion_failure_tm( + apid, failure_notice=self.failure_notice, pus_tc=self.suc_set.pus_tc, timestamp=CdsShortTimestamp.empty().pack(), @@ -72,10 +79,13 @@ def req_id(self): class TestPusVerificator(TestCase): def setUp(self) -> None: + self.def_apid = 0x06 self.pus_verificator = PusVerificator() def test_basic(self): - suc_set = SuccessSet(PusTc(service=17, subservice=1)) + suc_set = SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ) self.pus_verificator.add_tc(suc_set.pus_tc) check_res = self.pus_verificator.add_tm(suc_set.acc_suc_tm) self.assertEqual(check_res.completed, False) @@ -104,30 +114,48 @@ def test_basic(self): ) def test_complete_verification_clear_completed(self): - self._regular_success_seq(SuccessSet(PusTc(service=17, subservice=1))) + self._regular_success_seq( + SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ), + ) self.pus_verificator.remove_completed_entries() self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_clear_completed_multi(self): self._regular_success_seq( - SuccessSet(PusTc(service=17, subservice=1, seq_count=0)) + SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + ) ) self._regular_success_seq( - SuccessSet(PusTc(service=5, subservice=4, seq_count=1)) + SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=5, subservice=4, seq_count=1), + ) ) self.pus_verificator.remove_completed_entries() self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_remove_manually(self): - suc_set = SuccessSet(PusTc(service=17, subservice=1)) + suc_set = SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ) self._regular_success_seq(suc_set) self.assertTrue(self.pus_verificator.remove_entry(suc_set.req_id)) self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_multi_remove_manually(self): - set_0 = SuccessSet(PusTc(service=17, subservice=1, seq_count=0)) + set_0 = SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + ) self._regular_success_seq(set_0) - set_1 = SuccessSet(PusTc(service=5, subservice=4, seq_count=1)) + set_1 = SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=5, subservice=4, seq_count=1), + ) self._regular_success_seq(set_1) self.assertTrue(self.pus_verificator.remove_entry(set_0.req_id)) self.assertEqual(len(self.pus_verificator.verif_dict), 1) @@ -136,7 +164,11 @@ def test_complete_verification_multi_remove_manually(self): def test_acceptance_failure(self): notice = FailureNotice(ErrorCode.with_byte_size(1, 8), data=bytes([0, 1])) - fail_set = FailureSet(PusTc(service=17, subservice=1, seq_count=0), notice) + fail_set = FailureSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + notice, + ) self.assertTrue(self.pus_verificator.add_tc(fail_set.pus_tc)) status = self.pus_verificator.add_tm(fail_set.acc_fail_tm) self.assertIsNotNone(status) @@ -153,7 +185,11 @@ def test_acceptance_failure(self): def test_step_failure(self): notice = FailureNotice(ErrorCode.with_byte_size(1, 8), data=bytes([0, 1])) - fail_set = FailureSet(PusTc(service=17, subservice=1, seq_count=0), notice) + fail_set = FailureSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + notice, + ) self.assertTrue(self.pus_verificator.add_tc(fail_set.pus_tc)) status = self.pus_verificator.add_tm(fail_set.suc_set.acc_suc_tm) self.assertIsNotNone(status) From a750a929a62de7b1cd2d881c336b51cb67cfb07a Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 10:16:13 +0200 Subject: [PATCH 3/3] changelog update --- CHANGELOG.md | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e1d40c..78bef99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,26 +10,25 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [v0.24.0] 2024-04-23 +## Removed + +- Global configuration module for TC and TM APID was removed. + ## Changed - ECSS PUS telemetry time handling is now more generic and low level: Constructors expect - a simple `bytes` type while unpackers/readers expect the length of the timestamp. Determining - or knowing the size of the timestamp is now the task of the user, but a helper constant - is expose which can help with this. + a simple `bytes` type while unpackers/readers expect the length of the timestamp. A helper + constant for the offset of the timestamp is exposed which can help with determining the + length of the timestamp. - `CdsShortTimestamp.from_now` renamed to `now`. - The ECSS TMTC APID field must not be set explicitely in the class constructors. ## Added - - `spacepackets.ecss.tm.PUS_TM_TIMESTAMP_OFFSET` constant which can be used as a look-ahead to determine the timestamp length from a raw PUS TM packet. - `spacepackets.ccsds.CCSDS_HEADER_LEN` constant. -## Removed - -- Global configuration module for TC and TM APID was removed. - # [v0.23.1] 2024-04-22 ## Added