Skip to content

Commit

Permalink
Merge pull request #284 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update (dev)
  • Loading branch information
cccs-rs committed Aug 30, 2021
2 parents ee3e638 + 74cd20a commit 6aaa30d
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 43 deletions.
3 changes: 2 additions & 1 deletion assemblyline_core/alerter/run_alerter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ def __init__(self):
else:
self.apm_client = None

def close(self):
def stop(self):
if self.counter:
self.counter.stop()

if self.apm_client:
elasticapm.uninstrument()
super().stop()

def run_once(self):
alert = self.alert_queue.pop(timeout=1)
Expand Down
3 changes: 2 additions & 1 deletion assemblyline_core/expiry/run_expiry.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ def __init__(self, force_ilm=False):
else:
self.apm_client = None

def close(self):
def stop(self):
if self.counter:
self.counter.stop()

if self.apm_client:
elasticapm.uninstrument()
super().stop()

def run_expiry_once(self):
now = now_as_iso()
Expand Down
2 changes: 1 addition & 1 deletion assemblyline_core/ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import elasticapm

from assemblyline_core.server_base import ThreadedCoreBase
from assemblyline.common.metrics import MetricsFactory
from assemblyline.common.str_utils import dotdump, safe_str
from assemblyline.common.exceptions import get_stacktrace_info
Expand All @@ -39,7 +40,6 @@
from assemblyline_core.alerter.run_alerter import ALERT_QUEUE_NAME
from assemblyline_core.submission_client import SubmissionClient
from .constants import INGEST_QUEUE_NAME, drop_chance, COMPLETE_QUEUE_NAME
from ..server_base import ThreadedCoreBase

_dup_prefix = 'w-m-'
_notification_queue_prefix = 'nq-'
Expand Down
103 changes: 89 additions & 14 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations
import base64
from collections import defaultdict
import functools
import json
import uuid
import os
import threading
import weakref
Expand All @@ -13,7 +16,7 @@
V1PodSpec, V1ObjectMeta, V1Volume, V1Container, V1VolumeMount, V1EnvVar, V1ConfigMapVolumeSource, \
V1PersistentVolumeClaimVolumeSource, V1LabelSelector, V1ResourceRequirements, V1PersistentVolumeClaim, \
V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \
V1NetworkPolicyIngressRule, V1Secret, V1LocalObjectReference
V1NetworkPolicyIngressRule, V1Secret, V1LocalObjectReference, V1Service, V1ServiceSpec, V1ServicePort
from kubernetes.client.rest import ApiException

from assemblyline_core.scaler.controllers.interface import ControllerInterface
Expand Down Expand Up @@ -152,14 +155,17 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
self.cpu_reservation: float = max(0.0, min(cpu_reservation, 1.0))
self.logger = logger
self.log_level: str = log_level
self._labels: Dict[str, str] = labels
self._labels: Dict[str, str] = labels or {}
self.apps_api = client.AppsV1Api()
self.api = client.CoreV1Api()
self.net_api = client.NetworkingV1Api()
self.namespace: str = namespace
self.config_volumes: Dict[str, V1Volume] = {}
self.config_mounts: Dict[str, V1VolumeMount] = {}
self.core_config_volumes: Dict[str, V1Volume] = {}
self.core_config_mounts: Dict[str, V1VolumeMount] = {}
self._external_profiles = weakref.WeakValueDictionary()
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)

# A record of previously reported events so that we don't report the same message repeatedly, fill it with
# existing messages so we don't have a huge dump of duplicates on restart
Expand Down Expand Up @@ -198,10 +204,13 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
def stop(self):
self.running = False

def _deployment_name(self, service_name):
def _deployment_name(self, service_name: str):
return (self.prefix + service_name).lower().replace('_', '-')

def config_mount(self, name, config_map, key, target_path):
def _dependency_name(self, service_name: str, container_name: str):
return f"{self._deployment_name(service_name)}-{container_name}".lower()

def config_mount(self, name:str, config_map:str, key:str, target_path:str):
if name not in self.config_volumes:
self.config_volumes[name] = V1Volume(
name=name,
Expand All @@ -217,6 +226,22 @@ def config_mount(self, name, config_map, key, target_path):
sub_path=key
)

