diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/db/repo.ex b/apps/omg_watcher_info/lib/omg_watcher_info/db/repo.ex index b75b06b74d..6a1935b2ff 100644 --- a/apps/omg_watcher_info/lib/omg_watcher_info/db/repo.ex +++ b/apps/omg_watcher_info/lib/omg_watcher_info/db/repo.ex @@ -33,16 +33,23 @@ defmodule OMG.WatcherInfo.DB.Repo do def insert_all_chunked(_schema_or_source, [], _opts), do: :ok def insert_all_chunked(schema_or_source, entries, opts) do - chunk_size = @max_params_count |> div(entries |> hd |> fields_count) - utc_now = DateTime.utc_now() + entries = Enum.map(entries, fn entry -> Map.merge(entry, %{inserted_at: utc_now, updated_at: utc_now}) end) + + chunk_size = entries |> hd() |> chunk_size() entries - |> Enum.map(fn entry -> Map.merge(entry, %{inserted_at: utc_now, updated_at: utc_now}) end) |> Stream.chunk_every(chunk_size) |> Enum.each(&insert_all(schema_or_source, &1, opts)) end - defp fields_count(map) when is_map(map), do: map |> Kernel.map_size() - defp fields_count(list) when is_list(list), do: length(list) + # Note: an entry with 0 fields will cause a divide-by-zero error. + # + # DB.Repo.chunk_size(%{}) ==> (ArithmeticError) bad argument in arithmetic expression + # + # But we could not think of a case where this code happen, so no defensive + # checks here. + def chunk_size(entry), do: div(@max_params_count, fields_count(entry)) + + defp fields_count(map), do: Kernel.map_size(map) end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/db/repo_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/db/repo_test.exs index 64b3b252cb..88520e63d3 100644 --- a/apps/omg_watcher_info/test/omg_watcher_info/db/repo_test.exs +++ b/apps/omg_watcher_info/test/omg_watcher_info/db/repo_test.exs @@ -25,7 +25,16 @@ defmodule OMG.WatcherInfo.DB.RepoTest do require Utxo describe "DB.Repo.insert_all_chunked/3" do - # a special test for insert_all_chunked/3 is here because under the hood it calls insert_all/2. using + # The current number of columns on the transaction table allow up to 8191 + # transactions to be inserted using `DB.Repo.insert_all/3` before chunking must + # be done to avoid hitting postgres limits. The test `DB.Repo.insert_all for + # transactions (via postgres INSERT)...` below shows how this number is derived + # and asserts the number is correct + # + # This is the chunk_size for the transactions table. + @max_txns_before_chunking 8191 + + # A special test for insert_all_chunked/3 is here because under the hood it calls insert_all/2. Using # insert_all/3 with a queryable means that certain autogenerated columns, such as inserted_at and # updated_at, will not be inserted as they would be if you used a plain insert. More info # is here: https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3 @@ -43,6 +52,49 @@ defmodule OMG.WatcherInfo.DB.RepoTest do assert txoutput_with_dates.inserted_at != nil assert DateTime.compare(txoutput_with_dates.inserted_at, txoutput_with_dates.updated_at) == :eq end + + @tag fixtures: [:phoenix_ecto_sandbox] + test "insert_all_chunked/3 does not exceed postgres' max of 65535 parameters" do + block = insert(:block) + + # Create an array of transactions beyond postgres limits where chunking + # is required. + transactions = new_transactions(block.blknum, @max_txns_before_chunking + 1) + + assert DB.Repo.insert_all_chunked(OMG.WatcherInfo.DB.Transaction, transactions) == :ok + end + + @tag fixtures: [:phoenix_ecto_sandbox] + test "chunk_size/1 calculates the correct chunk size based on the column's in the transaction table" do + block = insert(:block) + + transaction = new_transaction(block.blknum, 1, DateTime.utc_now()) + + assert DB.Repo.chunk_size(transaction) == @max_txns_before_chunking + end + end + + @tag fixtures: [:phoenix_ecto_sandbox] + test "DB.Repo.insert_all for transactions (via postgres INSERT) is limited to #{@max_txns_before_chunking} transactions" do + utc_now = DateTime.utc_now() + + # test that transaction inseration at the max limit succeeds + block = insert(:block) + transactions = new_transactions(block.blknum, @max_txns_before_chunking, utc_now) + + {transactions_inserted, _} = DB.Repo.insert_all(OMG.WatcherInfo.DB.Transaction, transactions) + + assert transactions_inserted == @max_txns_before_chunking + + # test that transaction inseration above the max limit raises an exception + block = insert(:block) + transactions = new_transactions(block.blknum, @max_txns_before_chunking + 1, utc_now) + + assert_raise( + Postgrex.QueryError, + "postgresql protocol can not handle 65536 parameters, the maximum is 65535", + fn -> DB.Repo.insert_all(OMG.WatcherInfo.DB.Transaction, transactions) end + ) end describe "DB.Repo timestamps" do @@ -60,4 +112,38 @@ defmodule OMG.WatcherInfo.DB.RepoTest do end) end end + + # Prefer using `ExMachina.build_list/3 which uses `OMG.WatcherInfo.Factory.Transaction` + # over this function. This function is built to be fast and simple. + defp new_transactions(blknum, count, utc_now \\ nil) do + Enum.reduce(1..count, [], fn index, acc -> + [new_transaction(blknum, index, utc_now) | acc] + end) + end + + # Prefer using `ExMachina.build/2 which uses `OMG.WatcherInfo.Factory.Transaction` + # over this function. This function is built to be fast and simple. + # + # `ExMachina.params_for/2` could be used here to make use of `OMG.WatcherInfo.Factory.Transaction`. + # But the transaction factory does a lot of extra stuff unnecessary for this test. This stripped + # down version is about 15x faster. Also using `ExMachina.params_for/2` here also requires some + # tweaking of the map it returns because `OMG.WatcherInfo.DB.Repo.insert_all_chunked` is the code + # being tested rather than `Ecto.Repo.insert_all/3`. The 2 functions differ in the inputs they + # expect. + defp new_transaction(blknum, index, utc_now) do + transaction = %{ + txhash: to_string(index), + txindex: index, + txbytes: to_string(index), + metadata: to_string(index), + txtype: 1, + blknum: blknum + } + + if utc_now != nil do + Map.merge(transaction, %{inserted_at: utc_now, updated_at: utc_now}) + else + transaction + end + end end diff --git a/docker-compose-watcher.yml b/docker-compose-watcher.yml index 916118c973..6a0cbe6084 100644 --- a/docker-compose-watcher.yml +++ b/docker-compose-watcher.yml @@ -17,7 +17,7 @@ services: watcher: #last stable integration watcher - image: omisego/watcher:78f1a7f + image: omisego/watcher:latest command: "full_local" environment: - ETHEREUM_RPC_URL=https://ropsten.infura.io/v3/${INFURA_API_KEY}