Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native S3 transfer driver #1010

Merged
merged 67 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
483e25e
Initial code for an S3 transfer class
gcglinton Mar 26, 2024
fc1aa50
Add bandwidth capping to S3 transfer class
gcglinton Mar 27, 2024
c27094d
Update core sr3 code to work with S3 transfer
gcglinton Mar 27, 2024
3bd7491
many changes to s3 transfer class
gcglinton Mar 27, 2024
9e450b5
#982 statehost config option should be used to see pick state directory
petersilva Mar 27, 2024
15a9022
Try to catch bad connections in S3 Transfer
gcglinton Mar 28, 2024
315470c
Got S3 connecting and LSing
gcglinton Apr 2, 2024
12f271b
Add more methods so send works
gcglinton Apr 2, 2024
9ee6447
Add metadata to uploaded S3 files
gcglinton Apr 3, 2024
f9e0f05
Catch object metadata on ls
gcglinton Apr 3, 2024
3d9764b
Start of unit Tests for S3 Transfer class
gcglinton Apr 3, 2024
7087ffc
Add dependencies for S3 transfer tests
gcglinton Apr 3, 2024
bd8924d
Expanded unit tests for S3 transfer class
gcglinton Apr 4, 2024
386a9f7
Add last two method tests
gcglinton Apr 4, 2024
2417c60
housekeeping check missing in wiski
petersilva Mar 16, 2024
ecbb5b3
allow gather to abort further gathers by changing return value to a t…
petersilva Mar 18, 2024
4fc56d4
implement #974 poll refactor to used scheduled flows
petersilva Mar 21, 2024
b9463ca
updating documentation of gather routine's new variable return formats
petersilva Mar 19, 2024
512fec5
Adding initial run on polls with scheduled intervals
petersilva Mar 20, 2024
822b47d
fix bug where using localtime instead of utc can cause infinite loop
petersilva Mar 20, 2024
6c365be
notice when appointments are in the past
petersilva Mar 20, 2024
2dd35ee
refactoring Flow/run() to simplify
petersilva Mar 20, 2024
4162df0
refactoring run further to make it shorter
petersilva Mar 20, 2024
8df3e55
more run refactoring, trying to get it shorter
petersilva Mar 20, 2024
c068255
move on_start to save some vip check logic
petersilva Mar 20, 2024
aaa908b
fixing indenting of code block moved in last commit
petersilva Mar 21, 2024
47c810f
remove do stub, same as parent now
petersilva Mar 21, 2024
0f5efe8
documenting use of scheduled_ settings over sleep for polls
Mar 21, 2024
0496b63
poll implement default interval 300s
Mar 21, 2024
95e1fc8
have convert change poll sleep to scheduled_interval, and related fut…
petersilva Mar 22, 2024
614a73f
abort gather when housekeeping or stopping
petersilva Mar 22, 2024
095de1c
providing a type hint for ls entry point
petersilva Apr 2, 2024
0659bf9
more detailed dicts
Apr 2, 2024
210fa9b
fix #1002 sftp binary accelletor fails for file names with colons in …
petersilva Mar 28, 2024
22a3b92
#997 Move retry logic to on_start
andreleblanc11 Mar 25, 2024
40481f0
#997 Correctly launch child through os.execl. Corrects instance numbers.
andreleblanc11 Mar 27, 2024
58ddcf4
#997 Correct method names
andreleblanc11 Mar 27, 2024
4bd6cd0
#997 Add a socket timeout to mimic sarracenia flow and correct poor e…
andreleblanc11 Mar 27, 2024
5565b24
#997 Add method to cleanup socket descriptor files
andreleblanc11 Mar 27, 2024
cb642b1
#997 Add timeout checks for parent as well. Avoids sr3 sanity restarts.
andreleblanc11 Mar 28, 2024
7d9cadc
had_vip must be set to true to ensure novip file is written
Mar 28, 2024
8baabdd
still run housekeeping even in poll and wVip
Mar 28, 2024
5915943
documentation touchups sleep -> scheduled interval
petersilva Mar 29, 2024
2df2230
retry cleanup was broken by work with andre for am... fixed
Apr 2, 2024
93124d0
more information about ls entry point
Apr 2, 2024
3849a8d
more information about ls entry point for transfer classes
petersilva Apr 4, 2024
002006d
fixing typo from Reid
petersilva Apr 4, 2024
25df985
fix for unit tests of retry class
petersilva Apr 4, 2024
556d4a2
fixing problem seen in tests noticed by andre
petersilva Apr 4, 2024
44d941f
Initial code for an S3 transfer class
gcglinton Mar 26, 2024
503b3cd
Add bandwidth capping to S3 transfer class
gcglinton Mar 27, 2024
e63f0e4
Update core sr3 code to work with S3 transfer
gcglinton Mar 27, 2024
9f8eae1
many changes to s3 transfer class
gcglinton Mar 27, 2024
f5cea72
Try to catch bad connections in S3 Transfer
gcglinton Mar 28, 2024
16f3c06
Got S3 connecting and LSing
gcglinton Apr 2, 2024
ecf81a1
Add more methods so send works
gcglinton Apr 2, 2024
67c57cc
Add metadata to uploaded S3 files
gcglinton Apr 3, 2024
5795ca4
Catch object metadata on ls
gcglinton Apr 3, 2024
15c575e
Start of unit Tests for S3 Transfer class
gcglinton Apr 3, 2024
cfbbab6
Add dependencies for S3 transfer tests
gcglinton Apr 3, 2024
937a957
Expanded unit tests for S3 transfer class
gcglinton Apr 4, 2024
9cc1d8a
Add last two method tests
gcglinton Apr 4, 2024
947320a
Merge branch 'Issue939_S3Transfer' of https://github.com/MetPX/sarrac…
gcglinton Apr 4, 2024
5052384
Add type hints to S3 trasnfer class, per #1009
gcglinton Apr 4, 2024
0f73d22
revert retur type hints on transfer/ls because older python3.6 cannot…
petersilva Apr 5, 2024
1d60e78
Merge branch 'issue983_3_per_config_statehost' into Issue939_S3Transfer
petersilva Apr 5, 2024
c0961fb
removing prototype return values again (do not work on older python)
petersilva Apr 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion sarracenia/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self, urlstr=None):
self.prot_p = False
self.bearer_token = None
self.login_method = None
self.s3_endpoint = None
self.s3_session_token = None

