diff --git a/.changeset/drop-duplicate-relations.md b/.changeset/drop-duplicate-relations.md new file mode 100644 index 0000000000..a5db7106f4 --- /dev/null +++ b/.changeset/drop-duplicate-relations.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Drop duplicate relation messages before shape routing when Postgres resends unchanged table metadata. diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index 829c5b371d..463a7ed8cc 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -633,9 +633,7 @@ defmodule Electric.Replication.ShapeLogCollector do end defp handle_relation(state, rel) do - OpenTelemetry.add_span_attributes("rel.is_dropped": false) - - {updated_rel, tracker_state} = + {relation_status, updated_rel, tracker_state} = AffectedColumns.transform_relation(rel, state.tracked_relations) # PG doesn't send all the details in the relation message (in particular, nullability), but @@ -646,35 +644,51 @@ defmodule Electric.Replication.ShapeLogCollector do Inspector.clean(updated_rel.id, state.inspector) end - :ok = - PersistentReplicationState.set_tracked_relations( - tracker_state, - state.persistent_replication_data_opts - ) + case Partitions.handle_relation(state.partitions, updated_rel) do + {:ok, partitions} -> + state = %{state | partitions: partitions} + + with {:ok, state} <- publish_relation(state, updated_rel, relation_status) do + :ok = + PersistentReplicationState.set_tracked_relations( + tracker_state, + state.persistent_replication_data_opts + ) + + {:ok, %{state | tracked_relations: tracker_state}} + end + + {:error, :connection_not_available} -> + {{:error, :connection_not_available}, state} + end + end + defp publish_relation(state, rel, :unchanged) do + OpenTelemetry.add_span_attributes("rel.is_dropped": true) + + Logger.debug(fn -> + "Dropping unchanged relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}" + end) + + {:ok, state} + end + + defp publish_relation(state, rel, _relation_status) do case state do %{subscriptions: 0} -> + OpenTelemetry.add_span_attributes("rel.is_dropped": true) + Logger.debug(fn -> "Dropping relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}: no active consumers" end) - {:ok, %{state | tracked_relations: tracker_state}} + {:ok, state} _ -> - case Partitions.handle_relation(state.partitions, updated_rel) do - {:ok, partitions} -> - # relation changes will also start consumers if they're not running - state = - publish( - %{state | tracked_relations: tracker_state, partitions: partitions}, - updated_rel - ) - - {:ok, state} - - {:error, :connection_not_available} -> - {{:error, :connection_not_available}, state} - end + OpenTelemetry.add_span_attributes("rel.is_dropped": false) + + # relation changes will also start consumers if they're not running + {:ok, publish(state, rel)} end end diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector/affected_columns.ex b/packages/sync-service/lib/electric/replication/shape_log_collector/affected_columns.ex index e0fb5c2251..9e7dc9fd14 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector/affected_columns.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector/affected_columns.ex @@ -24,18 +24,18 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumns do case {existing_id, existing_rel} do # New relation, register it {nil, nil} -> - {rel, add_relation(state, id, rel)} + {:new, rel, add_relation(state, id, rel)} # Relation identity matches known, let's compare columns {^id, %Relation{schema: ^schema, table: ^table}} -> case find_differing_columns(existing_rel, rel) do - # No (noticable) changes to the relation, continue as-is + # No noticeable changes to the relation, continue as-is [] -> - {rel, state} + {:unchanged, rel, state} affected_cols -> updated_rel = %{rel | affected_columns: affected_cols} - {updated_rel, add_relation(state, id, rel)} + {:changed, updated_rel, add_relation(state, id, rel)} end # Some part of identity changed, update the state and pass it through @@ -44,7 +44,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumns do "Relation identity changed: #{existing_id}/#{inspect(existing_rel)} -> #{inspect(rel)}" end) - {rel, + {:identity_changed, rel, state |> delete_tracked_relation(schema_table_key(existing_rel), existing_id) |> add_relation(id, rel)} diff --git a/packages/sync-service/test/electric/replication/shape_log_collector/affected_columns_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector/affected_columns_test.exs index 7e520fb6f2..700aa9bbdd 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector/affected_columns_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector/affected_columns_test.exs @@ -23,7 +23,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = AffectedColumns.transform_relation(relation, state) + {:new, returned_relation, new_state} = + AffectedColumns.transform_relation(relation, state) # Verify relation is returned unchanged assert returned_relation == relation @@ -44,7 +45,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_first} = AffectedColumns.transform_relation(relation1, state) + {:new, _, state_with_first} = AffectedColumns.transform_relation(relation1, state) # Second relation relation2 = %Relation{ @@ -58,7 +59,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:new, returned_relation, new_state} = AffectedColumns.transform_relation(relation2, state_with_first) # Verify relation is returned unchanged @@ -70,6 +71,27 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do assert new_state.id_to_table_info[2] == relation2 end + test "relation with same id/schema/table and columns is unchanged", %{state: state} do + relation = %Relation{ + id: 1, + schema: "public", + table: "users", + columns: [ + %Column{name: "id", type_oid: 23}, + %Column{name: "name", type_oid: 25} + ] + } + + {:new, _, state_with_original} = + AffectedColumns.transform_relation(relation, state) + + {:unchanged, returned_relation, new_state} = + AffectedColumns.transform_relation(relation, state_with_original) + + assert returned_relation == relation + assert new_state == state_with_original + end + test "relation with same id/schema/table but column was added", %{state: state} do # Original relation original_relation = %Relation{ @@ -82,7 +104,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_original} = AffectedColumns.transform_relation(original_relation, state) + {:new, _, state_with_original} = + AffectedColumns.transform_relation(original_relation, state) # Updated relation with new column updated_relation = %Relation{ @@ -96,7 +119,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:changed, returned_relation, new_state} = AffectedColumns.transform_relation(updated_relation, state_with_original) # Verify "email" is detected as an affected column @@ -118,7 +141,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_original} = AffectedColumns.transform_relation(original_relation, state) + {:new, _, state_with_original} = + AffectedColumns.transform_relation(original_relation, state) # Updated relation with changed column type updated_relation = %Relation{ @@ -132,7 +156,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:changed, returned_relation, new_state} = AffectedColumns.transform_relation(updated_relation, state_with_original) # Verify "name" is detected as an affected column @@ -155,7 +179,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_original} = AffectedColumns.transform_relation(original_relation, state) + {:new, _, state_with_original} = + AffectedColumns.transform_relation(original_relation, state) # Updated relation with both name and type changes updated_relation = %Relation{ @@ -171,7 +196,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:changed, returned_relation, new_state} = AffectedColumns.transform_relation(updated_relation, state_with_original) # Verify both "name"/"username" and "description" are affected columns @@ -195,7 +220,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_original} = AffectedColumns.transform_relation(original_relation, state) + {:new, _, state_with_original} = + AffectedColumns.transform_relation(original_relation, state) # Relation with changed ID updated_relation = %Relation{ @@ -208,7 +234,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:identity_changed, returned_relation, new_state} = AffectedColumns.transform_relation(updated_relation, state_with_original) # Verify relation is returned unchanged @@ -233,7 +259,8 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {_, state_with_original} = AffectedColumns.transform_relation(original_relation, state) + {:new, _, state_with_original} = + AffectedColumns.transform_relation(original_relation, state) # Relation with changed schema/table updated_relation = %Relation{ @@ -248,7 +275,7 @@ defmodule Electric.Replication.ShapeLogCollector.AffectedColumnsTest do ] } - {returned_relation, new_state} = + {:identity_changed, returned_relation, new_state} = AffectedColumns.transform_relation(updated_relation, state_with_original) # Verify relation is returned unchanged diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 1744f24201..fb14346b77 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -4,6 +4,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do alias Electric.LsnTracker alias Electric.Postgres.Lsn + alias Electric.Replication.PersistentReplicationState alias Electric.Replication.ShapeLogCollector alias Electric.Replication.Changes.Relation alias Electric.Replication.Changes.Commit @@ -889,6 +890,88 @@ defmodule Electric.Replication.ShapeLogCollectorTest do Support.TransactionConsumer.assert_consume(ctx.consumers, [relation1, relation2]) end + + test "retries changed relation after partition inspection connection error", ctx do + id = @shape.root_table_id + {:ok, partition_relation_info_calls} = Agent.start_link(fn -> 0 end) + + stub_inspector([force: true], + clean: fn ^id, _ -> :ok end, + load_relation_info: fn ^id, _ -> + call = + Agent.get_and_update(partition_relation_info_calls, fn calls -> + {calls, calls + 1} + end) + + case call do + 1 -> + {:error, :connection_not_available} + + _ -> + {:ok, %{id: id, schema: "public", name: "test_table", parent: nil, children: nil}} + end + end + ) + + relation = %Relation{ + id: id, + table: "test_table", + schema: "public", + columns: [%{name: "id", type_oid: {1, 1}}] + } + + changed_relation = %{ + relation + | columns: [%{name: "id", type_oid: {2, 1}}] + } + + assert :ok = ShapeLogCollector.handle_event(relation, ctx.stack_id) + + assert {:error, :connection_not_available} = + ShapeLogCollector.handle_event(changed_relation, ctx.stack_id) + + assert :ok = ShapeLogCollector.handle_event(changed_relation, ctx.stack_id) + Support.TransactionConsumer.assert_consume(ctx.consumers, [changed_relation]) + end + + test "does not persist changed relation before routing completes", ctx do + id = @shape.root_table_id + + stub_inspector([force: true], clean: fn ^id, _ -> :ok end) + + relation = %Relation{ + id: id, + table: "test_table", + schema: "public", + columns: [%{name: "id", type_oid: {1, 1}}] + } + + changed_relation = %{ + relation + | columns: [%{name: "id", type_oid: {2, 1}}] + } + + assert :ok = ShapeLogCollector.handle_event(relation, ctx.stack_id) + + persistence_opts = [stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv] + + assert %{ + id_to_table_info: %{^id => ^relation}, + table_to_id: %{{"public", "test_table"} => ^id} + } = PersistentReplicationState.get_tracked_relations(persistence_opts) + + Repatch.patch(Electric.Shapes.Filter, :affected_shapes, [mode: :shared], fn + _, _ -> raise "routing failed" + end) + + assert {{%RuntimeError{message: "routing failed"}, _}, _} = + catch_exit(ShapeLogCollector.handle_event(changed_relation, ctx.stack_id)) + + assert %{ + id_to_table_info: %{^id => ^relation}, + table_to_id: %{{"public", "test_table"} => ^id} + } = PersistentReplicationState.get_tracked_relations(persistence_opts) + end end describe "collector not ready" do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 8d6bd46707..af5a82a43b 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -431,13 +431,13 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} end - test "does not clean shapes if relation didn't change", ctx do + test "does not route relation to shapes if relation didn't change", ctx do rel = %Relation{ - id: :erlang.phash2({"random", "definitely_different"}), - schema: "random", - table: "definitely_different", - columns: [] + id: @shape1.root_table_id, + schema: elem(@shape1.root_table, 0), + table: elem(@shape1.root_table, 1), + columns: [%{name: "id", type_oid: {1, 1}}, %{name: "value", type_oid: {2, 1}}] } ref1 = Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) @@ -452,6 +452,13 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(rel, ctx.stack_id) + Repatch.patch(Electric.Shapes.Filter, :affected_shapes, [mode: :shared], fn + _, _ -> + raise "Unexpected call to Filter.affected_shapes/2 for unchanged duplicate relation" + end) + + assert :ok = ShapeLogCollector.handle_event(rel, ctx.stack_id) + refute_receive {:DOWN, ^ref1, :process, _, _} refute_receive {:DOWN, ^ref2, :process, _, _} end