Skip to content

Commit

Permalink
Merge pull request #1138 from BoostryJP/fix/#1137_22_3
Browse files Browse the repository at this point in the history
fix: [BUG] processing block is skipped when web3 service is not available during Indexer execution
  • Loading branch information
YoshihitoAso authored May 17, 2022
2 parents 5aede17 + bc3bac3 commit 98ce4bb
Show file tree
Hide file tree
Showing 17 changed files with 2,154 additions and 257 deletions.
8 changes: 4 additions & 4 deletions app/contracts/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from eth_utils import to_checksum_address
from web3 import contract
from web3.exceptions import BadFunctionCallOutput
from web3.exceptions import BadFunctionCallOutput, ABIFunctionNotFound

from app.utils.web3_utils import Web3Wrapper

Expand Down Expand Up @@ -104,14 +104,14 @@ def call_function(contract: contract,
:param default_returns: Default return when BadFunctionCallOutput is raised
:return: Return from function or default return
"""
_function = getattr(contract.functions, function_name)

try:
_function = getattr(contract.functions, function_name)
result = _function(*args).call()
except BadFunctionCallOutput:
except (BadFunctionCallOutput, ABIFunctionNotFound) as web3_exception:
if default_returns is not None:
return default_returns
else:
raise BadFunctionCallOutput
raise web3_exception

return result
24 changes: 20 additions & 4 deletions batch/indexer_Consume_Coupon.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from web3.exceptions import ABIEventFunctionNotFound

path = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(path)
Expand Down Expand Up @@ -58,9 +59,9 @@

class Processor:
"""Processor for indexing coupon consumption events"""
latest_block = 0

def __init__(self):
self.latest_block = web3.eth.blockNumber
self.token_list = []

@staticmethod
Expand All @@ -69,6 +70,8 @@ def __get_db_session():

def initial_sync(self):
local_session = self.__get_db_session()
latest_block_at_start = self.latest_block
self.latest_block = web3.eth.blockNumber
try:
self.__get_token_list(local_session)

Expand All @@ -90,21 +93,26 @@ def initial_sync(self):
block_from=_from_block,
block_to=self.latest_block
)
local_session.commit()
else:
self.__sync_all(
db_session=local_session,
block_from=_from_block,
block_to=self.latest_block
)
local_session.commit()
local_session.commit()
except Exception as e:
LOG.exception("An exception occurred during event synchronization")
local_session.rollback()
self.latest_block = latest_block_at_start
raise e
finally:
local_session.close()

LOG.info(f"<{process_name}> Initial sync has been completed")

def sync_new_logs(self):
local_session = self.__get_db_session()
latest_block_at_start = self.latest_block
try:
self.__get_token_list(local_session)

Expand All @@ -119,6 +127,11 @@ def sync_new_logs(self):
)
self.latest_block = blockTo
local_session.commit()
except Exception as e:
LOG.exception("An exception occurred during event synchronization")
local_session.rollback()
self.latest_block = latest_block_at_start
raise e
finally:
local_session.close()

Expand Down Expand Up @@ -154,6 +167,9 @@ def __sync_consume(self, db_session: Session, block_from: int, block_to: int):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
args = event["args"]
transaction_hash = event["transactionHash"].hex()
Expand All @@ -176,7 +192,7 @@ def __sync_consume(self, db_session: Session, block_from: int, block_to: int):
block_timestamp=block_timestamp
)
except Exception as e:
LOG.exception(e)
raise e

@staticmethod
def __sink_on_consume_coupon(db_session: Session,
Expand Down
46 changes: 39 additions & 7 deletions batch/indexer_DEX.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from web3.exceptions import ABIEventFunctionNotFound

path = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(path)
Expand Down Expand Up @@ -61,6 +62,7 @@

class Processor:
"""Processor for indexing IbetExchange events"""
latest_block = 0

def __init__(self):
self.exchange_list = []
Expand Down Expand Up @@ -92,14 +94,15 @@ def __init__(self):
IBET_SHARE_EXCHANGE_CONTRACT_ADDRESS
)
self.exchange_list.append(share_exchange_contract)
self.latest_block = web3.eth.blockNumber

@staticmethod
def __get_db_session():
return Session(autocommit=False, autoflush=True, bind=db_engine)

def initial_sync(self):
local_session = self.__get_db_session()
latest_block_at_start = self.latest_block
self.latest_block = web3.eth.blockNumber
try:
# Synchronize 1,000,000 blocks each
_to_block = 999999
Expand All @@ -125,12 +128,18 @@ def initial_sync(self):
block_to=self.latest_block
)
local_session.commit()
except Exception as e:
LOG.exception("An exception occurred during event synchronization")
local_session.rollback()
self.latest_block = latest_block_at_start
raise e
finally:
local_session.close()
LOG.info(f"<{process_name}> Initial sync has been completed")

def sync_new_logs(self):
local_session = self.__get_db_session()
latest_block_at_start = self.latest_block
try:
blockTo = web3.eth.blockNumber
if blockTo == self.latest_block:
Expand All @@ -142,6 +151,11 @@ def sync_new_logs(self):
)
self.latest_block = blockTo
local_session.commit()
except Exception as e:
LOG.exception("An exception occurred during event synchronization")
local_session.rollback()
self.latest_block = latest_block_at_start
raise e
finally:
local_session.close()

Expand All @@ -161,6 +175,9 @@ def __sync_new_order(self, db_session: Session, block_from: int, block_to: int):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
args = event["args"]
if args["price"] > sys.maxsize or args["amount"] > sys.maxsize:
Expand Down Expand Up @@ -194,7 +211,7 @@ def __sync_new_order(self, db_session: Session, block_from: int, block_to: int):
order_timestamp=order_timestamp
)
except Exception as e:
LOG.exception(e)
raise e

def __sync_cancel_order(self, db_session: Session, block_from, block_to):
for exchange_contract in self.exchange_list:
Expand All @@ -203,14 +220,17 @@ def __sync_cancel_order(self, db_session: Session, block_from, block_to):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
self.__sink_on_cancel_order(
db_session=db_session,
exchange_address=exchange_contract.address,
order_id=event["args"]["orderId"]
)
except Exception as e:
LOG.exception(e)
raise e

def __sync_force_cancel_order(self, db_session: Session, block_from, block_to):
for exchange_contract in self.exchange_list:
Expand All @@ -219,14 +239,17 @@ def __sync_force_cancel_order(self, db_session: Session, block_from, block_to):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
self.__sink_on_force_cancel_order(
db_session=db_session,
exchange_address=exchange_contract.address,
order_id=event["args"]["orderId"]
)
except Exception as e:
LOG.exception(e)
raise e

def __sync_agree(self, db_session: Session, block_from, block_to):
for exchange_contract in self.exchange_list:
Expand All @@ -235,6 +258,9 @@ def __sync_agree(self, db_session: Session, block_from, block_to):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
args = event["args"]
if args["amount"] > sys.maxsize:
Expand Down Expand Up @@ -269,7 +295,7 @@ def __sync_agree(self, db_session: Session, block_from, block_to):
agreement_timestamp=agreement_timestamp
)
except Exception as e:
LOG.exception(e)
raise e

def __sync_settlement_ok(self, db_session: Session, block_from, block_to):
for exchange_contract in self.exchange_list:
Expand All @@ -278,6 +304,9 @@ def __sync_settlement_ok(self, db_session: Session, block_from, block_to):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
args = event["args"]
settlement_timestamp = datetime.fromtimestamp(
Expand All @@ -292,7 +321,7 @@ def __sync_settlement_ok(self, db_session: Session, block_from, block_to):
settlement_timestamp=settlement_timestamp
)
except Exception as e:
LOG.exception(e)
raise e

def __sync_settlement_ng(self, db_session: Session, block_from, block_to):
for exchange_contract in self.exchange_list:
Expand All @@ -301,6 +330,9 @@ def __sync_settlement_ng(self, db_session: Session, block_from, block_to):
fromBlock=block_from,
toBlock=block_to
)
except ABIEventFunctionNotFound:
events = []
try:
for event in events:
args = event["args"]
self.__sink_on_settlement_ng(
Expand All @@ -310,7 +342,7 @@ def __sync_settlement_ng(self, db_session: Session, block_from, block_to):
agreement_id=args["agreementId"]
)
except Exception as e:
LOG.exception(e)
raise e

def __sink_on_new_order(self,
db_session: Session,
Expand Down
Loading

0 comments on commit 98ce4bb

Please sign in to comment.