Skip to content

Commit

Permalink
Merge pull request #304 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update
  • Loading branch information
cccs-rs committed Sep 27, 2021
2 parents bc110aa + 1cc8eef commit 10ff71b
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 125 deletions.
5 changes: 3 additions & 2 deletions assemblyline_core/ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
_retry_delay = 60 * 4 # Wait 4 minutes to retry
_max_time = 2 * 24 * 60 * 60 # Wait 2 days for responses.
HOUR_IN_SECONDS = 60 * 60
COMPLETE_THREADS = int(environ.get('INGESTER_COMPLETE_THREADS', 4))
INGEST_THREADS = int(environ.get('INGESTER_INGEST_THREADS', 1))
SUBMIT_THREADS = int(environ.get('INGESTER_SUBMIT_THREADS', 4))

Expand Down Expand Up @@ -147,7 +148,7 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None,
datastore=datastore, config=config)

# Cache the user groups
self.cache_lock = threading.RLock() # TODO are middle man instances single threaded now?
self.cache_lock = threading.RLock()
self._user_groups = {}
self._user_groups_reset = time.time()//HOUR_IN_SECONDS
self.cache = {}
Expand Down Expand Up @@ -223,10 +224,10 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None,

def try_run(self):
threads_to_maintain = {
'Complete': self.handle_complete,
'Retries': self.handle_retries,
'Timeouts': self.handle_timeouts
}
threads_to_maintain.update({f'Complete_{n}': self.handle_complete for n in range(COMPLETE_THREADS)})
threads_to_maintain.update({f'Ingest_{n}': self.handle_ingest for n in range(INGEST_THREADS)})
threads_to_maintain.update({f'Submit_{n}': self.handle_submit for n in range(SUBMIT_THREADS)})
self.maintain_threads(threads_to_maintain)
Expand Down
3 changes: 3 additions & 0 deletions assemblyline_core/metrics/heartbeat_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from assemblyline.remote.datatypes.queues.comms import CommsQueue
from assemblyline.remote.datatypes.queues.named import NamedQueue
from assemblyline.remote.datatypes.queues.priority import PriorityQueue
from assemblyline_core.ingester.constants import COMPLETE_QUEUE_NAME

STATUS_QUEUE = "status"

Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, sender, log, config=None, redis=None):
self.ingest_scanning = Hash('m-scanning-table', self.redis_persist)
self.ingest_unique_queue = PriorityQueue('m-unique', self.redis_persist)
self.ingest_queue = NamedQueue(INGEST_QUEUE_NAME, self.redis_persist)
self.ingest_complete_queue = NamedQueue(COMPLETE_QUEUE_NAME, self.redis)
self.alert_queue = NamedQueue(ALERT_QUEUE_NAME, self.redis_persist)

constants = forge.get_constants(self.config)
Expand Down Expand Up @@ -162,6 +164,7 @@ def send_heartbeat(self, m_type, m_name, m_data, instances):
"critical": c_q_len,
"high": h_q_len,
"ingest": self.ingest_queue.length(),
"complete": self.ingest_complete_queue.length(),
"low": l_q_len,
"medium": m_q_len
}
Expand Down
85 changes: 64 additions & 21 deletions assemblyline_core/scaler/controllers/docker_ctl.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
from __future__ import annotations
import os
import threading
import time
from collections import defaultdict
from typing import List, Tuple, Dict
import uuid

from assemblyline.odm.models.service import DockerConfig
from assemblyline.odm.models.service import DependencyConfig, DockerConfig
from .interface import ControllerInterface, ServiceControlError

# How to identify the update volume as a whole, in a way that the underlying container system recognizes.
FILE_UPDATE_VOLUME = os.environ.get('FILE_UPDATE_VOLUME', None)

# Where to find the update directory inside this container.
FILE_UPDATE_DIRECTORY = os.environ.get('FILE_UPDATE_DIRECTORY', None)
INHERITED_VARIABLES = ['HTTP_PROXY', 'HTTPS_PROXY', 'NO_PROXY', 'http_proxy', 'https_proxy', 'no_proxy']

# Every this many seconds, check that the services can actually reach the service server.
NETWORK_REFRESH_INTERVAL = 60 * 3
CHANGE_KEY_NAME = 'al_change_key'


