Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer FPRule counter updates during normalization #2007

Open
wants to merge 2 commits into
base: qa/1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions src/MCPClient/lib/clientScripts/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
import shutil
import traceback
import uuid
from collections import defaultdict
from types import TracebackType
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Optional
from typing import Type

import django
import transcoder
Expand All @@ -20,17 +25,21 @@
import databaseFunctions
import fileOperations
from client.job import Job
from custom_handlers import get_script_logger
from dicts import ReplacementDict
from django.conf import settings as mcpclient_settings
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import F
from fpr.models import FPRule
from lib import setup_dicts
from main.models import Derivation
from main.models import File
from main.models import FileFormatVersion
from main.models import FileID

logger = get_script_logger("archivematica.mcp.client.normalize")

# Return codes
SUCCESS = 0
RULE_FAILED = 1
Expand Down Expand Up @@ -355,7 +364,59 @@
return FPRule.active.get(purpose="default_" + purpose)


def main(job: Job, opts: NormalizeArgs) -> int:
class DeferredFPRuleCounter:
"""Deferred counter for FPRule attempts, successes, and failures.

This class postpones database writes to aggregate updates and minimize the
duration of transactions, which is beneficial when dealing with long-running
batches.
"""

def __init__(self) -> None:
self._counters: DefaultDict[uuid.UUID, Dict[str, int]] = defaultdict(
lambda: {"count_attempts": 0, "count_okay": 0, "count_not_okay": 0}
)

def __enter__(self) -> "DeferredFPRuleCounter":
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
try:
self.save()
except Exception as err:
logger.error("Failed to save counters: %s", err, exc_info=True)

Check warning on line 392 in src/MCPClient/lib/clientScripts/normalize.py

View check run for this annotation

Codecov / codecov/patch

src/MCPClient/lib/clientScripts/normalize.py#L391-L392

Added lines #L391 - L392 were not covered by tests

