diff --git a/.github/workflows/deploy-production.yaml b/.github/workflows/deploy-production.yaml index eb0801be..a49f0a92 100644 --- a/.github/workflows/deploy-production.yaml +++ b/.github/workflows/deploy-production.yaml @@ -55,8 +55,8 @@ jobs: NEWRELIC_APP_NAME: ${{ vars.NEWRELIC_APP_NAME }} SENTRY_ENV: "production" SENTRY_DSN: ${{ secrets.SENTRY_DSN }} - ENABLE_LISTENER: "true" - ENABLE_UPDATER: "true" - ENABLE_FETCHER: "true" + ENABLE_MAINNET_SYNC: "true" + ENABLE_TESTNET_SYNC: "true" + ENABLE_TESTNET2_SYNC: "true" run: | ansible-playbook -i ansible/inventory.yaml ansible/playbooks/deployment.yaml diff --git a/.github/workflows/deploy-testing.yaml b/.github/workflows/deploy-testing.yaml index 5401629b..29feb075 100644 --- a/.github/workflows/deploy-testing.yaml +++ b/.github/workflows/deploy-testing.yaml @@ -53,9 +53,9 @@ jobs: SSH_HOST: ${{ vars.SSH_HOST }} GIT_BRANCH: ${{ github.head_ref || github.ref_name }} ANSIBLE_STDOUT_CALLBACK: "yaml" - ENABLE_LISTENER: "true" - ENABLE_UPDATER: "true" - ENABLE_FETCHER: ${{ vars.ENABLE_FETCHER }} + ENABLE_MAINNET_SYNC: "true" + ENABLE_TESTNET_SYNC: "true" + ENABLE_TESTNET2_SYNC: "true" ENABLE_GATEWAY_DATA: "true" NEWRELIC_KEY: ${{ secrets.NEWRELIC_KEY }} NEWRELIC_APP_NAME: ${{ vars.NEWRELIC_APP_NAME }} diff --git a/ansible/playbooks/deployment.yaml b/ansible/playbooks/deployment.yaml index 6cfa7a73..511e8723 100644 --- a/ansible/playbooks/deployment.yaml +++ b/ansible/playbooks/deployment.yaml @@ -52,9 +52,9 @@ PORT: "4100" SENTRY_ENV: "{{ lookup('ansible.builtin.env', 'SENTRY_ENV') }}" SENTRY_DSN: "{{ lookup('ansible.builtin.env', 'SENTRY_DSN') }}" - ENABLE_LISTENER: "true" - ENABLE_UPDATER: "true" - ENABLE_FETCHER: "true" + ENABLE_MAINNET_SYNC: "true" + ENABLE_TESTNET_SYNC: "true" + ENABLE_TESTNET2_SYNC: "true" args: chdir: /home/starknet_explorer/tmp/madara_explorer register: build_output @@ -88,15 +88,14 @@ aws_secret_access_key: "{{ lookup('ansible.builtin.env', 'AWS_SECRET_ACCESS_KEY') }}" aws_region: "{{ lookup('ansible.builtin.env', 'AWS_REGION') }}" prover_storage: "{{ lookup('ansible.builtin.env', 'PROVER_STORAGE') }}" - enable_listener: "{{ lookup('ansible.builtin.env', 'ENABLE_LISTENER') }}" enable_gateway_data: "{{ lookup('ansible.builtin.env', 'ENABLE_GATEWAY_DATA') }}" newrelic_key: "{{ lookup('ansible.builtin.env', 'NEWRELIC_KEY') }}" newrelic_app_name: "{{ lookup('ansible.builtin.env', 'NEWRELIC_APP_NAME') }}" sentry_env: "{{ lookup('ansible.builtin.env', 'SENTRY_ENV') }}" sentry_dsn: "{{ lookup('ansible.builtin.env', 'SENTRY_DSN') }}" - enable_listener: "true" - enable_updater: "true" - enable_fetcher: "true" + enable_mainnet_sync: "true" + enable_testnet_sync: "true" + enable_testnet2_sync: "true" - name: Create user systemd directory ansible.builtin.file: diff --git a/ansible/playbooks/templates/.env.j2 b/ansible/playbooks/templates/.env.j2 index 06b727ef..3539447c 100644 --- a/ansible/playbooks/templates/.env.j2 +++ b/ansible/playbooks/templates/.env.j2 @@ -10,12 +10,11 @@ AWS_ACCESS_KEY_ID={{ aws_access_key_id }} AWS_SECRET_ACCESS_KEY={{ aws_secret_access_key }} AWS_REGION={{ aws_region }} PROVER_STORAGE={{ prover_storage }} -ENABLE_LISTENER={{ enable_listener }} ENABLE_GATEWAY_DATA={{ enable_gateway_data }} NEWRELIC_KEY={{ newrelic_key }} NEWRELIC_APP_NAME={{ newrelic_app_name }} SENTRY_DSN={{ sentry_dsn }} SENTRY_ENV={{ sentry_env }} -ENABLE_FETCHER={{ enable_fetcher }} -ENABLE_LISTENER={{ enable_listener }} -ENABLE_UPDATER={{ enable_updater }} +ENABLE_MAINNET_SYNC={{ enable_mainnet_sync }} +ENABLE_TESTNET_SYNC={{ enable_testnet_sync }} +ENABLE_TESTNET2_SYNC={{ enable_testnet2_sync }} diff --git a/lib/starknet_explorer/application.ex b/lib/starknet_explorer/application.ex index eb3245f8..78109df1 100644 --- a/lib/starknet_explorer/application.ex +++ b/lib/starknet_explorer/application.ex @@ -11,10 +11,50 @@ defmodule StarknetExplorer.Application do # @networks # |> Enum.flat_map(fn net -> cache_supervisor_spec(net) end) + mainnet_state_sync = + if System.get_env("ENABLE_MAINNET_SYNC") == "true" do + # Start the State Sync System server for mainnet. + [ + Supervisor.child_spec( + {StarknetExplorer.Blockchain.StateSyncSystem, + [network: :mainnet, name: :mainnet_state_sync]}, + id: :mainnet_state_sync + ) + ] + else + [] + end + + testnet_state_sync = + if System.get_env("ENABLE_TESTNET_SYNC") == "true" do + # Start the State Sync System server for testnet. + [ + Supervisor.child_spec( + {StarknetExplorer.Blockchain.StateSyncSystem, + [network: :testnet, name: :testnet_state_sync]}, + id: :testnet_state_sync + ) + ] + else + [] + end + + testnet2_state_sync = + if System.get_env("ENABLE_TESTNET2_SYNC") == "true" do + # Start the State Sync System server for testnet2. + [ + Supervisor.child_spec( + {StarknetExplorer.Blockchain.StateSyncSystem, + [network: :testnet2, name: :testnet2_state_sync]}, + id: :testnet2_state_sync + ) + ] + else + [] + end + children = [ - # Start the Blockchain supervisor - StarknetExplorer.Blockchain.BlockchainSupervisor, # Start the Telemetry supervisor StarknetExplorerWeb.Telemetry, # Start the Ecto repository @@ -27,8 +67,8 @@ defmodule StarknetExplorer.Application do StarknetExplorerWeb.Endpoint, # Start a worker by calling: StarknetExplorer.Worker.start_link(arg) # {StarknetExplorer.Worker, arg} - {DynamicSupervisor, strategy: :one_for_one, name: StarknetExplorer.BlockFetcher} - ] + StarknetExplorer.IndexCache + ] ++ testnet2_state_sync ++ testnet_state_sync ++ mainnet_state_sync # See https://hexdocs.pm/elixir/Supervisor.html # for other strategies and supported options diff --git a/lib/starknet_explorer/block/block.ex b/lib/starknet_explorer/block/block.ex index f6b7b7c3..79792e5a 100644 --- a/lib/starknet_explorer/block/block.ex +++ b/lib/starknet_explorer/block/block.ex @@ -53,11 +53,62 @@ defmodule StarknetExplorer.Block do def changeset(block = %__MODULE__{}, attrs) do block |> cast(attrs, @cast_fields) + end + + def changeset_with_validations(block = %__MODULE__{}, attrs) do + block + |> cast(attrs, @cast_fields) |> validate_required(@required_fields) |> unique_constraint(:number) |> unique_constraint(:hash) end + @doc """ + Given a block from the RPC response, our block from SQL, and transactions receipts + update them into the DB. + """ + def update_from_rpc_response( + block_from_sql, + _block_from_rpc = %{ + "status" => status, + "gas_fee_in_wei" => gas_fee_in_wei, + "execution_resources" => execution_resources + }, + receipts + ) do + tx_receipts = + Enum.map(receipts, fn {tx_hash, rpc_receipt} -> + sql_receipt = + Enum.find(block_from_sql.transactions, fn tx -> + tx.receipt.transaction_hash == tx_hash + end).receipt + + {sql_receipt, rpc_receipt} + end) + + StarknetExplorer.Repo.transaction(fn -> + block_changeset = + Ecto.Changeset.change(block_from_sql, + status: status, + gas_fee_in_wei: gas_fee_in_wei, + execution_resources: execution_resources + ) + + Repo.update!(block_changeset) + + Enum.each(tx_receipts, fn {tx_receipt, rpc_tx_receipt} -> + tx_receipt_changeset = + Ecto.Changeset.change(tx_receipt, + actual_fee: rpc_tx_receipt["actual_fee"], + finality_status: rpc_tx_receipt["finality_status"], + execution_status: rpc_tx_receipt["execution_status"] + ) + + Repo.update!(tx_receipt_changeset) + end) + end) + end + @doc """ Given a block from the RPC response, and transactions receipts insert them into the DB. @@ -88,7 +139,7 @@ defmodule StarknetExplorer.Block do transaction_result = StarknetExplorer.Repo.transaction(fn -> - block_changeset = Block.changeset(%Block{}, block) + block_changeset = Block.changeset_with_validations(%Block{}, block) {:ok, block} = Repo.insert(block_changeset) @@ -137,6 +188,45 @@ defmodule StarknetExplorer.Block do struct(__MODULE__, rpc_block) end + @doc """ + This function will return the lowest continuous block starting from the highest one + stored so far. Note this is not the same as the lowest block. Example: + + If we have stored the blocks [5, 6, 20, 21, 22], this returns `20`, not `5`. + We are using this for the block fetcher logic, where we want to go downwards in order. + The problem is a block with a lower number could be added by someone visiting a details + page for a block, so we need to account for that. + """ + def get_lowest_block_number(network) do + Repo.query( + "SELECT number - 1 + FROM blocks block + WHERE NOT EXISTS + ( + SELECT NULL + FROM blocks mi + WHERE mi.number = block.number - 1 AND mi.network = $1 + ) AND block.network = $1 + ORDER BY number DESC + LIMIT 1", + [network] + ) + end + + @doc """ + Returns the highest block number stored in the DB. + """ + def block_height(network) do + query = + from(b in Block, + where: b.network == ^network, + order_by: [desc: b.number], + limit: 1 + ) + + Repo.one(query) + end + @doc """ Returns the highest block number fetched from the RPC. """ @@ -217,6 +307,16 @@ defmodule StarknetExplorer.Block do |> Repo.preload(:transactions) end + def get_by_number_with_receipts_preload(num, network) do + query = + from b in Block, + where: b.number == ^num and b.network == ^network, + preload: [transactions: :receipt] + + Repo.one(query) + |> Repo.preload(:transactions) + end + def get_by_height(height, network) when is_integer(height) do query = from b in Block, @@ -225,6 +325,19 @@ defmodule StarknetExplorer.Block do Repo.one(query) end + def get_lowest_not_completed_block(network) do + query = + from b in Block, + where: + b.status != "ACCEPTED_ON_L1" or is_nil(b.gas_fee_in_wei) or b.gas_fee_in_wei == "" or + is_nil(b.execution_resources), + where: b.network == ^network, + limit: 1, + order_by: [asc: b.number] + + Repo.one(query) + end + def get_with_not_finalized_blocks(limit \\ 10, network) do query = from b in Block, diff --git a/lib/starknet_explorer/block/block_utils.ex b/lib/starknet_explorer/block/block_utils.ex index 9075ae42..f1281bce 100644 --- a/lib/starknet_explorer/block/block_utils.ex +++ b/lib/starknet_explorer/block/block_utils.ex @@ -1,10 +1,27 @@ defmodule StarknetExplorer.BlockUtils do alias StarknetExplorer.{Rpc, Block} + alias StarknetExplorer.IndexCache def fetch_and_store(block_height, network) do with false <- already_stored?(block_height, network), {:ok, block = %{"block_number" => block_number}} <- fetch_block(block_height, network), - :ok <- store_block(block, network) do + :ok <- store_block(block, network), + :ok <- IndexCache.add_block(block["block_number"], network) do + {:ok, block_number} + else + true -> + {:ok, block_height} + + error -> + {:error, error} + end + end + + def fetch_and_update(block_height, network) do + with block_from_sql <- Block.get_by_number_with_receipts_preload(block_height, network), + {:ok, block_from_rpc = %{"block_number" => block_number}} <- + fetch_block(block_height, network), + {:ok, _update} <- update_block_and_transactions(block_from_sql, block_from_rpc, network) do {:ok, block_number} else true -> @@ -19,6 +36,36 @@ defmodule StarknetExplorer.BlockUtils do not is_nil(Block.get_by_num(block_height, network)) end + def update_block_and_transactions( + block_from_sql, + block_from_rpc = %{"block_number" => block_number}, + network + ) do + with {:ok, receipts} <- receipts_for_block(block_from_rpc, network) do + block_from_rpc = + block_from_rpc + |> Map.put("network", network) + + block_from_rpc = + case Application.get_env(:starknet_explorer, :enable_gateway_data) do + true -> + {:ok, gateway_block = %{"gas_price" => gas_price}} = + StarknetExplorer.Gateway.fetch_block(block_number, network) + + block_from_rpc + |> Map.put("gas_fee_in_wei", gas_price) + |> Map.put("execution_resources", calculate_gateway_block_steps(gateway_block)) + + _ -> + block_from_rpc + |> Map.put("gas_fee_in_wei", "0") + |> Map.put("execution_resources", 0) + end + + Block.update_from_rpc_response(block_from_sql, block_from_rpc, receipts) + end + end + def store_block(block = %{"block_number" => block_number}, network) do with {:ok, receipts} <- receipts_for_block(block, network) do block = @@ -75,10 +122,52 @@ defmodule StarknetExplorer.BlockUtils do end end - def block_height(network) do - StarknetExplorer.Blockchain.ListenerWorker.get_height( - StarknetExplorer.Utils.listener_atom(network) - ) + @doc """ + Get block height from DB. + If any block is present in the DB, use RPC. + """ + def block_height(network) when is_atom(network) do + case Block.block_height(Atom.to_string(network)) do + %Block{} = block -> + {:ok, block.number} + + _else -> + Rpc.get_block_height_no_cache(network) + end + end + + @doc """ + Get the lowest block number from DB. + If any block is present in the DB, use RPC. + """ + def get_lowest_block_number(network) when is_atom(network) do + case Block.get_lowest_block_number(Atom.to_string(network)) do + {:ok, %Postgrex.Result{rows: [[block_number]]}} -> + {:ok, block_number} + + {:ok, %Exqlite.Result{rows: [[block_number]]}} -> + {:ok, block_number} + + _else -> + Rpc.get_block_height_no_cache(network) + end + end + + @doc """ + Get the lowest uncompleted block number from DB. + If any block is present in the DB, use RPC. + A block can be uncompleted if: + - gateway data is missing (execution resources & fee) + - block status != "ACCEPTED_ON_L1" + """ + def get_lowest_not_completed_block(network) when is_atom(network) do + case Block.get_lowest_not_completed_block(Atom.to_string(network)) do + %Block{} = block -> + {:ok, block.number} + + _else -> + Rpc.get_block_height_no_cache(network) + end end def fetch_block(number, network) when is_integer(number) do diff --git a/lib/starknet_explorer/blockchain/blockchain_supervisor.ex b/lib/starknet_explorer/blockchain/blockchain_supervisor.ex deleted file mode 100644 index 68e2ae4c..00000000 --- a/lib/starknet_explorer/blockchain/blockchain_supervisor.ex +++ /dev/null @@ -1,37 +0,0 @@ -defmodule StarknetExplorer.Blockchain.BlockchainSupervisor do - use Supervisor - alias StarknetExplorer.Blockchain.{ListenerSupervisor, UpdaterSupervisor, FetcherSupervisor} - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @impl true - def init(_init_arg) do - fetcher_supervisor = - if System.get_env("ENABLE_FETCHER") == "true" do - [FetcherSupervisor] - else - [] - end - - updater_supervisor = - if System.get_env("ENABLE_UPDATER") == "true" do - [UpdaterSupervisor] - else - [] - end - - listener_supervisor = - if System.get_env("ENABLE_LISTENER") == "true" do - [ListenerSupervisor] - else - [] - end - - children = - listener_supervisor ++ updater_supervisor ++ fetcher_supervisor - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/starknet_explorer/blockchain/fetcher_supervisor.ex b/lib/starknet_explorer/blockchain/fetcher_supervisor.ex deleted file mode 100644 index 4a204222..00000000 --- a/lib/starknet_explorer/blockchain/fetcher_supervisor.ex +++ /dev/null @@ -1,51 +0,0 @@ -defmodule StarknetExplorer.Blockchain.FetcherSupervisor do - use Supervisor - alias StarknetExplorer.Blockchain.FetcherWorker - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - def stop_child(pid) do - Supervisor.terminate_child(__MODULE__, pid) - end - - @impl true - def init(_init_arg) do - finish = 0 - {:ok, start_mainnet} = StarknetExplorer.Rpc.get_block_height_no_cache(:mainnet) - {:ok, start_testnet} = StarknetExplorer.Rpc.get_block_height_no_cache(:testnet) - {:ok, start_testnet2} = StarknetExplorer.Rpc.get_block_height_no_cache(:testnet2) - - children = - if Application.get_env(:starknet_explorer, :env) == :prod do - [ - Supervisor.child_spec( - {FetcherWorker, - [start: start_mainnet, finish: finish, network: :mainnet, name: :fetcher_mainnet]}, - id: "fetcher_mainnet" - ), - Supervisor.child_spec( - {FetcherWorker, - [start: start_testnet, finish: finish, network: :testnet, name: :fetcher_testnet]}, - id: "fetcher_testnet" - ), - Supervisor.child_spec( - {FetcherWorker, - [start: start_testnet2, finish: finish, network: :testnet2, name: :fetcher_testnet2]}, - id: "fetcher_testnet2" - ) - ] - else - [ - Supervisor.child_spec( - {FetcherWorker, - [start: start_mainnet, finish: finish, network: :mainnet, name: :fetcher_mainnet]}, - id: "fetcher_mainnet" - ) - ] - end - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/starknet_explorer/blockchain/fetcher_worker.ex b/lib/starknet_explorer/blockchain/fetcher_worker.ex deleted file mode 100644 index 584df300..00000000 --- a/lib/starknet_explorer/blockchain/fetcher_worker.ex +++ /dev/null @@ -1,60 +0,0 @@ -defmodule StarknetExplorer.Blockchain.FetcherWorker do - @fetch_interval :timer.seconds(5) - alias StarknetExplorer.{Blockchain.FetcherWorker, BlockUtils} - defstruct [:finish, :next_to_fetch, :network] - require Logger - use GenServer, restart: :temporary - - @moduledoc """ - Module dedicated to fetch blocks on the range between 2 numbers - (start and finish, where start > finish) so for example if start = - 10 and finish = 1, then this process will fetch blocks 10 down to 1. - """ - - def start_link(args = [start: _start, finish: _finish, network: _network, name: name]) do - GenServer.start_link(__MODULE__, args, name: name) - end - - @impl true - def init(_args = [start: start, finish: finish, network: network, name: _name]) - when start > finish do - state = %FetcherWorker{ - finish: finish, - next_to_fetch: start, - network: network - } - - Process.send_after(self(), :fetch, @fetch_interval) - - Logger.info("Starting block fetcher from block: #{start} to #{finish}, in network #{network}") - - {:ok, state} - end - - @impl true - def handle_info(:fetch, state = %FetcherWorker{network: network}) do - state = - case BlockUtils.fetch_and_store(state.next_to_fetch, network) do - {:ok, _block} -> - %{state | next_to_fetch: state.next_to_fetch - 1} - - {:error, err} -> - Logger.error( - "Error fetching block number #{state.next_to_fetch}: #{inspect(err)}, retrying..." - ) - - state - end - - maybe_fetch_another(state) - - {:noreply, state} - end - - defp maybe_fetch_another(%FetcherWorker{next_to_fetch: next, finish: last}) - when next < last, - do: StarknetExplorer.Blockchain.FetcherSupervisor.stop_child(self()) - - defp maybe_fetch_another(_), - do: Process.send_after(self(), :fetch, @fetch_interval) -end diff --git a/lib/starknet_explorer/blockchain/listener_supervisor.ex b/lib/starknet_explorer/blockchain/listener_supervisor.ex deleted file mode 100644 index f54f9a71..00000000 --- a/lib/starknet_explorer/blockchain/listener_supervisor.ex +++ /dev/null @@ -1,40 +0,0 @@ -defmodule StarknetExplorer.Blockchain.ListenerSupervisor do - use Supervisor - alias StarknetExplorer.Blockchain.ListenerWorker - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @impl true - def init(_init_arg) do - children = - if Application.get_env(:starknet_explorer, :env) == :prod do - [ - Supervisor.child_spec({ListenerWorker, [network: :mainnet, name: :listener_mainnet]}, - id: "listener_mainnet" - ), - Supervisor.child_spec({ListenerWorker, [network: :testnet, name: :listener_testnet]}, - id: "listener_testnet" - ), - Supervisor.child_spec({ListenerWorker, [network: :testnet2, name: :listener_testnet2]}, - id: "listener_testnet2" - ) - ] - else - [ - Supervisor.child_spec({ListenerWorker, [network: :mainnet, name: :listener_mainnet]}, - id: "listener_mainnet" - ), - Supervisor.child_spec({ListenerWorker, [network: :testnet, name: :listener_testnet]}, - id: "listener_testnet" - ), - Supervisor.child_spec({ListenerWorker, [network: :testnet2, name: :listener_testnet2]}, - id: "listener_testnet2" - ) - ] - end - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/starknet_explorer/blockchain/listener_worker.ex b/lib/starknet_explorer/blockchain/listener_worker.ex deleted file mode 100644 index 48d8686a..00000000 --- a/lib/starknet_explorer/blockchain/listener_worker.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule StarknetExplorer.Blockchain.ListenerWorker do - @moduledoc """ - Periodically fetches the latests block from the Starknet Blockchain and inserts it into the database. - """ - alias StarknetExplorer.{BlockUtils, Rpc, Blockchain.ListenerWorker} - defstruct [:latest_block_number, :network] - use GenServer - require Logger - @fetch_timer :timer.seconds(5) - - def start_link([network: _network, name: name] = arg) do - GenServer.start_link(__MODULE__, arg, name: name) - end - - ## Callbacks - - @impl true - def init([network: network, name: _name] = _args) do - {:ok, block_height} = Rpc.get_block_height_no_cache(network) - - state = %ListenerWorker{ - latest_block_number: block_height, - network: network - } - - Process.send_after(self(), :fetch_latest, @fetch_timer) - Logger.info("Listener enabled") - {:ok, state} - end - - @impl true - def handle_info(:fetch_latest, state = %ListenerWorker{network: network}) do - {:ok, new_block_height} = Rpc.get_block_height_no_cache(network) - new_blocks? = new_block_height > state.latest_block_number - - state = - try_fetch(new_blocks?, state) - - Process.send_after(self(), :fetch_latest, @fetch_timer) - {:noreply, state} - end - - def get_height(network_listener \\ :listener_mainnet) do - case Process.whereis(network_listener) do - nil -> - {:err, "Listener not enabled for that network."} - - _ -> - {:ok, GenServer.call(network_listener, :get_height)} - end - end - - @impl true - def handle_call(:get_height, _from, state) do - {:reply, state.latest_block_number, state} - end - - defp try_fetch(true, state = %ListenerWorker{network: network}) do - next_to_fetch = state.latest_block_number + 1 - - case BlockUtils.fetch_and_store(next_to_fetch, network) do - {:ok, _} -> - Logger.info("New block stored: #{next_to_fetch}") - %{state | latest_block_number: next_to_fetch} - - {:error, err} -> - Logger.error("[Block Listener] Error fetching latest block: #{inspect(err)}, retrying...") - - state - end - end - - defp try_fetch(_new_blocks?, state), do: state -end diff --git a/lib/starknet_explorer/blockchain/state_sync_system.ex b/lib/starknet_explorer/blockchain/state_sync_system.ex new file mode 100644 index 00000000..0b342074 --- /dev/null +++ b/lib/starknet_explorer/blockchain/state_sync_system.ex @@ -0,0 +1,114 @@ +defmodule StarknetExplorer.Blockchain.StateSyncSystem do + @moduledoc """ + State Sync System. + """ + alias StarknetExplorer.{Block, BlockUtils, Rpc, Blockchain.StateSyncSystem, Counts} + defstruct [:current_block_number, :network, :next_to_fetch, :updater_block_number] + use GenServer + require Logger + @fetch_timer :timer.seconds(5) + + def start_link([network: _network, name: name] = arg) do + GenServer.start_link(__MODULE__, arg, name: name) + end + + ## Callbacks + @impl true + def init([network: network, name: _name] = _args) do + {:ok, block_height} = BlockUtils.block_height(network) + {:ok, lowest_block_number} = BlockUtils.get_lowest_block_number(network) + + {:ok, lowest_not_finished_block_number} = + BlockUtils.get_lowest_not_completed_block(network) + + state = %StateSyncSystem{ + current_block_number: block_height, + next_to_fetch: lowest_block_number - 1, + updater_block_number: lowest_not_finished_block_number, + network: network + } + + Process.send_after(self(), :listener, @fetch_timer) + Process.send_after(self(), :fetcher, @fetch_timer) + Process.send_after(self(), :updater, @fetch_timer) + Logger.info("State Sync System enabled for network #{network}.") + {:ok, state} + end + + @impl true + def handle_info( + :updater, + state = %StateSyncSystem{network: network, updater_block_number: nil} + ) do + {:ok, lowest_not_completed_block} = Block.get_lowest_not_completed_block(network) + state = %{state | updater_block_number: lowest_not_completed_block} + + Process.send_after(self(), :updater, @fetch_timer) + {:noreply, state} + end + + @impl true + def handle_info( + :updater, + state = %StateSyncSystem{network: network, updater_block_number: updater_block_number} + ) do + {:ok, _} = BlockUtils.fetch_and_update(updater_block_number, network) + {:ok, lowest_not_completed_block} = BlockUtils.get_lowest_not_completed_block(network) + + state = %{ + state + | updater_block_number: lowest_not_completed_block + } + + Process.send_after(self(), :updater, @fetch_timer) + {:noreply, state} + end + + @impl true + def handle_info( + :listener, + state = %StateSyncSystem{network: network, current_block_number: current_block_number} + ) do + {:ok, new_block_height} = Rpc.get_block_height_no_cache(network) + new_blocks? = new_block_height > current_block_number + + state = + try_fetch(new_blocks?, state) + + Process.send_after(self(), :listener, @fetch_timer) + {:noreply, state} + end + + @impl true + def handle_info( + :fetcher, + state = %StateSyncSystem{network: network, next_to_fetch: next_to_fetch} + ) do + {:ok, _} = BlockUtils.fetch_and_store(next_to_fetch, network) + state = %{state | next_to_fetch: next_to_fetch - 1} + + Counts.insert_or_update(network) + + maybe_fetch_another(state) + + {:noreply, state} + end + + defp try_fetch(true, state = %StateSyncSystem{network: network}) do + next_to_fetch = state.current_block_number + 1 + + {:ok, _} = BlockUtils.fetch_and_store(next_to_fetch, network) + + Counts.insert_or_update(network) + + %{state | current_block_number: next_to_fetch} + end + + defp try_fetch(_new_blocks?, state), do: state + + # This means that we are fully syncd. + defp maybe_fetch_another(%StateSyncSystem{next_to_fetch: -1} = _args), do: :ok + + defp maybe_fetch_another(_), + do: Process.send_after(self(), :fetcher, @fetch_timer) +end diff --git a/lib/starknet_explorer/blockchain/updater_supervisor.ex b/lib/starknet_explorer/blockchain/updater_supervisor.ex deleted file mode 100644 index 2e70b53a..00000000 --- a/lib/starknet_explorer/blockchain/updater_supervisor.ex +++ /dev/null @@ -1,34 +0,0 @@ -defmodule StarknetExplorer.Blockchain.UpdaterSupervisor do - use Supervisor - alias StarknetExplorer.Blockchain.UpdaterWorker - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @impl true - def init(_init_arg) do - children = - if Application.get_env(:starknet_explorer, :env) == :prod do - [ - Supervisor.child_spec({UpdaterWorker, [network: :mainnet, name: :updater_mainnet]}, - id: "updater_mainnet" - ), - Supervisor.child_spec({UpdaterWorker, [network: :testnet, name: :updater_testnet]}, - id: "updater_testnet" - ), - Supervisor.child_spec({UpdaterWorker, [network: :testnet2, name: :updater_testnet2]}, - id: "updater_testnet2" - ) - ] - else - [ - Supervisor.child_spec({UpdaterWorker, [network: :mainnet, name: :updater_mainnet]}, - id: "updater_mainnet" - ) - ] - end - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/starknet_explorer/blockchain/updater_worker.ex b/lib/starknet_explorer/blockchain/updater_worker.ex deleted file mode 100644 index 1fa527b7..00000000 --- a/lib/starknet_explorer/blockchain/updater_worker.ex +++ /dev/null @@ -1,172 +0,0 @@ -defmodule StarknetExplorer.Blockchain.UpdaterWorker do - require Logger - use GenServer - alias StarknetExplorer.TransactionReceipt - alias StarknetExplorer.Rpc - alias StarknetExplorer.Block - alias StarknetExplorer.Gateway - alias StarknetExplorer.Repo - alias StarknetExplorer.Blockchain.UpdaterWorker - defstruct [:network] - - @fetch_timer :timer.seconds(String.to_integer(System.get_env("UPDATER_FETCH_TIMER", "10"), 10)) - @limit 10 - - def start_link([network: _network, name: name] = arg) do - GenServer.start_link(__MODULE__, arg, name: name) - end - - ## Callbacks - - @impl true - def init([network: network, name: _name] = _args) do - state = %UpdaterWorker{ - network: network - } - - Process.send_after(self(), :update, @fetch_timer) - Logger.info("Updater enabled") - {:ok, state} - end - - @impl true - def handle_info(:update, state = %UpdaterWorker{network: network}) do - Logger.info("Updating...") - # Get whatever we need to update. - # Blocks: - # - if gas_fee or resources empty -> try update - # - if status not "accepted on l1" -> try update - # Transaction receipt: - # - if status not "accepted on l1" OR "reverted" -> try update - # - If transaction.status != "reverted" and transaction.receipt == None -> try update - - # REVISIT: - # LAST THREE CAN BE MERGED. - # IF A BLOCK IS FINALIZED -> TX SHOULD BE FINALIZED STATUS -> RECEIPT SHOULD EXIST OR NOT - - # - if gas_fee or resources empty -> try update - StarknetExplorer.Block.get_with_missing_gas_fees_or_resources(@limit, network) - |> tasks_for_fee_fetching(network) - |> Task.await_many() - |> Enum.map(&do_db_update_fee_and_resources(&1, network)) - - # - if status not "accepted on l1" -> try update - # We are retrieving from oldest to newest. - StarknetExplorer.Block.get_with_not_finalized_blocks(@limit, network) - |> tasks_for_not_finalized_blocks(network) - # |> Task.await_many() - |> Enum.map(&do_db_update_block_status(&1, network)) - - # - if status not "accepted on l1" OR "reverted" -> try update - StarknetExplorer.TransactionReceipt.get_status_not_finalized(@limit, network) - |> tasks_for_not_finalized_transactions(network) - |> Task.await_many() - |> Enum.map(&do_db_update_transaction_status(&1, network)) - - # - If transaction.receipt == None -> try update - StarknetExplorer.Transaction.get_missing_tx_receipt(@limit, network) - |> tasks_for_unexisting_receipts(network) - |> Task.await_many() - |> Enum.map(&do_db_insert_receipts(&1, network)) - - Process.send_after(self(), :update, @fetch_timer) - {:noreply, state} - end - - defp tasks_for_unexisting_receipts(transactions, network) do - Enum.map(transactions, &transaction_receipts_fetch_task(&1, network)) - end - - defp transaction_receipts_fetch_task(transaction = %StarknetExplorer.Transaction{}, network) do - Task.async(fn -> Rpc.get_transaction_receipt(transaction.hash, network) end) - end - - defp do_db_insert_receipts( - {:ok, tx_receipt}, - network - ) do - TransactionReceipt.from_rpc_tx(tx_receipt |> Map.put("network", network)) |> Repo.insert() - end - - defp do_db_insert_receipts({{:error, reason}, _tx_hash}, _network) do - Logger.error("Transaction receipt fetch failed with error: #{reason}") - end - - defp tasks_for_not_finalized_transactions(transactions, network) do - Enum.map(transactions, &transaction_fetch_task(&1, network)) - end - - defp transaction_fetch_task(transaction = %StarknetExplorer.TransactionReceipt{}, network) do - Task.async(fn -> - {Rpc.get_transaction_receipt(transaction.transaction_hash, network), - transaction.transaction_hash} - end) - end - - defp do_db_update_transaction_status( - {{:ok, - %{"finality_status" => finality_status, "execution_status" => execution_status} = - _status}, tx_hash}, - network - ) do - TransactionReceipt.update_transaction_status( - tx_hash, - finality_status, - execution_status, - network - ) - end - - defp do_db_update_transaction_status({{:error, reason}, _tx_hash}, _network) do - Logger.error("Transaction status fetch failed with error: #{reason["message"]}") - end - - defp tasks_for_not_finalized_blocks(blocks, network) do - Enum.map(blocks, fn block -> Rpc.get_block_by_number(block.number, network) end) - end - - defp do_db_update_block_status( - {:ok, %{"block_number" => block_number, "status" => status} = _block}, - network - ) do - Block.update_block_status(block_number, status, network) - end - - defp do_db_update_block_status({:error, reason}, _network) do - Logger.error("Block fetch failed with error: #{reason}") - end - - defp tasks_for_fee_fetching(blocks, network) when is_list(blocks) do - Enum.map(blocks, &fee_fetch_task(&1, network)) - end - - defp fee_fetch_task(block = %StarknetExplorer.Block{}, network) do - Task.async(fn -> fetch_gas_fee_and_resources(block, network) end) - end - - defp fetch_gas_fee_and_resources(block = %StarknetExplorer.Block{}, network) do - case Gateway.fetch_block(block.number, network) do - {:ok, gateway_block = %{"gas_price" => gas_price}} -> - execution_resources = - StarknetExplorer.BlockUtils.calculate_gateway_block_steps(gateway_block) - - {:ok, gas_price, execution_resources, block.number} - - err -> - {err, block.number} - end - end - - defp do_db_update_fee_and_resources( - {:ok, gas_price, execution_resources, block_number}, - network - ) do - Block.update_block_gas_and_resources(block_number, gas_price, execution_resources, network) - end - - defp do_db_update_fee_and_resources({{:error, reason}, block_number}, network) do - Logger.error( - "Error fetching gas for block #{block_number}: #{inspect(reason)}, in network #{network}" - ) - end -end diff --git a/lib/starknet_explorer/cache.ex b/lib/starknet_explorer/cache.ex new file mode 100644 index 00000000..227a5541 --- /dev/null +++ b/lib/starknet_explorer/cache.ex @@ -0,0 +1,44 @@ +defmodule StarknetExplorer.IndexCache do + use Agent + + def start_link(_) do + blocks_mainnet = StarknetExplorer.Data.many_blocks("mainnet") + blocks_testnet = StarknetExplorer.Data.many_blocks("testnet") + blocks_testnet2 = StarknetExplorer.Data.many_blocks("testnet2") + + Agent.start_link( + fn -> + %{ + "mainnet" => blocks_mainnet, + "testnet" => blocks_testnet, + "testnet2" => blocks_testnet2 + } + end, + name: __MODULE__ + ) + end + + def latest_blocks(network) do + Agent.get(__MODULE__, fn state -> state[Atom.to_string(network)] end) + end + + def add_block(block_number, network) do + Agent.update(__MODULE__, fn state -> + ## We fetch from the database here just to make the map keys atoms instead of strings + ## Yes, really. + network_string = Atom.to_string(network) + block = block_number |> StarknetExplorer.Block.get_by_num(network_string) + new_blocks = state[network_string] |> List.insert_at(0, block) |> maybe_delete_last_block() + + Map.put(state, network_string, new_blocks) + end) + end + + defp maybe_delete_last_block(blocks) do + if length(blocks) > 15 do + List.delete_at(blocks, length(blocks) - 1) + else + blocks + end + end +end diff --git a/lib/starknet_explorer/counts.ex b/lib/starknet_explorer/counts.ex new file mode 100644 index 00000000..738c53bc --- /dev/null +++ b/lib/starknet_explorer/counts.ex @@ -0,0 +1,53 @@ +defmodule StarknetExplorer.Counts do + use Ecto.Schema + import Ecto.Query + alias StarknetExplorer.Events + alias StarknetExplorer.Message + alias StarknetExplorer.Transaction + alias StarknetExplorer.{Counts, BlockUtils} + alias StarknetExplorer.Repo + + @primary_key {:network, :string, autogenerate: false} + schema "counts" do + field :blocks, :integer + field :transactions, :integer + field :messages, :integer + field :events, :integer + end + + def get(network) when is_atom(network) do + from(count in __MODULE__, where: count.network == ^Atom.to_string(network)) + |> Repo.one() + end + + def get(network) do + from(count in __MODULE__, where: count.network == ^network) + |> Repo.one() + end + + def insert_or_update(network) do + {:ok, blocks} = BlockUtils.block_height(network) + transactions = Transaction.get_total_count(network) + messages = Message.get_total_count(network) + events = Events.get_total_count(network) + + case Repo.get_by(Counts, network: Atom.to_string(network)) do + # Count exists, let's use it + %Counts{} = count -> + count + + # Count not found, we build one + nil -> + %Counts{ + network: Atom.to_string(network) + } + end + |> Ecto.Changeset.change( + blocks: blocks, + transactions: transactions, + messages: messages, + events: events + ) + |> Repo.insert_or_update() + end +end diff --git a/lib/starknet_explorer/data.ex b/lib/starknet_explorer/data.ex index 1bb2882e..c3d80c7c 100644 --- a/lib/starknet_explorer/data.ex +++ b/lib/starknet_explorer/data.ex @@ -235,11 +235,19 @@ defmodule StarknetExplorer.Data do [] end - def get_entity_count(_network) do - Map.new() - # |> Map.put(:message_count, Message.get_total_count(network)) - |> Map.put(:message_count, 21021) - |> Map.put(:events_count, 6_030_904) - |> Map.put(:transaction_count, 1_209_670) + def get_entity_count(network) do + counts = StarknetExplorer.Counts.get(network) + + if counts do + Map.new() + |> Map.put(:message_count, counts.messages) + |> Map.put(:events_count, counts.events) + |> Map.put(:transaction_count, counts.transactions) + else + Map.new() + |> Map.put(:message_count, 0) + |> Map.put(:events_count, 0) + |> Map.put(:transaction_count, 0) + end end end diff --git a/lib/starknet_explorer/transaction.ex b/lib/starknet_explorer/transaction.ex index 7d617e14..c2d77625 100644 --- a/lib/starknet_explorer/transaction.ex +++ b/lib/starknet_explorer/transaction.ex @@ -77,8 +77,8 @@ defmodule StarknetExplorer.Transaction do @networks [:mainnet, :testnet, :testnet2] + @primary_key {:hash, :string, []} schema "transactions" do - field :hash, :string field :constructor_calldata, {:array, :string} field :class_hash, :string field :type, :string diff --git a/lib/starknet_explorer/transaction_receipt.ex b/lib/starknet_explorer/transaction_receipt.ex index a22c7d24..5b969960 100644 --- a/lib/starknet_explorer/transaction_receipt.ex +++ b/lib/starknet_explorer/transaction_receipt.ex @@ -89,9 +89,9 @@ defmodule StarknetExplorer.TransactionReceipt do @fields @invoke_tx_receipt_fields ++ @l1_receipt_handler ++ @declare_tx_receipt ++ @deploy_account_tx_receipt ++ [:network, :execution_resources] + @primary_key {:transaction_hash, :string, []} schema "transaction_receipts" do - belongs_to :transaction, Transaction - field :transaction_hash + belongs_to :transaction, Transaction, references: :hash field :type, :string field :actual_fee, :string field :finality_status, :string @@ -143,12 +143,12 @@ defmodule StarknetExplorer.TransactionReceipt do Repo.all(query) end - def get_status_not_finalized(limit \\ 10, network) do + def get_status_not_finalized(block_number, network) do query = from t in TransactionReceipt, where: t.finality_status != "ACCEPTED_ON_L1" and t.execution_status == "SUCCEEDED", - where: t.network == ^network, - limit: ^limit + # TODO add index in block_number. + where: t.network == ^network and t.block_number == ^block_number Repo.all(query) end diff --git a/lib/starknet_explorer_web/live/message_live.ex b/lib/starknet_explorer_web/live/message_live.ex index d9264878..20e447b2 100644 --- a/lib/starknet_explorer_web/live/message_live.ex +++ b/lib/starknet_explorer_web/live/message_live.ex @@ -23,7 +23,6 @@ defmodule StarknetExplorerWeb.MessageDetailLive do def mount(_params = %{"identifier" => hash}, _session, socket) do message = Message.get_by_hash(hash, socket.assigns.network) - IO.inspect(message) message = case Message.is_l2_to_l1(message) do diff --git a/lib/starknet_explorer_web/live/pages/home/index.ex b/lib/starknet_explorer_web/live/pages/home/index.ex index 8f163f02..ac537278 100644 --- a/lib/starknet_explorer_web/live/pages/home/index.ex +++ b/lib/starknet_explorer_web/live/pages/home/index.ex @@ -2,6 +2,7 @@ defmodule StarknetExplorerWeb.HomeLive.Index do alias StarknetExplorerWeb.Component.TransactionsPerSecond, as: TPSComponent alias StarknetExplorerWeb.CoreComponents alias StarknetExplorerWeb.Utils + alias StarknetExplorer.IndexCache use Phoenix.Component use StarknetExplorerWeb, :live_view @@ -258,7 +259,12 @@ defmodule StarknetExplorerWeb.HomeLive.Index do end def load_blocks(socket) do - blocks = StarknetExplorer.Data.many_blocks(socket.assigns.network) + blocks = + if length(IndexCache.latest_blocks(socket.assigns.network)) < 15 do + StarknetExplorer.Data.many_blocks(socket.assigns.network) + else + IndexCache.latest_blocks(socket.assigns.network) + end case List.first(blocks) do nil -> @@ -291,19 +297,7 @@ defmodule StarknetExplorerWeb.HomeLive.Index do end) |> Map.new() - max_block_height = - case StarknetExplorer.Blockchain.ListenerWorker.get_height( - StarknetExplorer.Utils.listener_atom(socket.assigns.network) - ) do - {:ok, max_block_height} -> - max_block_height - - {:err, _} -> - {:ok, max_block_height} = - StarknetExplorer.Rpc.get_block_height(socket.assigns.network) - - max_block_height - end + {:ok, max_block_height} = StarknetExplorer.BlockUtils.block_height(socket.assigns.network) assign(socket, blocks: blocks, diff --git a/lib/starknet_rpc.ex b/lib/starknet_rpc.ex index e0c24503..5b966409 100644 --- a/lib/starknet_rpc.ex +++ b/lib/starknet_rpc.ex @@ -95,20 +95,13 @@ defmodule StarknetExplorer.Rpc do end defp send_request_no_cache(method, args, network) - when network in [:testnet, :testnet2] do + when network in [:mainnet, :testnet, :testnet2] do payload = build_payload(method, args) host = fetch_rpc_host(network) {:ok, rsp} = post(host, payload) handle_response(rsp) end - defp send_request_no_cache(method, args, _network) do - payload = build_payload(method, args) - host = fetch_rpc_host(:mainnet) - {:ok, rsp} = post(host, payload) - handle_response(rsp) - end - defp fetch_rpc_host(:mainnet), do: Application.fetch_env!(:starknet_explorer, :rpc_host) defp fetch_rpc_host(:testnet), do: Application.fetch_env!(:starknet_explorer, :testnet_host) defp fetch_rpc_host(:testnet2), do: Application.fetch_env!(:starknet_explorer, :testnet_2_host) diff --git a/mix.exs b/mix.exs index 25fe987c..0beff287 100644 --- a/mix.exs +++ b/mix.exs @@ -31,13 +31,6 @@ defmodule StarknetExplorer.MixProject do # # Type `mix help deps` for examples and options. defp deps do - db_adapter = - if System.get_env("DB_TYPE") == "postgresql" do - [{:postgrex, ">= 0.0.0"}] - else - [{:ecto_sqlite3, ">= 0.0.0"}] - end - [ {:phoenix, "~> 1.7.6"}, {:phoenix_ecto, "~> 4.4"}, @@ -69,8 +62,10 @@ defmodule StarknetExplorer.MixProject do {:sentry, "~> 8.0"}, {:etop, "~> 0.7"}, {:rexbug, ">= 1.0.0"}, - {:eep, github: "virtan/eep"} - ] ++ db_adapter + {:eep, github: "virtan/eep"}, + {:postgrex, ">= 0.0.0"}, + {:ecto_sqlite3, ">= 0.0.0"} + ] end # Aliases are shortcuts or tasks specific to the current project. diff --git a/priv/repo/migrations/20230704215332_transactions.exs b/priv/repo/migrations/20230704215332_transactions.exs index 94f4ba49..4240eb6a 100644 --- a/priv/repo/migrations/20230704215332_transactions.exs +++ b/priv/repo/migrations/20230704215332_transactions.exs @@ -2,7 +2,7 @@ defmodule StarknetExplorer.Repo.Migrations.Transactions do use Ecto.Migration def change do - create table("transactions") do + create table("transactions", primary_key: false) do add :block_number, references("blocks", on_delete: :delete_all, @@ -22,7 +22,7 @@ defmodule StarknetExplorer.Repo.Migrations.Transactions do add :compiled_class_hash, :string add :sender_address, :string add :entry_point_selector, :string - add :hash, :string + add :hash, :string, primary_key: true add :max_fee, :string add :nonce, :string add :signature, {:array, :string} diff --git a/priv/repo/migrations/20230707152800_transaction_receipts.exs b/priv/repo/migrations/20230707152800_transaction_receipts.exs index d4320997..7cf3f4d9 100644 --- a/priv/repo/migrations/20230707152800_transaction_receipts.exs +++ b/priv/repo/migrations/20230707152800_transaction_receipts.exs @@ -2,10 +2,11 @@ defmodule StarknetExplorer.Repo.Migrations.TransactionReceipts do use Ecto.Migration def change do - create table("transaction_receipts") do + create table("transaction_receipts", primary_key: false) do add :transaction_id, - references(:transactions) + references(:transactions, column: :hash, type: :string) + add :transaction_hash, :string, primary_key: true add :type, :string add :actual_fee, :string add :finality_status, :string diff --git a/priv/repo/migrations/20230711175239_add_receipts_missing_fields.exs b/priv/repo/migrations/20230711175239_add_receipts_missing_fields.exs index 5ab18835..5358960d 100644 --- a/priv/repo/migrations/20230711175239_add_receipts_missing_fields.exs +++ b/priv/repo/migrations/20230711175239_add_receipts_missing_fields.exs @@ -3,7 +3,6 @@ defmodule StarknetExplorer.Repo.Migrations.AddReceiptsMissingFields do def change do alter table("transaction_receipts") do - add :transaction_hash, :string remove :messages end end diff --git a/priv/repo/migrations/20230927174125_add_entity_count_table.exs b/priv/repo/migrations/20230927174125_add_entity_count_table.exs new file mode 100644 index 00000000..9fac06cb --- /dev/null +++ b/priv/repo/migrations/20230927174125_add_entity_count_table.exs @@ -0,0 +1,13 @@ +defmodule StarknetExplorer.Repo.Migrations.AddEntityCountTable do + use Ecto.Migration + + def change do + create table("counts", primary_key: false) do + add :blocks, :integer + add :transactions, :integer + add :messages, :integer + add :events, :integer + add :network, :string, primary_key: true, null: false + end + end +end