Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/sam-config-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ output_template_file="packaged.yaml"

[default.deploy.parameters]
stack_name="using_config_file"

### Specifying a Boolean deployment option

```
[default.deploy.parameters]
parallel_upload=true
```

Setting `parallel_upload` to `true` is equivalent to passing `--parallel-upload` on
`sam deploy`, enabling concurrent S3/ECR uploads during the packaging phase.
capabilities="CAPABILITY_IAM"
region="us-east-1"
profile="srirammv"
Expand Down Expand Up @@ -94,4 +104,3 @@ stack_name="using_config_file"
[default.build.parameters]
debug=true
```

12 changes: 12 additions & 0 deletions samcli/commands/deploy/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
@image_repository_option
@image_repositories_option
@force_upload_option
@click.option(
"--parallel-upload",
is_flag=True,
default=False,
help="Enable parallel upload of artifacts to S3/ECR during packaging before deployment.",
)
@s3_prefix_option
@kms_key_id_option
@role_arn_option
Expand Down Expand Up @@ -177,6 +183,7 @@ def cli(
image_repository,
image_repositories,
force_upload,
parallel_upload,
no_progressbar,
s3_prefix,
kms_key_id,
Expand Down Expand Up @@ -212,6 +219,7 @@ def cli(
image_repository,
image_repositories,
force_upload,
parallel_upload,
no_progressbar,
s3_prefix,
kms_key_id,
Expand Down Expand Up @@ -246,6 +254,7 @@ def do_cli(
image_repository,
image_repositories,
force_upload,
parallel_upload,
no_progressbar,
s3_prefix,
kms_key_id,
Expand Down Expand Up @@ -300,6 +309,7 @@ def do_cli(
config_env=config_env,
config_file=config_file,
disable_rollback=disable_rollback,
parallel_upload=parallel_upload,
)
guided_context.run()
else:
Expand Down Expand Up @@ -331,6 +341,7 @@ def do_cli(
kms_key_id=kms_key_id,
use_json=use_json,
force_upload=force_upload,
parallel_upload=guided_context.guided_parallel_upload if guided else parallel_upload,
no_progressbar=no_progressbar,
metadata=metadata,
on_deploy=True,
Expand All @@ -357,6 +368,7 @@ def do_cli(
image_repository=guided_context.guided_image_repository if guided else image_repository,
image_repositories=guided_context.guided_image_repositories if guided else image_repositories,
force_upload=force_upload,
parallel_upload=guided_context.guided_parallel_upload if guided else parallel_upload,
no_progressbar=no_progressbar,
s3_prefix=guided_context.guided_s3_prefix if guided else s3_prefix,
kms_key_id=kms_key_id,
Expand Down
1 change: 1 addition & 0 deletions samcli/commands/deploy/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"disable_rollback",
"on_failure",
"force_upload",
"parallel_upload",
"max_wait_duration",
]

Expand Down
3 changes: 3 additions & 0 deletions samcli/commands/deploy/deploy_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ def __init__(
poll_delay,
on_failure,
max_wait_duration,
parallel_upload=False,
):
self.template_file = template_file
self.stack_name = stack_name
self.s3_bucket = s3_bucket
self.image_repository = image_repository
self.image_repositories = image_repositories
self.force_upload = force_upload
self.parallel_upload = parallel_upload
self.no_progressbar = no_progressbar
self.s3_prefix = s3_prefix
self.kms_key_id = kms_key_id
Expand Down Expand Up @@ -164,6 +166,7 @@ def run(self):
self.signing_profiles,
self.use_changeset,
self.disable_rollback,
self.parallel_upload,
)
return self.deploy(
self.stack_name,
Expand Down
11 changes: 11 additions & 0 deletions samcli/commands/deploy/guided_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
config_env=None,
config_file=None,
disable_rollback=None,
parallel_upload=False,
):
self.template_file = template_file
self.stack_name = stack_name
Expand Down Expand Up @@ -95,6 +96,8 @@ def __init__(
self.color = Colored()
self.function_provider = None
self.disable_rollback = disable_rollback
self.parallel_upload = parallel_upload
self.guided_parallel_upload = None

@property
def guided_capabilities(self):
Expand Down Expand Up @@ -162,6 +165,12 @@ def guided_prompts(self, parameter_override_keys):
click.secho("\t#Preserves the state of previously provisioned resources when an operation fails")
disable_rollback = confirm(f"\t{self.start_bold}Disable rollback{self.end_bold}", default=self.disable_rollback)

if self.parallel_upload:
parallel_upload = True
else:
click.secho("\t#Speed up artifact uploads by running them in parallel")
parallel_upload = confirm(f"\t{self.start_bold}Enable parallel uploads{self.end_bold}", default=False)

self.prompt_authorization(stacks)
self.prompt_code_signing_settings(stacks)

Expand Down Expand Up @@ -204,6 +213,7 @@ def guided_prompts(self, parameter_override_keys):
self.guided_s3_prefix = stack_name
self.guided_region = region
self.guided_profile = self.profile
self.guided_parallel_upload = parallel_upload
self._capabilities = input_capabilities if input_capabilities else default_capabilities
self._parameter_overrides = (
input_parameter_overrides if input_parameter_overrides else self.parameter_overrides_from_cmdline
Expand Down Expand Up @@ -587,6 +597,7 @@ def run(self):
capabilities=self._capabilities,
signing_profiles=self.signing_profiles,
disable_rollback=self.disable_rollback,
parallel_upload=self.guided_parallel_upload,
)

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions samcli/commands/deploy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def print_deploy_args(
signing_profiles,
use_changeset,
disable_rollback,
parallel_upload,
):
"""
Print a table of the values that are used during a sam deploy.
Expand Down Expand Up @@ -48,6 +49,7 @@ def print_deploy_args(
:param signing_profiles: Signing profile details which will be used to sign functions/layers
:param use_changeset: Flag to use or skip the usage of changesets
:param disable_rollback: Preserve the state of previously provisioned resources when an operation fails.
:param parallel_upload: Whether artifact uploads run in parallel prior to deployment.
"""
_parameters = parameter_overrides.copy()

Expand All @@ -70,6 +72,7 @@ def print_deploy_args(
if use_changeset:
click.echo(f"\tConfirm changeset : {confirm_changeset}")
click.echo(f"\tDisable rollback : {disable_rollback}")
click.echo(f"\tParallel uploads : {parallel_upload}")
if image_repository:
msg = "Deployment image repository : "
# NOTE(sriram-mv): tab length is 8 spaces.
Expand Down
3 changes: 3 additions & 0 deletions samcli/commands/package/package_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
parameter_overrides=None,
on_deploy=False,
signing_profiles=None,
parallel_upload=False,
):
self.template_file = template_file
self.s3_bucket = s3_bucket
Expand All @@ -81,6 +82,7 @@ def __init__(
self.output_template_file = output_template_file
self.use_json = use_json
self.force_upload = force_upload
self.parallel_upload = parallel_upload
self.no_progressbar = no_progressbar
self.metadata = metadata
self.region = region
Expand Down Expand Up @@ -161,6 +163,7 @@ def _export(self, template_path, use_json):
self.code_signer,
normalize_template=True,
normalize_parameters=True,
parallel_upload=self.parallel_upload,
)
exported_template = template.export()

Expand Down
81 changes: 76 additions & 5 deletions samcli/lib/package/artifact_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
from typing import Dict, List, Optional
import threading
from collections.abc import MutableMapping
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from typing import Callable, Dict, List, Optional

from botocore.utils import set_value_from_jmespath

Expand Down Expand Up @@ -52,6 +55,37 @@

# NOTE: sriram-mv, A cyclic dependency on `Template` needs to be broken.

DEFAULT_PARALLEL_UPLOAD_WORKERS = max(4, min(32, (os.cpu_count() or 1) * 2))


class _ThreadSafeUploadCache(MutableMapping[str, str]):
"""Simple thread-safe mapping used to deduplicate uploads across threads."""

def __init__(self, initial: Optional[MutableMapping[str, str]] = None):
# Copy into a regular dict so we can safely snapshot under a lock
self._cache: Dict[str, str] = dict(initial or {})
self._lock = threading.Lock()

def __getitem__(self, key: str) -> str: # pragma: no cover - small helper
with self._lock:
return self._cache[key]

def __setitem__(self, key: str, value: str) -> None: # pragma: no cover - small helper
with self._lock:
self._cache[key] = value

def __delitem__(self, key: str) -> None: # pragma: no cover - small helper
with self._lock:
del self._cache[key]

def __iter__(self): # pragma: no cover - small helper
with self._lock:
return iter(dict(self._cache))

def __len__(self) -> int: # pragma: no cover - small helper
with self._lock:
return len(self._cache)


class CloudFormationStackResource(ResourceZip):
"""
Expand Down Expand Up @@ -89,6 +123,7 @@ def do_export(self, resource_id, resource_dict, parent_dir):
normalize_template=True,
normalize_parameters=True,
parent_stack_id=resource_id,
parallel_upload=getattr(self, "parallel_upload", False),
).export()

exported_template_str = yaml_dump(exported_template_dict)
Expand Down Expand Up @@ -179,6 +214,7 @@ def __init__(
normalize_template: bool = False,
normalize_parameters: bool = False,
parent_stack_id: str = "",
parallel_upload: bool = False,
):
"""
Reads the template and makes it ready for export
Expand All @@ -202,6 +238,7 @@ def __init__(
self.metadata_to_export = metadata_to_export
self.uploaders = uploaders
self.parent_stack_id = parent_stack_id
self.parallel_upload = parallel_upload

def _export_global_artifacts(self, template_dict: Dict) -> Dict:
"""
Expand Down Expand Up @@ -276,10 +313,13 @@ def export(self) -> Dict:
self._apply_global_values()
self.template_dict = self._export_global_artifacts(self.template_dict)

cache: Optional[Dict] = None
cache: Optional[MutableMapping[str, str]] = None
if is_experimental_enabled(ExperimentalFlag.PackagePerformance):
cache = {}
if cache is not None and self.parallel_upload:
cache = _ThreadSafeUploadCache(cache)

export_jobs: List[Callable[[], None]] = []
for resource_logical_id, resource in self.template_dict["Resources"].items():
resource_type = resource.get("Type", None)
resource_dict = resource.get("Properties", {})
Expand All @@ -291,12 +331,43 @@ def export(self) -> Dict:
continue
if resource_dict.get("PackageType", ZIP) != exporter_class.ARTIFACT_TYPE:
continue
# Export code resources
exporter = exporter_class(self.uploaders, self.code_signer, cache)
exporter.export(full_path, resource_dict, self.template_dir)

export_jobs.append(self._build_export_job(exporter_class, full_path, resource_dict, cache))

if self.parallel_upload and export_jobs:
self._execute_jobs_in_parallel(export_jobs)
else:
for job in export_jobs:
job()

return self.template_dict

def _build_export_job(
self,
exporter_class,
resource_full_path: str,
resource_dict: Dict,
cache: Optional[MutableMapping[str, str]],
) -> Callable[[], None]:
def _job() -> None:
exporter = exporter_class(self.uploaders, self.code_signer, cache)
setattr(exporter, "parallel_upload", self.parallel_upload)
exporter.export(resource_full_path, resource_dict, self.template_dir)

return _job

def _execute_jobs_in_parallel(self, jobs: List[Callable[[], None]]) -> None:
max_workers = min(len(jobs), DEFAULT_PARALLEL_UPLOAD_WORKERS)
if max_workers <= 1:
jobs[0]()
return

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(job) for job in jobs]
wait(futures, return_when=FIRST_EXCEPTION)
for future in futures:
future.result()

def delete(self, retain_resources: List):
"""
Deletes all the artifacts referenced by the given Cloudformation template
Expand Down
8 changes: 6 additions & 2 deletions samcli/lib/package/ecr_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import base64
import logging
import threading
from io import StringIO
from pathlib import Path
from typing import Dict
Expand Down Expand Up @@ -50,6 +51,7 @@ def __init__(
self.stream = StreamWriter(stream=stream, auto_flush=True)
self.log_streamer = LogStreamer(stream=self.stream)
self.login_session_active = False
self._login_lock = threading.Lock()

@property
def docker_client(self):
Expand Down Expand Up @@ -88,8 +90,10 @@ def upload(self, image, resource_name):
:return: remote ECR image path that has been uploaded.
"""
if not self.login_session_active:
self.login()
self.login_session_active = True
with self._login_lock:
if not self.login_session_active:
self.login()
self.login_session_active = True

# Sometimes the `resource_name` is used as the `image` parameter to `tag_translation`.
# This is because these two cases (directly from an archive or by ID) are effectively
Expand Down
3 changes: 3 additions & 0 deletions samcli/local/docker/container_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def __init__(self, base_url=None):
# Specify minimum version
self.client_params["version"] = DOCKER_MIN_API_VERSION

# Increase client timeout to tolerate longer pushes/pulls
client_params["timeout"] = int(os.environ.get("SAM_CLI_DOCKER_TIMEOUT", "600"))

# Initialize DockerClient with processed parameters
LOG.debug(f"Creating container client with parameters: {self.client_params}")
super().__init__(**self.client_params)
Expand Down
Loading