Skip to content

Commit

Permalink
added stop on field validation exception flag
Browse files Browse the repository at this point in the history
  • Loading branch information
BuzzCutNorman committed Dec 5, 2023
1 parent ebf3bfe commit 65d0578
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
8 changes: 8 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class Sink(metaclass=abc.ABCMeta):
validate_field_string_format = False
"""Enable JSON schema format validation, for example `date-time` string fields."""

stop_on_field_validation_exception: bool = True
"""Enable Target to stop when a JSONSchmea field validation exception is raised."""

def __init__(
self,
target: Target,
Expand Down Expand Up @@ -422,13 +425,18 @@ def _validate_and_parse(self, record: dict) -> dict:
Returns:
TODO
Raises:
InvalidRecord: If the record is invalid.
"""
if self._validator is not None:
# TODO: Check the performance impact of this try/except block. It runs
# on every record, so it's probably bad and should be moved up the stack.
try:
self._validator.validate(record)
except InvalidRecord as e:
if self.stop_on_field_validation_exception:
raise InvalidRecord(e) from e
self.logger.exception("Record validation failed %s", e)

self._parse_timestamps_in_record(
Expand Down
57 changes: 54 additions & 3 deletions tests/core/sinks/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

from singer_sdk.exceptions import InvalidRecord
from tests.conftest import BatchSinkMock, TargetMock


Expand Down Expand Up @@ -59,7 +60,7 @@ def test_validate_record():


@pytest.fixture
def draft7_sink():
def draft7_sink_stop():
"""Return a sink object with Draft7 checks enabled."""

class CustomSink(BatchSinkMock):
Expand All @@ -84,8 +85,57 @@ class CustomSink(BatchSinkMock):
)


def test_validate_record_jsonschema_format_checking_enabled(capsys, draft7_sink):
sink: BatchSinkMock = draft7_sink
@pytest.fixture
def draft7_sink_continue():
"""Return a sink object with Draft7 checks enabled."""

class CustomSink(BatchSinkMock):
"""Custom sink class."""

validate_field_string_format = True
stop_on_field_validation_exception = False

return CustomSink(
TargetMock(),
"users",
{
"type": "object",
"properties": {
"id": {"type": "integer"},
"created_at": {"type": "string", "format": "date-time"},
"created_at_date": {"type": "string", "format": "date"},
"created_at_time": {"type": "string", "format": "time"},
"invalid_datetime": {"type": "string", "format": "date-time"},
},
},
["id"],
)


def test_validate_record_jsonschema_format_checking_enabled_stop_on_error(
draft7_sink_stop
):
sink: BatchSinkMock = draft7_sink_stop

record = {
"id": 1,
"created_at": "2021-01-01T00:00:00+00:00",
"created_at_date": "2021-01-01",
"created_at_time": "00:01:00+00:00",
"missing_datetime": "2021-01-01T00:00:00+00:00",
"invalid_datetime": "not a datetime",
}
with pytest.raises(
InvalidRecord,
match=r"data.invalid_datetime must be date-time",
):
sink._validate_and_parse(record)


def test_validate_record_jsonschema_format_checking_enabled_continue_on_error(
capsys, draft7_sink_continue
):
sink: BatchSinkMock = draft7_sink_continue

record = {
"id": 1,
Expand All @@ -95,6 +145,7 @@ def test_validate_record_jsonschema_format_checking_enabled(capsys, draft7_sink)
"missing_datetime": "2021-01-01T00:00:00+00:00",
"invalid_datetime": "not a datetime",
}

updated_record = sink._validate_and_parse(record)
captured = capsys.readouterr()

Expand Down

0 comments on commit 65d0578

Please sign in to comment.