Skip to content

Commit

Permalink
Decentralise duplicate detection
Browse files Browse the repository at this point in the history
Previously we detected duplicate samples upon job submission. This was a
very intricate process that covered two stages of detection: local
duplicates and other Peekaboo instances in a cluster analysing the same
sample concurrently. Apart from being hard to understand and maintain
this was inefficient for analyses which didn't involve any expensive
operations such as offloading a job to Cuckoo. This degraded into a
downright throughput bottleneck for analyses of large numbers (> 10000)
of nonidentical samples which are eventually ignored.

This change moves duplicate handling out of the queueing into a new
duplicate toolbox module. Duplicate detection is moved into individual
rules. Resubmission of withheld samples is done in the worker at the end
of ruleset processing after the processing result is saved to the
database.

Handling of local and cluster duplicates is stricly separated. While
that makes the actual code not much easier to understand and maintain,
the underlying concepts at least are somewhat untangled.

The cluster duplicate handler stays mostly the same, primarily
consisting of a coroutine which periodically tries to lock samples from
its backlog and then submit it to the local queue.

The local duplicate handler is now a distinct module very similar to the
cluster duplicate handler but doesn't need any repeated polling. Instead
potential duplicates are still resubmitted once a sample finishes
processing.

The cluster duplicate handler no longer directly interacts with the
local duplicate handler by putting samples from its backlog into their
backlog. Instead cluster duplicates are submitted to the local queue in
bulk and the duplicate handler is expected to either never come into
play again (because of the known rule and its cached previous analysis
result) or detect the local duplicates and put all but one of them into
its own backlog automatically.

This new design highlighted an additional point for optimisation: If a
sample can be locked by the cluster duplicate handler (i.e. is not
currently being processed by another instance) but we find siblings of
it in our own cluster duplicate backlog, then obviously this sample was
at an earlier point in time a cluster duplicate and withheld samples are
waiting for the next polling run to be resubmitted. In this case we
short-circuit the process from the cluster duplicate detection and
submit them to the job queue immediately.
  • Loading branch information
michaelweiser committed Mar 16, 2023
1 parent bdaf92d commit cc30b29
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 262 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ See documentation for details.

## devel

- Duplicate detection moved from job submit to long running rules, i.e. those
using Cuckoo and Cortex. This should improve throughput for analyses that
don't use these rules and avoids a massive performance degredation if a very
high number quick-to-analyse samples hits a PeekabooAV cluster.
- Samples now have an identity that includes sha256sum, declared name and type
as well as content disposition. This allows for more reliable and efficient
in-flight locking and cached result usage decisions. DB schema version raised
Expand Down
7 changes: 5 additions & 2 deletions peekaboo/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ async def async_main():
"interval to %d seconds.",
cldup_check_interval)

if not cldup_check_interval:
logger.debug("Disabling cluster duplicate handler.")

loop = asyncio.get_running_loop()
sig_handler = SignalHandler(loop)

Expand Down Expand Up @@ -383,8 +386,8 @@ async def async_main():
except asyncio.exceptions.CancelledError as error:
# cancellation is expected in the case of shutdown via signal handler
pass
except Exception:
logger.error("Shutting down due to unexpected exception")
except Exception as error:
logger.error("Shutting down due to unexpected exception: %s", error)

# trigger shutdowns of other components if not already ongoing triggered
# by the signal handler
Expand Down
263 changes: 8 additions & 255 deletions peekaboo/queuing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# #
###############################################################################

""" The main job queue with workers and a cluster duplicate handler. """
""" The main job queue with workers. """


import asyncio
Expand Down Expand Up @@ -64,20 +64,9 @@ def __init__(self, ruleset_config, db_con, analyzer_config,
self.worker_count = worker_count
self.threadpool = threadpool

# keep a backlog of samples with identities identical to samples
# currently in analysis to avoid analysing multiple identical samples
# simultaneously. Once one analysis has finished, we can submit the
# others and the ruleset will notice that we already know the result.
self.duplicates = {}
self.duplock = asyncio.Lock()

# keep a similar backlog of samples currently being processed by
# other instances so we can regularly try to resubmit them and re-use
# the other instances' cached results from the database
self.cluster_duplicates = {}

self.ruleset_engine = RulesetEngine(
ruleset_config, self, db_con, analyzer_config, threadpool)
ruleset_config, self, db_con, analyzer_config,
cluster_duplicate_check_interval, threadpool)

