Skip to content

Commit

Permalink
Watcher_Info events race cause spent output presented unspent (#1769)
Browse files Browse the repository at this point in the history
* feat: add 'ensure_output' to EthEvent.insert_exits to explicitly allow
missing outputs

* test: demonstrate missing output feature

* feat: unplug Std exit consumer of WatcherInfo

* feat: unplug IFE exit started consumer of WatcherInfo

* feat: unplug IFE output piggybacked consumer of WatcherInfo

* feat: unplug IFE output withdrawn consumer of WatcherInfo

* feat: unplug deposits handling out of the Bus

* feat: unplug block application handling out of the Bus

* chore: delete unused modules, linter fixes

* fix: improve TxOutput schema queries and show that by tests

* fix: fix wrongly marked outout existance

* chore: [after review] align to comments

* test: use syncronous test when talking to postgres

* refactor: [after review] Insert full block data synchronously instead of pending block

* chore: delete unused code regarding pending blocks machinery

* test: fix tests - change nonexisting pending block

* test: fix test

* refactor: be explicit about which event expect the output existence

* chore: align to comment

Co-authored-by: Ino Murko <[email protected]>
Co-authored-by: Pawel Nowosielski <[email protected]>
  • Loading branch information
3 people authored Jan 19, 2021
1 parent 35bf68d commit 0e96ddd
Show file tree
Hide file tree
Showing 43 changed files with 466 additions and 1,308 deletions.
26 changes: 26 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ commands:
sudo apt-get update &&
./bin/setup
no_output_timeout: 2400
install_and_setup_gcloud:
description: Installs and sets up gcloud to fetch feefeed
steps:
- run: |
export LD_LIBRARY_PATH=/usr/local/lib
export CLOUDSDK_PYTHON=/usr/bin/python
wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-323.0.0-linux-x86_64.tar.gz -O gcloud-sdk.tar.gz
tar zxf gcloud-sdk.tar.gz google-cloud-sdk
mv google-cloud-sdk ~/.google-cloud-sdk
~/.google-cloud-sdk/install.sh --quiet
echo $GCP_KEY_FILE | gcloud auth activate-service-account $GCP_SERVICE_EMAIL --key-file=-
gcloud --quiet config set project ${GCP_PROJECT}
gcloud --quiet config set compute/zone ${GCP_ZONE}
gcloud --quiet auth configure-docker
jobs:
barebuild:
Expand Down Expand Up @@ -618,6 +632,8 @@ jobs:
image: ubuntu-2004:202010-01
environment:
SNAPSHOT: SNAPSHOT_MIX_EXIT_PERIOD_SECONDS_120
LD_LIBRARY_PATH: /usr/local/lib
CLOUDSDK_PYTHON: /usr/bin/python
parallelism: 4
steps:
- checkout
Expand All @@ -632,6 +648,7 @@ jobs:
[ -d data ] || mkdir data && chmod 777 data
- docker_login
- make_docker_images
- install_and_setup_gcloud
- run:
name: Start daemon services
command: |
Expand Down Expand Up @@ -664,6 +681,8 @@ jobs:
environment:
PERF_IMAGE_NAME: "omisego/perf:latest"
STATIX_TAG: "env:perf_circleci"
LD_LIBRARY_PATH: /usr/local/lib
CLOUDSDK_PYTHON: /usr/bin/python
steps:
- checkout
- run:
Expand All @@ -672,6 +691,7 @@ jobs:
[ -d data ] || mkdir data && chmod 777 data
- docker_login
- make_docker_images
- install_and_setup_gcloud
- run:
name: Build perf docker image
command: make docker-perf IMAGE_NAME=$PERF_IMAGE_NAME
Expand Down Expand Up @@ -731,6 +751,8 @@ jobs:
image: ubuntu-2004:202010-01
environment:
REORG: true
LD_LIBRARY_PATH: /usr/local/lib
CLOUDSDK_PYTHON: /usr/bin/python
steps:
- checkout
- run:
Expand All @@ -746,6 +768,7 @@ jobs:
[ -d data ] || mkdir data && chmod 777 data
- docker_login
- make_docker_images
- install_and_setup_gcloud
- run:
name: Start daemon services
command: |
Expand Down Expand Up @@ -794,6 +817,8 @@ jobs:
image: ubuntu-2004:202010-01
environment:
TERM: xterm-256color
LD_LIBRARY_PATH: /usr/local/lib
CLOUDSDK_PYTHON: /usr/bin/python
steps:
- checkout
- run:
Expand All @@ -802,6 +827,7 @@ jobs:
git submodule init
git submodule update --remote
- run: echo 'export PATH=~/.cargo/bin:$PATH' >> $BASH_ENV
- install_and_setup_gcloud
- docker_login
- run:
name: Start geth, postgres, feefeed and pull in blockchain snapshot
Expand Down
3 changes: 3 additions & 0 deletions apps/omg/lib/omg/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ defmodule OMG.State do
end

def handle_call({:deposit, deposits}, _from, state) do
if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent),
do: Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_deposits!, [deposits])