class DockerController(ControllerInterface):
"""A controller for *non* swarm mode docker."""
def __init__(self, logger, prefix='', labels=None, cpu_overallocation=1, memory_overallocation=1, log_level="INFO"):
def __init__(self, logger, prefix='', labels: dict[str, str] = None, cpu_overallocation=1, memory_overallocation=1, log_level="INFO"):
"""
:param logger: A logger to report status and debug information.
:param prefix: A prefix used to distinguish containers launched by this controller.
Expand All @@ -32,9 +32,11 @@ def __init__(self, logger, prefix='', labels=None, cpu_overallocation=1, memory_
self.log = logger
self.log_level = log_level
self.global_mounts: List[Tuple[str, str]] = []
self.core_mounts: List[Tuple[str, str]] = []
self._prefix: str = prefix
self._labels = labels
self._labels: dict[str, str] = labels or {}
self.prune_lock = threading.Lock()
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)

for network in self.client.networks.list(names=['external']):
self.external_network = network
Expand Down Expand Up @@ -109,8 +111,8 @@ def _flush_containers(self):

def add_profile(self, profile, scale=0):
"""Tell the controller about a service profile it needs to manage."""
self._pull_image(profile)
self._profiles[profile.name] = profile
self._pull_image(profile)

def _start(self, service_name):
"""Launch a docker container in a manner suitable for Assemblyline."""
Expand All @@ -124,15 +126,13 @@ def _start(self, service_name):

# Prepare the volumes and folders
volumes = {row[0]: {'bind': row[1], 'mode': 'ro'} for row in self.global_mounts}
volumes[os.path.join(FILE_UPDATE_VOLUME, service_name)] = {'bind': '/mount/updates/', 'mode': 'ro'}
if not os.path.exists(os.path.join(FILE_UPDATE_DIRECTORY, service_name)):
os.makedirs(os.path.join(FILE_UPDATE_DIRECTORY, service_name), 0x777)

# Define environment variables
env = [f'{_e.name}={_e.value}' for _e in cfg.environment]
env += ['UPDATE_PATH=/mount/updates/']
env += [f'{name}={os.environ[name]}' for name in INHERITED_VARIABLES if name in os.environ]
env += [f'LOG_LEVEL={self.log_level}']
env += [f'{_n}={_v}' for _n, _v in self._service_limited_env[service_name].items()]

container = self.client.containers.run(
image=cfg.image,
Expand All @@ -152,7 +152,7 @@ def _start(self, service_name):
if cfg.allow_internet_access:
self.external_network.connect(container)

def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, hostname):
def _start_container(self, service_name, name, labels, volumes, cfg: DockerConfig, network, hostname, core_container=False):
"""Launch a docker container."""
# Take the port strings and convert them to a dictionary
ports = {}
Expand All @@ -174,9 +174,13 @@ def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, ho
self.log.warning(f"Not sure how to parse port string {port_string} for container {name} not using it...")

# Put together the environment variables
env = [f'{_e.name}={_e.value}' for _e in cfg.environment]
env = []
if core_container:
env += [f'{_n}={_v}' for _n, _v in os.environ.items()
if any(term in _n for term in ['ELASTIC', 'FILESTORE', 'UI_SERVER'])]
env += [f'{_e.name}={_e.value}' for _e in cfg.environment]
env += [f'{name}={os.environ[name]}' for name in INHERITED_VARIABLES if name in os.environ]
env += [f'LOG_LEVEL={self.log_level}']
env += [f'LOG_LEVEL={self.log_level}', f'AL_SERVICE_NAME={service_name}']

container = self.client.containers.run(
image=cfg.image,
Expand All @@ -192,8 +196,9 @@ def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, ho
network=network,
environment=env,
detach=True,
ports=ports,
# ports=ports,
)

if cfg.allow_internet_access:
self.external_network.connect(container, aliases=[hostname])

Expand Down Expand Up @@ -324,16 +329,44 @@ def get_running_container_names(self):
out.append(container.name)
return out

def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False, change_key=''):
volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()}
def start_stateful_container(self, service_name: str, container_name: str, spec: DependencyConfig,
labels: dict[str, str], change_key: str):
import docker.errors
deployment_name = f'{service_name}-dep-{container_name}'

change_check = change_key + service_name + container_name + str(spec)

try:
old_container = self.client.containers.get(deployment_name)
instance_key = old_container.attrs["Config"]["Env"]['AL_INSTANCE_KEY']
if old_container.labels.get(CHANGE_KEY_NAME) == change_check and old_container.status == 'running':
self._service_limited_env[service_name][f'{container_name}_host'] = deployment_name
self._service_limited_env[service_name][f'{container_name}_key'] = instance_key
if spec.container.ports:
self._service_limited_env[service_name][f'{container_name}_port'] = spec.container.ports[0]
return
else:
old_container.kill()
except docker.errors.NotFound:
instance_key = uuid.uuid4().hex

volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()}
if spec.run_as_core:
volumes.update({row[0]: {'bind': row[1], 'mode': 'ro'} for row in self.core_mounts})

all_labels = dict(self._labels)
all_labels.update({'component': service_name})
all_labels.update({'component': service_name, CHANGE_KEY_NAME: change_check})
all_labels.update(labels)

self._start_container(name=deployment_name, labels=all_labels, volumes=volumes, hostname=container_name,
cfg=spec.container, network=self._get_network(service_name).name)
spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key})

self._service_limited_env[service_name][f'{container_name}_host'] = deployment_name
self._service_limited_env[service_name][f'{container_name}_key'] = instance_key
if spec.container.ports:
self._service_limited_env[service_name][f'{container_name}_port'] = spec.container.ports[0]

self._start_container(service_name=service_name, name=deployment_name, labels=all_labels, volumes=volumes, hostname=container_name,
cfg=spec.container, core_container=spec.run_as_core, network=self._get_network(service_name).name)

def stop_containers(self, labels):
label_strings = [f'{name}={value}' for name, value in labels.items()]
Expand Down Expand Up @@ -368,6 +401,7 @@ def _pull_image(self, service):
This lets us override the auth_config on a per image basis.
"""
from docker.errors import ImageNotFound
# Split the image string into "[registry/]image_name" and "tag"
repository, _, tag = service.container_config.image.rpartition(':')
if '/' in tag:
Expand All @@ -385,4 +419,13 @@ def _pull_image(self, service):
'password': service.container_config.registry_password
}

