Skip to content

Commit

Permalink
changed threads default
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Aug 16, 2023
1 parent 6e18aaf commit 6720a29
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
* Better help text for --corsRules
* if `--threads` is not explicitly set, number of threads is no longer guaranteed to be 10

### Infrastructure
* Remove unsupported PyPy 3.7 from tests matrix and add PyPy 3.10 instead
Expand Down
4 changes: 4 additions & 0 deletions b2/_cli/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
B2_DESTINATION_SSE_C_KEY_ID_ENV_VAR = 'B2_DESTINATION_SSE_C_KEY_ID'
B2_SOURCE_SSE_C_KEY_B64_ENV_VAR = 'B2_SOURCE_SSE_C_KEY_B64'

# Threads defaults

DEFAULT_THREADS = 10

# Constants used in the B2 API
CREATE_BUCKET_TYPES = ('allPublic', 'allPrivate')
LIST_FILE_NAMES_MAX_LIMIT = 10000 # https://www.backblaze.com/b2/docs/b2_list_file_names.html
79 changes: 47 additions & 32 deletions b2/console_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
B2_SOURCE_SSE_C_KEY_B64_ENV_VAR,
B2_USER_AGENT_APPEND_ENV_VAR,
CREATE_BUCKET_TYPES,
DEFAULT_THREADS,
)
from b2._cli.shell import detect_shell
from b2.arg_parser import (
Expand Down Expand Up @@ -514,6 +515,28 @@ def _get_upload_mode_from_args(args):
return UploadMode.FULL


class ThreadsMixin(Described):
"""
Use --threads to manually adjust number of threads used in the operation.
Otherwise, the number of threads will be automatically chosen.
"""

@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--threads', type=int, default=None)

super()._setup_parser(parser) # noqa

def _get_threads_from_args(self, args) -> int:
return args.threads or DEFAULT_THREADS

def _set_threads_from_args(self, args):
threads = self._get_threads_from_args(args)
# FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(threads)


class Command(Described):
# Set to True for commands that receive sensitive information in arguments
FORBID_LOGGING_ARGUMENTS = False
Expand Down Expand Up @@ -1279,8 +1302,8 @@ def _print_file_attribute(self, label, value):

@B2.register_subcommand
class DownloadFileById(
SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin, MaxDownloadStreamsMixin,
DownloadCommand
ThreadsMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin,
MaxDownloadStreamsMixin, DownloadCommand
):
"""
Downloads the given file, and stores it in the given local file.
Expand All @@ -1289,6 +1312,7 @@ class DownloadFileById(
on stderr. Without it, simple text progress is printed.
Use ``--noProgress`` to disable progress reporting.
{THREADSMIXIN}
{SOURCESSEMIXIN}
{WRITEBUFFERSIZEMIXIN}
{SKIPHASHVERIFICATIONMIXIN}
Expand All @@ -1302,18 +1326,14 @@ class DownloadFileById(
@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--noProgress', action='store_true')
parser.add_argument('--threads', type=int, default=10)
parser.add_argument('fileId')
parser.add_argument('localFileName')
super()._setup_parser(parser)

def run(self, args):
progress_listener = make_progress_listener(args.localFileName, args.noProgress)
encryption_setting = self._get_source_sse_setting(args)
if args.threads:
# FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(args.threads)
self._set_threads_from_args(args)
downloaded_file = self.api.download_file_by_id(
args.fileId, progress_listener, encryption=encryption_setting
)
Expand All @@ -1325,6 +1345,7 @@ def run(self, args):

@B2.register_subcommand
class DownloadFileByName(
ThreadsMixin,
SourceSseMixin,
WriteBufferSizeMixin,
SkipHashVerificationMixin,
Expand All @@ -1338,6 +1359,7 @@ class DownloadFileByName(
on stderr. Without it, simple text progress is printed.
Use ``--noProgress`` to disable progress reporting.
{THREADSMIXIN}
{SOURCESSEMIXIN}
{WRITEBUFFERSIZEMIXIN}
{SKIPHASHVERIFICATIONMIXIN}
Expand All @@ -1351,17 +1373,13 @@ class DownloadFileByName(
@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--noProgress', action='store_true')
parser.add_argument('--threads', type=int, default=10)
parser.add_argument('bucketName').completer = bucket_name_completer
parser.add_argument('b2FileName').completer = file_name_completer
parser.add_argument('localFileName')
super()._setup_parser(parser)

def run(self, args):
if args.threads:
# FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3.
# There is `max_download_workers` param in B2Api constructor for this.
self.api.services.download_manager.set_thread_pool_size(args.threads)
self._set_threads_from_args(args)
bucket = self.api.get_bucket_by_name(args.bucketName)
progress_listener = make_progress_listener(args.localFileName, args.noProgress)
encryption_setting = self._get_source_sse_setting(args)
Expand Down Expand Up @@ -1912,7 +1930,7 @@ def format_ls_entry(self, file_version: FileVersion, replication: bool):


@B2.register_subcommand
class Rm(AbstractLsCommand):
class Rm(ThreadsMixin, AbstractLsCommand):
"""
Removes a "folder" or a set of files matching a pattern. Use with caution.
Expand All @@ -1928,8 +1946,7 @@ class Rm(AbstractLsCommand):
To list (but not remove) files to be deleted, use ``--dryRun``. You can also
list files via ``ls`` command - the listing behaviour is exactly the same.
Users with multiple files to be removed will benefit from multi-threaded
capabilities. The default number of threads is 10.
{THREADSMIXIN}
Progress is displayed on the console unless ``--noProgress`` is specified.
Expand Down Expand Up @@ -1993,7 +2010,6 @@ class Rm(AbstractLsCommand):
- **deleteFiles**
"""

DEFAULT_THREADS = 10
PROGRESS_REPORT_CLASS = ProgressReport

class SubmitThread(threading.Thread):
Expand All @@ -2007,12 +2023,14 @@ def __init__(
args: argparse.Namespace,
messages_queue: queue.Queue,
reporter: ProgressReport,
threads: int,
):
self.runner = runner
self.args = args
self.messages_queue = messages_queue
self.reporter = reporter
removal_queue_size = self.args.queueSize or (2 * self.args.threads)
self.threads = threads
removal_queue_size = self.args.queueSize or (2 * self.threads)
self.semaphore = threading.BoundedSemaphore(value=removal_queue_size)
self.fail_fast_event = threading.Event()
self.mapping_lock = threading.Lock()
Expand All @@ -2021,7 +2039,7 @@ def __init__(

def run(self) -> None:
try:
with ThreadPoolExecutor(max_workers=self.args.threads) as executor:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
self._run_removal(executor)
except Exception as error:
self.messages_queue.put((self.EXCEPTION_TAG, error))
Expand Down Expand Up @@ -2075,7 +2093,6 @@ def _removal_done(self, future: Future) -> None:
@classmethod
def _setup_parser(cls, parser):
parser.add_argument('--dryRun', action='store_true')
parser.add_argument('--threads', type=int, default=cls.DEFAULT_THREADS)
parser.add_argument(
'--queueSize',
type=int,
Expand All @@ -2094,8 +2111,9 @@ def run(self, args):
failed_on_any_file = False
messages_queue = queue.Queue()

threads = self._get_threads_from_args(args)
with self.PROGRESS_REPORT_CLASS(self.stdout, args.noProgress) as reporter:
submit_thread = self.SubmitThread(self, args, messages_queue, reporter)
submit_thread = self.SubmitThread(self, args, messages_queue, reporter, threads=threads)
# This thread is started in daemon mode, no joining needed.
submit_thread.start()

Expand Down Expand Up @@ -2156,6 +2174,7 @@ def run(self, args):

@B2.register_subcommand
class Sync(
ThreadsMixin,
DestinationSseMixin,
SourceSseMixin,
WriteBufferSizeMixin,
Expand Down Expand Up @@ -2187,9 +2206,10 @@ class Sync(
The default is to fail when the specified source directory doesn't exist
or is empty. (This check only applies to version 1.0 and later.)
Users with high-performance networks, or file sets with very small
files, will benefit from multi-threaded uploads and downloads. The default
number of threads for syncing, downloading, and uploading is 10.
{THREADSMIXIN}
You can alternatively control number of threads per each operation.
The number of files processed in parallel is set by ``--syncThreads``,
the number of files/file parts downloaded in parallel is set by``--downloadThreads``,
and the number of files/file parts uploaded in parallel is set by `--uploadThreads``.
Expand Down Expand Up @@ -2336,7 +2356,6 @@ def _setup_parser(cls, parser):
parser.add_argument('--dryRun', action='store_true')
parser.add_argument('--allowEmptySource', action='store_true')
parser.add_argument('--excludeAllSymlinks', action='store_true')
parser.add_argument('--threads', type=int)
parser.add_argument('--syncThreads', type=int, default=cls.DEFAULT_SYNC_THREADS)
parser.add_argument('--downloadThreads', type=int, default=cls.DEFAULT_DOWNLOAD_THREADS)
parser.add_argument('--uploadThreads', type=int, default=cls.DEFAULT_UPLOAD_THREADS)
Expand Down Expand Up @@ -2610,7 +2629,8 @@ def run(self, args):

@B2.register_subcommand
class UploadFile(
DestinationSseMixin, LegalHoldMixin, FileRetentionSettingMixin, UploadModeMixin, Command
ThreadsMixin, DestinationSseMixin, LegalHoldMixin, FileRetentionSettingMixin, UploadModeMixin,
Command
):
"""
Uploads one file to the given bucket. Uploads the contents
Expand All @@ -2631,9 +2651,7 @@ class UploadFile(
B2 allows is 100MB. Setting ``--minPartSize`` to a larger value will
reduce the number of parts uploaded when uploading a large file.
The maximum number of upload threads to use to upload parts of a large file
is specified by ``--threads``. It has no effect on small files (under 200MB).
Default is 10.
{THREADSMIXIN}
If the ``tqdm`` library is installed, progress bar is displayed
on stderr. Without it, simple text progress is printed.
Expand Down Expand Up @@ -2664,7 +2682,6 @@ def _setup_parser(cls, parser):
parser.add_argument('--contentType')
parser.add_argument('--minPartSize', type=int)
parser.add_argument('--sha1')
parser.add_argument('--threads', type=int, default=10)
parser.add_argument('--cache-control', default=None)
parser.add_argument('--info', action='append', default=[])
parser.add_argument('--custom-upload-timestamp', type=int)
Expand All @@ -2682,9 +2699,7 @@ def run(self, args):
int(os.path.getmtime(args.localFilePath) * 1000)
)

# FIXME: This is using deprecated API. It should be be replaced when moving to b2sdk apiver 3.
# There is `max_upload_workers` param in B2Api constructor for this.
self.api.services.upload_manager.set_thread_pool_size(args.threads)
self._set_threads_from_args(args)

bucket = self.api.get_bucket_by_name(args.bucketName)
encryption_setting = self._get_destination_sse_setting(args)
Expand Down

0 comments on commit 6720a29

Please sign in to comment.