Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/drop-duplicate-relations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Drop duplicate relation messages before shape routing when Postgres resends unchanged table metadata.
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,7 @@ defmodule Electric.Replication.ShapeLogCollector do
end

defp handle_relation(state, rel) do
OpenTelemetry.add_span_attributes("rel.is_dropped": false)

{updated_rel, tracker_state} =
{relation_status, updated_rel, tracker_state} =
AffectedColumns.transform_relation(rel, state.tracked_relations)

# PG doesn't send all the details in the relation message (in particular, nullability), but
Expand All @@ -646,35 +644,51 @@ defmodule Electric.Replication.ShapeLogCollector do
Inspector.clean(updated_rel.id, state.inspector)
end

:ok =
PersistentReplicationState.set_tracked_relations(
tracker_state,
state.persistent_replication_data_opts
)
case Partitions.handle_relation(state.partitions, updated_rel) do
{:ok, partitions} ->
state = %{state | partitions: partitions}

with {:ok, state} <- publish_relation(state, updated_rel, relation_status) do
:ok =
PersistentReplicationState.set_tracked_relations(
tracker_state,
state.persistent_replication_data_opts
)

{:ok, %{state | tracked_relations: tracker_state}}
end

{:error, :connection_not_available} ->
{{:error, :connection_not_available}, state}
end
end

defp publish_relation(state, rel, :unchanged) do
OpenTelemetry.add_span_attributes("rel.is_dropped": true)

Logger.debug(fn ->
"Dropping unchanged relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}"
end)

{:ok, state}
end

defp publish_relation(state, rel, _relation_status) do
case state do
%{subscriptions: 0} ->
OpenTelemetry.add_span_attributes("rel.is_dropped": true)

Logger.debug(fn ->
"Dropping relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}: no active consumers"
end)

{:ok, %{state | tracked_relations: tracker_state}}
{:ok, state}

_ ->
case Partitions.handle_relation(state.partitions, updated_rel) do
{:ok, partitions} ->
# relation changes will also start consumers if they're not running
state =
publish(
%{state | tracked_relations: tracker_state, partitions: partitions},
updated_rel
)

{:ok, state}

{:error, :connection_not_available} ->
{{:error, :connection_not_available}, state}
end
OpenTelemetry.add_span_attributes("rel.is_dropped": false)

# relation changes will also start consumers if they're not running
{:ok, publish(state, rel)}
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumns do
case {existing_id, existing_rel} do
# New relation, register it
{nil, nil} ->
{rel, add_relation(state, id, rel)}
{:new, rel, add_relation(state, id, rel)}

# Relation identity matches known, let's compare columns
{^id, %Relation{schema: ^schema, table: ^table}} ->
case find_differing_columns(existing_rel, rel) do
# No (noticable) changes to the relation, continue as-is
# No noticeable changes to the relation, continue as-is
[] ->
{rel, state}
{:unchanged, rel, state}

affected_cols ->
updated_rel = %{rel | affected_columns: affected_cols}
{updated_rel, add_relation(state, id, rel)}
{:changed, updated_rel, add_relation(state, id, rel)}
end

# Some part of identity changed, update the state and pass it through
Expand All @@ -44,7 +44,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumns do
"Relation identity changed: #{existing_id}/#{inspect(existing_rel)} -> #{inspect(rel)}"
end)

{rel,
{:identity_changed, rel,
state
|> delete_tracked_relation(schema_table_key(existing_rel), existing_id)
|> add_relation(id, rel)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} = AffectedColumns.transform_relation(relation, state)
{:new, returned_relation, new_state} =
AffectedColumns.transform_relation(relation, state)

# Verify relation is returned unchanged
assert returned_relation == relation
Expand All @@ -44,7 +45,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_first} = AffectedColumns.transform_relation(relation1, state)
{:new, _, state_with_first} = AffectedColumns.transform_relation(relation1, state)

# Second relation
relation2 = %Relation{
Expand All @@ -58,7 +59,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:new, returned_relation, new_state} =
AffectedColumns.transform_relation(relation2, state_with_first)

# Verify relation is returned unchanged
Expand All @@ -70,6 +71,27 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
assert new_state.id_to_table_info[2] == relation2
end

test "relation with same id/schema/table and columns is unchanged", %{state: state} do
relation = %Relation{
id: 1,
schema: "public",
table: "users",
columns: [
%Column{name: "id", type_oid: 23},
%Column{name: "name", type_oid: 25}
]
}

{:new, _, state_with_original} =
AffectedColumns.transform_relation(relation, state)

{:unchanged, returned_relation, new_state} =
AffectedColumns.transform_relation(relation, state_with_original)

assert returned_relation == relation
assert new_state == state_with_original
end

test "relation with same id/schema/table but column was added", %{state: state} do
# Original relation
original_relation = %Relation{
Expand All @@ -82,7 +104,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_original} = AffectedColumns.transform_relation(original_relation, state)
{:new, _, state_with_original} =
AffectedColumns.transform_relation(original_relation, state)