self.client.images.pull(repository, tag, auth_config=auth_config)
try:
self.client.images.pull(repository, tag, auth_config=auth_config)
except ImageNotFound:
self.log.error(f"Couldn't pull image {repository}:{tag} check authentication settings. "
"Will try to use local copy.")

try:
self.client.images.get(repository + ':' + tag)
except ImageNotFound:
self.log.error(f"Couldn't find local image {repository}:{tag}")
2 changes: 1 addition & 1 deletion assemblyline_core/scaler/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_running_container_names(self):
def new_events(self):
return []

def start_stateful_container(self, service_name, container_name, spec, labels, change_key):
def start_stateful_container(self, service_name: str, container_name: str, spec, labels, change_key):
raise NotImplementedError()

def stop_containers(self, labels):
Expand Down
21 changes: 8 additions & 13 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,6 @@ def memory_info(self):
return self._quota_mem_limit - self._pod_used_namespace_ram, self._quota_mem_limit
return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram

def _create_volumes(self, core_mounts=False):
volumes, mounts = [], []


return volumes, mounts

def _create_containers(self, service_name: str, deployment_name: str, container_config, mounts,
core_container=False):
cores = container_config.cpu_cores
Expand Down Expand Up @@ -476,9 +470,9 @@ def _create_containers(self, service_name: str, deployment_name: str, container_
)]

def _create_deployment(self, service_name: str, deployment_name: str, docker_config: DockerConfig,
shutdown_seconds: int, scale: int, labels:dict[str,str]=None,
volumes:list[V1Volume]=None, mounts:list[V1VolumeMount]=None,
core_mounts:bool=False, change_key:str=''):
shutdown_seconds: int, scale: int, labels: dict[str, str] = None,
volumes: list[V1Volume] = None, mounts: list[V1VolumeMount] = None,
core_mounts: bool = False, change_key: str = ''):
# Build a cache key to check for changes, just trying to only patch what changed
# will still potentially result in a lot of restarts due to different kubernetes
# systems returning differently formatted data
Expand All @@ -490,7 +484,8 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
# Check if a deployment already exists, and if it does check if it has the same change key set
replace = None
try:
replace = self.apps_api.read_namespaced_deployment(deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT)
replace = self.apps_api.read_namespaced_deployment(
deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT)
if replace.metadata.annotations.get(CHANGE_KEY_NAME) == change_key:
if replace.spec.replicas != scale:
self.set_target(service_name, scale)
Expand Down Expand Up @@ -530,7 +525,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
# Send it to the server
if current_pull_secret:
self.api.patch_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret,
_request_timeout=API_TIMEOUT)
_request_timeout=API_TIMEOUT)
else:
self.api.create_namespaced_secret(namespace=self.namespace, body=new_pull_secret,
_request_timeout=API_TIMEOUT)
Expand Down Expand Up @@ -613,7 +608,7 @@ def set_target(self, service_name: str, target: int):
_request_timeout=API_TIMEOUT)
scale.spec.replicas = target
self.apps_api.patch_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale,
_request_timeout=API_TIMEOUT)
_request_timeout=API_TIMEOUT)
return
except client.ApiException as error:
# If the error is a conflict, it means multiple attempts to scale a deployment
Expand Down Expand Up @@ -673,7 +668,7 @@ def new_events(self):
return new

