diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 0f5c42918b..b23ae0f00a 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -144,6 +144,14 @@ description="The default target database schema name to use for all streams.", ), ).to_dict() +ACTIVATE_VERSION_CONFIG = PropertiesList( + Property( + "activate_version", + BooleanType, + title="Process `ACTIVATE_VERSION` messages", + description="Whether to process `ACTIVATE_VERSION` messages.", + ), +).to_dict() ADD_RECORD_METADATA_CONFIG = PropertiesList( Property( "add_record_metadata", diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 5070655d47..7d9d24a3cd 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool: """ return self.config.get("add_record_metadata", False) + @property + def process_activate_version_messages(self) -> bool: + """Check if activate version messages should be processed. + + Returns: + True if activate version messages should be processed. + """ + return self.config.get("activate_version", True) + @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8907b6201e..b361698ce2 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -454,6 +454,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None: for stream_map in self.mapper.stream_maps[stream_name]: sink = self.get_sink(stream_map.stream_alias) + if not sink.process_activate_version_messages: + self.logger.warning( + "Activate version messages are not enabled for '%s'. Ignoring.", + stream_map.stream_alias, + ) + continue + if not sink.include_sdc_metadata_properties: + self.logger.warning( + "ACTIVATE_VERSION requires _sdc_* metadata properties to be " + "included. Set `add_record_metadata` to `True` if you wanna use " + "this feature." + ) + continue sink.activate_version(message_dict["version"]) def _process_batch_message(self, message_dict: dict) -> None: