Skip to content

Commit

Permalink
Make black happy
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Dec 22, 2023
1 parent 29d5982 commit a4c17e1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
12 changes: 7 additions & 5 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,13 @@ def consume(self, message) -> dict | None:
elif message_payload["action"] in delete_actions:
for column in message_payload["identity"]:
row.update({column["name"]: column["value"]})
row.update({
"_sdc_deleted_at": datetime.datetime.utcnow().strftime(
r"%Y-%m-%dT%H:%M:%SZ"
)
})
row.update(
{
"_sdc_deleted_at": datetime.datetime.utcnow().strftime(
r"%Y-%m-%dT%H:%M:%SZ"
)
}
)
row.update({"_sdc_lsn": message.data_start})
elif message_payload["action"] in truncate_actions:
self.logger.debug(
Expand Down
40 changes: 24 additions & 16 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,24 +541,32 @@ def catalog(self) -> Catalog: # noqa: C901
new_stream.schema.required = None
if "_sdc_deleted_at" not in new_stream.schema.properties: # type: ignore[operator]
stream_modified = True
new_stream.schema.properties.update({ # type: ignore[union-attr]
"_sdc_deleted_at": Schema(type=["string", "null"])
})
new_stream.metadata.update({
("properties", "_sdc_deleted_at"): Metadata(
Metadata.InclusionType.AVAILABLE, True, None
)
})
new_stream.schema.properties.update(
{ # type: ignore[union-attr]
"_sdc_deleted_at": Schema(type=["string", "null"])
}
)
new_stream.metadata.update(
{
("properties", "_sdc_deleted_at"): Metadata(
Metadata.InclusionType.AVAILABLE, True, None
)
}
)
if "_sdc_lsn" not in new_stream.schema.properties: # type: ignore[operator]
stream_modified = True
new_stream.schema.properties.update({ # type: ignore[union-attr]
"_sdc_lsn": Schema(type=["integer", "null"])
})
new_stream.metadata.update({
("properties", "_sdc_lsn"): Metadata(
Metadata.InclusionType.AVAILABLE, True, None
)
})
new_stream.schema.properties.update(
{ # type: ignore[union-attr]
"_sdc_lsn": Schema(type=["integer", "null"])
}
)
new_stream.metadata.update(
{
("properties", "_sdc_lsn"): Metadata(
Metadata.InclusionType.AVAILABLE, True, None
)
}
)
if stream_modified:
modified_streams.append(new_stream.tap_stream_id)
new_catalog.add_stream(new_stream)
Expand Down

0 comments on commit a4c17e1

Please sign in to comment.