def start_stateful_container(self, service_name: str, container_name: str,
spec, labels: dict[str, str], change_key:str):
spec, labels: dict[str, str], change_key: str):
# Setup PVC
deployment_name = self._dependency_name(service_name, container_name)
mounts, volumes = [], []
Expand Down
17 changes: 16 additions & 1 deletion assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from contextlib import contextmanager

import elasticapm
import yaml

from assemblyline.remote.datatypes.queues.named import NamedQueue
from assemblyline.remote.datatypes.queues.priority import PriorityQueue, length as pq_length
Expand All @@ -26,7 +27,7 @@
from assemblyline.odm.messages.scaler_heartbeat import Metrics
from assemblyline.odm.messages.scaler_status_heartbeat import Status
from assemblyline.odm.messages.changes import ServiceChange, Operation
from assemblyline.common.forge import get_service_queue
from assemblyline.common.forge import get_classification, get_service_queue
from assemblyline.common.constants import SCALER_TIMEOUT_QUEUE, SERVICE_STATE_HASH, ServiceStatus
from assemblyline_core.scaler.controllers import KubernetesController
from assemblyline_core.scaler.controllers.interface import ServiceControlError
Expand Down Expand Up @@ -60,6 +61,8 @@
CLASSIFICATION_CONFIGMAP = os.getenv('CLASSIFICATION_CONFIGMAP', None)
CLASSIFICATION_CONFIGMAP_KEY = os.getenv('CLASSIFICATION_CONFIGMAP_KEY', 'classification.yml')

DOCKER_CONFIGURATION_PATH = os.getenv('DOCKER_CONFIGURATION_PATH', None)
DOCKER_CONFIGURATION_VOLUME = os.getenv('DOCKER_CONFIGURATION_VOLUME', None)
CONFIGURATION_CONFIGMAP = os.getenv('CONFIGURATION_CONFIGMAP', None)
CONFIGURATION_CONFIGMAP_KEY = os.getenv('CONFIGURATION_CONFIGMAP_KEY', 'config')

Expand Down Expand Up @@ -276,6 +279,16 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
cpu_overallocation=self.config.core.scaler.cpu_overallocation,
memory_overallocation=self.config.core.scaler.memory_overallocation,
labels=labels, log_level=self.config.logging.log_level)

if DOCKER_CONFIGURATION_PATH and DOCKER_CONFIGURATION_VOLUME:
self.controller.core_mounts.append((DOCKER_CONFIGURATION_VOLUME, '/etc/assemblyline/'))

with open(os.path.join(DOCKER_CONFIGURATION_PATH, 'config.yml'), 'w') as handle:
yaml.dump(self.config.as_primitives(), handle)

with open(os.path.join(DOCKER_CONFIGURATION_PATH, 'classification.yml'), 'w') as handle:
yaml.dump(get_classification().original_definition, handle)

# If we know where to find it, mount the classification into the service containers
if CLASSIFICATION_HOST_PATH:
self.controller.global_mounts.append((CLASSIFICATION_HOST_PATH, '/etc/assemblyline/classification.yml'))
Expand Down Expand Up @@ -397,6 +410,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
dependency_blobs[_n] = str(dependency)

if service.enabled and stage == ServiceStage.Off:
self.log.info(f'Preparing environment for {service.name}')
# Move to the next service stage (do this first because the container we are starting may care)
if service.update_config and service.update_config.wait_for_update:
self._service_stage_hash.set(name, ServiceStage.Update)
Expand All @@ -408,6 +422,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
# Enable this service's dependencies before trying to launch the service containers
self.controller.prepare_network(service.name, service.docker_config.allow_internet_access)
for _n, dependency in dependency_config.items():
self.log.info(f'Launching {service.name} dependency {_n}')
self.controller.start_stateful_container(
service_name=service.name,
container_name=_n,
Expand Down
Loading

0 comments on commit 10ff71b

Please sign in to comment.