def core_config_mount(self, name, config_map, key, target_path):
if name not in self.core_config_volumes:
self.core_config_volumes[name] = V1Volume(
name=name,
config_map=V1ConfigMapVolumeSource(
name=config_map,
optional=False
)
)

self.core_config_mounts[target_path] = V1VolumeMount(
name=name,
mount_path=target_path,
sub_path=key
)

def add_profile(self, profile, scale=0):
"""Tell the controller about a service profile it needs to manage."""
self._create_deployment(profile.name, self._deployment_name(profile.name),
Expand Down Expand Up @@ -419,13 +444,17 @@ def memory_info(self):
def _create_metadata(deployment_name: str, labels: Dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, service_name, mount_updates=False):
def _create_volumes(self, service_name, mount_updates=False, core_mounts=False):
volumes, mounts = [], []

# Attach the mount that provides the config file
volumes.extend(self.config_volumes.values())
mounts.extend(self.config_mounts.values())

if core_mounts:
volumes.extend(self.core_config_volumes.values())
mounts.extend(self.core_config_mounts.values())

if mount_updates:
# Attach the mount that provides the update
volumes.append(V1Volume(
Expand All @@ -443,16 +472,26 @@ def _create_volumes(self, service_name, mount_updates=False):

return volumes, mounts

def _create_containers(self, deployment_name, container_config, mounts):
def _create_containers(self, service_name: str, deployment_name:str, container_config, mounts, core_container=False):
cores = container_config.cpu_cores
memory = container_config.ram_mb
min_memory = min(container_config.ram_mb_min, container_config.ram_mb)
environment_variables = [V1EnvVar(name=_e.name, value=_e.value) for _e in container_config.environment]
environment_variables: list[V1EnvVar] = []
# If we are launching a core container, include the scalers environment
if core_container:
environment_variables += [V1EnvVar(name=_n, value=_v) for _n, _v in os.environ.items()]
# Overwrite them with configured special environment variables
environment_variables += [V1EnvVar(name=_e.name, value=_e.value) for _e in container_config.environment]
# Overwrite those with special hard coded variables
environment_variables += [
V1EnvVar(name='UPDATE_PATH', value=CONTAINER_UPDATE_DIRECTORY),
V1EnvVar(name='FILE_UPDATE_DIRECTORY', value=CONTAINER_UPDATE_DIRECTORY),
V1EnvVar(name='AL_SERVICE_NAME', value=service_name),
V1EnvVar(name='LOG_LEVEL', value=self.log_level)
]
]
# Overwrite ones defined dynamically by dependency container launches
for name, value in self._service_limited_env[service_name].items():
environment_variables.append(V1EnvVar(name=name, value=value))
return [V1Container(
name=deployment_name,
image=container_config.image,
Expand All @@ -467,7 +506,8 @@ def _create_containers(self, deployment_name, container_config, mounts):
)]

def _create_deployment(self, service_name: str, deployment_name: str, docker_config,
shutdown_seconds, scale: int, labels=None, volumes=None, mounts=None, mount_updates=True):
shutdown_seconds, scale: int, labels=None, volumes=None, mounts=None,
mount_updates=True, core_mounts=False):

replace = False

Expand Down Expand Up @@ -520,16 +560,19 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con

all_labels = dict(self._labels)
all_labels['component'] = service_name
if core_mounts:
all_labels['section'] = 'core'
all_labels.update(labels or {})

all_volumes, all_mounts = self._create_volumes(service_name, mount_updates)
all_volumes, all_mounts = self._create_volumes(service_name, mount_updates, core_mounts)
all_volumes.extend(volumes or [])
all_mounts.extend(mounts or [])
metadata = self._create_metadata(deployment_name=deployment_name, labels=all_labels)

pod = V1PodSpec(
volumes=all_volumes,
containers=self._create_containers(deployment_name, docker_config, all_mounts),
containers=self._create_containers(service_name, deployment_name, docker_config,
all_mounts, core_container=core_mounts),
priority_class_name=self.priority,
termination_grace_period_seconds=shutdown_seconds,
)
Expand Down Expand Up @@ -640,9 +683,10 @@ def new_events(self):

return new

def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False):
def start_stateful_container(self, service_name:str, container_name:str,
spec, labels:dict[str, str], mount_updates:bool=False):
# Setup PVC
deployment_name = service_name + '-' + container_name
deployment_name = self._dependency_name(service_name, container_name)
mounts, volumes = [], []
for volume_name, volume_spec in spec.volumes.items():
mount_name = deployment_name + volume_name
Expand All @@ -657,8 +701,39 @@ def start_stateful_container(self, service_name, container_name, spec, labels, m
))
mounts.append(V1VolumeMount(mount_path=volume_spec.mount_path, name=mount_name))

