Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc feedback #1094

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1218,18 +1218,15 @@ a time will result in 300 seconds (or 5 minutes) being the expiry interval.
Default value in a Poll is 8 hours, should be longer than nodupe_fileAgeMax to prevent
re-ingesting files that have aged out of the duplicate suppression cache.

**Use of the cache is incompatible with the default *parts 0* strategy**, one must specify an
alternate strategy. One must use either a fixed blocksize, or always never partition files.
One must avoid the dynamic algorithm that will change the partition size used as a file grows.

**Note that the duplicate suppresion store is local to each instance**. When N
instances share a queue, the first time a posting is received, it could be
picked by one instance, and if a duplicate one is received it would likely
be picked up by another instance. **For effective duplicate suppression with instances**,
one must **deploy two layers of subscribers**. Use
a **first layer of subscribers (shovels)** with duplicate suppression turned
off and output with *post_exchangeSplit*, which route notification messages by checksum to
a **second layer of subscibers (winnow) whose duplicate suppression caches are active.**
off and output with *post_exchangeSplit*, which route notification with the same checksum to
the same member of a **second layer of subscribers (winnow) whose duplicate suppression caches
are active.**


outlet post|json|url (default: post)
Expand Down Expand Up @@ -1525,10 +1522,6 @@ given. This option also enforces traversing of symbolic links.

This option is being used to investigate some use cases, and may disappear in future.

sendTo <url>
---------------

Specification of a remote resource to deliver to in a sender.

rename <path>
-------------
Expand Down Expand Up @@ -1702,6 +1695,35 @@ flowcb.scheduled.Scheduled class will look for the other two time specifiers::
which will have the poll run each day at: 01:14, 01:17, 01:29, then the same minutes
after each of 4h, 5h and 23h.

sendTo <url>
---------------

Specification of a remote resource to deliver to in a sender.

set (DEVELOPER)
---------------

The *set* option is used, usually by developers, to define settings
for particular classes in the source code. the most prominent usage
would be to set the logLevel higher for a particular class of interest.

Use of this option is more easily done with the source code handy.
an example::

set sarracenia.moth.amqp.AMQP.logLevel debug
set sarracenia.moth.mqtt.MQTT.logLevel debug

So *sarracenia.moth.amqp.AMQP* refers to the class to which the setting
is applied. There is a *class AMQP* in the python file
sarracenia/moth/amqp.py (relative to root of source.)

The *logLevel* is the setting to applied but only within
that class. The *set* option requires an implementation in the source
code to implement it for each class. All *flowcb*'s have the neeeded
support. The ``moth`` and transfer classes have a specific implementation
for logLevel.

Other classes may be hit or miss in terms of implementing the *set* semantic.

shim_defer_posting_to_exit (EXPERIMENTAL)
-----------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,31 @@ sendTo <url>
Specification du serveur auquel on veut livrer des données (dans un *sender*)


set (DÉVELOPPEUR)
---------------

L'option *set* est utilisée, généralement par les développeurs, pour définir les paramètres
pour des classes particulières dans le code source. l'utilisation la plus importante
serait de définir le logLevel plus élevé pour une classe d’intérêt particulière.

L'utilisation de cette option se fait plus facilement avec le code source à portée de main.
un exemple::

set sarracenia.moth.amqp.AMQP.logLevel debug
set sarracenia.moth.mqtt.MQTT.logLevel debug

Ainsi, *sarracenia.moth.amqp.AMQP* fait référence à la classe à laquelle le paramètre
est appliqué. Il y a une *class AMQP* dans le fichier python
sarracenia/moth/amqp.py (par rapport à la racine de la source.)

Le *logLevel* est le paramètre à appliquer mais uniquement dans
cette classe. L'option *set* nécessite une implémentation dans le source
code pour l’implémenter pour chaque classe. Tous les *flowcb* ont le nécessaire
soutien. Les classes ``moth`` et transfert ont une implémentation spécifique pour le logLevel.

