Skip to content

Commit

Permalink
Redis driver for Nodupe (#705)
Browse files Browse the repository at this point in the history
* Added many more NoDupe tests

* Add final tests to nodupe

* Add nodupe sub-class tests

* Created nodupe_redis

Stripped a lot of the methods out, because they just aren't needed with Redis.

* First try at Redis Nodupe

* Unit test for nodupe_redis

* Merged nodupe derivekey tests into 1

Previously had a whole bunch of them, but since we're re-instanciating the message for each derivation, they can all go in one.

* Unit test for nodupe_redis

Had to make some changes to the module itself because some functionality wasn't working

* Create generic nodupe unit test

Trying to compare the functionality of the disk-, and redis-based nodupe classes is going to be hard. Most (all?) of the methods don't actually return anything, and if we're assuming to abstract the inner workings of each one, there's nothing to validate they're doing the same thing.

* Delete individual derivekey nodupe unit tests

* Add test docs on using VSCode

* Allow picking which driver to use for NoDupe

* Add missing config options for redis nodupe

* Fix cache size counting

I'm not sure how much any of this counting matters, or if it's just for logging purposes.
Either way.. it's better now.

* Fix broken nodupe.redis tests

* Change field delimiter in nodupe redis key

* Update redis key field delimiter in nodupe test

* Fixed mis-named config option

Nodupe Redis was using retryqueue server URL.
now it's not.

* Standardized some testing aspects

- Made the add_option method for options work properly
- Added PrettyPrinter code for debugging tests

* Clean-up test debugging code

* Make Redis Nodupe count better

It wasn't loading on_start, or clearning on_stop, so it was never accurate.
I still don't really think it's all that accurate but since it's just for logging purposes, it might not need to be better than it is.

* Disabled uneeded redis Retry test

* Update retry unit test dependencies

Used to have dependencies per method, but now the comparative tests depend on both driver tests.
This ensures that nothing gets missed.

* Comparative NoDupe unit test

Compares state/output of both Disk and Redis nodupe drivers against an expected value.

* Change integrity to identity, per #703 and #706

This might cause merge conflicts, but it's probably still worth changing here.

* Resolve config.py conflict

* Resolve second config.py conflict

---------

Co-authored-by: Peter Silva <[email protected]>
  • Loading branch information
gcglinton and petersilva authored Jun 21, 2023
1 parent 927c0f9 commit 5bc9748
Show file tree
Hide file tree
Showing 13 changed files with 930 additions and 92 deletions.
3 changes: 2 additions & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __call__(self, parser, namespace, values, option_string=None):
'integrity_method': 'sha512',
'logMetrics': False,
'logStdout': False,
'nodupe_driver': 'disk',
'nodupe_ttl': 0,
'overwrite': True,
'path': [],
Expand Down Expand Up @@ -145,7 +146,7 @@ def __call__(self, parser, namespace, values, option_string=None):
'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'integrity', 'logLevel',
'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange',
'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename',
'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl',
'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver',
'nodupe_basis', 'tlsRigour', 'topic', 'vip'
]
"""
Expand Down
7 changes: 5 additions & 2 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,11 @@ def __init__(self, cfg=None):

# open cache, get masks.
if self.o.nodupe_ttl > 0:
# prepend...
self.plugins['load'].append('sarracenia.flowcb.nodupe.NoDupe')
if self.o.nodupe_driver.lower() == "redis":
self.plugins['load'].append('sarracenia.flowcb.nodupe.redis.NoDupe')
else:
self.plugins['load'].append('sarracenia.flowcb.nodupe.disk.NoDupe')


if (( hasattr(self.o, 'delete_source') and self.o.delete_source ) or \
( hasattr(self.o, 'delete_destination') and self.o.delete_destination )) and \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ def deriveKey(self, msg) -> str:
elif 'directory' in msg['fileOp']:
if 'remove' not in msg['fileOp']:
key = msg['relPath']
elif 'integrity' in msg:
if msg['integrity']['method'] in ['cod']:
elif 'identity' in msg:
if msg['identity']['method'] in ['cod']:
# if cod, revert to using the path.
key = msg['relPath']
else:
key = msg['integrity']['method'] + ',' + msg['integrity']['value'].replace('\n', '')
key = msg['identity']['method'] + ',' + msg['identity']['value'].replace('\n', '')


if not key:
Expand Down
226 changes: 226 additions & 0 deletions sarracenia/flowcb/nodupe/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#

import os

import urllib.parse

import logging

from sarracenia import nowflt, timestr2flt, timeflt2str

from sarracenia.flowcb import FlowCB

import redis

logger = logging.getLogger(__name__)


class NoDupe(FlowCB):
"""
generalised duplicate suppression for sr3 programs. It is used as a
time based buffer that prevents, when activated, identical files (of some kinds)
from being processed more than once, by rejecting files identified as duplicates.
options:
nodupe_ttl - duration in seconds (floating point.)
The time horizon of the receiption cache.
how long to remember files, so they are rejected as duplicates.
The expiry based on nodupe_ttl is applied every housekeeping interval.
nodupe_fileAgeMax - the oldest file that will be considered for processing.
files older than this threshold will be rejected.
nodupe_fileAgeMin - the newest file that can be considered for processing.
files newer than this threshold will be rejected.
if not specified, the value of option *inflight*
may be referenced if it is an integer value.
"""

# ----------- magic Methods ------------
def __init__(self, options):

super().__init__(options,logger)
logger.debug("NoDupe_Redis init")
logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper()))

self.o.add_option( 'nodupe_ttl', 'duration', 0 )
self.o.add_option( 'nodupe_fileAgeMax', 'duration', 0 )
self.o.add_option( 'nodupe_fileAgeMin', 'duration', 0 )

logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl))

self.o.add_option( 'nodupe_redis_serverurl', 'str')
self.o.add_option( 'nodupe_redis_keybase', 'str', 'sr3.nodupe.' + self.o.component + '.' + self.o.config.replace(".","_"))

self._rkey_base = self.o.nodupe_redis_keybase
self._rkey_count = self._rkey_base + ".count"
self._cache_hit = None

self._redis = redis.from_url(self.o.nodupe_redis_serverurl)

self._last_expire = nowflt()

self._last_time = nowflt()

self._redis.set(self._rkey_count, 0, nx=True)
self._last_count = self._count()


# ----------- Private Methods -----------
def _hash(self, text) -> str:
from hashlib import blake2b
h = blake2b(key=bytes(self._rkey_base, 'utf-8'), digest_size=16)
h.update(bytes(text, 'utf-8'))
return h.hexdigest()

def _deriveKey(self, message) -> str:

key = None
if ('nodupe_override' in message) and ('key' in message['nodupe_override']):
key = message['nodupe_override']['key']
elif 'fileOp' in message :
if 'link' in message['fileOp']:
key = message['fileOp']['link']
elif 'directory' in message['fileOp'] and 'remove' not in message['fileOp']:
key = message['relPath']
elif 'identity' in message:
if message['identity']['method'] in ['cod']:
# if cod, revert to using the path.
key = message['relPath']
else:
key = message['identity']['method'] + ',' + message['identity']['value'].replace('\n', '')

if not key:
if 'mtime' in message:
message_time = message['mtime']
else:
message_time = message['pubTime']

if 'size' in message:
key = f"{message['relPath']},{message_time},{message['size']}"
else:
key = f"{message['relPath']},{message_time}"

return key


def _is_new(self, message) -> bool :
"""
Derive keys to be looked up in cache of messages already seen, then look them up in the cache,
return False if message is a dupe.
True if it is new.
"""

key = self._deriveKey(message)

if ('nodupe_override' in message) and ('path' in message['nodupe_override']):
path = message['nodupe_override']['path']
else:
path = message['relPath'].lstrip('/')

message['noDupe'] = { 'key': key, 'path': path }
message['_deleteOnPost'] |= set(['noDupe'])

logger.debug("checking (%s, %s)" % (key, path))

self.cache_hit = None
key_hashed = self._hash(key)
path_quoted = urllib.parse.quote(path)
path_hashed = self._hash(path)

redis_key = self._rkey_base + ":" + key_hashed + "." + path_hashed

got = self._redis.get(redis_key)

#logger.debug("ttl type =%s" % (type(self.o.nodupe_ttl)) )
self._redis.set(redis_key, str(self.now) + "|" + path_quoted, ex=int(self.o.nodupe_ttl))

if got != None:
logger.debug("entry already in cache: key=%s" % (redis_key) )
logger.debug("updated time entry: time=%s" % (str(self.now)) )
self.cache_hit = path_quoted
return False
else:
logger.debug("adding entry to cache; key=%s" % (redis_key) )
#self._redis.incr(self._rkey_count)
return True

def _count(self):
count = self._redis.get(self._rkey_count)
if count == None:
return 0
else:
return int(count)

# ----------- Public Methods -----------
def on_housekeeping(self):

logger.info("start")

new_count = len(self._redis.keys(self._rkey_base + ":*"))
self.now = nowflt()

logger.info("cache size was %d items %5.2f sec ago, now saved %d entries" % (self._last_count, self.now - self._last_time, new_count))

self._last_time = self.now

self._last_count = new_count

def after_accept(self, worklist):
new_incoming = []
self.now = nowflt()
if self.o.nodupe_fileAgeMax > 0:
min_mtime = self.now - self.o.nodupe_fileAgeMax
else:
min_mtime = 0

if self.o.nodupe_fileAgeMin > 0:
max_mtime = self.now - self.o.nodupe_fileAgeMin
elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0:
max_mtime = self.now - self.o.inflight
else:
# FIXME: should we add some time here to allow for different clocks?
# 100 seconds in the future? hmm...
max_mtime = self.now + 100

for m in worklist.incoming:
if ('mtime' in m) :
mtime=timestr2flt(m['mtime'])
if mtime < min_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}"
m.setReport(304, f"{m['mtime']} too old (nodupe check), oldest allowed {timeflt2str(min_mtime)}" )
worklist.rejected.append(m)
continue
elif mtime > max_mtime:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}"
m.setReport(304, f"{m['mtime']} too new (nodupe check), newest allowed {timeflt2str(max_mtime)}" )
worklist.rejected.append(m)
continue

if self._is_new(m):
new_incoming.append(m)
else:
m['_deleteOnPost'] |= set(['reject'])
m['reject'] = "not modifified 1 (nodupe check)"
m.setReport(304, 'Not modified 1 (cache check)')
worklist.rejected.append(m)

logger.debug("items registered in duplicate suppression cache: %d" % (len(self._redis.keys(self._rkey_base + ":*"))) )
worklist.incoming = new_incoming

def on_start(self):
self._last_count = len(self._redis.keys(self._rkey_base + ":*"))


def on_stop(self):
self._last_count = None

9 changes: 7 additions & 2 deletions tests/sarracenia/diskqueue_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import pytest, jsonpickle
import os

#useful for debugging tests
#import pprint
#pretty = pprint.PrettyPrinter(indent=2, width=200).pprint

from sarracenia.diskqueue import DiskQueue

class Options:
def add_option(self, option, type, default = None):
if default != None:
self.option = default
if not hasattr(self, option):
setattr(self, option, default)

pass

BaseOptions = Options()
Expand Down
14 changes: 6 additions & 8 deletions tests/sarracenia/flowcb/nodupe/data_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pytest, pprint
import pytest
import os, types, copy

pretty = pprint.PrettyPrinter(indent=2, width=200)
#useful for debugging tests
#import pprint
#pretty = pprint.PrettyPrinter(indent=2, width=200).pprint

from sarracenia.flowcb.nodupe.data import Data
from sarracenia import Message as SR3Message
Expand All @@ -17,7 +19,8 @@ def __init__(self):
self.pid_filename = "/tmp/sarracenia/diskqueue_test/pid_filename"
self.housekeeping = float(39)
def add_option(self, option, type, default = None):
setattr(self, option, default)
if not hasattr(self, option):
setattr(self, option, default)
pass

def make_message():
Expand Down Expand Up @@ -65,11 +68,6 @@ def test_after_accept(tmp_path, capsys):

nodupe.after_accept(wl_test_after_accept)

#pretty.pprint(message)
#pretty.pprint(after_accept_worklist.rejected[0]['reject'].count(message_old['mtime'] + " too old (nodupe check), oldest allowed"))
#pretty.pprint(vars(nodupe))
#pretty.pprint(wl_test_after_accept)

assert len(wl_test_after_accept.incoming) == 2
assert wl_test_after_accept.incoming[0]['nodupe_override']['path'] == 'data'
assert 'nodupe_override' in wl_test_after_accept.incoming[1]['_deleteOnPost']
Loading

0 comments on commit 5bc9748

Please sign in to comment.