# we start these here because they do no lengthy init and starting can
# not fail. We need this here to avoid races in startup vs. shutdown by
Expand All @@ -90,25 +79,12 @@ def __init__(self, ruleset_config, db_con, analyzer_config,

logger.info('Created %d Workers.', self.worker_count)

self.cluster_duplicate_handler = None
if cluster_duplicate_check_interval:
logger.debug(
"Creating cluster duplicate handler with check "
"interval %d.", cluster_duplicate_check_interval)
self.cluster_duplicate_handler = ClusterDuplicateHandler(
self, cluster_duplicate_check_interval)
else:
logger.debug("Disabling cluster duplicate handler.")

async def start(self):
""" Start up the job queue including resource initialisation. """
awaitables = []
for worker in self.workers:
awaitables.append(await worker.start())

if self.cluster_duplicate_handler:
awaitables.append(await self.cluster_duplicate_handler.start())

# create a single ruleset engine for all workers, instantiates all the
# rules based on the ruleset configuration, may start up long-lived
# analyzer instances which are shared as well, is otherwise stateless
Expand All @@ -134,179 +110,9 @@ async def submit(self, sample):
exception.
@param sample: The Sample object to add to the queue.
@raises Full: if the queue is full.
"""
identity = await sample.identity
duplicate = None
cluster_duplicate = None
resubmit = None

# we have to lock this down because async routines called from here may
# allow us to be called again concurrently from the event loop
async with self.duplock:
# check if a sample with same identity is currently in flight
duplicates = self.duplicates.get(identity)
if duplicates is not None:
# we are regularly resubmitting samples, e.g. after we've
# noticed that cuckoo is finished analysing them. This
# obviously isn't a duplicate but continued processing of the
# same sample.
if duplicates['master'] == sample:
resubmit = sample.id
await self.jobs.put(sample)
else:
# record the to-be-submitted sample as duplicate and do
# nothing
duplicate = sample.id
duplicates['duplicates'].append(sample)
else:
# are we the first of potentially multiple instances working on
# this sample?
try:
locked = await self.db_con.mark_sample_in_flight(sample)
except PeekabooDatabaseError as dberr:
logger.error(dberr)
return False

if locked:
# initialise a per-duplicate backlog for this sample which
# also serves as in-flight marker and submit to queue
self.duplicates[identity] = {
'master': sample,
'duplicates': [],
}
await self.jobs.put(sample)
else:
# another instance is working on this
if self.cluster_duplicates.get(identity) is None:
self.cluster_duplicates[identity] = []

cluster_duplicate = sample.id
self.cluster_duplicates[identity].append(sample)

if duplicate is not None:
logger.debug(
"%d: Sample is duplicate and waiting for running analysis "
"to finish", duplicate)
elif cluster_duplicate is not None:
logger.debug(
"%d: Sample is concurrently processed by another instance "
"and held", cluster_duplicate)
elif resubmit is not None:
logger.debug("%d: Resubmitted sample to job queue", resubmit)
else:
logger.debug("%d: New sample submitted to job queue", sample.id)

return True

async def submit_cluster_duplicates(self):
""" Submit samples held while being processed by another cluster
instance back into the job queue if they have finished processing. """
if not self.cluster_duplicates.keys():
return True

submitted_cluster_duplicates = []

async with self.duplock:
# try to submit *all* samples which have been marked as being
# processed by another instance concurrently
# get the items view on a copy of the cluster duplicate backlog
# because we will change it by removing entries which would raise a
# RuntimeException
cluster_duplicates = self.cluster_duplicates.copy().items()
for identity, sample_duplicates in cluster_duplicates:
# try to mark as in-flight
try:
locked = await self.db_con.mark_sample_in_flight(
sample_duplicates[0])
except PeekabooDatabaseError as dberr:
logger.error(dberr)
return False

if locked:
if self.duplicates.get(identity) is not None:
logger.error(
"Possible backlog corruption for sample %d! "
"Please file a bug report. Trying to continue...",
sample.id)
continue

# submit one of the held-back samples as a new master
# analysis in case the analysis on the other instance
# failed and we have no result in the database yet. If all
# is well, this master should finish analysis very quickly
# using the stored result, causing all the duplicates to be
# submitted and finish quickly as well.
sample = sample_duplicates.pop()
self.duplicates[identity] = {
'master': sample,
'duplicates': sample_duplicates,
}
submitted_cluster_duplicates.append(sample.id)
await self.jobs.put(sample)
del self.cluster_duplicates[identity]

if len(submitted_cluster_duplicates) > 0:
logger.debug(
"Submitted cluster duplicates (and potentially their "
"duplicates) from backlog: %s", submitted_cluster_duplicates)

return True

async def clear_stale_in_flight_samples(self):
""" Clear any stale in-flight sample logs from the database. """
try:
cleared = await self.db_con.clear_stale_in_flight_samples()
except PeekabooDatabaseError as dberr:
logger.error(dberr)
cleared = False

return cleared

async def submit_duplicates(self, identity):
""" Check if any samples have been held from processing as duplicates
and submit them now. Clear the original sample whose duplicates have
been submitted from the in-flight list.
@param identity: identity of sample to check for duplicates
"""
submitted_duplicates = []

async with self.duplock:
# duplicates which have been submitted from the backlog still
# report done but do not get registered as potentially having
# duplicates because we expect the ruleset to identify them as
# already known and process them quickly now that the first
# instance has gone through full analysis. Therefore we can ignore
# them here.
if identity not in self.duplicates:
return

# submit all samples which have accumulated in the backlog
for sample in self.duplicates[identity]['duplicates']:
submitted_duplicates.append(sample.id)
await self.jobs.put(sample)

sample = self.duplicates[identity]['master']
try:
await self.db_con.clear_sample_in_flight(sample)
except PeekabooDatabaseError as dberr:
logger.error(dberr)

del self.duplicates[identity]

logger.debug("%d: Cleared sample from in-flight list", sample.id)
if len(submitted_duplicates) > 0:
logger.debug(
"Submitted duplicates from backlog: %s", submitted_duplicates)

async def done(self, sample):
""" Perform cleanup actions after sample processing is done:
1. Submit held duplicates and
2. notify request handler that sample processing is done.
@param sample: The Sample object to post-process. """
await self.submit_duplicates(await sample.identity)
await self.jobs.put(sample)
logger.debug("%d: New sample submitted to job queue", sample.id)

async def dequeue(self):
""" Remove a sample from the queue. Used by the workers to get their
Expand All @@ -320,9 +126,6 @@ def shut_down(self):
if self.ruleset_engine is not None:
self.ruleset_engine.shut_down()

if self.cluster_duplicate_handler is not None:
self.cluster_duplicate_handler.shut_down()

# tell all workers to shut down
for worker in self.workers:
worker.shut_down()
Expand All @@ -332,64 +135,12 @@ async def close_down(self):
for worker in self.workers:
await worker.close_down()

if self.cluster_duplicate_handler is not None:
await self.cluster_duplicate_handler.close_down()

if self.ruleset_engine is not None:
await self.ruleset_engine.close_down()

logger.info("Queue shut down.")


class ClusterDuplicateHandler:
""" A housekeeper handling submission and cleanup of cluster duplicates.
"""
def __init__(self, job_queue, interval=5):
self.job_queue = job_queue
self.interval = interval
self.task = None
self.task_name = "ClusterDuplicateHandler"

async def start(self):
self.task = asyncio.ensure_future(self.run())
if hasattr(self.task, "set_name"):
self.task.set_name(self.task_name)
return self.task

async def run(self):
logger.debug("Cluster duplicate handler started.")

while True:
await asyncio.sleep(self.interval)

logger.debug("Checking for samples in processing by other "
"instances to submit")

await self.job_queue.clear_stale_in_flight_samples()
await self.job_queue.submit_cluster_duplicates()

def shut_down(self):
""" Asynchronously initiate cluster duplicate handler shutdown. """
logger.debug("Cluster duplicate handler shutdown requested.")
if self.task is not None:
self.task.cancel()

async def close_down(self):
""" Wait for the cluster duplicate handler to close down and retrieve
any exceptions thrown. """
if self.task is not None:
try:
await self.task
# we cancelled the task so a CancelledError is expected
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"Unexpected exception in cluster duplicate handler")

logger.debug("Cluster duplicate handler shut down.")


class Worker:
""" A Worker to process a sample. """
def __init__(self, wid, job_queue, ruleset_engine, db_con):
Expand Down Expand Up @@ -444,7 +195,9 @@ async def run(self):
'database: %s', sample.id, dberr)
# no showstopper, we can limp on without caching in DB

await self.job_queue.done(sample)
# now is the time to submit any potential duplicates of this sample
# whose processing was deferred by rules
await self.ruleset_engine.submit_duplicates(sample)

def shut_down(self):
""" Asynchronously initiate worker shutdown. """
Expand Down
Loading

0 comments on commit cc30b29

Please sign in to comment.