D'autres classes peuvent être aléatoires en termes d'implémentation de la sémantique *set*.


shim_defer_posting_to_exit (EXPERIMENTAL)
-----------------------------------------

Expand Down
3 changes: 2 additions & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __repr__(self) -> str:
'inline': False,
'inlineOnly': False,
'identity_method': 'sha512',
'logFormat': '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s',
'logMetrics': False,
'logStdout': False,
'metrics_writeInterval': 5,
Expand Down Expand Up @@ -176,7 +177,7 @@ def __repr__(self) -> str:
str_options = [
'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange',
'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header',
'hostname', 'identity', 'inlineEncoding', 'logLevel',
'hostname', 'identity', 'inlineEncoding', 'logFormat', 'logLevel',
'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange',
'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename',
'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver',
Expand Down
32 changes: 13 additions & 19 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
'fileEvents': allFileEvents,
'housekeeping': 300,
'logReject': False,
'logFormat':
'%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s',
'logLevel': 'info',
'mirror': True,
'permCopy': True,
Expand Down Expand Up @@ -245,7 +243,7 @@ def loadCallbacks(self, plugins_to_load=None):
logger.debug( "details:", exc_info=True )
return False

logger.info( f'flowCallback plugins to load: {plugins_to_load}' )
logger.debug( f'flowCallback plugins to load: {plugins_to_load}' )
for c in plugins_to_load:
try:
plugin = sarracenia.flowcb.load_library(c, self.o)
Expand Down Expand Up @@ -303,7 +301,6 @@ def runCallbacksTime(self, entry_point):
eval( f"self.{entry_point}()")
else:
try:
logger.info( f'normal run of self.{entry_point}' )
eval( f"self.{entry_point}()")
except Exception as ex:
logger.error( f'flow {entry_point} crashed: {ex}' )
Expand Down Expand Up @@ -448,9 +445,7 @@ def reject(self, m, code, reason) -> None:
m.setReport(code, reason)

def please_stop(self) -> None:
logger.info(
f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.'
)
logger.info( f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True

Expand Down Expand Up @@ -530,8 +525,8 @@ def run(self):
logger.debug("options:")
self.o.dump()

logger.info("callbacks loaded: %s" % self.plugins['load'])
logger.info(
logger.debug("callbacks loaded: %s" % self.plugins['load'])
logger.debug(
f'pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}'
)

Expand All @@ -543,11 +538,11 @@ def run(self):

if self._stop_requested:
if stopping:
logger.info('clean stop from run loop')
logger.debug('clean stop from run loop')
self.close()
break
else:
logger.info( 'starting last pass (without gather) through loop for cleanup.')
logger.debug( 'starting last pass (without gather) through loop for cleanup.')
stopping = True

self._run_vip_update()
Expand All @@ -562,7 +557,7 @@ def run(self):
if (self.o.component == 'poll') or self.have_vip:

if ( self.o.messageRateMax > 0 ) and (current_rate > 0.8*self.o.messageRateMax ):
logger.info("current_rate (%.2f) vs. messageRateMax(%.2f)) " % (current_rate, self.o.messageRateMax))
logger.debug("current_rate (%.2f) vs. messageRateMax(%.2f)) " % (current_rate, self.o.messageRateMax))

if not stopping:
self.gather()
Expand Down Expand Up @@ -600,7 +595,7 @@ def run(self):
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
logger.info("Not exiting because there are still messages in the post retry queue.")
logger.debug("Not exiting because there are still messages in the post retry queue.")
# Sleep for a while. Messages can't be retried before housekeeping has run...
current_sleep = 60
else:
Expand Down Expand Up @@ -1112,7 +1107,7 @@ def gather(self) -> None:
return

if len(self.worklist.incoming) > 0:
logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) )
logger.debug('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) )
self.worklist.poll_catching_up = True
return
else:
Expand Down Expand Up @@ -1241,7 +1236,7 @@ def post(self,now) -> None:
mfn.write( f'\"{timestamp}\" : {metrics},\n')

# removing old metrics files
logger.info( f"looking for old metrics for {self.o.metricsFilename}" )
logger.debug( f"looking for old metrics for {self.o.metricsFilename}" )
old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount]
for o in old_metrics:
logger.info( f"removing old metrics file: {o} " )
Expand Down Expand Up @@ -1494,7 +1489,7 @@ def removeOneFile(self, path) -> bool:
if os.path.isfile(path): os.unlink(path)
if os.path.islink(path): os.unlink(path)
if os.path.isdir(path): os.rmdir(path)
logger.info("removed %s" % path)
logger.debug("removed %s" % path)
except:
logger.error("could not remove %s." % path)
logger.debug('Exception details: ', exc_info=True)
Expand All @@ -1507,7 +1502,6 @@ def renameOneItem(self, old, path) -> bool:
for messages with an rename file operation, it is to rename a file.
"""
ok = True
logger.info( f" pwd is {os.getcwd()} " )
if not os.path.exists(old):
logger.info(
"old file %s not found, if destination (%s) missing, then fall back to copy"
Expand Down Expand Up @@ -1644,7 +1638,7 @@ def do_download(self) -> None:

if not os.path.isdir(msg['new_dir']):
try:
logger.info( f"missing destination directories, makedirs: {msg['new_dir']} " )
logger.debug( f"missing destination directories, makedirs: {msg['new_dir']} " )
self.worklist.directories_ok.append(msg['new_dir'])
os.makedirs(msg['new_dir'], 0o775, True)
except Exception as ex:
Expand Down Expand Up @@ -2027,7 +2021,7 @@ def download(self, msg, options) -> bool:
remote_offset += msg['blocks']['manifest'][blkno]['size']

block_length=msg['blocks']['manifest'][msg['blocks']['number']]['size']
logger.info( f"offset calculation: start={remote_offset} count={block_length}" )
logger.debug( f"offset calculation: start={remote_offset} count={block_length}" )

elif 'size' in msg:
block_length = msg['size']
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, options):
self.o.add_option('logEvents', 'set',
['after_accept', 'on_housekeeping'])
self.o.add_option('logMessageDump', 'flag', False)
logger.info(f'{self.o.component} initialized with: logEvents: {self.o.logEvents}, logMessageDump: {self.o.logMessageDump}')
logger.debug(f'{self.o.component} initialized with: logEvents: {self.o.logEvents}, logMessageDump: {self.o.logMessageDump}')
if self.o.component in ['sender']:
self.action_verb = 'sent'
elif self.o.component in ['subscribe', 'sarra' ]:
Expand Down
10 changes: 4 additions & 6 deletions sarracenia/flowcb/nodupe/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ def __init__(self, options):

self.o.add_option( 'nodupe_ttl', 'duration', 0 )

logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl))

self.cache_dict = {}
self.cache_file = None
self.cache_hit = None
Expand All @@ -71,17 +69,17 @@ def __init__(self, options):

def on_housekeeping(self):

logger.info("start (%d)" % len(self.cache_dict))
logger.debug("start with %d entries)" % len(self.cache_dict))

count = self.count
self.save()

self.now = nowflt()
new_count = self.count

logger.info(
"was %d, but since %5.2f sec, increased up to %d, now saved %d entries"
% (self.last_count, self.now - self.last_time, count, new_count))
if new_count > 0:
logger.info( "was %d, but since %5.2f sec, increased up to %d, now saved %d entries"
% (self.last_count, self.now - self.last_time, count, new_count))

self.last_time = self.now
self.last_count = new_count
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/post/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def on_housekeeping(self):
def on_start(self):
if hasattr(self,'poster') and self.poster:
self.poster.putSetup()
logger.info('starting')
logger.debug('starting')

def on_stop(self):
if hasattr(self,'poster') and self.poster:
self.poster.close()
logger.info('closing')
logger.debug('closing')
Loading