# Setup the deployment itself
instance_key = uuid.uuid4().hex
labels['container'] = container_name
spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key})
self._create_deployment(service_name, deployment_name, spec.container,
30, 1, labels, volumes=volumes, mounts=mounts, mount_updates=mount_updates)
30, 1, labels, volumes=volumes, mounts=mounts,
mount_updates=mount_updates, core_mounts=spec.run_as_core)

# Setup a service to direct to the deployment
try:
service = self.api.read_namespaced_service(deployment_name, self.namespace)
service.metadata.labels = labels
service.spec.selector = labels
service.spec.ports = [V1ServicePort(port=int(_p)) for _p in spec.container.ports]
self.api.replace_namespaced_service(deployment_name, self.namespace, service)
except ApiException as error:
if error.status != 404:
raise
service = V1Service(
metadata=V1ObjectMeta(name=deployment_name, labels=labels),
spec=V1ServiceSpec(
cluster_ip='None',
selector=labels,
ports=[V1ServicePort(port=int(_p)) for _p in spec.container.ports]
)
)
self.api.create_namespaced_service(self.namespace, service)

# Add entries to the environment variable list to point to this container
self._service_limited_env[service_name][f'{container_name}_host'] = deployment_name
self._service_limited_env[service_name][f'{container_name}_key'] = instance_key
if spec.container.ports:
self._service_limited_env[service_name][f'{container_name}_port'] = spec.container.ports[0]

def _ensure_pvc(self, name, storage_class, size):
request = V1ResourceRequirements(requests={'storage': size})
Expand Down
11 changes: 11 additions & 0 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
CLASSIFICATION_CONFIGMAP = os.getenv('CLASSIFICATION_CONFIGMAP', None)
CLASSIFICATION_CONFIGMAP_KEY = os.getenv('CLASSIFICATION_CONFIGMAP_KEY', 'classification.yml')

CONFIGURATION_CONFIGMAP = os.getenv('CONFIGURATION_CONFIGMAP', None)
CONFIGURATION_CONFIGMAP_KEY = os.getenv('CONFIGURATION_CONFIGMAP_KEY', 'config')


@contextmanager
def apm_span(client, span_name: str):
Expand All @@ -78,6 +81,7 @@ class Pool:
jobs as a context manager, and wait for the batch to finish after
the context ends.
"""

def __init__(self, size=10):
self.pool = concurrent.futures.ThreadPoolExecutor(size)
self.futures = []
Expand All @@ -101,6 +105,7 @@ class ServiceProfile:
This includes how the service should be run, and conditions related to the scaling of the service.
"""

def __init__(self, name, container_config: DockerConfig, config_hash=0, min_instances=0, max_instances=None,
growth: float = 600, shrink: Optional[float] = None, backlog=500, queue=None, shutdown_seconds=30,
mount_updates=True):
Expand Down Expand Up @@ -251,6 +256,10 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
self.controller.config_mount('classification-config', config_map=CLASSIFICATION_CONFIGMAP,
key=CLASSIFICATION_CONFIGMAP_KEY,
target_path='/etc/assemblyline/classification.yml')
if CONFIGURATION_CONFIGMAP:
self.controller.core_config_mount('assemblyline-config', config_map=CONFIGURATION_CONFIGMAP,
key=CONFIGURATION_CONFIGMAP_KEY,
target_path='/etc/assemblyline/config.yml')
else:
self.log.info("Loading Docker cluster interface.")
self.controller = DockerController(logger=self.log, prefix=NAMESPACE,
Expand Down Expand Up @@ -342,6 +351,8 @@ def sync_services(self):
# Enable this service's dependencies
self.controller.prepare_network(service.name, service.docker_config.allow_internet_access)
for _n, dependency in service.dependencies.items():
dependency.container.image = Template(dependency.container.image) \
.safe_substitute(image_variables)
self.controller.start_stateful_container(
service_name=service.name,
container_name=_n,
Expand Down
Loading

0 comments on commit 6aaa30d

Please sign in to comment.