def record_attempt(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_attempts"] += 1

def record_success(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_okay"] += 1

def record_failure(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_not_okay"] += 1

def save(self) -> None:
"""Persist all aggregated FPRule counters in a single transaction.

This method updates the success and failure rates of FPRules by
incrementing their respective counters. It uses Django's F() expressions
to ensure atomic updates and prevent race conditions.
"""
with transaction.atomic():
for fprule_id, increments in self._counters.items():
FPRule.objects.filter(uuid=fprule_id).update(
count_attempts=F("count_attempts") + increments["count_attempts"],
count_okay=F("count_okay") + increments["count_okay"],
count_not_okay=F("count_not_okay") + increments["count_not_okay"],
)


def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int:
"""Find and execute normalization commands on input file."""
# TODO fix for maildir working only on attachments

Expand Down Expand Up @@ -489,7 +550,13 @@

replacement_dict = get_replacement_dict(job, opts)
cl = transcoder.CommandLinker(
job, rule, command, replacement_dict, opts, once_normalized_callback(job)
job,
rule,
command,
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

Expand All @@ -506,8 +573,8 @@
if (
exitstatus != 0
and opts.purpose in ("access", "thumbnail")
and cl.commandObject.output_location
and (not os.path.isfile(cl.commandObject.output_location))
and cl.output_location
and (not os.path.isfile(cl.output_location))
):
# Fall back to default rule
try:
Expand Down Expand Up @@ -540,13 +607,14 @@
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

# Store thumbnails locally for use during AIP searches
# TODO is this still needed, with the storage service?
if "thumbnail" in opts.purpose:
thumbnail_filepath = cl.commandObject.output_location
thumbnail_filepath = cl.output_location
thumbnail_storage_dir = os.path.join(
mcpclient_settings.SHARED_DIRECTORY, "www", "thumbnails", opts.sip_uuid
)
Expand Down Expand Up @@ -611,7 +679,7 @@
def call(jobs: List[Job]) -> None:
parser = get_parser()

with transaction.atomic():
with DeferredFPRuleCounter() as counter, transaction.atomic():
for job in jobs:
with job.JobContext():
opts = parse_args(parser, job)
Expand All @@ -625,7 +693,7 @@
continue

try:
job.set_status(main(job, opts))
job.set_status(main(job, opts, counter))
except Exception as e:
job.print_error(str(e))
job.set_status(1)
33 changes: 14 additions & 19 deletions src/MCPClient/lib/clientScripts/transcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#
# You should have received a copy of the GNU General Public License
# along with Archivematica. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import F
from executeOrRunSubProcess import executeOrRun


Expand Down Expand Up @@ -109,32 +108,28 @@


class CommandLinker:
def __init__(self, job, fprule, command, replacement_dict, opts, on_success):
def __init__(
self, job, fprule, command, replacement_dict, opts, on_success, counter
):
self.fprule = fprule
self.command = command
self.replacement_dict = replacement_dict
self.opts = opts
self.on_success = on_success
self.commandObject = Command(
job, self.command, replacement_dict, self.on_success, opts
)
self.cmd = Command(job, command, replacement_dict, on_success, opts)
self.counter = counter

def __str__(self):
return (
f"[Command Linker] FPRule: {self.fprule.uuid} Command: {self.commandObject}"
)
return f"[Command Linker] FPRule: {self.fprule.uuid} Command: {self.cmd}"

Check warning on line 119 in src/MCPClient/lib/clientScripts/transcoder.py

View check run for this annotation

Codecov / codecov/patch

src/MCPClient/lib/clientScripts/transcoder.py#L119

Added line #L119 was not covered by tests

@property
def output_location(self):
return self.cmd.output_location

def execute(self):
"""Execute the command, and track the success statistics.

Returns 0 on success, non-0 on failure."""
# Track success/failure rates of FP Rules
# Use Django's F() to prevent race condition updating the counts
self.fprule.count_attempts = F("count_attempts") + 1
ret = self.commandObject.execute()
self.counter.record_attempt(self.fprule)
ret = self.cmd.execute()
if ret:
self.fprule.count_not_okay = F("count_not_okay") + 1
self.counter.record_failure(self.fprule)
else:
self.fprule.count_okay = F("count_okay") + 1
self.fprule.save()
self.counter.record_success(self.fprule)
return ret
27 changes: 18 additions & 9 deletions tests/MCPClient/test_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def test_normalization_fails_if_original_file_does_not_exist() -> None:
job = mock.Mock(spec=Job)
opts = mock.Mock(file_uuid=file_uuid)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.NO_RULE_FOUND
job.print_error.assert_called_once_with(
Expand All @@ -67,7 +68,8 @@ def test_normalization_skips_submission_documentation_file_if_group_use_does_not
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand All @@ -93,7 +95,8 @@ def test_normalization_skips_file_if_group_use_does_not_match(
normalize_file_grp_use="access",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand Down Expand Up @@ -178,7 +181,8 @@ def test_manual_normalization_creates_event_and_derivation(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand Down Expand Up @@ -250,7 +254,8 @@ def test_manual_normalization_fails_with_invalid_normalization_csv(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.NO_RULE_FOUND
assert job.print_error.mock_calls == [
Expand Down Expand Up @@ -297,7 +302,8 @@ def test_manual_normalization_matches_by_filename_instead_of_normalization_csv(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_error.mock_calls == []
Expand Down Expand Up @@ -350,7 +356,8 @@ def test_manual_normalization_matches_from_multiple_filenames(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_error.mock_calls == []
Expand Down Expand Up @@ -413,7 +420,8 @@ def test_normalization_falls_back_to_default_rule(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
command_linker.assert_called_once()
Expand Down Expand Up @@ -474,7 +482,8 @@ def test_normalization_finds_rule_by_file_format_version(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
command_linker.assert_called_once()
Expand Down