From 483e25e63a5b19b3339e0cb838b551f5bf8c6735 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Tue, 26 Mar 2024 15:59:20 -0400 Subject: [PATCH 01/65] Initial code for an S3 transfer class --- sarracenia/featuredetection.py | 3 + sarracenia/transfer/s3.py | 175 +++++++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+) create mode 100644 sarracenia/transfer/s3.py diff --git a/sarracenia/featuredetection.py b/sarracenia/featuredetection.py index 38ef8673e..698a9a8ab 100755 --- a/sarracenia/featuredetection.py +++ b/sarracenia/featuredetection.py @@ -82,6 +82,9 @@ 'retry': { 'modules_needed': ['jsonpickle'], 'present': False, 'lament': 'unable to use local queues on the side (disk or redis) to retry messages later', 'rejoice': 'can write messages to local queues to retry failed publishes/sends/downloads'}, + 's3' : { 'modules_needed': [ 'boto3' ], 'present': False, + 'lament' : 'cannot connect natively to S3-compatible locations (AWS S3, Minio, etc..)', + 'rejoice': 'able to connect natively to S3-compatible locations (AWS S3, Minio, etc..)', }, 'sftp' : { 'modules_needed': [ 'paramiko' ], 'lament': 'cannot use or access sftp/ssh based services', 'rejoice': 'can use sftp or ssh based services' diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py new file mode 100644 index 000000000..eaa38307a --- /dev/null +++ b/sarracenia/transfer/s3.py @@ -0,0 +1,175 @@ +# This file is part of sarracenia. +# The sarracenia suite is Free and is proudly provided by the Government of Canada +# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2021 +# +# Sarracenia repository: https://github.com/MetPX/sarracenia +# Documentation: https://github.com/MetPX/sarracenia +# +######################################################################## +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# + +import logging +import os +import sarracenia +import sys + +from sarracenia.transfer import Transfer +from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise + +import boto3 + +logger = logging.getLogger(__name__) + + +class S3(Transfer): + """ + Simple Storage Service (S3) ( https://en.wikipedia.org/wiki/Amazon_S3 ) + + + built with: + boto3's S3 client (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html) + """ + + # ----------------------- MAGIC METHODS ---------------------- + def __init__(self, proto, options): + + super().__init__(proto, options) + + logger.debug("sr_s3 __init__") + + self.__init() + + ## --------------------- PRIVATE METHODS --------------------- + + # init + def __init(self): + Transfer.init(self) + + logger.debug("sr_s3 __init") + self.connected = False + self.client = boto3.client('s3') + self.details = None + self.seek = True + + self.bucket = 's3transfer' + self.cwd = '' + + self.entries = {} + + ## ---------------------- PUBLIC METHODS --------------------- + def registered_as(): + return ['s3'] + + def cd(self, path): + logger.debug("sr_s3 cd %s" % path) + self.cwd = os.path.dirname(path) + self.path = path + + def get(self, + msg, + remote_file, + local_file, + remote_offset=0, + local_offset=0, + length=0, exactLength=False): + logger.debug("get %s %s %d" % (remote_file, local_file, local_offset)) + logger.debug("sr_s3 self.path %s" % self.path) + + # open local file + dst = self.local_write_open(local_file, local_offset) + + # initialize sumalgo + if self.sumalgo: self.sumalgo.set_path(remote_file) + + # download + self.write_chunk_init(dst) + + self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk) + + rw_length = self.write_chunk_end() + + # close + self.local_write_close(dst) + + return rw_length + + def put(self, + msg, + local_file, + remote_file, + local_offset=0, + remote_offset=0, + length=0): + logger.debug("sr_s3 put %s %s" % (local_file, remote_file)) + + # open + src = self.local_read_open(local_file, local_offset) + + # upload + self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk) + + rw_length = self.write_chunk_end() + + # close + self.local_read_close(src) + + return rw_length + + def ls(self): + logger.debug("sr_s3 ls") + + self.entries = {} + + #objects = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=self.cwd) + + paginator = self.client.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path) + + for page in page_iterator: + for obj in page['Contents']: + self.entries += obj['Key'].replace(self.path, "") + + return self.entries + + # delete + def delete(self, path): + logger.debug("sr_s3 delete %s" % path) + # if delete does not work (file not found) run pwd to see if connection is ok + self.client.delete_object(Bucket=self.bucket, Key=path) + + + def rename(self, remote_old, remote_new): + self.client.copy_object(Bucket=self.bucket, CopySource=remote_old, Key=remote_new) + self.client.delete_object(Bucket=self.bucket, Key=remote_old) + + # rmdir + def rmdir(self, path): + logger.debug("sr_s3 rmdir %s" % path) + paginator = self.client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=self.bucket, Prefix=path + "/") + + delete_us = dict(Objects=[]) + for item in pages.search('Contents'): + delete_us['Objects'].append(dict(Key=item['Key'])) + + # flush once aws limit reached + if len(delete_us['Objects']) >= 1000: + self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) + delete_us = dict(Objects=[]) + + # flush rest + if len(delete_us['Objects']): + self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) \ No newline at end of file From fc1aa50cb8d1d07e5d137e8bb4708f902d298b2c Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 27 Mar 2024 11:09:25 -0400 Subject: [PATCH 02/65] Add bandwidth capping to S3 transfer class --- sarracenia/transfer/s3.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index eaa38307a..4e26d389d 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -50,6 +50,10 @@ def __init__(self, proto, options): logger.debug("sr_s3 __init__") + self.s3_config = boto3.s3.transfer.S3TransferConfig() + if hasattr(self.o, 'byteRateMax'): + self.s3_config.max_bandwidth = self.o.byteRateMax + self.__init() ## --------------------- PRIVATE METHODS --------------------- @@ -97,7 +101,7 @@ def get(self, # download self.write_chunk_init(dst) - self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk) + self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk, Config=self.s3_config) rw_length = self.write_chunk_end() @@ -119,7 +123,7 @@ def put(self, src = self.local_read_open(local_file, local_offset) # upload - self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk) + self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk, Config=self.s3_config) rw_length = self.write_chunk_end() From c27094da58e848294aedd98ae74d79f095ef4f66 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 27 Mar 2024 15:27:16 -0400 Subject: [PATCH 03/65] Update core sr3 code to work with S3 transfer --- sarracenia/credentials.py | 11 ++++++++++- sarracenia/transfer/__init__.py | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sarracenia/credentials.py b/sarracenia/credentials.py index 788710a0e..7570df932 100755 --- a/sarracenia/credentials.py +++ b/sarracenia/credentials.py @@ -98,6 +98,8 @@ def __init__(self, urlstr=None): self.prot_p = False self.bearer_token = None self.login_method = None + self.s3_endpoint = None + self.s3_session_token = None def __str__(self): """Returns attributes of the Credential object as a readable string. @@ -126,6 +128,9 @@ def __str__(self): s += " %s" % self.prot_p s += " %s" % self.bearer_token s += " %s" % self.login_method + s += " %s" % self.s3_endpoint + #want to show they provided a session token, but not leak it (like passwords above) + s += " %s" % 'Yes' if self.s3_session_token != None else 'No' return s @@ -275,7 +280,7 @@ def isValid(self, url, details=None): # we have no user and no pasw (http normal, https... no cert, sftp hope for .ssh/config) if not user and not pasw: - if url.scheme in ['http', 'https', 'sftp']: return True + if url.scheme in ['http', 'https', 'sftp', 's3']: return True logger.error( f'unknown scheme: {url.scheme}') return False @@ -363,6 +368,10 @@ def _parse(self, line): details.bearer_token = parts[1].strip() elif keyword == 'login_method': details.login_method = parts[1].strip() + elif keyword == 's3_session_token': + details.s3_session_token = urllib.parse.unquote(parts[1].strip()) + elif keyword == 's3_endpoint': + details.s3_endpoint = parts[1].strip() else: logger.warning("bad credential option (%s)" % keyword) diff --git a/sarracenia/transfer/__init__.py b/sarracenia/transfer/__init__.py index 2dd5e12e9..58aa581be 100755 --- a/sarracenia/transfer/__init__.py +++ b/sarracenia/transfer/__init__.py @@ -433,3 +433,6 @@ def write_chunk_init(self, proto): if features['sftp']['present']: import sarracenia.transfer.sftp +if features['s3']['present']: + import sarracenia.transfer.s3 + From 3bd74910118191c830af94e8a6cea98947dfe6f0 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 27 Mar 2024 15:28:19 -0400 Subject: [PATCH 04/65] many changes to s3 transfer class - Add all the methods I think it needs - Re-order them alphabetically - Work on how to configure it --- sarracenia/transfer/s3.py | 150 ++++++++++++++++++++++++++++---------- 1 file changed, 111 insertions(+), 39 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 4e26d389d..268cb366b 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -29,7 +29,7 @@ from sarracenia.transfer import Transfer from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise -import boto3 +import boto3, botocore logger = logging.getLogger(__name__) @@ -44,44 +44,111 @@ class S3(Transfer): """ # ----------------------- MAGIC METHODS ---------------------- + def __init__(self, proto, options): super().__init__(proto, options) logger.debug("sr_s3 __init__") - self.s3_config = boto3.s3.transfer.S3TransferConfig() + self.s3_client_config = botocore.config.Config( + user_agent_extra= 'Sarracenia/' + sarracenia.__version__ + ) + + self.s3_transfer_config = boto3.s3.transfer.S3TransferConfig() if hasattr(self.o, 'byteRateMax'): - self.s3_config.max_bandwidth = self.o.byteRateMax + self.s3_transfer_config.max_bandwidth = self.o.byteRateMax + self.__init() - ## --------------------- PRIVATE METHODS --------------------- - # init + ## --------------------- PRIVATE METHODS --------------------- + def __init(self): Transfer.init(self) logger.debug("sr_s3 __init") self.connected = False - self.client = boto3.client('s3') + self.client = None self.details = None self.seek = True - self.bucket = 's3transfer' + self.bucket = None + self.client_args = {} + self.cwd = '' self.entries = {} - ## ---------------------- PUBLIC METHODS --------------------- - def registered_as(): - return ['s3'] + def __credentials(self): + logger.debug("sr_s3 __credentials %s" % self.sendTo) + + try: + ok, details = self.o.credentials.get(self.sendTo) + if details: url = details.url + + if url.username != '': + self.client_args['aws_access_key_id'] = url.username + if url.password != '': + self.client_args['aws_secret_access_key'] = url.password + if hasattr(details, 's3_session_token'): + self.client_args['aws_session_token'] = details.s3_session_token + if hasattr(details, 's3_endpoint'): + self.client_args['endpoint'] = details.s3_endpoint + + return True + + except: + logger.error("sr_s3/credentials: unable to get credentials for %s" % self.sendTo) + logger.debug('Exception details: ', exc_info=True) + + return False + + ## ---------------------- PUBLIC METHODS --------------------- + def cd(self, path): logger.debug("sr_s3 cd %s" % path) self.cwd = os.path.dirname(path) self.path = path - + + def check_is_connected(self): + logger.debug("sr_s3 check_is_connected") + + if not self.connected : return False + + if self.sendTo != self.o.sendTo: + self.close() + return False + + return True + + def chmod(self, perms): + logger.debug(f"sr_s3 chmod {perms}") + return + + def close(self): + logger.debug("sr_s3 close") + self.connected = False + self.client = None + return + + def connect(self): + logger.debug("sr_s3 connect %s" % self.o.sendTo) + + self.__credentials() + + self.client = boto3.client('s3', Config=self.s3_client_config, **self.client_args) + self.connected = True + + return True + + def delete(self, path): + logger.debug("sr_s3 delete %s" % path) + # if delete does not work (file not found) run pwd to see if connection is ok + self.client.delete_object(Bucket=self.bucket, Key=path) + def get(self, msg, remote_file, @@ -89,8 +156,9 @@ def get(self, remote_offset=0, local_offset=0, length=0, exactLength=False): + logger.debug("sr_s3 get; self.path %s" % self.path) logger.debug("get %s %s %d" % (remote_file, local_file, local_offset)) - logger.debug("sr_s3 self.path %s" % self.path) + # open local file dst = self.local_write_open(local_file, local_offset) @@ -101,7 +169,7 @@ def get(self, # download self.write_chunk_init(dst) - self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk, Config=self.s3_config) + self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk, Config=self.s3_transfer_config) rw_length = self.write_chunk_end() @@ -110,6 +178,24 @@ def get(self, return rw_length + def getAccelerated (self, msg, remote_file, local_file, length=0 ): + return self.get(msg, remote_file, local_file, 0, 0, length, False) + + def ls(self): + logger.debug("sr_s3 ls") + + self.entries = {} + + paginator = self.client.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path) + + for page in page_iterator: + if 'Contents' in page: + for obj in page['Contents']: + self.entries += obj['Key'].replace(self.path, "") + + return self.entries + def put(self, msg, local_file, @@ -117,13 +203,13 @@ def put(self, local_offset=0, remote_offset=0, length=0): - logger.debug("sr_s3 put %s %s" % (local_file, remote_file)) + logger.debug("sr_s3 put; %s %s" % (local_file, remote_file)) # open src = self.local_read_open(local_file, local_offset) # upload - self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk, Config=self.s3_config) + self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk, Config=self.s3_transfer_config) rw_length = self.write_chunk_end() @@ -132,34 +218,16 @@ def put(self, return rw_length - def ls(self): - logger.debug("sr_s3 ls") - - self.entries = {} + def putAccelerated (self, msg, remote_file, local_file, length=0 ): + return self.put(msg,local_file, remote_file, 0, 0, length) - #objects = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=self.cwd) - - paginator = self.client.get_paginator('list_objects_v2') - page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path) - - for page in page_iterator: - for obj in page['Contents']: - self.entries += obj['Key'].replace(self.path, "") - - return self.entries + def registered_as(): + return ['s3'] - # delete - def delete(self, path): - logger.debug("sr_s3 delete %s" % path) - # if delete does not work (file not found) run pwd to see if connection is ok - self.client.delete_object(Bucket=self.bucket, Key=path) - - def rename(self, remote_old, remote_new): self.client.copy_object(Bucket=self.bucket, CopySource=remote_old, Key=remote_new) self.client.delete_object(Bucket=self.bucket, Key=remote_old) - # rmdir def rmdir(self, path): logger.debug("sr_s3 rmdir %s" % path) paginator = self.client.get_paginator('list_objects_v2') @@ -170,10 +238,14 @@ def rmdir(self, path): delete_us['Objects'].append(dict(Key=item['Key'])) # flush once aws limit reached - if len(delete_us['Objects']) >= 1000: + if len(delete_us['Objects']) >= 500: self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) delete_us = dict(Objects=[]) # flush rest if len(delete_us['Objects']): - self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) \ No newline at end of file + self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) + + def umask(self): + logger.debug("sr_s3 umask") + return \ No newline at end of file From 9e450b52a8da003bb6bbca380204357394128cbc Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 27 Mar 2024 17:42:57 -0400 Subject: [PATCH 05/65] #982 statehost config option should be used to see pick state directory --- sarracenia/sr.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index ce859997a..554fb3c19 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -447,19 +447,25 @@ def save_states(self, savename): self._save_state_dir(savename, self.user_cache_dir + os.sep + self.hostdir) - def _read_state_dir(self, dir1): + def _read_state_dir(self): # read in state files + dir1 = self.user_cache_dir if not os.path.isdir(dir1): return os.chdir(dir1) for c in self.components: - if os.path.isdir(c): - os.chdir(c) - for cfg in os.listdir(): - if os.path.isdir(cfg): - os.chdir(cfg) + for cfg in self.configs[c]: + #print( f" {self.configs[c][cfg]['statehost']=} " ) + if 'options' in self.configs[c][cfg] and self.configs[c][cfg]['options'].statehost: + print('statehost') + state_dir=self.user_cache_dir + os.sep + self.hostdir + os.sep + c + os.sep + cfg + else: + state_dir=self.user_cache_dir + os.sep + c + os.sep + cfg + + if os.path.isdir(state_dir): + os.chdir(state_dir) self.states[c][cfg] = {} self.states[c][cfg]['instance_pids'] = {} self.states[c][cfg]['queueName'] = None @@ -502,8 +508,7 @@ def _read_state_dir(self, dir1): self.states[c][cfg]['instance_metrics'][i]['status'] = { 'mtime':os.stat(p).st_mtime } except: logger.error( f"corrupt metrics file {pathname}: {t}" ) - os.chdir('..') - os.chdir('..') + def _read_metrics_dir(self,metrics_parent_dir): # read in metrics files @@ -549,8 +554,8 @@ def _read_states(self): for c in self.components: self.states[c] = {} - self._read_state_dir(self.user_cache_dir) - self._read_state_dir(self.user_cache_dir + os.sep + self.hostdir) + self._read_state_dir() + #self._read_state_dir(self.user_cache_dir + os.sep + self.hostdir) self._read_metrics_dir(self.user_cache_dir) self._read_metrics_dir(self.user_cache_dir + os.sep + self.hostdir) From 15a9022349ac70303fcc1c590b3620127855c267 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 28 Mar 2024 08:50:57 -0400 Subject: [PATCH 06/65] Try to catch bad connections in S3 Transfer --- sarracenia/transfer/s3.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 268cb366b..14578b70f 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -30,6 +30,7 @@ from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise import boto3, botocore +from boto3.s3.transfer import TransferConfig logger = logging.getLogger(__name__) @@ -55,7 +56,7 @@ def __init__(self, proto, options): user_agent_extra= 'Sarracenia/' + sarracenia.__version__ ) - self.s3_transfer_config = boto3.s3.transfer.S3TransferConfig() + self.s3_transfer_config = TransferConfig() if hasattr(self.o, 'byteRateMax'): self.s3_transfer_config.max_bandwidth = self.o.byteRateMax @@ -97,6 +98,7 @@ def __credentials(self): if hasattr(details, 's3_endpoint'): self.client_args['endpoint'] = details.s3_endpoint + return True except: @@ -135,11 +137,17 @@ def close(self): return def connect(self): - logger.debug("sr_s3 connect %s" % self.o.sendTo) + logger.debug("creating boto3 client") - self.__credentials() + if self.__credentials(): + logger.debug(f"found credentials?") self.client = boto3.client('s3', Config=self.s3_client_config, **self.client_args) + try: + buckets = self.client.list_buckets() + except botocore.exceptions.ClientError as e: + logger.warning(f"unable to establish boto3 connection: {e}") + self.connected = True return True From 315470c961713e9c833f7fdbec0c7969cb6de985 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Tue, 2 Apr 2024 13:10:04 -0400 Subject: [PATCH 07/65] Got S3 connecting and LSing Worked with Reid and Peter to get the S3 transfer class connecting to S3, and recusively LS-ing objects in a bucket. It also inherently gets `__credentials`, and `cd` working too. --- sarracenia/transfer/s3.py | 79 ++++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 18 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 14578b70f..83dcb9840 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -25,6 +25,8 @@ import os import sarracenia import sys +import paramiko +import stat from sarracenia.transfer import Transfer from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise @@ -83,12 +85,13 @@ def __init(self): self.entries = {} def __credentials(self): - logger.debug("sr_s3 __credentials %s" % self.sendTo) + logger.debug("%s" % self.sendTo) try: ok, details = self.o.credentials.get(self.sendTo) if details: url = details.url + self.bucket = details.url.hostname if url.username != '': self.client_args['aws_access_key_id'] = url.username if url.password != '': @@ -96,7 +99,7 @@ def __credentials(self): if hasattr(details, 's3_session_token'): self.client_args['aws_session_token'] = details.s3_session_token if hasattr(details, 's3_endpoint'): - self.client_args['endpoint'] = details.s3_endpoint + self.client_args['endpoint_url'] = details.s3_endpoint return True @@ -113,7 +116,7 @@ def __credentials(self): def cd(self, path): logger.debug("sr_s3 cd %s" % path) self.cwd = os.path.dirname(path) - self.path = path + self.path = path.strip('/') + "/" def check_is_connected(self): logger.debug("sr_s3 check_is_connected") @@ -139,18 +142,27 @@ def close(self): def connect(self): logger.debug("creating boto3 client") + self.sendTo = self.o.sendTo + + if self.__credentials(): - logger.debug(f"found credentials?") + logger.debug(f"found credentials? {self.client_args}") - self.client = boto3.client('s3', Config=self.s3_client_config, **self.client_args) + try: + self.client = boto3.client('s3', config=self.s3_client_config, **self.client_args) buckets = self.client.list_buckets() + self.connected = True + logger.debug("Connected to S3!!") + return True except botocore.exceptions.ClientError as e: logger.warning(f"unable to establish boto3 connection: {e}") + except botocore.exceptions.NoCredentialsError as e: + logger.warning(f"unable to establish boto3 connection, no credentials: {e}") + except Exception as e: + logger.warning(f"Something else happened: {e}", exc_info=True) - self.connected = True - - return True + return False def delete(self, path): logger.debug("sr_s3 delete %s" % path) @@ -186,22 +198,56 @@ def get(self, return rw_length - def getAccelerated (self, msg, remote_file, local_file, length=0 ): - return self.get(msg, remote_file, local_file, 0, 0, length, False) - def ls(self): - logger.debug("sr_s3 ls") + logger.debug(f"ls-ing items in {self.bucket}/{self.path}") self.entries = {} paginator = self.client.get_paginator('list_objects_v2') - page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path) + page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path, Delimiter='/') for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: - self.entries += obj['Key'].replace(self.path, "") - + # Only do stuff with objects that aren't "folders" + #if not obj['Key'][-1] == "/": + + filename = obj['Key'].replace(self.path, "") + if filename == "": + continue + entry = paramiko.SFTPAttributes() + if 'LastModified' in obj: + t = obj["LastModified"].timestamp() + entry.st_atime = t + entry.st_mtime = t + if 'Size' in obj: + entry.st_size = obj['Size'] + + entry.st_mode = 0o644 + + #entry.filename = filename + #entry.longname = filename + + self.entries[filename] = entry + + if 'CommonPrefixes' in page: + for prefix in page['CommonPrefixes']: + logger.debug(f"Found folder {prefix['Prefix']}") + + filename = prefix['Prefix'].replace(self.path, '').rstrip("/") + if filename == "": + continue + + entry = paramiko.SFTPAttributes() + + entry.st_mode = 0o644 | stat.S_IFDIR + + #entry.filename = filename + #entry.longname = filename + + self.entries[filename] = entry + + logger.debug(f"{self.entries=}") return self.entries def put(self, @@ -226,9 +272,6 @@ def put(self, return rw_length - def putAccelerated (self, msg, remote_file, local_file, length=0 ): - return self.put(msg,local_file, remote_file, 0, 0, length) - def registered_as(): return ['s3'] From 12f271b3f4686025b8ec367915ae38e5fb201fef Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Tue, 2 Apr 2024 16:35:55 -0400 Subject: [PATCH 08/65] Add more methods so send works It does send, the paths are weird, and I don't know what they're supposed to do. I suspect there's some changes to make. --- sarracenia/transfer/s3.py | 85 +++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 83dcb9840..9b52f0e8e 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -118,6 +118,11 @@ def cd(self, path): self.cwd = os.path.dirname(path) self.path = path.strip('/') + "/" + def cd_forced(self, perm, path): + logger.debug("sr_s3 cd %s" % path) + self.cwd = os.path.dirname(path) + self.path = path.strip('/') + "/" + def check_is_connected(self): logger.debug("sr_s3 check_is_connected") @@ -144,28 +149,30 @@ def connect(self): self.sendTo = self.o.sendTo + self.__credentials() + #logger.debug(f"found credentials {self.client_args=}") - if self.__credentials(): - logger.debug(f"found credentials? {self.client_args}") - - try: self.client = boto3.client('s3', config=self.s3_client_config, **self.client_args) buckets = self.client.list_buckets() - self.connected = True - logger.debug("Connected to S3!!") - return True + if self.bucket in [b['Name'] for b in buckets['Buckets']]: + self.connected = True + logger.debug(f"Connected to bucket {self.bucket} in {self.client.get_bucket_location(Bucket=self.bucket)['LocationConstraint']}") + return True + else: + logger.error(f"Can't find bucket called {self.bucket}") + except botocore.exceptions.ClientError as e: - logger.warning(f"unable to establish boto3 connection: {e}") + logger.error(f"unable to establish boto3 connection: {e}") except botocore.exceptions.NoCredentialsError as e: - logger.warning(f"unable to establish boto3 connection, no credentials: {e}") + logger.error(f"unable to establish boto3 connection, no credentials: {e}") except Exception as e: - logger.warning(f"Something else happened: {e}", exc_info=True) + logger.error(f"Something else happened: {e}", exc_info=True) return False - + def delete(self, path): - logger.debug("sr_s3 delete %s" % path) + logger.debug("deleting %s" % path) # if delete does not work (file not found) run pwd to see if connection is ok self.client.delete_object(Bucket=self.bucket, Key=path) @@ -177,26 +184,23 @@ def get(self, local_offset=0, length=0, exactLength=False): logger.debug("sr_s3 get; self.path %s" % self.path) - logger.debug("get %s %s %d" % (remote_file, local_file, local_offset)) - - # open local file - dst = self.local_write_open(local_file, local_offset) + file_key = self.path + remote_file + logger.debug(f"get s3://{self.bucket}/{file_key} to {local_file}") # initialize sumalgo - if self.sumalgo: self.sumalgo.set_path(remote_file) - - # download - self.write_chunk_init(dst) - - self.client.download_file(Bucket=self.bucket, Key=remote_file, Filename=local_file, Callback=self.write_chunk, Config=self.s3_transfer_config) + #if self.sumalgo: self.sumalgo.set_path(remote_file) - rw_length = self.write_chunk_end() + self.client.download_file(Bucket=self.bucket, Key=file_key, Filename=local_file, Config=self.s3_transfer_config) - # close - self.local_write_close(dst) + rw_length = os.stat(local_file).st_size return rw_length + + def getcwd(self): + cwd = self.cwd if self.client else None + alarm_cancel() + return cwd def ls(self): logger.debug(f"ls-ing items in {self.bucket}/{self.path}") @@ -212,7 +216,7 @@ def ls(self): # Only do stuff with objects that aren't "folders" #if not obj['Key'][-1] == "/": - filename = obj['Key'].replace(self.path, "") + filename = obj['Key'].replace(self.path, '', 1) if filename == "": continue entry = paramiko.SFTPAttributes() @@ -234,13 +238,13 @@ def ls(self): for prefix in page['CommonPrefixes']: logger.debug(f"Found folder {prefix['Prefix']}") - filename = prefix['Prefix'].replace(self.path, '').rstrip("/") + filename = prefix['Prefix'].replace(self.path, '', 1).rstrip("/") if filename == "": continue entry = paramiko.SFTPAttributes() - entry.st_mode = 0o644 | stat.S_IFDIR + entry.st_mode = 0o755 | stat.S_IFDIR #entry.filename = filename #entry.longname = filename @@ -250,6 +254,10 @@ def ls(self): logger.debug(f"{self.entries=}") return self.entries + def mkdir(self, remote_dir): + logger.debug(f"mkdir {remote_dir=}; {self.path=}") + return + def put(self, msg, local_file, @@ -259,28 +267,27 @@ def put(self, length=0): logger.debug("sr_s3 put; %s %s" % (local_file, remote_file)) - # open - src = self.local_read_open(local_file, local_offset) - - # upload - self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=remote_file, Callback=self.write_chunk, Config=self.s3_transfer_config) + file_key = self.path + remote_file + logger.debug(f"put {local_file} to s3://{self.bucket}/{file_key}") + logger.debug(f"{msg=}") - rw_length = self.write_chunk_end() - # close - self.local_read_close(src) + # upload + self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=file_key, Config=self.s3_transfer_config) - return rw_length + write_size = self.client.get_object_attributes(Bucket='s3transfer', Key='foobar/amesii.csv', ObjectAttributes=['ObjectSize'])['ObjectSize'] + return write_size def registered_as(): return ['s3'] def rename(self, remote_old, remote_new): + logger.debug(f"{remote_old=}; {remote_new=}") self.client.copy_object(Bucket=self.bucket, CopySource=remote_old, Key=remote_new) self.client.delete_object(Bucket=self.bucket, Key=remote_old) def rmdir(self, path): - logger.debug("sr_s3 rmdir %s" % path) + logger.debug("%s" % path) paginator = self.client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=self.bucket, Prefix=path + "/") @@ -298,5 +305,5 @@ def rmdir(self, path): self.client.delete_objects(Bucket=self.bucket, Delete=delete_us) def umask(self): - logger.debug("sr_s3 umask") + logger.debug("umask") return \ No newline at end of file From 9ee644791dbce46f9bd92b31f9daed105f17a6b7 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 3 Apr 2024 12:02:30 -0400 Subject: [PATCH 09/65] Add metadata to uploaded S3 files Also cleaned up some test code, and did some re-formatting. --- sarracenia/transfer/s3.py | 49 +++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 9b52f0e8e..d90811ba5 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -27,6 +27,7 @@ import sys import paramiko import stat +import json from sarracenia.transfer import Transfer from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise @@ -89,9 +90,11 @@ def __credentials(self): try: ok, details = self.o.credentials.get(self.sendTo) - if details: url = details.url + if details: + url = details.url self.bucket = details.url.hostname + if url.username != '': self.client_args['aws_access_key_id'] = url.username if url.password != '': @@ -101,7 +104,6 @@ def __credentials(self): if hasattr(details, 's3_endpoint'): self.client_args['endpoint_url'] = details.s3_endpoint - return True except: @@ -126,7 +128,8 @@ def cd_forced(self, perm, path): def check_is_connected(self): logger.debug("sr_s3 check_is_connected") - if not self.connected : return False + if not self.connected: + return False if self.sendTo != self.o.sendTo: self.close() @@ -150,7 +153,6 @@ def connect(self): self.sendTo = self.o.sendTo self.__credentials() - #logger.debug(f"found credentials {self.client_args=}") try: self.client = boto3.client('s3', config=self.s3_client_config, **self.client_args) @@ -173,7 +175,6 @@ def connect(self): def delete(self, path): logger.debug("deleting %s" % path) - # if delete does not work (file not found) run pwd to see if connection is ok self.client.delete_object(Bucket=self.bucket, Key=path) def get(self, @@ -183,14 +184,12 @@ def get(self, remote_offset=0, local_offset=0, length=0, exactLength=False): + logger.debug("sr_s3 get; self.path %s" % self.path) file_key = self.path + remote_file logger.debug(f"get s3://{self.bucket}/{file_key} to {local_file}") - # initialize sumalgo - #if self.sumalgo: self.sumalgo.set_path(remote_file) - self.client.download_file(Bucket=self.bucket, Key=file_key, Filename=local_file, Config=self.s3_transfer_config) rw_length = os.stat(local_file).st_size @@ -213,13 +212,12 @@ def ls(self): for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: - # Only do stuff with objects that aren't "folders" - #if not obj['Key'][-1] == "/": - filename = obj['Key'].replace(self.path, '', 1) if filename == "": continue + entry = paramiko.SFTPAttributes() + if 'LastModified' in obj: t = obj["LastModified"].timestamp() entry.st_atime = t @@ -228,10 +226,7 @@ def ls(self): entry.st_size = obj['Size'] entry.st_mode = 0o644 - - #entry.filename = filename - #entry.longname = filename - + self.entries[filename] = entry if 'CommonPrefixes' in page: @@ -243,12 +238,8 @@ def ls(self): continue entry = paramiko.SFTPAttributes() - entry.st_mode = 0o755 | stat.S_IFDIR - - #entry.filename = filename - #entry.longname = filename - + self.entries[filename] = entry logger.debug(f"{self.entries=}") @@ -271,12 +262,24 @@ def put(self, logger.debug(f"put {local_file} to s3://{self.bucket}/{file_key}") logger.debug(f"{msg=}") + extra_args = { + 'Metadata': { + 'sarracenia': json.dumps({ + 'identity': msg['identity'], + 'mtime': msg['mtime'], + }) + } + } # upload - self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=file_key, Config=self.s3_transfer_config) + try: + self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=file_key, Config=self.s3_transfer_config, ExtraArgs=extra_args) - write_size = self.client.get_object_attributes(Bucket='s3transfer', Key='foobar/amesii.csv', ObjectAttributes=['ObjectSize'])['ObjectSize'] - return write_size + write_size = self.client.get_object_attributes(Bucket='s3transfer', Key='foobar/amesii.csv', ObjectAttributes=['ObjectSize'])['ObjectSize'] + return write_size + except Exception as e: + logger.error(f"Something went wrong with the upload: {e}", exc_info=True) + return -1 def registered_as(): return ['s3'] From f9e0f05daf84d4fdcc3db66c9b3d33bbac4d27cd Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 3 Apr 2024 12:32:28 -0400 Subject: [PATCH 10/65] Catch object metadata on ls Also fix statically assigned bucket and key names in object size fetch on puts --- sarracenia/transfer/s3.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index d90811ba5..b05afbe30 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -85,6 +85,8 @@ def __init(self): self.entries = {} + self._Metadata_Key = 'sarracenia_v3' + def __credentials(self): logger.debug("%s" % self.sendTo) @@ -218,12 +220,20 @@ def ls(self): entry = paramiko.SFTPAttributes() + obj_metadata = self.client.head_object(Bucket=self.bucket, Key=obj['Key'])['Metadata'] + + if self._Metadata_Key in obj_metadata: + sr_metadata = json.loads(obj_metadata[self._Metadata_Key]) + entry.sr_mtime = sr_metadata['mtime'] + entry.sr_identity = sr_metadata['identity'] + if 'LastModified' in obj: t = obj["LastModified"].timestamp() entry.st_atime = t entry.st_mtime = t if 'Size' in obj: entry.st_size = obj['Size'] + entry.st_mode = 0o644 @@ -264,7 +274,7 @@ def put(self, extra_args = { 'Metadata': { - 'sarracenia': json.dumps({ + self._Metadata_Key: json.dumps({ 'identity': msg['identity'], 'mtime': msg['mtime'], }) @@ -275,7 +285,7 @@ def put(self, try: self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=file_key, Config=self.s3_transfer_config, ExtraArgs=extra_args) - write_size = self.client.get_object_attributes(Bucket='s3transfer', Key='foobar/amesii.csv', ObjectAttributes=['ObjectSize'])['ObjectSize'] + write_size = self.client.get_object_attributes(Bucket=self.bucket, Key=file_key, ObjectAttributes=['ObjectSize'])['ObjectSize'] return write_size except Exception as e: logger.error(f"Something went wrong with the upload: {e}", exc_info=True) From 3d9764bf81871e3c93e8d9b74e0de4d46dbeb0d4 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 3 Apr 2024 15:11:40 -0400 Subject: [PATCH 11/65] Start of unit Tests for S3 Transfer class Just the basics for now, but more to come. --- sarracenia/transfer/s3.py | 10 +- tests/sarracenia/transfer/_transfer__test.py | 31 ++++ tests/sarracenia/transfer/s3_test.py | 160 +++++++++++++++++++ 3 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 tests/sarracenia/transfer/_transfer__test.py create mode 100644 tests/sarracenia/transfer/s3_test.py diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index b05afbe30..5d5125198 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -81,7 +81,8 @@ def __init(self): self.bucket = None self.client_args = {} - self.cwd = '' + self.path = "" + self.cwd = "" self.entries = {} @@ -199,9 +200,10 @@ def get(self, return rw_length def getcwd(self): - cwd = self.cwd if self.client else None - alarm_cancel() - return cwd + if self.client: + return self.cwd + else: + return None def ls(self): logger.debug(f"ls-ing items in {self.bucket}/{self.path}") diff --git a/tests/sarracenia/transfer/_transfer__test.py b/tests/sarracenia/transfer/_transfer__test.py new file mode 100644 index 000000000..eff7c780f --- /dev/null +++ b/tests/sarracenia/transfer/_transfer__test.py @@ -0,0 +1,31 @@ +import pytest +#from unittest.mock import Mock + +import logging + +import sarracenia +import sarracenia.config +import sarracenia.transfer + +#useful for debugging tests +import pprint +pretty = pprint.PrettyPrinter(indent=2, width=200).pprint + + +logger = logging.getLogger('sarracenia.config') +logger.setLevel('DEBUG') + +def test_factory(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.Transfer.factory('http', options) + + assert type(transfer) is sarracenia.transfer.https.Https + + transfer = sarracenia.transfer.Transfer.factory('DoesNotExist', options) + assert transfer == None + +def test___init__(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.Transfer.factory('http', options) + + assert transfer.fpos == 0 diff --git a/tests/sarracenia/transfer/s3_test.py b/tests/sarracenia/transfer/s3_test.py new file mode 100644 index 000000000..072d3ffe4 --- /dev/null +++ b/tests/sarracenia/transfer/s3_test.py @@ -0,0 +1,160 @@ +import pytest +#from unittest.mock import Mock + +import os +from base64 import b64decode +#import urllib.request +import logging +import re + +import sarracenia +import sarracenia.config +import sarracenia.transfer +import sarracenia.transfer.s3 + +import boto3, botocore + +#useful for debugging tests +import pprint +pretty = pprint.PrettyPrinter(indent=2, width=200).pprint + + +logger = logging.getLogger('sarracenia.config') +logger.setLevel('DEBUG') + +def test___init__(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert type(transfer) is sarracenia.transfer.s3.S3 + assert transfer.entries == {} + assert hasattr(transfer.s3_transfer_config, 'max_concurrency') + assert hasattr(transfer.s3_client_config, 'user_agent_extra') + +def test___credentials(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_cd(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert transfer.path == "" + assert transfer.cwd == "" + + transfer.cd("/this/Is/A/Path/") + + assert transfer.path == "this/Is/A/Path/" + assert transfer.cwd == "/this/Is/A/Path" + +def test_cd_forced(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert transfer.path == "" + assert transfer.cwd == "" + + transfer.cd("/this/Is/A/Path/") + + assert transfer.path == "this/Is/A/Path/" + assert transfer.cwd == "/this/Is/A/Path" + +@pytest.mark.depends(on=['test_close']) +def test_check_is_connected(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_chmod(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + transfer.chmod('777') + assert True + +def test_close(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + transfer.connected = True + transfer.client = boto3.client('s3') + + transfer.close() + + assert transfer.connected == False + assert transfer.client == None + +@pytest.mark.depends(on=['test___credentials']) +def test_connect(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_delete(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_get(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_getcwd(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert transfer.getcwd() == None + + transfer.client = boto3.client('s3') + assert transfer.getcwd() == '' + +def test_ls(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_mkdir(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + transfer.mkdir('ThisMeansNothing') + assert True + +def test_put(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_registered_as(): + #options = sarracenia.config.default_config() + #transfer = sarracenia.transfer.s3.S3('s3', options) + + assert sarracenia.transfer.s3.S3.registered_as() == ['s3'] + +def test_rename(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_rmdir(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + assert True + +def test_umask(): + options = sarracenia.config.default_config() + transfer = sarracenia.transfer.s3.S3('s3', options) + + transfer.umask() + assert True \ No newline at end of file From 7087ffc799627db068782d082ecc86f6b955eee1 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 3 Apr 2024 15:18:29 -0400 Subject: [PATCH 12/65] Add dependencies for S3 transfer tests --- tests/requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/requirements.txt b/tests/requirements.txt index ee7a8d51c..96a490a72 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -7,4 +7,6 @@ pytest-mock>=3.11 python-redis-lock>=4 fakeredis>=2.11 -fakeredis[lua]>=2.11 \ No newline at end of file +fakeredis[lua]>=2.11 +boto3>=1.34 +moto[s3]>=5.0 \ No newline at end of file From bd8924d420424dc2c3cf3a8d7b6a9a3487b72ba7 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 4 Apr 2024 11:36:51 -0400 Subject: [PATCH 13/65] Expanded unit tests for S3 transfer class --- sarracenia/transfer/s3.py | 4 +- tests/sarracenia/transfer/s3_test.py | 157 +++++++++++++++++++++++---- 2 files changed, 137 insertions(+), 24 deletions(-) diff --git a/sarracenia/transfer/s3.py b/sarracenia/transfer/s3.py index 5d5125198..80b23172b 100644 --- a/sarracenia/transfer/s3.py +++ b/sarracenia/transfer/s3.py @@ -77,6 +77,7 @@ def __init(self): self.client = None self.details = None self.seek = True + self.sendTo = None self.bucket = None self.client_args = {} @@ -148,6 +149,7 @@ def close(self): logger.debug("sr_s3 close") self.connected = False self.client = None + self.sendTo = None return def connect(self): @@ -298,7 +300,7 @@ def registered_as(): def rename(self, remote_old, remote_new): logger.debug(f"{remote_old=}; {remote_new=}") - self.client.copy_object(Bucket=self.bucket, CopySource=remote_old, Key=remote_new) + self.client.copy_object(Bucket=self.bucket, CopySource=self.bucket + "/" + remote_old, Key=remote_new) self.client.delete_object(Bucket=self.bucket, Key=remote_old) def rmdir(self, path): diff --git a/tests/sarracenia/transfer/s3_test.py b/tests/sarracenia/transfer/s3_test.py index 072d3ffe4..b30469095 100644 --- a/tests/sarracenia/transfer/s3_test.py +++ b/tests/sarracenia/transfer/s3_test.py @@ -14,6 +14,11 @@ import boto3, botocore +from moto import mock_aws +import base64 +import json +import stat + #useful for debugging tests import pprint pretty = pprint.PrettyPrinter(indent=2, width=200).pprint @@ -22,6 +27,49 @@ logger = logging.getLogger('sarracenia.config') logger.setLevel('DEBUG') + +TEST_BUCKET_NAME = 'NotARealBucket' +TEST_BUCKET_KEYS = { + 'RootFile.txt': { + 'value': 'Lorem ipsum', + 'meta': json.dumps({'mtime': '20240402T161825', 'identity': {'method': 'cod', 'value': 'sha512'}})}, + 'Folder1/NestedFile.jpg': { + 'value': base64.b64decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjuG8t/x8ABW8COSh+5hMAAAAASUVORK5CYII='), + 'meta': json.dumps({'mtime': '20240401T161825', 'identity': {'method': 'cod', 'value': 'sha512'}})}, + 'Folder1/NestedFolder/DoubleNestedFile.txt': { + 'value': 'This is the contents of DoubleNestedFile.txt', + 'meta': json.dumps({ 'mtime': '20240404T181822', 'identity': {'method': 'cod', 'value': 'sha512'}})}, + 'Folder2/AlsoNestedFile.dat': { + 'value': 'o28934ua;loifgja908024hf;oiau4fhj298yao;uih43wap98w4fiuaghw3oufiywag3fhjklawgv2873RTY23ILUGHli&tyl&uiGHUU', + 'meta': json.dumps({'mtime': '20240404T181822', 'identity': {'method': 'cod', 'value': 'sha512'}})}, + 'FolderToDelete/ThisFileWillBeGone.txt': { + 'value': 'ThisIsNotTheFileYouAreLookingFor', + 'meta': json.dumps({ 'mtime': '20240404T181822', 'identity': {'method': 'cod', 'value': 'sha512'}})}, + 'FileToRename.txt': { + 'value': 'This file used to be called FileToRename.txt', + 'meta': json.dumps({ 'mtime': '20240404T181822', 'identity': {'method': 'cod', 'value': 'sha512'}})}, +} + +@pytest.fixture(scope="function") +def build_client(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + with mock_aws(): + client = boto3.client("s3", region_name="us-east-1") + client.create_bucket(Bucket=TEST_BUCKET_NAME) + for key, details in TEST_BUCKET_KEYS.items(): + client.put_object(Bucket=TEST_BUCKET_NAME, Key=key, Body=details['value'], Metadata={'sarracenia_v3': details['meta']}) + + yield client + +def _list_keys(client): + return [item['Key'] for item in client.list_objects_v2(Bucket=TEST_BUCKET_NAME)['Contents']] + def test___init__(): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) @@ -56,7 +104,7 @@ def test_cd_forced(): assert transfer.path == "" assert transfer.cwd == "" - transfer.cd("/this/Is/A/Path/") + transfer.cd_forced('777', "/this/Is/A/Path/") assert transfer.path == "this/Is/A/Path/" assert transfer.cwd == "/this/Is/A/Path" @@ -64,9 +112,19 @@ def test_cd_forced(): @pytest.mark.depends(on=['test_close']) def test_check_is_connected(): options = sarracenia.config.default_config() + options.sendTo = 's3://foobar' transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True + + assert transfer.check_is_connected() == False + assert transfer.connected == False + + # This will still return False because of the sendto checks + transfer.connected = True + assert transfer.check_is_connected() == False + + transfer.connected = True + transfer.sendTo = options.sendTo + assert transfer.check_is_connected() == True def test_chmod(): options = sarracenia.config.default_config() @@ -94,32 +152,58 @@ def test_connect(): assert True -def test_delete(): +def test_delete(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True -def test_get(): + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + + assert 'RootFile.txt' in _list_keys(transfer.client) + + transfer.delete('RootFile.txt') + assert 'RootFile.txt' not in _list_keys(transfer.client) + +def test_get(build_client, tmp_path): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + transfer.path = 'Folder2/' + + filename = str(tmp_path) + os.sep + "DownloadedFile.txt" -def test_getcwd(): + # This isn't a valid message, but it serves our purposes here. + msg = {'identity': {'method': 'cod', 'value': 'sha512'}, 'mtime': '20240326T182732'} + + size = transfer.get(msg, 'AlsoNestedFile.dat', filename) + + assert size == len(TEST_BUCKET_KEYS['Folder2/AlsoNestedFile.dat']['value']) # This is the size of the "content" string + assert os.path.isfile(filename) + assert open(filename, 'r').read() == TEST_BUCKET_KEYS['Folder2/AlsoNestedFile.dat']['value'] + +def test_getcwd(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) assert transfer.getcwd() == None - transfer.client = boto3.client('s3') + transfer.client = build_client assert transfer.getcwd() == '' -def test_ls(): +def test_ls(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - assert True + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + entries = transfer.ls() + + assert len(entries) == 5 + assert 'FolderToDelete' in entries + assert entries['Folder1'].st_mode == 0o755 | stat.S_IFDIR + assert entries['FileToRename.txt'].st_mode == 0o644 + assert entries['RootFile.txt'].st_size == 11 def test_mkdir(): options = sarracenia.config.default_config() @@ -128,29 +212,56 @@ def test_mkdir(): transfer.mkdir('ThisMeansNothing') assert True -def test_put(): +def test_put(build_client, tmp_path): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + transfer.path = 'NewFolder/' + + filename = str(tmp_path) + os.sep + "FileToUpload.txt" + fp = open(filename, 'a') + fp.write('ThisIsMyBody\n') + fp.flush() + fp.close() + + # This isn't a valid message, but it serves our purposes here. + msg = {'identity': {'method': 'cod', 'value': 'sha512'}, 'mtime': '20240326T182732'} -def test_registered_as(): - #options = sarracenia.config.default_config() - #transfer = sarracenia.transfer.s3.S3('s3', options) + size = transfer.put(msg, filename, "FileToUpload.txt", 0,0) + assert size == 13 + assert "NewFolder/FileToUpload.txt" in _list_keys(transfer.client) + +def test_registered_as(): assert sarracenia.transfer.s3.S3.registered_as() == ['s3'] -def test_rename(): +def test_rename(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - assert True + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + + assert 'FileToRename.txt' in _list_keys(transfer.client) + + transfer.rename('FileToRename.txt', 'FileNewName.txt') + + assert 'FileToRename.txt' not in _list_keys(transfer.client) + assert 'FileNewName.txt' in _list_keys(transfer.client) -def test_rmdir(): +def test_rmdir(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) + + transfer.client = build_client + transfer.bucket = TEST_BUCKET_NAME + + assert 'FolderToDelete/ThisFileWillBeGone.txt' in _list_keys(transfer.client) - assert True + transfer.rmdir('FolderToDelete') + + assert 'FolderToDelete/ThisFileWillBeGone.txt' not in _list_keys(transfer.client) def test_umask(): options = sarracenia.config.default_config() From 386a9f7c3574a0ad87f935d9b3cab5cf4e7a58cd Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 4 Apr 2024 12:31:29 -0400 Subject: [PATCH 14/65] Add last two method tests They're not 100% complete, but overall coverage of the class is at 90%, which seems like it's good enough. --- tests/sarracenia/transfer/s3_test.py | 46 +++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/tests/sarracenia/transfer/s3_test.py b/tests/sarracenia/transfer/s3_test.py index b30469095..fbcfd5d17 100644 --- a/tests/sarracenia/transfer/s3_test.py +++ b/tests/sarracenia/transfer/s3_test.py @@ -28,7 +28,7 @@ logger.setLevel('DEBUG') -TEST_BUCKET_NAME = 'NotARealBucket' +TEST_BUCKET_NAME = 'notarealbucket' TEST_BUCKET_KEYS = { 'RootFile.txt': { 'value': 'Lorem ipsum', @@ -80,10 +80,32 @@ def test___init__(): assert hasattr(transfer.s3_client_config, 'user_agent_extra') def test___credentials(): - options = sarracenia.config.default_config() - transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True + transfer = sarracenia.transfer.s3.S3('s3', sarracenia.config.default_config()) + + #simple path + transfer.o.credentials._parse('s3://testing_simple_bucket_creds') + transfer.sendTo = 's3://testing_simple_bucket_creds' + transfer._S3__credentials() + assert transfer.bucket == 'testing_simple_bucket_creds' + assert transfer.client_args == { + 'aws_access_key_id': None, + 'aws_secret_access_key': None, + 'aws_session_token': None, + 'endpoint_url': None + } + + #Complex, with all options/details + transfer = sarracenia.transfer.s3.S3('s3', sarracenia.config.default_config()) + transfer.o.credentials._parse('s3://testing__access_key_id:testing__secret_access_key@testing_full_bucket_creds s3_session_token=testing_session_token,s3_endpoint=https://testing_endpoint:5000') + transfer.sendTo = 's3://testing_full_bucket_creds' + transfer._S3__credentials() + assert transfer.bucket == 'testing_full_bucket_creds' + assert transfer.client_args == { + 'aws_access_key_id': 'testing__access_key_id', + 'aws_secret_access_key': 'testing__secret_access_key', + 'aws_session_token': 'testing_session_token', + 'endpoint_url': 'https://testing_endpoint:5000' + } def test_cd(): options = sarracenia.config.default_config() @@ -146,11 +168,19 @@ def test_close(): assert transfer.client == None @pytest.mark.depends(on=['test___credentials']) -def test_connect(): +def test_connect(build_client): options = sarracenia.config.default_config() transfer = sarracenia.transfer.s3.S3('s3', options) - - assert True + + transfer.o.credentials._parse('s3://testing_simple_bucket_creds') + transfer.o.sendTo = 's3://testing_simple_bucket_creds' + + assert transfer.connect() == False + + transfer.o.sendTo = 's3://' + TEST_BUCKET_NAME + assert transfer.connect() == True + + # Probably need to test exception handling here, but... that sounds like a lot of work. def test_delete(build_client): options = sarracenia.config.default_config() From 2417c60055ca907d0b0fe3580509523179c5f24b Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 15 Mar 2024 23:08:32 -0400 Subject: [PATCH 15/65] housekeeping check missing in wiski --- sarracenia/flowcb/scheduled/wiski.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/flowcb/scheduled/wiski.py b/sarracenia/flowcb/scheduled/wiski.py index 256b036a5..44d17eb0f 100644 --- a/sarracenia/flowcb/scheduled/wiski.py +++ b/sarracenia/flowcb/scheduled/wiski.py @@ -138,7 +138,7 @@ def gather(self,messageCountMax): # placeholder self.wait_until_next() while (1): - if self.stop_requested: + if self.stop_requested or self.housekeeping_needed: return messages self.token = self.submit_tokenization_request() From ecbb5b323b6b3f0cf79c698aacfd4d0fb653d59f Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 18 Mar 2024 03:21:26 -0400 Subject: [PATCH 16/65] allow gather to abort further gathers by changing return value to a tuple --- sarracenia/flow/__init__.py | 13 +++++++++++-- sarracenia/flowcb/__init__.py | 9 ++++++--- sarracenia/flowcb/gather/am.py | 2 +- sarracenia/flowcb/gather/file.py | 8 ++++---- sarracenia/flowcb/gather/message.py | 8 +++++--- sarracenia/flowcb/log.py | 2 +- sarracenia/flowcb/poll/__init__.py | 1 - sarracenia/flowcb/retry.py | 6 +++--- sarracenia/flowcb/run.py | 2 +- sarracenia/flowcb/scheduled/__init__.py | 4 ++-- sarracenia/flowcb/scheduled/wiski.py | 6 +++--- 11 files changed, 37 insertions(+), 24 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index bed31b1d2..13c87bda0 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -1093,9 +1093,18 @@ def filter(self) -> None: def gather(self) -> None: so_far=0 + keep_going=True for p in self.plugins["gather"]: try: - new_incoming = p(self.o.batch-so_far) + retval = p(self.o.batch-so_far) + + # To avoid having to modify all existing gathers, support old API. + if type(retval) == tuple: + keep_going, new_incoming = retval + elif type(retval) == list: + new_incoming = retval + else: + logger.error( f"flowCallback plugin gather routine {p} returned unexpected type: {type(retval)}. Expected tuple of boolean and list of new messages" ) except Exception as ex: logger.error( f'flowCallback plugin {p} crashed: {ex}' ) logger.debug( "details:", exc_info=True ) @@ -1106,7 +1115,7 @@ def gather(self) -> None: so_far += len(new_incoming) # if we gathered enough with a subset of plugins then return. - if so_far >= self.o.batch: + if not keep_going or so_far >= self.o.batch: return diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index 5ec2c94de..27587a07e 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -80,14 +80,17 @@ def ack(self,messagelist) -> None:: Task: acknowledge messages from a gather source. - def gather(self, messageCountMax) -> list:: + def gather(self, messageCountMax) -> (gather_more, messages) :: - Task: gather messages from a source... return a list of messages + Task: gather messages from a source... return a tuple: + + * gather_more ... bool whether to continue gathering + * messages ... list of messages in a poll, gather is always called, regardless of vip posession. in all other components, gather is only called when in posession of the vip. - return [] + return (True, []) def after_accept(self,worklist) -> None:: diff --git a/sarracenia/flowcb/gather/am.py b/sarracenia/flowcb/gather/am.py index a57cf7320..2897c5a77 100644 --- a/sarracenia/flowcb/gather/am.py +++ b/sarracenia/flowcb/gather/am.py @@ -485,4 +485,4 @@ def gather(self, messageCountMax): except Exception as e: logger.error(f"Unable to generate bulletin file. Error message: {e}") - return newmsg + return (True, newmsg) diff --git a/sarracenia/flowcb/gather/file.py b/sarracenia/flowcb/gather/file.py index 8238cb4dc..321daf0c6 100755 --- a/sarracenia/flowcb/gather/file.py +++ b/sarracenia/flowcb/gather/file.py @@ -692,19 +692,19 @@ def gather(self, messageCountMax): if len(self.queued_messages) > self.o.batch: messages = self.queued_messages[0:self.o.batch] self.queued_messages = self.queued_messages[self.o.batch:] - return messages + return (True, messages) elif len(self.queued_messages) > 0: messages = self.queued_messages self.queued_messages = [] if self.o.sleep < 0: - return messages + return (True, messages) else: messages = [] if self.primed: - return self.wakeup() + return (True, self.wakeup()) cwd = os.getcwd() @@ -740,4 +740,4 @@ def gather(self, messageCountMax): messages = messages[0:self.o.batch] self.primed = True - return messages + return (True, messages) diff --git a/sarracenia/flowcb/gather/message.py b/sarracenia/flowcb/gather/message.py index d155b8795..f6fcea4bb 100755 --- a/sarracenia/flowcb/gather/message.py +++ b/sarracenia/flowcb/gather/message.py @@ -30,14 +30,16 @@ def __init__(self, options) -> None: def gather(self, messageCountMax) -> list: """ - return a current list of messages. + return: + True ... you can gather from other sources. and: + a list of messages obtained from this source. """ if hasattr(self,'consumer') and hasattr(self.consumer,'newMessages'): - return self.consumer.newMessages() + return (True, self.consumer.newMessages()) else: logger.warning( f'not connected. Trying to connect to {self.o.broker}') self.consumer = sarracenia.moth.Moth.subFactory(self.od) - return [] + return (True, []) def ack(self, mlist) -> None: diff --git a/sarracenia/flowcb/log.py b/sarracenia/flowcb/log.py index 3e5ddd6e1..9ff534ec0 100755 --- a/sarracenia/flowcb/log.py +++ b/sarracenia/flowcb/log.py @@ -64,7 +64,7 @@ def gather(self, messageCountMax): if set(['gather']) & self.o.logEvents: logger.info( f' messageCountMax: {messageCountMax} ') - return [] + return (True, []) def _messageStr(self, msg): if self.o.logMessageDump: diff --git a/sarracenia/flowcb/poll/__init__.py b/sarracenia/flowcb/poll/__init__.py index a39c55bed..d0b00deaf 100755 --- a/sarracenia/flowcb/poll/__init__.py +++ b/sarracenia/flowcb/poll/__init__.py @@ -115,7 +115,6 @@ class Poll(FlowCB): * options are passed to sarracenia.Transfer classes for their use as well. - Poll uses sarracenia.transfer (ftp, sftp, https, etc... )classes to requests lists of files using those protocols using built-in logic. diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index 3c51e73b7..2c12467ce 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -84,9 +84,9 @@ def gather(self, qty) -> None: """ if not features['retry']['present'] or not self.o.retry_refilter: - return [] + return (True, []) - if qty <= 0: return [] + if qty <= 0: return (True, []) message_list = self.download_retry.get(qty) @@ -99,7 +99,7 @@ def gather(self, qty) -> None: m['_deleteOnPost'] = set( [ '_isRetry' ] ) - return message_list + return (True, message_list) def after_accept(self, worklist) -> None: diff --git a/sarracenia/flowcb/run.py b/sarracenia/flowcb/run.py index c183522ab..d1a1c1a90 100755 --- a/sarracenia/flowcb/run.py +++ b/sarracenia/flowcb/run.py @@ -76,7 +76,7 @@ def gather(self, messageCountMax): if hasattr(self.o, 'run_gather') and self.o.run_gather is not None: self.run_script(self.o.run_gather) - return [] + return (True, []) def after_accept(self, worklist): """ diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index dd770e746..acc518bda 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -93,7 +93,7 @@ def gather(self,messageCountMax): self.wait_until_next() if self.stop_requested or self.housekeeping_needed: - return [] + return (True, []) logger.info('time to run') @@ -105,7 +105,7 @@ def gather(self,messageCountMax): m = sarracenia.Message.fromFileInfo(relPath, self.o, st) gathered_messages.append(m) - return gathered_messages + return (True, gathered_messages) def on_housekeeping(self): diff --git a/sarracenia/flowcb/scheduled/wiski.py b/sarracenia/flowcb/scheduled/wiski.py index 44d17eb0f..1f12b3083 100644 --- a/sarracenia/flowcb/scheduled/wiski.py +++ b/sarracenia/flowcb/scheduled/wiski.py @@ -139,7 +139,7 @@ def gather(self,messageCountMax): # placeholder while (1): if self.stop_requested or self.housekeeping_needed: - return messages + return (True, messages) self.token = self.submit_tokenization_request() authenticated_url = self.main_url @@ -172,7 +172,7 @@ def gather(self,messageCountMax): # placeholder for station_id in k.get_station_list().station_id: if self.stop_requested: - return messages + return (False, messages) timeseries = k.get_timeseries_list(station_id = station_id ).ts_id #logger.info( f"looping over the timeseries: {timeseries}" ) @@ -197,7 +197,7 @@ def gather(self,messageCountMax): # placeholder f.close() messages.append( sarracenia.Message.fromFileData( fname, self.o, os.stat(fname) ) ) - return messages + return (True, messages) if __name__ == '__main__': From 4fc56d4a676801ecfe2ae8a8978bccece44bba95 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 21 Mar 2024 08:00:14 -0400 Subject: [PATCH 17/65] implement #974 poll refactor to used scheduled flows --- sarracenia/config.py | 5 +++ sarracenia/flow/__init__.py | 24 +++++++++++-- sarracenia/flow/poll.py | 30 ++++------------ sarracenia/flowcb/__init__.py | 12 +++++-- sarracenia/flowcb/poll/airnow.py | 2 +- sarracenia/flowcb/scheduled/poll.py | 55 +++++++++++++++++++++++++++++ 6 files changed, 98 insertions(+), 30 deletions(-) create mode 100644 sarracenia/flowcb/scheduled/poll.py diff --git a/sarracenia/config.py b/sarracenia/config.py index 42f70fee0..281e7c52d 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -1979,6 +1979,11 @@ def finalize(self, component=None, config=None): if (component not in ['poll' ]): self.path = list(map( os.path.expanduser, self.path )) + else: + if self.sleep > 1: + self.scheduled_interval = self.sleep + self.sleep=1 + if self.vip and not features['vip']['present']: logger.critical( f"vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " ) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 13c87bda0..da5bbc4e6 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -481,7 +481,6 @@ def run(self): spamming = False self.filter() - self._runCallbacksWorklist('after_accept') logger.debug( @@ -1115,9 +1114,30 @@ def gather(self) -> None: so_far += len(new_incoming) # if we gathered enough with a subset of plugins then return. - if not keep_going or so_far >= self.o.batch: + if not keep_going or (so_far >= self.o.batch): + if (self.o.component == 'poll' ): + self.worklist.poll_catching_up=True + return + # gather is an extended version of poll. + if self.o.component != 'poll': + return + + if len(self.worklist.incoming) > 0: + logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) ) + self.worklist.poll_catching_up = True + return + else: + self.worklist.poll_catching_up = False + + if self.have_vip: + for plugin in self.plugins['poll']: + new_incoming = plugin() + if len(new_incoming) > 0: + self.worklist.incoming.extend(new_incoming) + + def do(self) -> None: diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index f6dd49d32..bbdf16b9a 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -63,16 +63,17 @@ def __init__(self, options): else: logger.info( f"Good! post_exchange: {px} and exchange: {self.o.exchange} match so multiple instances to share a poll." ) - if not 'poll' in ','.join(self.plugins['load']): + if not 'scheduled' in ','.join(self.plugins['load']): + self.plugins['load'].append('sarracenia.flowcb.scheduled.poll.Poll') + + if not 'flowcb.poll.Poll' in ','.join(self.plugins['load']): logger.info( f"adding poll plugin, because missing from: {self.plugins['load']}" ) self.plugins['load'].append('sarracenia.flowcb.poll.Poll') if options.vip: - self.plugins['load'].insert( - 0, 'sarracenia.flowcb.gather.message.Message') + self.plugins['load'].insert( 0, 'sarracenia.flowcb.gather.message.Message') - self.plugins['load'].insert(0, - 'sarracenia.flowcb.post.message.Message') + self.plugins['load'].insert( 0, 'sarracenia.flowcb.post.message.Message') if self.o.nodupe_ttl < self.o.fileAgeMax: logger.warning( f"nodupe_ttl < fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904") @@ -97,22 +98,3 @@ def do(self): logger.debug('processing %d messages worked! (stop requested: %s)' % (len(self.worklist.incoming), self._stop_requested)) self.worklist.incoming = [] - - - def gather(self): - - super().gather() - - if len(self.worklist.incoming) > 0: - logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) ) - self.worklist.poll_catching_up = True - return - else: - self.worklist.poll_catching_up = False - - if self.have_vip: - for plugin in self.plugins['poll']: - new_incoming = plugin() - if len(new_incoming) > 0: - self.worklist.incoming.extend(new_incoming) - diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index 27587a07e..feee0808d 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -87,10 +87,16 @@ def gather(self, messageCountMax) -> (gather_more, messages) :: * gather_more ... bool whether to continue gathering * messages ... list of messages - in a poll, gather is always called, regardless of vip posession. - in all other components, gather is only called when in posession + or just return a list of messages. + + In a poll, gather is always called, regardless of vip posession. + + In all other components, gather is only called when in posession of the vip. - return (True, []) + + return (True, list) + OR + return list def after_accept(self,worklist) -> None:: diff --git a/sarracenia/flowcb/poll/airnow.py b/sarracenia/flowcb/poll/airnow.py index c814aab26..4e13edb8e 100755 --- a/sarracenia/flowcb/poll/airnow.py +++ b/sarracenia/flowcb/poll/airnow.py @@ -27,7 +27,7 @@ class Airnow(FlowCB): def poll(self): - sleep = self.o.sleep + sleep = self.o.scheduled_interval gathered_messages = [] for Hours in range(1, 3): diff --git a/sarracenia/flowcb/scheduled/poll.py b/sarracenia/flowcb/scheduled/poll.py new file mode 100644 index 000000000..c19be929c --- /dev/null +++ b/sarracenia/flowcb/scheduled/poll.py @@ -0,0 +1,55 @@ +import logging +import requests +import base64 + +import datetime +import os +import sys +import time + +from datetime import date + +import sarracenia +from sarracenia.flowcb.scheduled import Scheduled + +logger = logging.getLogger(__name__) + + + + +class Poll(Scheduled): + """ + + """ + + def gather(self,messageCountMax): # placeholder + """ + + This gather aborts further gathers if the next interval has not yet arrived. + + """ + logger.info( f"waiting for next poll") + self.wait_until_next() + + return not (self.stop_requested or self.housekeeping_needed), [] + + +if __name__ == '__main__': + + import sarracenia.config + import types + import sarracenia.flow + + options = sarracenia.config.default_config() + flow = sarracenia.flow.Flow(options) + flow.o.scheduled_interval= 5 + flow.o.pollUrl = "https://dd.weather.gc.ca/bulletins/alphanumeric/" + if sys.platform.startswith( "win" ): + flow.o.directory = "C:\\temp\poll" + else: + flow.o.directory = "/tmp/scheduled_poll/${%Y%m%d}" + logging.basicConfig(level=logging.DEBUG) + + me = Poll(flow.o) + me.gather(flow.o.batch) + logger.info("Done") From b9463caac76f8dfdbaeb6fa2f5f441fa65e0c460 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 19 Mar 2024 02:16:20 -0400 Subject: [PATCH 18/65] updating documentation of gather routine's new variable return formats --- docs/source/Explanation/SarraPluginDev.rst | 5 +++++ docs/source/How2Guides/FlowCallbacks.rst | 6 +++++- docs/source/fr/CommentFaire/FlowCallbacks.rst | 2 ++ docs/source/fr/Explication/SarraPluginDev.rst | 4 ++++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/docs/source/Explanation/SarraPluginDev.rst b/docs/source/Explanation/SarraPluginDev.rst index cc7364354..8927b46c5 100644 --- a/docs/source/Explanation/SarraPluginDev.rst +++ b/docs/source/Explanation/SarraPluginDev.rst @@ -585,12 +585,17 @@ for detailed information about call signatures and return values, etc... | | permanent name. | | | | | | return the new name for the downloaded/sent file. | +| | | +---------------------+----------------------------------------------------+ | download(self,msg) | replace built-in downloader return true on success | | | takes message as argument. | +---------------------+----------------------------------------------------+ | gather(self) | gather messages from a source, returns a list of | | | messages. | +| | can also return a tuple where the first element | +| | is a boolean flag keep_going indicating whether | +| | to stop gather processing. | +| | | +---------------------+----------------------------------------------------+ | | Called every housekeeping interval (minutes) | | | used to clean cache, check for occasional issues. | diff --git a/docs/source/How2Guides/FlowCallbacks.rst b/docs/source/How2Guides/FlowCallbacks.rst index 8469f2412..876a92f9e 100644 --- a/docs/source/How2Guides/FlowCallbacks.rst +++ b/docs/source/How2Guides/FlowCallbacks.rst @@ -215,7 +215,11 @@ Other entry_points, extracted from sarracenia/flowcb/__init__.py :: def gather(self): - Task: gather notification messages from a source... return a list of notification messages. + Task: gather notification messages from a source... return either: + * a list of notification messages, or + * a tuple, (bool:keep_going, list of messages) + * to curtail further gathers in this cycle. + return [] def metrics_report(self) -> dict: diff --git a/docs/source/fr/CommentFaire/FlowCallbacks.rst b/docs/source/fr/CommentFaire/FlowCallbacks.rst index ecf8fb931..70bff1dfe 100644 --- a/docs/source/fr/CommentFaire/FlowCallbacks.rst +++ b/docs/source/fr/CommentFaire/FlowCallbacks.rst @@ -182,6 +182,8 @@ Autres entry_points, extraits de sarracenia/flowcb/__init__.py :: def gather(self): Task: gather notification messages from a source... return a list of notification messages. + can also return tuple (keep_going, new_messages) where keep_going is a flag + that when False stops processing of further gather routines. return [] """ diff --git a/docs/source/fr/Explication/SarraPluginDev.rst b/docs/source/fr/Explication/SarraPluginDev.rst index d0e1056e5..0eb1ea4c1 100644 --- a/docs/source/fr/Explication/SarraPluginDev.rst +++ b/docs/source/fr/Explication/SarraPluginDev.rst @@ -554,6 +554,10 @@ pour des informations détaillées sur les signatures d’appel et les valeurs d +---------------------+----------------------------------------------------+ | gather(self) | Rassembler les messages a la source, retourne une | | | une liste de messages. | +| | on peut également retourner un tuple dont le | +| | première élément est une valeur booléen keep_going | +| | qui peut arreter l´execution des gather. | +| | | +---------------------+----------------------------------------------------+ | | Appelé à chaque intervalle housekeeping (minutes). | | | utilisé pour nettoyer le cache, vérifier les | From 512fec521d8cefe7ed49907aad071f13da41f6a3 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 19 Mar 2024 23:22:15 -0400 Subject: [PATCH 19/65] Adding initial run on polls with scheduled intervals --- sarracenia/flowcb/scheduled/__init__.py | 5 +++++ sarracenia/flowcb/scheduled/poll.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index acc518bda..e196b98d4 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -86,6 +86,7 @@ def __init__(self,options,logger=logger): now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) self.update_appointments(now) + self.first_interval=True def gather(self,messageCountMax): @@ -166,6 +167,10 @@ def wait_until( self, appointment ): def wait_until_next( self ): if self.o.scheduled_interval > 0: + if self.first_interval: + self.first_interval=False + return + self.wait_seconds(datetime.timedelta(seconds=self.o.scheduled_interval)) return diff --git a/sarracenia/flowcb/scheduled/poll.py b/sarracenia/flowcb/scheduled/poll.py index c19be929c..92ef77b79 100644 --- a/sarracenia/flowcb/scheduled/poll.py +++ b/sarracenia/flowcb/scheduled/poll.py @@ -52,4 +52,6 @@ def gather(self,messageCountMax): # placeholder me = Poll(flow.o) me.gather(flow.o.batch) - logger.info("Done") + logger.info("first done") + me.gather(flow.o.batch) + logger.info("Second Done") From 822b47dcda9e3192cd126f32af1d18a221443083 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 19 Mar 2024 23:34:46 -0400 Subject: [PATCH 20/65] fix bug where using localtime instead of utc can cause infinite loop --- sarracenia/flowcb/scheduled/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index e196b98d4..2507ae884 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -184,7 +184,7 @@ def wait_until_next( self ): if next_appointment is None: # done for the day... - tomorrow = datetime.date.today()+datetime.timedelta(days=1) + tomorrow = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)+datetime.timedelta(days=1) midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc) midnight = datetime.datetime.combine(tomorrow,midnight) self.update_appointments(midnight) From 6c365be178a01bb71960cef6d48266779e4826e1 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 19 Mar 2024 23:47:37 -0400 Subject: [PATCH 21/65] notice when appointments are in the past --- sarracenia/flowcb/scheduled/__init__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index 2507ae884..d2fd6d2de 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -177,10 +177,18 @@ def wait_until_next( self ): if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ): now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) next_appointment=None + missed_appointments=[] for t in self.appointments: if now < t: next_appointment=t break + else: + logger.info( f'already too late to {t} skipping' ) + missed_appointments.append(t) + + if missed_appointments: + for ma in missed_appointments: + self.appointments.remove(ma) if next_appointment is None: # done for the day... From 2dd35eef74d59ff49830455829a7a90d46859b55 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 20 Mar 2024 14:34:08 -0400 Subject: [PATCH 22/65] refactoring Flow/run() to simplify --- sarracenia/flow/__init__.py | 158 ++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 79 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index da5bbc4e6..c26a39585 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -470,8 +470,6 @@ def run(self): if not stopping: self.gather() - else: - self.worklist.incoming = [] last_gather_len = len(self.worklist.incoming) if (last_gather_len == 0): @@ -481,21 +479,6 @@ def run(self): spamming = False self.filter() - self._runCallbacksWorklist('after_accept') - - logger.debug( - 'B filtered incoming: %d, ok: %d (directories: %d), rejected: %d, failed: %d stop_requested: %s have_vip: %s' - % (len(self.worklist.incoming), len( - self.worklist.ok), len(self.worklist.directories_ok), - len(self.worklist.rejected), len( - self.worklist.failed), self._stop_requested, - self.have_vip)) - - self.ack(self.worklist.ok) - self.worklist.ok = [] - self.ack(self.worklist.rejected) - self.worklist.rejected = [] - self.ack(self.worklist.failed) # this for duplicate cache synchronization. if self.worklist.poll_catching_up: @@ -517,65 +500,7 @@ def run(self): os.unlink( self.o.novipFilename ) # normal processing, when you are active. - self.do() - - # need to acknowledge here, because posting will delete message-id - self.ack(self.worklist.ok) - self.ack(self.worklist.rejected) - self.ack(self.worklist.failed) - - # adjust message after action is done, but before 'after_work' so adjustment is possible. - for m in self.worklist.ok: - if ('new_baseUrl' in m) and (m['baseUrl'] != - m['new_baseUrl']): - m['old_baseUrl'] = m['baseUrl'] - m['_deleteOnPost'] |= set(['old_baseUrl']) - m['baseUrl'] = m['new_baseUrl'] - if ('new_retrievePath' in m) : - m['old_retrievePath'] = m['retrievePath'] - m['retrievePath'] = m['new_retrievePath'] - m['_deleteOnPost'] |= set(['old_retrievePath']) - - # if new_file does not match relPath, then adjust relPath so it does. - if 'relPath' in m and m['new_file'] != m['relPath'].split('/')[-1]: - if not 'new_relPath' in m: - if len(m['relPath']) > 1: - m['new_relPath'] = '/'.join( m['relPath'].split('/')[0:-1] + [ m['new_file'] ]) - else: - m['new_relPath'] = m['new_file'] - else: - if len(m['new_relPath']) > 1: - m['new_relPath'] = '/'.join( m['new_relPath'].split('/')[0:-1] + [ m['new_file'] ] ) - else: - m['new_relPath'] = m['new_file'] - - if ('new_relPath' in m) and (m['relPath'] != m['new_relPath']): - m['old_relPath'] = m['relPath'] - m['_deleteOnPost'] |= set(['old_relPath']) - m['relPath'] = m['new_relPath'] - m['old_subtopic'] = m['subtopic'] - m['_deleteOnPost'] |= set(['old_subtopic','subtopic']) - m['subtopic'] = m['new_subtopic'] - - if '_format' in m: - m['old_format'] = m['_format'] - m['_deleteOnPost'] |= set(['old_format']) - m['_format'] = m['post_format'] - - # restore adjustment to fileOp - if 'post_fileOp' in m: - m['fileOp'] = m['post_fileOp'] - - if self.o.download and 'retrievePath' in m: - # retrieve paths do not propagate after download. - del m['retrievePath'] - - - self._runCallbacksWorklist('after_work') - - self.ack(self.worklist.rejected) - self.worklist.rejected = [] - self.ack(self.worklist.failed) + self.work() if len(self.plugins["post"]) > 0: self.post() @@ -1086,9 +1011,21 @@ def filter(self) -> None: self.reject(m, 304, "unmatched pattern %s" % url) self.worklist.incoming = filtered_worklist - logger.debug( - 'end len(incoming)=%d, rejected=%d' % - (len(self.worklist.incoming), len(self.worklist.rejected))) + + logger.debug( 'end len(incoming)=%d, rejected=%d' % (len(self.worklist.incoming), len(self.worklist.rejected))) + + self._runCallbacksWorklist('after_accept') + + logger.debug( 'B filtered incoming: %d, ok: %d (directories: %d), rejected: %d, failed: %d stop_requested: %s have_vip: %s' + % (len(self.worklist.incoming), len(self.worklist.ok), len(self.worklist.directories_ok), + len(self.worklist.rejected), len(self.worklist.failed), self._stop_requested, self.have_vip)) + + self.ack(self.worklist.ok) + self.worklist.ok = [] + self.ack(self.worklist.rejected) + self.worklist.rejected = [] + self.ack(self.worklist.failed) + def gather(self) -> None: so_far=0 @@ -1150,6 +1087,69 @@ def do(self) -> None: logger.debug('processing %d messages worked!' % len(self.worklist.ok)) + def work(self) -> None: + + self.do() + + # need to acknowledge here, because posting will delete message-id + self.ack(self.worklist.ok) + self.ack(self.worklist.rejected) + self.ack(self.worklist.failed) + + # adjust message after action is done, but before 'after_work' so adjustment is possible. + for m in self.worklist.ok: + if ('new_baseUrl' in m) and (m['baseUrl'] != + m['new_baseUrl']): + m['old_baseUrl'] = m['baseUrl'] + m['_deleteOnPost'] |= set(['old_baseUrl']) + m['baseUrl'] = m['new_baseUrl'] + if ('new_retrievePath' in m) : + m['old_retrievePath'] = m['retrievePath'] + m['retrievePath'] = m['new_retrievePath'] + m['_deleteOnPost'] |= set(['old_retrievePath']) + + # if new_file does not match relPath, then adjust relPath so it does. + if 'relPath' in m and m['new_file'] != m['relPath'].split('/')[-1]: + if not 'new_relPath' in m: + if len(m['relPath']) > 1: + m['new_relPath'] = '/'.join( m['relPath'].split('/')[0:-1] + [ m['new_file'] ]) + else: + m['new_relPath'] = m['new_file'] + else: + if len(m['new_relPath']) > 1: + m['new_relPath'] = '/'.join( m['new_relPath'].split('/')[0:-1] + [ m['new_file'] ] ) + else: + m['new_relPath'] = m['new_file'] + + if ('new_relPath' in m) and (m['relPath'] != m['new_relPath']): + m['old_relPath'] = m['relPath'] + m['_deleteOnPost'] |= set(['old_relPath']) + m['relPath'] = m['new_relPath'] + m['old_subtopic'] = m['subtopic'] + m['_deleteOnPost'] |= set(['old_subtopic','subtopic']) + m['subtopic'] = m['new_subtopic'] + + if '_format' in m: + m['old_format'] = m['_format'] + m['_deleteOnPost'] |= set(['old_format']) + m['_format'] = m['post_format'] + + # restore adjustment to fileOp + if 'post_fileOp' in m: + m['fileOp'] = m['post_fileOp'] + + if self.o.download and 'retrievePath' in m: + # retrieve paths do not propagate after download. + del m['retrievePath'] + + self._runCallbacksWorklist('after_work') + + self.ack(self.worklist.rejected) + self.worklist.rejected = [] + self.ack(self.worklist.failed) + + + def post(self) -> None: # work-around for python3.5 not being able to copy re.match issue: From 4162df06be32668889b3316de2e2a2c2553b3919 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 20 Mar 2024 14:51:18 -0400 Subject: [PATCH 23/65] refactoring run further to make it shorter --- sarracenia/flow/__init__.py | 99 ++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index c26a39585..e737af125 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -451,23 +451,20 @@ def run(self): if self._stop_requested: if stopping: logger.info('clean stop from run loop') - self.close() break else: - logger.info( - 'starting last pass (without gather) through loop for cleanup.' - ) + logger.info( 'starting last pass (without gather) through loop for cleanup.') stopping = True self.have_vip = self.has_vip() + self.worklist.incoming = [] + if (self.o.component == 'poll') or self.have_vip: if ( self.o.messageRateMax > 0 ) and (current_rate > 0.8*self.o.messageRateMax ): logger.info("current_rate (%.2f) vs. messageRateMax(%.2f)) " % (current_rate, self.o.messageRateMax)) - self.worklist.incoming = [] - if not stopping: self.gather() @@ -501,39 +498,7 @@ def run(self): # normal processing, when you are active. self.work() - - if len(self.plugins["post"]) > 0: - self.post() - self._runCallbacksWorklist('after_post') - - self._runCallbacksWorklist('report') - self._runCallbackMetrics() - - if hasattr(self.o, 'metricsFilename' ) and os.path.isdir(os.path.dirname(self.o.metricsFilename)): - metrics=json.dumps(self.metrics) - with open(self.o.metricsFilename, 'w') as mfn: - mfn.write(metrics+"\n") - if self.o.logMetrics: - if self.o.logRotateInterval >= 24*60*60: - tslen=8 - elif self.o.logRotateInterval > 60: - tslen=14 - else: - tslen=16 - timestamp=time.strftime("%Y%m%d-%H%M%S", time.gmtime()) - with open(self.o.metricsFilename + '.' + timestamp[0:tslen], 'a') as mfn: - mfn.write( f'\"{timestamp}\" : {metrics},\n') - - # removing old metrics files - logger.info( f"looking for old metrics for {self.o.metricsFilename}" ) - old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount] - for o in old_metrics: - logger.info( f"removing old metrics file: {o} " ) - os.unlink(o) - - self.worklist.ok = [] - self.worklist.directories_ok = [] - self.worklist.failed = [] + self.post() now = nowflt() run_time = now - start_time @@ -1152,19 +1117,51 @@ def work(self) -> None: def post(self) -> None: - # work-around for python3.5 not being able to copy re.match issue: - # https://github.com/MetPX/sarracenia/issues/857 - if sys.version_info.major == 3 and sys.version_info.minor <= 6: - for m in self.worklist.ok: - if '_matches' in m: - del m['_matches'] + if len(self.plugins["post"]) > 0: - for p in self.plugins["post"]: - try: - p(self.worklist) - except Exception as ex: - logger.error( f'flowCallback plugin {p} crashed: {ex}' ) - logger.debug( "details:", exc_info=True ) + # work-around for python3.5 not being able to copy re.match issue: + # https://github.com/MetPX/sarracenia/issues/857 + if sys.version_info.major == 3 and sys.version_info.minor <= 6: + for m in self.worklist.ok: + if '_matches' in m: + del m['_matches'] + + for p in self.plugins["post"]: + try: + p(self.worklist) + except Exception as ex: + logger.error( f'flowCallback plugin {p} crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + + self._runCallbacksWorklist('after_post') + self._runCallbacksWorklist('report') + self._runCallbackMetrics() + + if hasattr(self.o, 'metricsFilename' ) and os.path.isdir(os.path.dirname(self.o.metricsFilename)): + metrics=json.dumps(self.metrics) + with open(self.o.metricsFilename, 'w') as mfn: + mfn.write(metrics+"\n") + if self.o.logMetrics: + if self.o.logRotateInterval >= 24*60*60: + tslen=8 + elif self.o.logRotateInterval > 60: + tslen=14 + else: + tslen=16 + timestamp=time.strftime("%Y%m%d-%H%M%S", time.gmtime()) + with open(self.o.metricsFilename + '.' + timestamp[0:tslen], 'a') as mfn: + mfn.write( f'\"{timestamp}\" : {metrics},\n') + + # removing old metrics files + logger.info( f"looking for old metrics for {self.o.metricsFilename}" ) + old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount] + for o in old_metrics: + logger.info( f"removing old metrics file: {o} " ) + os.unlink(o) + + self.worklist.ok = [] + self.worklist.directories_ok = [] + self.worklist.failed = [] def write_inline_file(self, msg) -> bool: """ From 8df3e552e814c3b41e130e0d951e1fdbe6a1f471 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 20 Mar 2024 17:23:41 -0400 Subject: [PATCH 24/65] more run refactoring, trying to get it shorter --- sarracenia/flow/__init__.py | 47 +++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index e737af125..473656a1f 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -402,6 +402,27 @@ def ack(self, mlist) -> None: logger.error( f'flowCallback plugin {p}/ack crashed: {ex}' ) logger.debug( "details:", exc_info=True ) + def _run_vip_update(self, had_vip) -> bool: + + self.have_vip = self.has_vip() + retval=had_vip + if (self.o.component == 'poll') and not self.have_vip: + if had_vip: + logger.info("now passive on vips %s" % self.o.vip ) + with open( self.o.novipFilename, 'w' ) as f: + f.write(str(nowflt()) + '\n' ) + retval=False + else: + if not had_vip: + logger.info("now active on vip %s" % self.have_vip ) + retval=True + if os.path.exists( self.o.novipFilename ): + os.unlink( self.o.novipFilename ) + return retval + + + + def run(self): """ This is the core routine of the algorithm, with most important data driven @@ -457,7 +478,11 @@ def run(self): logger.info( 'starting last pass (without gather) through loop for cleanup.') stopping = True - self.have_vip = self.has_vip() + if now > next_housekeeping or stopping: + next_housekeeping = self._runHousekeeping(now) + + had_vip = self._run_vip_update(had_vip) + self.worklist.incoming = [] if (self.o.component == 'poll') or self.have_vip: @@ -483,22 +508,10 @@ def run(self): self.worklist.incoming = [] continue - if (self.o.component == 'poll') and not self.have_vip: - if had_vip: - logger.info("now passive on vips %s" % self.o.vip ) - with open( self.o.novipFilename, 'w' ) as f: - f.write(str(nowflt()) + '\n' ) - had_vip=False - else: - if not had_vip: - logger.info("now active on vip %s" % self.have_vip ) - had_vip=True - if os.path.exists( self.o.novipFilename ): - os.unlink( self.o.novipFilename ) - - # normal processing, when you are active. - self.work() - self.post() + + # normal processing, when you are active. + self.work() + self.post() now = nowflt() run_time = now - start_time From c06825543a7b16556562223c8d310931dd177278 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 20 Mar 2024 19:07:47 -0400 Subject: [PATCH 25/65] move on_start to save some vip check logic --- sarracenia/flow/__init__.py | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 473656a1f..0cdf364a7 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -211,6 +211,7 @@ def __init__(self, cfg=None): # metrics - dictionary with names of plugins as the keys self.metricsFlowReset() + self.had_vip = False def metricsFlowReset(self) -> None: @@ -402,23 +403,21 @@ def ack(self, mlist) -> None: logger.error( f'flowCallback plugin {p}/ack crashed: {ex}' ) logger.debug( "details:", exc_info=True ) - def _run_vip_update(self, had_vip) -> bool: + def _run_vip_update(self) -> bool: self.have_vip = self.has_vip() - retval=had_vip if (self.o.component == 'poll') and not self.have_vip: - if had_vip: + if self.had_vip: logger.info("now passive on vips %s" % self.o.vip ) with open( self.o.novipFilename, 'w' ) as f: f.write(str(nowflt()) + '\n' ) - retval=False + self.had_vip=False else: - if not had_vip: + if not self.had_vip: logger.info("now active on vip %s" % self.have_vip ) - retval=True + self.had_vip=True if os.path.exists( self.o.novipFilename ): os.unlink( self.o.novipFilename ) - return retval @@ -440,7 +439,7 @@ def run(self): current_rate = 0 total_messages = 1 start_time = nowflt() - had_vip = False + now=start_time current_sleep = self.o.sleep last_time = start_time self.metrics['flow']['last_housekeeping'] = start_time @@ -453,15 +452,6 @@ def run(self): logger.info( f'pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}' ) - if not self.has_vip(): - logger.info( f'starting up passive, as do not possess any vip from: {self.o.vip}' ) - with open( self.o.novipFilename, 'w' ) as f: - f.write(str(start_time) + '\n' ) - else: - if os.path.exists( self.o.novipFilename ): - os.unlink( self.o.novipFilename ) - - self.runCallbacksTime(f'on_start') spamming = True last_gather_len = 0 @@ -478,10 +468,12 @@ def run(self): logger.info( 'starting last pass (without gather) through loop for cleanup.') stopping = True + self._run_vip_update() + if now > next_housekeeping or stopping: next_housekeeping = self._runHousekeeping(now) - - had_vip = self._run_vip_update(had_vip) + elif now == start_time: + self.runCallbacksTime(f'on_start') self.worklist.incoming = [] @@ -508,7 +500,6 @@ def run(self): self.worklist.incoming = [] continue - # normal processing, when you are active. self.work() self.post() From aaa908b212e2215af0e0de1030e06c76b96b7f60 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 20 Mar 2024 23:10:31 -0400 Subject: [PATCH 26/65] fixing indenting of code block moved in last commit --- sarracenia/flow/__init__.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 0cdf364a7..e20b33f27 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -405,22 +405,19 @@ def ack(self, mlist) -> None: def _run_vip_update(self) -> bool: - self.have_vip = self.has_vip() - if (self.o.component == 'poll') and not self.have_vip: - if self.had_vip: - logger.info("now passive on vips %s" % self.o.vip ) - with open( self.o.novipFilename, 'w' ) as f: - f.write(str(nowflt()) + '\n' ) - self.had_vip=False - else: - if not self.had_vip: - logger.info("now active on vip %s" % self.have_vip ) - self.had_vip=True - if os.path.exists( self.o.novipFilename ): - os.unlink( self.o.novipFilename ) - - - + self.have_vip = self.has_vip() + if (self.o.component == 'poll') and not self.have_vip: + if self.had_vip: + logger.info("now passive on vips %s" % self.o.vip ) + with open( self.o.novipFilename, 'w' ) as f: + f.write(str(nowflt()) + '\n' ) + self.had_vip=False + else: + if not self.had_vip: + logger.info("now active on vip %s" % self.have_vip ) + self.had_vip=True + if os.path.exists( self.o.novipFilename ): + os.unlink( self.o.novipFilename ) def run(self): """ From 47c810f3afe08cee693985a9d453adbd7b8fe3ba Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 21 Mar 2024 08:31:55 -0400 Subject: [PATCH 27/65] remove do stub, same as parent now --- sarracenia/flow/poll.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index bbdf16b9a..bd5c80ba1 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -81,20 +81,3 @@ def __init__(self, options): if not features['ftppoll']['present']: if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ): logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" ) - - def do(self): - """ - stub to do the work: does nothing, marking everything done. - to be replaced in child classes that do transforms or transfers. - """ - - # mark all remaining messages as rejected. - if self.worklist.poll_catching_up: - # in catchup mode, just reading previously posted messages. - self.worklist.rejected = self.worklist.incoming - else: - self.worklist.ok = self.worklist.incoming - - logger.debug('processing %d messages worked! (stop requested: %s)' % - (len(self.worklist.incoming), self._stop_requested)) - self.worklist.incoming = [] From 0f5efe830a242a2db9b1e468df961ffc27a80a42 Mon Sep 17 00:00:00 2001 From: petersilva Date: Thu, 21 Mar 2024 16:31:26 -0400 Subject: [PATCH 28/65] documenting use of scheduled_ settings over sleep for polls --- docs/source/Explanation/CommandLineGuide.rst | 15 ++++++++++- docs/source/Reference/sr3_options.7.rst | 26 +++++++++++++++++-- .../fr/Explication/GuideLigneDeCommande.rst | 14 +++++++++- docs/source/fr/Reference/sr3_options.7.rst | 24 +++++++++++++++++ 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/docs/source/Explanation/CommandLineGuide.rst b/docs/source/Explanation/CommandLineGuide.rst index 42afb424b..6ad09967a 100644 --- a/docs/source/Explanation/CommandLineGuide.rst +++ b/docs/source/Explanation/CommandLineGuide.rst @@ -939,6 +939,18 @@ post per file. The file's size is taken from the directory "ls"... but its checksum cannot be determined, so the default identity method is "cod", asking clients to calculate the identity Checksum On Download. +To set when to poll, use the *scheduled_interval* or *scheduled_hour* and *scheduled_minute* +settings. for example:: + + scheduled_interval 30m + +to poll the remote resources every thirty minutes. Alternatively:: + + scheduled_hour 1,13,19 + scheduled_minute 27 + +specifies that poll be run at 1:27, 13:27, and 19:27 each day. + By default, sr_poll sends its post notification message to the broker with default exchange (the prefix *xs_* followed by the broker username). The *post_broker* is mandatory. It can be given incomplete if it is well defined in the credentials.conf file. @@ -1101,7 +1113,8 @@ notify about the new product. The notification protocol is defined here `sr_post(7) <../Reference/sr_post.7.html>`_ -**poll** connects to a *broker*. Every *sleep* seconds, it connects to +**poll** connects to a *broker*. Every *scheduled_interval* seconds (or can used +combination of *scheduled_hour* and *scheduled_minute*) , it connects to a *pollUrl* (sftp, ftp, ftps). For each of the *directory* defined, it lists the contents. Polling is only intended to be used for recently modified files. The *fileAgeMax* option eliminates files that are too old diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 3ef81e96e..db4111bb3 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1578,6 +1578,25 @@ sanity_log_dead (default: 1.5*housekeeping) The **sanity_log_dead** option sets how long to consider too long before restarting a component. +scheduled_interval,scheduled_hour,scheduled_minute +-------------------------------------------------- + +When working with scheduled flows, such as polls, one can configure a duration +(no units defaults to seconds, suffixes: m-minute, h-hour) at which to run a +given activity:: + + scheduled_interval 30 + +run the flow or poll every 30 seconds. If no duration is set, then the +flowcb.scheduled.Scheduled class will look for the other two time specifiers:: + + scheduled_hour 1,4,5,23 + scheduled_minute 14,17,29 + + +which will have the poll run each day at: 01:14, 01:17, 01:29, then the same minutes +after each of 4h, 5h and 23h. + shim_defer_posting_to_exit (EXPERIMENTAL) ----------------------------------------- @@ -1613,11 +1632,14 @@ shim_skip_parent_open_files (EXPERIMENTAL) sleep