{:ok, db_updates, new_state} = Core.deposit(deposits, state)

{:reply, {:ok, db_updates}, new_state}
Expand Down
18 changes: 17 additions & 1 deletion apps/omg/test/omg/state/persistence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule OMG.State.PersistenceTest do

require OMG.Utxo

alias Ecto.Adapters.SQL.Sandbox
alias OMG.Block
alias OMG.Eth.Configuration
alias OMG.State.Transaction
Expand Down Expand Up @@ -57,6 +58,20 @@ defmodule OMG.State.PersistenceTest do
strategy: :one_for_one
)

Application.ensure_all_started(:postgrex)
Application.ensure_all_started(:spandex_ecto)
Application.ensure_all_started(:ecto)

{:ok, _} =
Supervisor.start_link(
[%{id: OMG.WatcherInfo.DB.Repo, start: {OMG.WatcherInfo.DB.Repo, :start_link, []}, type: :supervisor}],
strategy: :one_for_one,
name: WatcherInfo.Supervisor
)

:ok = Sandbox.checkout(OMG.WatcherInfo.DB.Repo)
Sandbox.mode(OMG.WatcherInfo.DB.Repo, {:shared, self()})

on_exit(fn ->
Application.put_env(:omg_db, :path, nil)

Expand Down Expand Up @@ -226,7 +241,8 @@ defmodule OMG.State.PersistenceTest do
owner: owner.addr,
currency: currency,
amount: amount,
blknum: blknum
blknum: blknum,
eth_height: 1
}
end)
end
Expand Down
29 changes: 28 additions & 1 deletion apps/omg/test/omg/state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule OMG.StateTest do

use OMG.DB.Fixtures

alias Ecto.Adapters.SQL.Sandbox
alias OMG.State
alias OMG.TestHelper
alias OMG.Utxo
Expand All @@ -38,6 +39,20 @@ defmodule OMG.StateTest do
# the pubsub is required, because `OMG.State` is broadcasting to the `OMG.Bus`
{:ok, bus_apps} = Application.ensure_all_started(:omg_bus)

Application.ensure_all_started(:postgrex)
Application.ensure_all_started(:spandex_ecto)
Application.ensure_all_started(:ecto)

{:ok, _} =
Supervisor.start_link(
[%{id: OMG.WatcherInfo.DB.Repo, start: {OMG.WatcherInfo.DB.Repo, :start_link, []}, type: :supervisor}],
strategy: :one_for_one,
name: WatcherInfo.Supervisor
)

:ok = Sandbox.checkout(OMG.WatcherInfo.DB.Repo)
Sandbox.mode(OMG.WatcherInfo.DB.Repo, {:shared, self()})

on_exit(fn ->
(started_apps ++ bus_apps)
|> Enum.reverse()
Expand Down Expand Up @@ -68,7 +83,19 @@ defmodule OMG.StateTest do
fee = %{@eth => [1]}

# deposits, transactions, utxo existence
assert {:ok, _} = State.deposit([%{owner: alice.addr, currency: @eth, amount: 10, blknum: 1}])
assert {:ok, _} =
State.deposit([
%{
owner: alice.addr,
currency: @eth,
amount: 10,
blknum: 1,
root_chain_txhash: <<1::256>>,
eth_height: 1,
log_index: 0
}
])

assert true == State.utxo_exists?(Utxo.position(1, 0, 0))

assert {:ok, _} = State.exec(TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 9}]), fee)
Expand Down
4 changes: 2 additions & 2 deletions apps/omg_status/lib/omg_status/metric/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule OMG.Status.Metric.Event do
:ife_exit_finalizer,
:in_flight_exit,
:in_flight_exit_processor,
:in_flight_exit_deleted_processor,
:piggyback,
:piggyback_challenges_processor,
:piggyback_processor,
Expand All @@ -54,7 +55,6 @@ defmodule OMG.Status.Metric.Event do
:watcher_exit_processor_message_queue_len - OMG.Watcher.ExitProcessor message queue length
:eventer_message_queue_len - OMG.Watcher.Eventer message queue length
:db_message_queue_len - OMG.DB server implementation (OMG.DB.LevelDB.Server, or OMG.DB.RocksDB.Server,) message queue length
:pending_block_queue_length - OMG.WatcherInfo.DB.PendingBlock queue length
:write - OMG.DB KV layer has three types of actions: write, read, multiread
:read - OMG.DB KV layer has three types of actions: write, read, multiread
:multiread - OMG.DB KV layer has three types of actions: write, read, multiread
Expand All @@ -78,7 +78,6 @@ defmodule OMG.Status.Metric.Event do
def name(:watcher_exit_processor_message_queue_len), do: "watcher_exit_processor_message_queue_len"
def name(:eventer_message_queue_len), do: "eventer_message_queue_len"
def name(:db_message_queue_len), do: "db_message_queue_len"
def name(:pending_block_queue_length), do: "pending_block_queue_length"
def name(:write), do: "db_write"
def name(:read), do: "db_read"
def name(:multiread), do: "db_multiread"
Expand All @@ -100,6 +99,7 @@ defmodule OMG.Status.Metric.Event do
defp events_name(:exit_finalizer), do: "exit_finalizer_ethereum_events"
defp events_name(:exit_challenger), do: "exit_challenger_ethereum_events"
defp events_name(:in_flight_exit_processor), do: "in_flight_exit_processor_ethereum_events"
defp events_name(:in_flight_exit_deleted_processor), do: "in_flight_exit_deleted_processor_ethereum_events"
defp events_name(:piggyback_processor), do: "piggyback_processor_ethereum_events"
defp events_name(:competitor_processor), do: "competitor_processor_ethereum_events"
defp events_name(:challenges_responds_processor), do: "challenges_responds_processor_ethereum_events"
Expand Down
6 changes: 2 additions & 4 deletions apps/omg_watcher/lib/omg_watcher/block_getter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@ defmodule OMG.Watcher.BlockGetter do

