diff --git a/meilisync/schemas.py b/meilisync/schemas.py index 88238f9..a538f69 100644 --- a/meilisync/schemas.py +++ b/meilisync/schemas.py @@ -25,6 +25,6 @@ def mapping_data(self, fields_mapping: Optional[dict] = None): if fields_mapping is not None and k in fields_mapping: real_k = fields_mapping[k] or k data[real_k] = v - else: + elif fields_mapping is None: data[k] = v return data or self.data diff --git a/meilisync/source/postgres.py b/meilisync/source/postgres.py index 5172d15..5b90b84 100644 --- a/meilisync/source/postgres.py +++ b/meilisync/source/postgres.py @@ -14,6 +14,23 @@ from meilisync.source import Source +class CustomDictRow(psycopg2.extras.RealDictRow): + def __getitem__(self, key): + try: + return super().__getitem__(key) + except KeyError as exc: + if isinstance(key, int): + return super().__getitem__(list(self.keys())[key]) + raise exc + + +class CustomDictCursor(psycopg2.extras.RealDictCursor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + kwargs["row_factory"] = CustomDictRow + self.row_factory = CustomDictRow + + class Postgres(Source): type = SourceType.postgres slot = "meilisync" @@ -34,7 +51,7 @@ def __init__( self.cursor.execute("SELECT pg_current_wal_lsn()") self.start_lsn = self.cursor.fetchone()[0] self.conn_dict = psycopg2.connect( - **self.kwargs, cursor_factory=psycopg2.extras.RealDictCursor + **self.kwargs, cursor_factory=CustomDictCursor ) async def get_current_progress(self): @@ -81,9 +98,9 @@ def _consumer(self, msg: ReplicationMessage): table = change.get("table") if table not in self.tables: return - columnnames = change.get("columnnames") - columnvalues = change.get("columnvalues") - columntypes = change.get("columntypes") + columnnames = change.get("columnnames", []) + columnvalues = change.get("columnvalues", []) + columntypes = change.get("columntypes", []) for i in range(len(columntypes)): if columntypes[i] == "json":