def __str__(self):
"""Returns attributes of the Credential object as a readable string.
Expand Down Expand Up @@ -126,6 +128,9 @@ def __str__(self):
s += " %s" % self.prot_p
s += " %s" % self.bearer_token
s += " %s" % self.login_method
s += " %s" % self.s3_endpoint
#want to show they provided a session token, but not leak it (like passwords above)
s += " %s" % 'Yes' if self.s3_session_token != None else 'No'
return s


Expand Down Expand Up @@ -275,7 +280,7 @@ def isValid(self, url, details=None):

# we have no user and no pasw (http normal, https... no cert, sftp hope for .ssh/config)
if not user and not pasw:
if url.scheme in ['http', 'https', 'sftp']: return True
if url.scheme in ['http', 'https', 'sftp', 's3']: return True
logger.error( f'unknown scheme: {url.scheme}')
return False

Expand Down Expand Up @@ -363,6 +368,10 @@ def _parse(self, line):
details.bearer_token = parts[1].strip()
elif keyword == 'login_method':
details.login_method = parts[1].strip()
elif keyword == 's3_session_token':
details.s3_session_token = urllib.parse.unquote(parts[1].strip())
elif keyword == 's3_endpoint':
details.s3_endpoint = parts[1].strip()
else:
logger.warning("bad credential option (%s)" % keyword)

Expand Down
3 changes: 3 additions & 0 deletions sarracenia/featuredetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
'retry': { 'modules_needed': ['jsonpickle'], 'present': False,
'lament': 'unable to use local queues on the side (disk or redis) to retry messages later',
'rejoice': 'can write messages to local queues to retry failed publishes/sends/downloads'},
's3' : { 'modules_needed': [ 'boto3' ], 'present': False,
'lament' : 'cannot connect natively to S3-compatible locations (AWS S3, Minio, etc..)',
'rejoice': 'able to connect natively to S3-compatible locations (AWS S3, Minio, etc..)', },
'sftp' : { 'modules_needed': [ 'paramiko' ],
'lament': 'cannot use or access sftp/ssh based services',
'rejoice': 'can use sftp or ssh based services'
Expand Down
3 changes: 3 additions & 0 deletions sarracenia/transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,6 @@ def write_chunk_init(self, proto):
if features['sftp']['present']:
import sarracenia.transfer.sftp

if features['s3']['present']:
import sarracenia.transfer.s3

326 changes: 326 additions & 0 deletions sarracenia/transfer/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2021
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#

import logging
import os
import sarracenia
import sys
import paramiko
import stat
import json

from sarracenia.transfer import Transfer
from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise

import boto3, botocore
from boto3.s3.transfer import TransferConfig

logger = logging.getLogger(__name__)


class S3(Transfer):
"""
Simple Storage Service (S3) ( https://en.wikipedia.org/wiki/Amazon_S3 )


built with:
boto3's S3 client (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html)
"""

# ----------------------- MAGIC METHODS ----------------------

def __init__(self, proto, options):

super().__init__(proto, options)

logger.debug("sr_s3 __init__")

self.s3_client_config = botocore.config.Config(
user_agent_extra= 'Sarracenia/' + sarracenia.__version__
)

self.s3_transfer_config = TransferConfig()
if hasattr(self.o, 'byteRateMax'):
self.s3_transfer_config.max_bandwidth = self.o.byteRateMax


self.__init()


## --------------------- PRIVATE METHODS ---------------------

def __init(self):
Transfer.init(self)

logger.debug("sr_s3 __init")
self.connected = False
self.client = None
self.details = None
self.seek = True
self.sendTo = None

self.bucket = None
self.client_args = {}

self.path = ""
self.cwd = ""

self.entries = {}

self._Metadata_Key = 'sarracenia_v3'

def __credentials(self) -> bool:
logger.debug("%s" % self.sendTo)

try:
ok, details = self.o.credentials.get(self.sendTo)
if details:
url = details.url

self.bucket = details.url.hostname

if url.username != '':
self.client_args['aws_access_key_id'] = url.username
if url.password != '':
self.client_args['aws_secret_access_key'] = url.password
if hasattr(details, 's3_session_token'):
self.client_args['aws_session_token'] = details.s3_session_token
if hasattr(details, 's3_endpoint'):
self.client_args['endpoint_url'] = details.s3_endpoint

return True

except:
logger.error("sr_s3/credentials: unable to get credentials for %s" % self.sendTo)
logger.debug('Exception details: ', exc_info=True)

return False


## ---------------------- PUBLIC METHODS ---------------------

def cd(self, path):
logger.debug("sr_s3 cd %s" % path)
self.cwd = os.path.dirname(path)
self.path = path.strip('/') + "/"

def cd_forced(self, perm, path):
logger.debug("sr_s3 cd %s" % path)
self.cwd = os.path.dirname(path)
self.path = path.strip('/') + "/"

def check_is_connected(self) -> bool:
logger.debug("sr_s3 check_is_connected")

if not self.connected:
return False

if self.sendTo != self.o.sendTo:
self.close()
return False

return True

def chmod(self, perms):
logger.debug(f"sr_s3 chmod {perms}")
return

def close(self):
logger.debug("sr_s3 close")
self.connected = False
self.client = None
self.sendTo = None
return

def connect(self) -> bool:
logger.debug("creating boto3 client")

self.sendTo = self.o.sendTo

self.__credentials()

try:
self.client = boto3.client('s3', config=self.s3_client_config, **self.client_args)
buckets = self.client.list_buckets()
if self.bucket in [b['Name'] for b in buckets['Buckets']]:
self.connected = True
logger.debug(f"Connected to bucket {self.bucket} in {self.client.get_bucket_location(Bucket=self.bucket)['LocationConstraint']}")
return True
else:
logger.error(f"Can't find bucket called {self.bucket}")

except botocore.exceptions.ClientError as e:
logger.error(f"unable to establish boto3 connection: {e}")
except botocore.exceptions.NoCredentialsError as e:
logger.error(f"unable to establish boto3 connection, no credentials: {e}")
except Exception as e:
logger.error(f"Something else happened: {e}", exc_info=True)

return False

def delete(self, path):
logger.debug("deleting %s" % path)
self.client.delete_object(Bucket=self.bucket, Key=path)

def get(self,
msg,
remote_file,
local_file,
remote_offset=0,
local_offset=0,
length=0, exactLength=False) -> int:

logger.debug("sr_s3 get; self.path %s" % self.path)

file_key = self.path + remote_file
logger.debug(f"get s3://{self.bucket}/{file_key} to {local_file}")

self.client.download_file(Bucket=self.bucket, Key=file_key, Filename=local_file, Config=self.s3_transfer_config)

rw_length = os.stat(local_file).st_size

return rw_length

def getcwd(self):
if self.client:
return self.cwd
else:
return None

def ls(self) -> dict[ str, paramiko.SFTPAttributes ]:
logger.debug(f"ls-ing items in {self.bucket}/{self.path}")

self.entries = {}

paginator = self.client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.path, Delimiter='/')

for page in page_iterator:
if 'Contents' in page:
for obj in page['Contents']:
filename = obj['Key'].replace(self.path, '', 1)
if filename == "":
continue

entry = paramiko.SFTPAttributes()

obj_metadata = self.client.head_object(Bucket=self.bucket, Key=obj['Key'])['Metadata']

if self._Metadata_Key in obj_metadata:
sr_metadata = json.loads(obj_metadata[self._Metadata_Key])
entry.sr_mtime = sr_metadata['mtime']
entry.sr_identity = sr_metadata['identity']

if 'LastModified' in obj:
t = obj["LastModified"].timestamp()
entry.st_atime = t
entry.st_mtime = t
if 'Size' in obj:
entry.st_size = obj['Size']


entry.st_mode = 0o644

self.entries[filename] = entry

if 'CommonPrefixes' in page:
for prefix in page['CommonPrefixes']:
logger.debug(f"Found folder {prefix['Prefix']}")

filename = prefix['Prefix'].replace(self.path, '', 1).rstrip("/")
if filename == "":
continue

entry = paramiko.SFTPAttributes()
entry.st_mode = 0o755 | stat.S_IFDIR

self.entries[filename] = entry

logger.debug(f"{self.entries=}")
return self.entries

def mkdir(self, remote_dir):
logger.debug(f"mkdir {remote_dir=}; {self.path=}")
return

def put(self,
msg,
local_file,
remote_file,
local_offset=0,
remote_offset=0,
length=0) -> int:
logger.debug("sr_s3 put; %s %s" % (local_file, remote_file))

file_key = self.path + remote_file
logger.debug(f"put {local_file} to s3://{self.bucket}/{file_key}")
logger.debug(f"{msg=}")

extra_args = {
'Metadata': {
self._Metadata_Key: json.dumps({
'identity': msg['identity'],
'mtime': msg['mtime'],
})
}
}

# upload
try:
self.client.upload_file( Filename=local_file, Bucket=self.bucket, Key=file_key, Config=self.s3_transfer_config, ExtraArgs=extra_args)

write_size = self.client.get_object_attributes(Bucket=self.bucket, Key=file_key, ObjectAttributes=['ObjectSize'])['ObjectSize']
return write_size
except Exception as e:
logger.error(f"Something went wrong with the upload: {e}", exc_info=True)
return -1

def registered_as() -> list:
return ['s3']

def rename(self, remote_old, remote_new):
logger.debug(f"{remote_old=}; {remote_new=}")
self.client.copy_object(Bucket=self.bucket, CopySource=self.bucket + "/" + remote_old, Key=remote_new)
self.client.delete_object(Bucket=self.bucket, Key=remote_old)

def rmdir(self, path):
logger.debug("%s" % path)
paginator = self.client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=self.bucket, Prefix=path + "/")

delete_us = dict(Objects=[])
for item in pages.search('Contents'):
delete_us['Objects'].append(dict(Key=item['Key']))

# flush once aws limit reached
if len(delete_us['Objects']) >= 500:
self.client.delete_objects(Bucket=self.bucket, Delete=delete_us)
delete_us = dict(Objects=[])

# flush rest
if len(delete_us['Objects']):
self.client.delete_objects(Bucket=self.bucket, Delete=delete_us)

def umask(self):
logger.debug("umask")
return
4 changes: 3 additions & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pytest-mock>=3.11

python-redis-lock>=4
fakeredis>=2.11
fakeredis[lua]>=2.11
fakeredis[lua]>=2.11
boto3>=1.34
moto[s3]>=5.0
Loading
Loading