case Core.validate_executions(tx_exec_results, block_application, state) do
{:ok, state} ->
:ok =
{:child_chain, "block.get"}
|> OMG.Bus.Event.new(:block_received, block_application)
|> OMG.Bus.direct_local_broadcast()
if Code.ensure_loaded?(OMG.WatcherInfo.DB.Block),
do: Kernel.apply(OMG.WatcherInfo.BlockApplicator, :insert_block!, [block_application])

{:noreply, state, {:continue, {:apply_block_step, :run_block_download_task, block_application}}}

Expand Down
41 changes: 30 additions & 11 deletions apps/omg_watcher/lib/omg_watcher/exit_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ defmodule OMG.Watcher.ExitProcessor do
)
|> Enum.map(fn {:ok, result} -> result end)

if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent),
do: Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_exits!, [exits, :standard_exit, nil])

{new_state, db_updates} = Core.new_exits(state, exit_maps, exit_contract_statuses)
{:reply, {:ok, db_updates}, new_state}
end
Expand All @@ -404,13 +407,17 @@ defmodule OMG.Watcher.ExitProcessor do
end)

# Prepare events data for internal bus
:ok =
events =
exits
|> Enum.map(fn %{call_data: %{input_utxos_pos: inputs}} = event ->
{event, inputs}
end)
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightExitStarted")

:ok = publish_internal_bus_events(events, :InFlightExitStarted)

if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent),
do: Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_exits!, [events, :in_flight_exit, :InFlightExitStarted])

{:ok, statuses} = Eth.RootChain.get_in_flight_exit_structs(contract_ife_ids)
ife_contract_statuses = Enum.zip(statuses, contract_ife_ids)
Expand Down Expand Up @@ -444,10 +451,16 @@ defmodule OMG.Watcher.ExitProcessor do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} piggybacks: #{inspect(exits)}")
{new_state, db_updates} = Core.new_piggybacks(state, exits)

:ok =
exits
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightTxOutputPiggybacked")
events = Tools.to_bus_events_data(exits)
:ok = publish_internal_bus_events(events, :InFlightTxOutputPiggybacked)

if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent),
do:
Kernel.apply(
OMG.WatcherInfo.DB.EthEvent,
:insert_exits!,
[events, :in_flight_exit, :InFlightTxOutputPiggybacked]
)

{:reply, {:ok, db_updates}, new_state}
end
Expand Down Expand Up @@ -501,10 +514,16 @@ defmodule OMG.Watcher.ExitProcessor do

{:ok, state3, db_updates} = Core.finalize_in_flight_exits(state2, finalizations, invalidities)

:ok =
events_with_utxos
|> Tools.to_bus_events_data()
|> publish_internal_bus_events("InFlightExitOutputWithdrawn")
events = Tools.to_bus_events_data(events_with_utxos)
:ok = publish_internal_bus_events(events, :InFlightExitOutputWithdrawn)

if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent),
do:
Kernel.apply(
OMG.WatcherInfo.DB.EthEvent,
:insert_exits!,
[events, :in_flight_exit, :InFlightExitOutputWithdrawn]
)

{:reply, {:ok, state_db_updates ++ db_updates}, state3}
end
Expand Down Expand Up @@ -707,7 +726,7 @@ defmodule OMG.Watcher.ExitProcessor do

defp publish_internal_bus_events([], _), do: :ok

defp publish_internal_bus_events(events_data, topic) when is_list(events_data) and is_binary(topic) do
defp publish_internal_bus_events(events_data, topic) when is_list(events_data) and is_atom(topic) do
{:watcher, topic}
|> OMG.Bus.Event.new(:data, events_data)
|> OMG.Bus.direct_local_broadcast()
Expand Down

This file was deleted.

Loading

0 comments on commit 0e96ddd

Please sign in to comment.