Skip to content

Commit

Permalink
fix insert_all_chunked chunk_size calculation (#1356)
Browse files Browse the repository at this point in the history
* fix insert_all_chunked chunk_size calculation

* add max boundary tests for insert_all_chunked

* make lint happy

* fixed typo in comment, added another comment referencing a bottle neck issue

* add tests to show postgres boundaries, cleaned up pipe usage

* removed @txn_chunk_size and addressed other PR comments

* hello lint, my friend

* refactor: per PR suggestion of piping and function calls

Co-Authored-By: Ino Murko <[email protected]>

* refactor: minor pipe/function call changes per PR suggestion

Co-Authored-By: Ino Murko <[email protected]>

* fix: remove comment per PR suggestion

* cleanup: remove unused code per PR suggestion via Slack

* cleanup: remove guard per PR suggestion

Co-authored-by: Ino Murko <[email protected]>
  • Loading branch information
jrhite and Ino Murko authored Feb 27, 2020
1 parent fb4742b commit 4445aee
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 7 deletions.
17 changes: 12 additions & 5 deletions apps/omg_watcher_info/lib/omg_watcher_info/db/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ defmodule OMG.WatcherInfo.DB.Repo do
def insert_all_chunked(_schema_or_source, [], _opts), do: :ok

def insert_all_chunked(schema_or_source, entries, opts) do
chunk_size = @max_params_count |> div(entries |> hd |> fields_count)

utc_now = DateTime.utc_now()
entries = Enum.map(entries, fn entry -> Map.merge(entry, %{inserted_at: utc_now, updated_at: utc_now}) end)

chunk_size = entries |> hd() |> chunk_size()

entries
|> Enum.map(fn entry -> Map.merge(entry, %{inserted_at: utc_now, updated_at: utc_now}) end)
|> Stream.chunk_every(chunk_size)
|> Enum.each(&insert_all(schema_or_source, &1, opts))
end

defp fields_count(map) when is_map(map), do: map |> Kernel.map_size()
defp fields_count(list) when is_list(list), do: length(list)
# Note: an entry with 0 fields will cause a divide-by-zero error.
#
# DB.Repo.chunk_size(%{}) ==> (ArithmeticError) bad argument in arithmetic expression
#
# But we could not think of a case where this code happen, so no defensive
# checks here.
def chunk_size(entry), do: div(@max_params_count, fields_count(entry))

defp fields_count(map), do: Kernel.map_size(map)
end
88 changes: 87 additions & 1 deletion apps/omg_watcher_info/test/omg_watcher_info/db/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ defmodule OMG.WatcherInfo.DB.RepoTest do
require Utxo

describe "DB.Repo.insert_all_chunked/3" do
# a special test for insert_all_chunked/3 is here because under the hood it calls insert_all/2. using
# The current number of columns on the transaction table allow up to 8191
# transactions to be inserted using `DB.Repo.insert_all/3` before chunking must
# be done to avoid hitting postgres limits. The test `DB.Repo.insert_all for
# transactions (via postgres INSERT)...` below shows how this number is derived
# and asserts the number is correct
#
# This is the chunk_size for the transactions table.
@max_txns_before_chunking 8191

# A special test for insert_all_chunked/3 is here because under the hood it calls insert_all/2. Using
# insert_all/3 with a queryable means that certain autogenerated columns, such as inserted_at and
# updated_at, will not be inserted as they would be if you used a plain insert. More info
# is here: https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3
Expand All @@ -43,6 +52,49 @@ defmodule OMG.WatcherInfo.DB.RepoTest do
assert txoutput_with_dates.inserted_at != nil
assert DateTime.compare(txoutput_with_dates.inserted_at, txoutput_with_dates.updated_at) == :eq
end

@tag fixtures: [:phoenix_ecto_sandbox]
test "insert_all_chunked/3 does not exceed postgres' max of 65535 parameters" do
block = insert(:block)

# Create an array of transactions beyond postgres limits where chunking
# is required.
transactions = new_transactions(block.blknum, @max_txns_before_chunking + 1)

assert DB.Repo.insert_all_chunked(OMG.WatcherInfo.DB.Transaction, transactions) == :ok
end

@tag fixtures: [:phoenix_ecto_sandbox]
test "chunk_size/1 calculates the correct chunk size based on the column's in the transaction table" do
block = insert(:block)

transaction = new_transaction(block.blknum, 1, DateTime.utc_now())

assert DB.Repo.chunk_size(transaction) == @max_txns_before_chunking
end
end

@tag fixtures: [:phoenix_ecto_sandbox]
test "DB.Repo.insert_all for transactions (via postgres INSERT) is limited to #{@max_txns_before_chunking} transactions" do
utc_now = DateTime.utc_now()

# test that transaction inseration at the max limit succeeds
block = insert(:block)
transactions = new_transactions(block.blknum, @max_txns_before_chunking, utc_now)

{transactions_inserted, _} = DB.Repo.insert_all(OMG.WatcherInfo.DB.Transaction, transactions)

assert transactions_inserted == @max_txns_before_chunking

# test that transaction inseration above the max limit raises an exception
block = insert(:block)
transactions = new_transactions(block.blknum, @max_txns_before_chunking + 1, utc_now)

assert_raise(
Postgrex.QueryError,
"postgresql protocol can not handle 65536 parameters, the maximum is 65535",
fn -> DB.Repo.insert_all(OMG.WatcherInfo.DB.Transaction, transactions) end
)
end

describe "DB.Repo timestamps" do
Expand All @@ -60,4 +112,38 @@ defmodule OMG.WatcherInfo.DB.RepoTest do
end)
end
end

# Prefer using `ExMachina.build_list/3 which uses `OMG.WatcherInfo.Factory.Transaction`
# over this function. This function is built to be fast and simple.
defp new_transactions(blknum, count, utc_now \\ nil) do
Enum.reduce(1..count, [], fn index, acc ->
[new_transaction(blknum, index, utc_now) | acc]
end)
end

# Prefer using `ExMachina.build/2 which uses `OMG.WatcherInfo.Factory.Transaction`
# over this function. This function is built to be fast and simple.
#
# `ExMachina.params_for/2` could be used here to make use of `OMG.WatcherInfo.Factory.Transaction`.
# But the transaction factory does a lot of extra stuff unnecessary for this test. This stripped
# down version is about 15x faster. Also using `ExMachina.params_for/2` here also requires some
# tweaking of the map it returns because `OMG.WatcherInfo.DB.Repo.insert_all_chunked` is the code
# being tested rather than `Ecto.Repo.insert_all/3`. The 2 functions differ in the inputs they
# expect.
defp new_transaction(blknum, index, utc_now) do
transaction = %{
txhash: to_string(index),
txindex: index,
txbytes: to_string(index),
metadata: to_string(index),
txtype: 1,
blknum: blknum
}

if utc_now != nil do
Map.merge(transaction, %{inserted_at: utc_now, updated_at: utc_now})
else
transaction
end
end
end
2 changes: 1 addition & 1 deletion docker-compose-watcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:

watcher:
#last stable integration watcher
image: omisego/watcher:78f1a7f
image: omisego/watcher:latest
command: "full_local"
environment:
- ETHEREUM_RPC_URL=https://ropsten.infura.io/v3/${INFURA_API_KEY}
Expand Down

0 comments on commit 4445aee

Please sign in to comment.