Skip to content

✨ Add WriteLog replay #2783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ repos:
tests/curators/conftest.py|
tests/permissions/conftest.py|
tests/writelog/conftest.py|
tests/writelog_sqlite/conftest.py|
tests/curators/test_curators_examples.py
)
1 change: 1 addition & 0 deletions lamindb/core/writelog/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FOREIGN_KEYS_LIST_COLUMN_NAME = "_lamin_fks"
251 changes: 234 additions & 17 deletions lamindb/core/writelog/_db_metadata_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from django.db.models import ManyToManyField
from typing_extensions import override

from ._types import KeyConstraint, TableUID, UIDColumns
from ._types import Column, ColumnType, KeyConstraint, TableUID, UIDColumns


class DatabaseMetadataWrapper(ABC):
Expand All @@ -20,9 +20,12 @@
"""

@abstractmethod
def get_column_names(self, table: str, cursor: CursorWrapper) -> set[str]:
def get_columns(self, table: str, cursor: CursorWrapper) -> set[Column]:
raise NotImplementedError()

def get_column_names(self, table: str, cursor: CursorWrapper) -> set[str]:
return {c.name for c in self.get_columns(table, cursor)}

@abstractmethod
def get_tables_with_installed_triggers(self, cursor: CursorWrapper) -> set[str]:
raise NotImplementedError()
Expand Down Expand Up @@ -67,14 +70,45 @@

return many_to_many_tables

def _get_columns_by_name(
self, table: str, column_names: list[str], cursor: CursorWrapper
) -> list[Column]:
columns = self.get_columns(table=table, cursor=cursor)

column_list: list[Column] = []

for column_name in column_names:
column = next((c for c in columns if c.name == column_name), None)

if column is None:
raise ValueError(

Check warning on line 84 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L84

Added line #L84 was not covered by tests
f"Table '{table}' doesn't have a column named '{column_name}'"
)

column_list.append(column)

return column_list

def get_uid_columns(self, table: str, cursor: CursorWrapper) -> UIDColumns:
"""Get the UID columns for a given table."""
if table == "lamindb_featurevalue":
# TODO: update this to feature + hash instead of value + created_at
return [
TableUID(
source_table_name=table,
uid_columns=["value", "created_at"],
uid_columns=self._get_columns_by_name(
table, ["value", "created_at"], cursor
),
key_constraint=None,
)
]
elif table == "lamindb_param":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This table does no longer exist. I consciously removed it. So, no need to add it back. ☺️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed in the next PR.

return [

Check warning on line 106 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L106

Added line #L106 was not covered by tests
TableUID(
source_table_name=table,
uid_columns=self._get_columns_by_name(
table, ["name", "dtype", "created_at"], cursor
),
key_constraint=None,
)
]
Expand All @@ -86,7 +120,7 @@
return [
TableUID(
source_table_name=table,
uid_columns=["uid"],
uid_columns=self._get_columns_by_name(table, ["uid"], cursor),
key_constraint=None,
)
]
Expand Down Expand Up @@ -128,6 +162,11 @@


class PostgresDatabaseMetadataWrapper(DatabaseMetadataWrapper):
def __init__(self) -> None:
super().__init__()

self._columns: dict[str, set[Column]] | None = None

@override
def get_table_key_constraints(
self, table: str, cursor: CursorWrapper
Expand All @@ -141,7 +180,11 @@
WHEN 'f' THEN 'FOREIGN KEY'
END AS constraint_type,
a.attname AS source_column,
a.attnum AS source_column_position,
pg_catalog.format_type(a.atttypid, a.atttypmod) AS source_column_type,
CASE WHEN tc.contype = 'f' THEN af.attname ELSE NULL END AS target_column,
CASE WHEN tc.contype = 'f' THEN af.attnum ELSE NULL END AS target_column_position,
CASE WHEN tc.contype = 'f' THEN pg_catalog.format_type(af.atttypid, af.atttypmod) ELSE NULL END AS target_column_type,
CASE WHEN tc.contype = 'f' THEN tf.relname ELSE NULL END AS target_table
FROM
pg_constraint tc
Expand Down Expand Up @@ -171,12 +214,27 @@
(
constraint_name,
constraint_type,
source_column,
target_column,
source_column_name,
source_column_position,
source_column_type,
target_column_name,
target_column_position,
target_column_type,
target_table,
) = k

source_column = Column(
name=source_column_name,
type=self._get_column_type(source_column_type),
ordinal_position=source_column_position,
)

if constraint_type == "PRIMARY KEY":
if target_table is not None or target_column_name is not None:
raise Exception(

Check warning on line 234 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L234

Added line #L234 was not covered by tests
"Expected foreign key's target table/column to be NULL"
)

if primary_key_constraint is None:
primary_key_constraint = KeyConstraint(
constraint_name=constraint_name,
Expand All @@ -187,8 +245,14 @@
)

primary_key_constraint.source_columns.append(source_column)
primary_key_constraint.target_columns.append(target_column)

elif constraint_type == "FOREIGN KEY":
target_column = Column(
name=target_column_name,
type=self._get_column_type(target_column_type),
ordinal_position=target_column_position,
)

if constraint_name not in foreign_key_constraints:
foreign_key_constraints[constraint_name] = KeyConstraint(
constraint_name=constraint_name,
Expand All @@ -214,14 +278,64 @@

return (primary_key_constraint, list(foreign_key_constraints.values()))

def _get_column_type(self, data_type: str) -> ColumnType:
column_type: ColumnType

if data_type in ("smallint", "integer", "bigint"):
column_type = ColumnType.INT
elif data_type in ("boolean",):
column_type = ColumnType.BOOL
elif data_type in ("character varying", "text"):
column_type = ColumnType.STR
elif data_type in ("jsonb",):
column_type = ColumnType.JSON
elif data_type in ("date",):
column_type = ColumnType.DATE
elif data_type in ("timestamp with time zone",):
column_type = ColumnType.TIMESTAMPTZ
elif data_type in ("double precision",):
column_type = ColumnType.FLOAT

Check warning on line 297 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L296-L297

Added lines #L296 - L297 were not covered by tests
else:
raise ValueError(

Check warning on line 299 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L299

Added line #L299 was not covered by tests
f"Don't know how to canonicalize column type '{data_type}'"
)

return column_type

@override
def get_column_names(self, table: str, cursor: CursorWrapper) -> set[str]:
cursor.execute(
"SELECT column_name FROM information_schema.columns WHERE TABLE_NAME = %s ORDER BY ordinal_position",
(table,),
)
def get_columns(self, table: str, cursor: CursorWrapper) -> set[Column]:
if self._columns is None:
cursor.execute("""
SELECT
table_name,
column_name,
data_type,
ordinal_position
FROM information_schema.columns
WHERE
table_schema not in ('pg_catalog', 'information_schema')
ORDER BY table_name, ordinal_position
""")
self._columns = {}

return {r[0] for r in cursor.fetchall()}
for row in cursor.fetchall():
table_name, column_name, data_type, ordinal_position = row

column_type = self._get_column_type(data_type)
ordinal_position = ordinal_position

if table_name not in self._columns:
self._columns[table_name] = set()

self._columns[table_name].add(
Column(
name=column_name,
type=column_type,
ordinal_position=ordinal_position,
)
)

return self._columns[table]

@override
def get_tables_with_installed_triggers(self, cursor: CursorWrapper) -> set[str]:
Expand All @@ -246,15 +360,118 @@
"""This is a placeholder until we implement the SQLite side of synchronization."""

@override
def get_column_names(self, table: str, cursor: CursorWrapper) -> set[str]:
raise NotImplementedError()
def get_columns(self, table: str, cursor: CursorWrapper) -> set[Column]:
cursor.execute(
"""
SELECT
name,
"type",
cid AS ordinal_position
FROM pragma_table_info(%s)
""",
[table],
)

return {
Column(
name=r[0],
type=self._data_type_to_column_type(r[1]),
ordinal_position=r[2],
)
for r in cursor.fetchall()
}

@override
def get_tables_with_installed_triggers(self, cursor: CursorWrapper) -> set[str]:
raise NotImplementedError()
cursor.execute("SELECT tbl_name FROM sqlite_master WHERE type = 'trigger';")

Check warning on line 386 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L386

Added line #L386 was not covered by tests

return {r[0] for r in cursor.fetchall()}

Check warning on line 388 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L388

Added line #L388 was not covered by tests

@override
def get_table_key_constraints(
self, table: str, cursor: CursorWrapper
) -> tuple[KeyConstraint, list[KeyConstraint]]:
raise NotImplementedError()
cursor.execute(
"""
SELECT
p.name AS column_name,
p.type AS data_type,
p.cid AS ordinal_position
FROM sqlite_schema m
JOIN pragma_table_info(m.name) p ON p.pk > 0
WHERE m.type = 'table'
AND m.name = %s
ORDER BY m.name, p.pk;
""",
[table],
)

primary_key_columns = cursor.fetchall()

primary_key_constraint = KeyConstraint(
constraint_name="primary",
constraint_type="PRIMARY KEY",
source_columns=[
Column(
name=row[0],
type=self._data_type_to_column_type(row[1]),
ordinal_position=row[2],
)
for row in primary_key_columns
],
target_columns=[],
target_table=table,
)

foreign_key_constraints: dict[int, KeyConstraint] = {}

cursor.execute(
"""
SELECT
id AS fk_id,
"from" AS from_column,
"table" AS referenced_table,
"to" AS referenced_column
FROM pragma_foreign_key_list(%s)
ORDER BY id, seq;
""",
[table],
)

rows = cursor.fetchall()

for row in rows:
fk_id, from_column, referenced_table, referenced_column = row

Check warning on line 444 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L444

Added line #L444 was not covered by tests

if fk_id not in foreign_key_constraints:
foreign_key_constraints[fk_id] = KeyConstraint(

Check warning on line 447 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L446-L447

Added lines #L446 - L447 were not covered by tests
constraint_name=f"foreign_key_{fk_id}",
constraint_type="FOREIGN KEY",
source_columns=[],
target_columns=[],
target_table=referenced_table,
)

foreign_key_constraints[fk_id].source_columns.append(from_column)
foreign_key_constraints[fk_id].target_columns.append(referenced_column)

Check warning on line 456 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L455-L456

Added lines #L455 - L456 were not covered by tests

return (
primary_key_constraint,
sorted(foreign_key_constraints.values(), key=lambda c: c.constraint_name),
)

def _data_type_to_column_type(self, data_type: str) -> ColumnType:
if data_type.lower() in ("integer", "bigint", "smallint", "smallint unsigned"):
return ColumnType.INT
elif data_type.lower() in ("bool",):
return ColumnType.BOOL
elif data_type.lower().startswith("varchar") or data_type.lower() == "text":
return ColumnType.STR
elif data_type.lower() == "datetime":
return ColumnType.TIMESTAMPTZ
elif data_type.lower() == "date":
return ColumnType.DATE
elif data_type.lower() == "real":
return ColumnType.FLOAT
else:
raise ValueError(f"Unhandled data type '{data_type}'")

Check warning on line 477 in lamindb/core/writelog/_db_metadata_wrapper.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/writelog/_db_metadata_wrapper.py#L477

Added line #L477 was not covered by tests
Loading