Skip to content

Commit

Permalink
Merge pull request #293 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update
  • Loading branch information
cccs-rs authored Sep 13, 2021
2 parents ec0a520 + 59dfacc commit fd87423
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 26 deletions.
27 changes: 10 additions & 17 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

from assemblyline_core.scaler.controllers.interface import ControllerInterface


# How to identify the update volume as a whole, in a way that the underlying container system recognizes.
CONTAINER_UPDATE_DIRECTORY = '/mount/updates/'

# RESERVE_MEMORY_PER_NODE = os.environ.get('RESERVE_MEMORY_PER_NODE')

API_TIMEOUT = 90
Expand Down Expand Up @@ -243,8 +239,7 @@ def core_config_mount(self, name, config_map, key, target_path):
def add_profile(self, profile, scale=0):
"""Tell the controller about a service profile it needs to manage."""
self._create_deployment(profile.name, self._deployment_name(profile.name),
profile.container_config, profile.shutdown_seconds, scale,
mount_updates=profile.mount_updates)
profile.container_config, profile.shutdown_seconds, scale)
self._external_profiles[profile.name] = profile

def _loop_forever(self, function):
Expand Down Expand Up @@ -442,7 +437,7 @@ def memory_info(self):
def _create_metadata(deployment_name: str, labels: Dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, service_name, mount_updates=False, core_mounts=False):
def _create_volumes(self, core_mounts=False):
volumes, mounts = [], []

# Attach the mount that provides the config file
Expand All @@ -461,15 +456,14 @@ def _create_containers(self, service_name: str, deployment_name: str, container_
memory = container_config.ram_mb
min_memory = min(container_config.ram_mb_min, container_config.ram_mb)
environment_variables: list[V1EnvVar] = []
# If we are launching a core container, include the scalers environment
# If we are launching a core container, include environment variables related to authentication for DBs
if core_container:
environment_variables += [V1EnvVar(name=_n, value=_v) for _n, _v in os.environ.items()]
environment_variables += [V1EnvVar(name=_n, value=_v) for _n, _v in os.environ.items()
if any(term in _n for term in ['ELASTIC', 'FILESTORE', 'UI_SERVER'])]
# Overwrite them with configured special environment variables
environment_variables += [V1EnvVar(name=_e.name, value=_e.value) for _e in container_config.environment]
# Overwrite those with special hard coded variables
environment_variables += [
V1EnvVar(name='UPDATE_PATH', value=CONTAINER_UPDATE_DIRECTORY),
V1EnvVar(name='FILE_UPDATE_DIRECTORY', value=CONTAINER_UPDATE_DIRECTORY),
V1EnvVar(name='AL_SERVICE_NAME', value=service_name),
V1EnvVar(name='LOG_LEVEL', value=self.log_level)
]
Expand All @@ -491,7 +485,7 @@ def _create_containers(self, service_name: str, deployment_name: str, container_

def _create_deployment(self, service_name: str, deployment_name: str, docker_config,
shutdown_seconds, scale: int, labels=None, volumes=None, mounts=None,
mount_updates=True, core_mounts=False):
core_mounts=False):

replace = False
resources = self.apps_api.list_namespaced_deployment(namespace=self.namespace, _request_timeout=API_TIMEOUT)
Expand Down Expand Up @@ -544,7 +538,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
all_labels['section'] = 'core'
all_labels.update(labels or {})

all_volumes, all_mounts = self._create_volumes(service_name, mount_updates, core_mounts)
all_volumes, all_mounts = self._create_volumes(core_mounts)
all_volumes.extend(volumes or [])
all_mounts.extend(mounts or [])
metadata = self._create_metadata(deployment_name=deployment_name, labels=all_labels)
Expand Down Expand Up @@ -636,8 +630,7 @@ def stop_container(self, service_name, container_id):

def restart(self, service):
self._create_deployment(service.name, self._deployment_name(service.name), service.container_config,
service.shutdown_seconds, self.get_target(service.name),
mount_updates=service.mount_updates)
service.shutdown_seconds, self.get_target(service.name))

