Skip to content

Commit

Permalink
Restructure _handle_escrow_validations as _handle_escrow_validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Bobronium committed Sep 25, 2024
1 parent 45e44fa commit 3d3f843
Showing 1 changed file with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _download_with_retries(
return None


def _handle_completed_escrow(
def _export_escrow_annotations(
logger: logging.Logger,
chain_id: int,
escrow_address: str,
Expand Down Expand Up @@ -234,35 +234,33 @@ def _download_job_annotations(
return job_annotations


def _handle_escrow_validations(
def _handle_escrow_validation(
logger: logging.Logger,
escrow_validations: Sequence[tuple[str, str, int]],
handled_validations: set[str],
):
for validation_id, escrow_address, chain_id in escrow_validations:
escrow_address: str,
chain_id: int,
) -> bool:
try:
validate_escrow(chain_id, escrow_address)
except Exception as e:
logger.exception(f"Failed to handle completed projects for escrow {escrow_address}: {e}")
return False

# Need to work in separate transactions for each escrow, as a failing DB call
# (e.g. a failed lock attempt) will abort the transaction. A nested transaction
# can also be used for handling this.
with SessionLocal.begin() as session:
try:
validate_escrow(chain_id, escrow_address)
except Exception as e:
logger.exception(
f"Failed to handle completed projects for escrow {escrow_address}: {e}"
escrow_projects = cvat_service.get_projects_by_escrow_address(
session, escrow_address, limit=None, for_update=ForUpdateParams(nowait=True)
)
continue
except sa_errors.OperationalError as ex:
if isinstance(ex.orig, db_errors.LockNotAvailable):
return False
raise

# Need to work in separate transactions for each escrow, as a failing DB call
# (e.g. a failed lock attempt) will abort the transaction. A nested transaction
# can also be used for handling this.
with SessionLocal.begin() as session:
try:
escrow_projects = cvat_service.get_projects_by_escrow_address(
session, escrow_address, limit=None, for_update=ForUpdateParams(nowait=True)
)
except sa_errors.OperationalError as ex:
if isinstance(ex.orig, db_errors.LockNotAvailable):
continue
raise
_export_escrow_annotations(logger, chain_id, escrow_address, escrow_projects, session)

_handle_completed_escrow(logger, chain_id, escrow_address, escrow_projects, session)
handled_validations.add(validation_id)
return True


def handle_escrows_validations(logger: logging.Logger) -> None:
Expand All @@ -275,7 +273,9 @@ def handle_escrows_validations(logger: logging.Logger) -> None:
unhandled_validations = set(validation_id for validation_id, _, __ in escrow_validations)
handled_validations = set()
try:
_handle_escrow_validations(logger, escrow_validations, handled_validations)
for validation_id, escrow_address, chain_id in escrow_validations:
if _handle_escrow_validation(logger, escrow_address, chain_id):
handled_validations.add(validation_id)
finally:
with SessionLocal.begin() as session:
cvat_service.update_escrow_validation_statuses_by_ids(
Expand Down

0 comments on commit 3d3f843

Please sign in to comment.