From af9b5d56b5edeef2962040e20134e6f1a95c8715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:00:50 -0600 Subject: [PATCH] Move Singer IO to _singerlib --- .flake8 | 2 +- singer_sdk/_singerlib/__init__.py | 4 + singer_sdk/_singerlib/encoding/__init__.py | 12 ++ singer_sdk/_singerlib/encoding/base.py | 144 +++++++++++++++ singer_sdk/_singerlib/encoding/simple.py | 52 ++++++ singer_sdk/_singerlib/exceptions.py | 9 + singer_sdk/_singerlib/{serde.py => json.py} | 5 + singer_sdk/_singerlib/messages.py | 2 +- singer_sdk/contrib/batch_encoder_jsonl.py | 2 +- singer_sdk/exceptions.py | 8 +- singer_sdk/helpers/_flattening.py | 2 +- singer_sdk/io_base.py | 183 +------------------- singer_sdk/sinks/core.py | 2 +- tests/_singerlib/test_messages.py | 7 +- tests/core/test_io.py | 17 +- 15 files changed, 253 insertions(+), 198 deletions(-) create mode 100644 singer_sdk/_singerlib/encoding/__init__.py create mode 100644 singer_sdk/_singerlib/encoding/base.py create mode 100644 singer_sdk/_singerlib/encoding/simple.py create mode 100644 singer_sdk/_singerlib/exceptions.py rename singer_sdk/_singerlib/{serde.py => json.py} (96%) diff --git a/.flake8 b/.flake8 index 9311ae8d0..07b7824bd 100644 --- a/.flake8 +++ b/.flake8 @@ -1,7 +1,7 @@ [flake8] max-line-length = 88 exclude = cookiecutter -ignore = E, W +ignore = E, F, W per-file-ignores = # Don't require docstrings conventions in private modules singer_sdk/helpers/_*.py:DAR diff --git a/singer_sdk/_singerlib/__init__.py b/singer_sdk/_singerlib/__init__.py index bc0b4523b..8386399d6 100644 --- a/singer_sdk/_singerlib/__init__.py +++ b/singer_sdk/_singerlib/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +from singer_sdk._singerlib import exceptions from singer_sdk._singerlib.catalog import ( Catalog, CatalogEntry, @@ -16,6 +17,7 @@ SingerMessageType, StateMessage, exclude_null_dict, + format_message, write_message, ) from singer_sdk._singerlib.schema import Schema, resolve_schema_references @@ -35,7 +37,9 @@ "SingerMessageType", "StateMessage", "StreamMetadata", + "exceptions", "exclude_null_dict", + "format_message", "resolve_schema_references", "strftime", "strptime_to_utc", diff --git a/singer_sdk/_singerlib/encoding/__init__.py b/singer_sdk/_singerlib/encoding/__init__.py new file mode 100644 index 000000000..7819a1452 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/__init__.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from .base import GenericSingerReader, GenericSingerWriter, SingerMessageType +from .simple import SingerReader, SingerWriter + +__all__ = [ + "GenericSingerReader", + "GenericSingerWriter", + "SingerMessageType", + "SingerReader", + "SingerWriter", +] diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/encoding/base.py new file mode 100644 index 000000000..91d8a1c08 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/base.py @@ -0,0 +1,144 @@ +"""Abstract base classes for all Singer messages IO operations.""" + +from __future__ import annotations + +import abc +import logging +import typing as t +from collections import Counter, defaultdict + +from singer_sdk._singerlib import exceptions +from singer_sdk._singerlib.messages import Message, SingerMessageType + +logger = logging.getLogger(__name__) + + +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) + + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" + + @t.final + def listen(self, file_input: t.IO[T] | None = None) -> None: + """Read from input until all messages are processed. + + Args: + file_input: Readable stream of messages. Defaults to standard in. + """ + self._process_lines(file_input or self.default_input) + self._process_endofpipe() + + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: + """Internal method to process jsonl lines from a Singer tap. + + Args: + file_input: Readable stream of messages, each on a separate line. + + Returns: + A counter object for the processed lines. + """ + stats: dict[str, int] = defaultdict(int) + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) + + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) + + elif record_type == SingerMessageType.RECORD: + self._process_record_message(line_dict) + + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) + + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) + + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + + else: + self._process_unknown_message(line_dict) + + stats[record_type] += 1 + + return Counter(**stats) + + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise exceptions.InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... + + @abc.abstractmethod + def _process_schema_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_record_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_state_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_activate_version_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: ... + + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 + """Internal method to process unknown message types from a Singer tap. + + Args: + message_dict: Dictionary representation of the Singer message. + + Raises: + ValueError: raised if a message type is not recognized + """ + record_type = message_dict["type"] + msg = f"Unknown message type '{record_type}' in message." + raise ValueError(msg) + + def _process_endofpipe(self) -> None: # noqa: PLR6301 + logger.debug("End of pipe reached") + + +class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" + + def format_message(self, message: Message) -> T: + """Format a message as a JSON string. + + Args: + message: The message to format. + + Returns: + The formatted message. + """ + return self.serialize_json(message.to_dict()) + + @abc.abstractmethod + def serialize_json(self, obj: object) -> T: ... + + @abc.abstractmethod + def write_message(self, message: Message) -> None: ... diff --git a/singer_sdk/_singerlib/encoding/simple.py b/singer_sdk/_singerlib/encoding/simple.py new file mode 100644 index 000000000..51d92db47 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/simple.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import sys +import typing as t + +from singer_sdk._singerlib.json import deserialize_json, serialize_json + +from .base import GenericSingerReader, GenericSingerWriter + +if t.TYPE_CHECKING: + from singer_sdk._singerlib.messages import Message + + +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" + + default_input = sys.stdin + + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + """ + return deserialize_json(line) + + +class SingerWriter(GenericSingerWriter[str]): + """Interface for all plugins writing Singer messages to stdout.""" + + def serialize_json(self, obj: object) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + + Returns: + A string of serialized json. + """ + return serialize_json(obj) + + def write_message(self, message: Message) -> None: + """Write a message to stdout. + + Args: + message: The message to write. + """ + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush() diff --git a/singer_sdk/_singerlib/exceptions.py b/singer_sdk/_singerlib/exceptions.py new file mode 100644 index 000000000..e3726bb6a --- /dev/null +++ b/singer_sdk/_singerlib/exceptions.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +__all__ = [ + "InvalidInputLine", +] + + +class InvalidInputLine(Exception): + """Raised when an input line is not a valid Singer message.""" diff --git a/singer_sdk/_singerlib/serde.py b/singer_sdk/_singerlib/json.py similarity index 96% rename from singer_sdk/_singerlib/serde.py rename to singer_sdk/_singerlib/json.py index 9971bc301..367238a4a 100644 --- a/singer_sdk/_singerlib/serde.py +++ b/singer_sdk/_singerlib/json.py @@ -10,6 +10,11 @@ logger = logging.getLogger(__name__) +__all__ = [ + "deserialize_json", + "serialize_json", +] + def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 """Default JSON encoder. diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index 2131290b6..ae8977c9e 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -8,7 +8,7 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 7c9505de4..6f121f8d4 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -6,7 +6,7 @@ import typing as t from uuid import uuid4 -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json from singer_sdk.batch import BaseBatcher, lazy_chunked_generator __all__ = ["JSONLinesBatcher"] diff --git a/singer_sdk/exceptions.py b/singer_sdk/exceptions.py index 20ec7ae65..a766952f9 100644 --- a/singer_sdk/exceptions.py +++ b/singer_sdk/exceptions.py @@ -5,6 +5,8 @@ import abc import typing as t +from singer_sdk._singerlib.exceptions import InvalidInputLine # noqa: F401 + if t.TYPE_CHECKING: import requests @@ -137,11 +139,7 @@ class ConformedNameClashException(Exception): class MissingKeyPropertiesError(Exception): - """Raised when a recieved (and/or transformed) record is missing key properties.""" - - -class InvalidInputLine(Exception): - """Raised when an input line is not a valid Singer message.""" + """Raised when a received (and/or transformed) record is missing key properties.""" class InvalidJSONSchema(Exception): diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index c1999fcc6..79ca50fdc 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -10,7 +10,7 @@ import inflection -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json DEFAULT_FLATTENING_SEPARATOR = "__" diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index ce7d09ee1..f9041beea 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -2,185 +2,4 @@ from __future__ import annotations -import abc -import logging -import sys -import typing as t -from collections import Counter, defaultdict - -from singer_sdk._singerlib.messages import Message, SingerMessageType -from singer_sdk._singerlib.serde import deserialize_json, serialize_json -from singer_sdk.exceptions import InvalidInputLine - -logger = logging.getLogger(__name__) - -# TODO: Use to default to 'str' here -# https://peps.python.org/pep-0696/ -T = t.TypeVar("T", str, bytes) - - -class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages as strings or bytes.""" - - @t.final - def listen(self, file_input: t.IO[T] | None = None) -> None: - """Read from input until all messages are processed. - - Args: - file_input: Readable stream of messages. Defaults to standard in. - """ - self._process_lines(file_input or self.default_input) - self._process_endofpipe() - - def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: - """Internal method to process jsonl lines from a Singer tap. - - Args: - file_input: Readable stream of messages, each on a separate line. - - Returns: - A counter object for the processed lines. - """ - stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) - - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) - - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) - - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) - - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) - - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) - - else: - self._process_unknown_message(line_dict) - - stats[record_type] += 1 - - return Counter(**stats) - - @property - @abc.abstractmethod - def default_input(self) -> t.IO[T]: ... # noqa: D102 - - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO - - Raises: - InvalidInputLine: raised if any required keys are missing - """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise InvalidInputLine(msg) - - @abc.abstractmethod - def deserialize_json(self, line: T) -> dict: ... # noqa: D102 - - @abc.abstractmethod - def _process_schema_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_record_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_state_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_activate_version_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_batch_message(self, message_dict: dict) -> None: ... - - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 - """Internal method to process unknown message types from a Singer tap. - - Args: - message_dict: Dictionary representation of the Singer message. - - Raises: - ValueError: raised if a message type is not recognized - """ - record_type = message_dict["type"] - msg = f"Unknown message type '{record_type}' in message." - raise ValueError(msg) - - def _process_endofpipe(self) -> None: # noqa: PLR6301 - logger.debug("End of pipe reached") - - -class SingerReader(GenericSingerReader[str]): - """Base class for all plugins reading Singer messages as strings from stdin.""" - - default_input = sys.stdin - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - """ - return deserialize_json(line) - - -class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins writing Singer messages as strings or bytes.""" - - def format_message(self, message: Message) -> T: - """Format a message as a JSON string. - - Args: - message: The message to format. - - Returns: - The formatted message. - """ - return self.serialize_json(message.to_dict()) - - @abc.abstractmethod - def serialize_json(self, obj: object) -> T: ... # noqa: D102 - - @abc.abstractmethod - def write_message(self, message: Message) -> None: ... # noqa: D102 - - -class SingerWriter(GenericSingerWriter[str]): - """Interface for all plugins writing Singer messages to stdout.""" - - def serialize_json(self, obj: object) -> str: # noqa: PLR6301 - """Serialize a dictionary into a line of json. - - Args: - obj: A Python object usually a dict. - - Returns: - A string of serialized json. - """ - return serialize_json(obj) - - def write_message(self, message: Message) -> None: - """Write a message to stdout. - - Args: - message: The message to write. - """ - sys.stdout.write(self.format_message(message) + "\n") - sys.stdout.flush() +from singer_sdk._singerlib.encoding import * # noqa: F403 diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 4fb475f8b..53533d58b 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -16,7 +16,7 @@ import jsonschema from typing_extensions import override -from singer_sdk._singerlib.serde import deserialize_json +from singer_sdk._singerlib.json import deserialize_json from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index 491573545..b433e9610 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -8,7 +8,6 @@ from pytz import timezone import singer_sdk._singerlib as singer -from singer_sdk.io_base import SingerWriter UTC = datetime.timezone.utc @@ -19,24 +18,22 @@ def test_exclude_null_dict(): def test_format_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) - assert singerwriter.format_message(message) == ( + assert singer.format_message(message) == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}' ) def test_write_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) with redirect_stdout(io.StringIO()) as out: - singerwriter.write_message(message) + singer.write_message(message) assert out.getvalue() == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 0fcce614b..39644cce7 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -3,9 +3,10 @@ from __future__ import annotations import decimal +import io import itertools import json -from contextlib import nullcontext +from contextlib import nullcontext, redirect_stdout import pytest @@ -57,6 +58,20 @@ def test_deserialize(line, expected, exception): assert reader.deserialize_json(line) == expected +def test_write_message(): + writer = SingerWriter() + message = RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + ) + with redirect_stdout(io.StringIO()) as out: + writer.write_message(message) + + assert out.getvalue() == ( + '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' + ) + + # Benchmark Tests