# Updated relation with new column
updated_relation = %Relation{
Expand All @@ -96,7 +119,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:changed, returned_relation, new_state} =
AffectedColumns.transform_relation(updated_relation, state_with_original)

# Verify "email" is detected as an affected column
Expand All @@ -118,7 +141,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_original} = AffectedColumns.transform_relation(original_relation, state)
{:new, _, state_with_original} =
AffectedColumns.transform_relation(original_relation, state)

# Updated relation with changed column type
updated_relation = %Relation{
Expand All @@ -132,7 +156,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:changed, returned_relation, new_state} =
AffectedColumns.transform_relation(updated_relation, state_with_original)

# Verify "name" is detected as an affected column
Expand All @@ -155,7 +179,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_original} = AffectedColumns.transform_relation(original_relation, state)
{:new, _, state_with_original} =
AffectedColumns.transform_relation(original_relation, state)

# Updated relation with both name and type changes
updated_relation = %Relation{
Expand All @@ -171,7 +196,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:changed, returned_relation, new_state} =
AffectedColumns.transform_relation(updated_relation, state_with_original)

# Verify both "name"/"username" and "description" are affected columns
Expand All @@ -195,7 +220,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_original} = AffectedColumns.transform_relation(original_relation, state)
{:new, _, state_with_original} =
AffectedColumns.transform_relation(original_relation, state)

# Relation with changed ID
updated_relation = %Relation{
Expand All @@ -208,7 +234,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:identity_changed, returned_relation, new_state} =
AffectedColumns.transform_relation(updated_relation, state_with_original)

# Verify relation is returned unchanged
Expand All @@ -233,7 +259,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{_, state_with_original} = AffectedColumns.transform_relation(original_relation, state)
{:new, _, state_with_original} =
AffectedColumns.transform_relation(original_relation, state)

# Relation with changed schema/table
updated_relation = %Relation{
Expand All @@ -248,7 +275,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do
]
}

{returned_relation, new_state} =
{:identity_changed, returned_relation, new_state} =
AffectedColumns.transform_relation(updated_relation, state_with_original)

# Verify relation is returned unchanged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do

alias Electric.LsnTracker
alias Electric.Postgres.Lsn
alias Electric.Replication.PersistentReplicationState
alias Electric.Replication.ShapeLogCollector
alias Electric.Replication.Changes.Relation
alias Electric.Replication.Changes.Commit
Expand Down Expand Up @@ -889,6 +890,88 @@ defmodule Electric.Replication.ShapeLogCollectorTest do

Support.TransactionConsumer.assert_consume(ctx.consumers, [relation1, relation2])
end

test "retries changed relation after partition inspection connection error", ctx do
id = @shape.root_table_id
{:ok, partition_relation_info_calls} = Agent.start_link(fn -> 0 end)

stub_inspector([force: true],
clean: fn ^id, _ -> :ok end,
load_relation_info: fn ^id, _ ->
call =
Agent.get_and_update(partition_relation_info_calls, fn calls ->
{calls, calls + 1}
end)

case call do
1 ->
{:error, :connection_not_available}

_ ->
{:ok, %{id: id, schema: "public", name: "test_table", parent: nil, children: nil}}
end
end
)

relation = %Relation{
id: id,
table: "test_table",
schema: "public",
columns: [%{name: "id", type_oid: {1, 1}}]
}

changed_relation = %{
relation
| columns: [%{name: "id", type_oid: {2, 1}}]
}

assert :ok = ShapeLogCollector.handle_event(relation, ctx.stack_id)

assert {:error, :connection_not_available} =
ShapeLogCollector.handle_event(changed_relation, ctx.stack_id)

assert :ok = ShapeLogCollector.handle_event(changed_relation, ctx.stack_id)
Support.TransactionConsumer.assert_consume(ctx.consumers, [changed_relation])
end

test "does not persist changed relation before routing completes", ctx do
id = @shape.root_table_id

stub_inspector([force: true], clean: fn ^id, _ -> :ok end)

relation = %Relation{
id: id,
table: "test_table",
schema: "public",
columns: [%{name: "id", type_oid: {1, 1}}]
}

changed_relation = %{
relation
| columns: [%{name: "id", type_oid: {2, 1}}]
}

assert :ok = ShapeLogCollector.handle_event(relation, ctx.stack_id)

persistence_opts = [stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv]

assert %{
id_to_table_info: %{^id => ^relation},
table_to_id: %{{"public", "test_table"} => ^id}
} = PersistentReplicationState.get_tracked_relations(persistence_opts)

Repatch.patch(Electric.Shapes.Filter, :affected_shapes, [mode: :shared], fn
_, _ -> raise "routing failed"
end)

assert {{%RuntimeError{message: "routing failed"}, _}, _} =
catch_exit(ShapeLogCollector.handle_event(changed_relation, ctx.stack_id))

assert %{
id_to_table_info: %{^id => ^relation},
table_to_id: %{{"public", "test_table"} => ^id}
} = PersistentReplicationState.get_tracked_relations(persistence_opts)
end
end

describe "collector not ready" do
Expand Down
Loading
Loading