def get_running_container_names(self):
pods = self.api.list_pod_for_all_namespaces(field_selector='status.phase==Running',
Expand All @@ -664,7 +657,7 @@ def new_events(self):
return new

def start_stateful_container(self, service_name: str, container_name: str,
spec, labels: dict[str, str], mount_updates: bool = False):
spec, labels: dict[str, str]):
# Setup PVC
deployment_name = self._dependency_name(service_name, container_name)
mounts, volumes = [], []
Expand All @@ -687,7 +680,7 @@ def start_stateful_container(self, service_name: str, container_name: str,
spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key})
self._create_deployment(service_name, deployment_name, spec.container,
30, 1, labels, volumes=volumes, mounts=mounts,
mount_updates=mount_updates, core_mounts=spec.run_as_core)
core_mounts=spec.run_as_core)

# Setup a service to direct to the deployment
try:
Expand Down
8 changes: 1 addition & 7 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ class ServiceProfile:
"""

def __init__(self, name, container_config: DockerConfig, config_hash=0, min_instances=0, max_instances=None,
growth: float = 600, shrink: Optional[float] = None, backlog=500, queue=None, shutdown_seconds=30,
mount_updates=True):
growth: float = 600, shrink: Optional[float] = None, backlog=500, queue=None, shutdown_seconds=30):
"""
:param name: Name of the service to manage
:param container_config: Instructions on how to start this service
Expand All @@ -126,7 +125,6 @@ def __init__(self, name, container_config: DockerConfig, config_hash=0, min_inst
self.low_duty_cycle = 0.5
self.shutdown_seconds = shutdown_seconds
self.config_hash = config_hash
self.mount_updates = mount_updates

# How many instances we want, and can have
self.min_instances = self._min_instances = max(0, int(min_instances))
Expand Down Expand Up @@ -216,7 +214,6 @@ def __deepcopy__(self, memodict=None):
shrink=self.shrink_threshold,
backlog=self.backlog,
shutdown_seconds=self.shutdown_seconds,
mount_updates=self.mount_updates
)
prof.desired_instances = self.desired_instances
prof.running_instances = self.running_instances
Expand Down Expand Up @@ -343,7 +340,6 @@ def sync_services(self):
name = service.name
stage = self.get_service_stage(service.name)
discovered_services.append(name)
mount_updates = bool(service.update_config)

# noinspection PyBroadException
try:
Expand All @@ -358,7 +354,6 @@ def sync_services(self):
container_name=_n,
spec=dependency,
labels={'dependency_for': service.name},
mount_updates=mount_updates
)

# Move to the next service stage
Expand Down Expand Up @@ -403,7 +398,6 @@ def sync_services(self):
queue=get_service_queue(name, self.redis),
# Give service an extra 30 seconds to upload results
shutdown_seconds=service.timeout + 30,
mount_updates=mount_updates
))

# Update RAM, CPU, licence requirements for running services
Expand Down
4 changes: 2 additions & 2 deletions assemblyline_core/updater/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify):
# Determine project/repo IDs from image name
project_id, repo_id = image_name.split('/', 1)
repo_id = repo_id.replace('/', "%2F")
url = f"https://{server}/api/v2.0/projects/{project_id}/repositories/{repo_id}/artifacts"
url = f"https://{server}/api/v2.0/projects/{project_id}/repositories/{repo_id}/artifacts?page_size=0"

headers = {}
if auth:
Expand Down Expand Up @@ -129,7 +129,7 @@ def process_image(image):

tag_name = None
if not tags:
logger.warning(f"Cannot fetch latest tag for service {service_name} - {image}"
logger.warning(f"Cannot fetch latest tag for service {service_name} - {image_name}"
f" => [server: {server}, repo_name: {image_name}, channel: {update_channel}]")
else:
tag_name = f"{FRAMEWORK_VERSION}.{SYSTEM_VERSION}.0.{update_channel}0"
Expand Down

0 comments on commit fd87423

Please sign in to comment.