Skip to content

Commit

Permalink
Merge pull request #297 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update
  • Loading branch information
cccs-douglass committed Sep 15, 2021
2 parents 6a7ca28 + 8d0d734 commit af5f2d2
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 632 deletions.
6 changes: 3 additions & 3 deletions assemblyline_core/dispatching/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def retry_error(self, task: SubmissionTask, sha256, service_name):
task.service_errors[(sha256, service_name)] = error_key

export_metrics_once(service_name, ServiceMetrics, dict(fail_nonrecoverable=1),
counter_type='service', host='dispatcher')
counter_type='service', host='dispatcher', redis=self.redis)

# Send the result key to any watching systems
msg = {'status': 'FAIL', 'cache_key': error_key}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ def timeout_service(self, task: SubmissionTask, sha256, service_name, worker_id)

# Report to the metrics system that a recoverable error has occurred for that service
export_metrics_once(service_name, ServiceMetrics, dict(fail_recoverable=1),
host=worker_id, counter_type='service')
host=worker_id, counter_type='service', redis=self.redis)

def work_guard(self):
check_interval = GUARD_TIMEOUT/8
Expand Down Expand Up @@ -1244,7 +1244,7 @@ def timeout_backstop(self):

# Report to the metrics system that a recoverable error has occurred for that service
export_metrics_once(task.service_name, ServiceMetrics, dict(fail_recoverable=1),
host=task.metadata['worker__'], counter_type='service')
host=task.metadata['worker__'], counter_type='service', redis=self.redis)

# Look for unassigned submissions in the datastore if we don't have a
# large number of outstanding things in the queue already.
Expand Down
14 changes: 9 additions & 5 deletions assemblyline_core/ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import threading
import time
from os import environ
from random import random
from typing import Iterable, List, Optional, Dict, Tuple

Expand Down Expand Up @@ -48,6 +49,8 @@
_retry_delay = 60 * 4 # Wait 4 minutes to retry
_max_time = 2 * 24 * 60 * 60 # Wait 2 days for responses.
HOUR_IN_SECONDS = 60 * 60
INGEST_THREADS = environ.get('INGESTER_INGEST_THREADS', 1)
SUBMIT_THREADS = environ.get('INGESTER_SUBMIT_THREADS', 4)


def must_drop(length: int, maximum: int) -> bool:
Expand Down Expand Up @@ -219,13 +222,14 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None,
self.apm_client = None

def try_run(self):
self.maintain_threads({
'Ingest': self.handle_ingest,
'Submit': self.handle_submit,
threads_to_maintain = {
'Complete': self.handle_complete,
'Retries': self.handle_retries,
'Timeouts': self.handle_timeouts,
})
'Timeouts': self.handle_timeouts
}
threads_to_maintain.update({f'Ingest_{n}': self.handle_ingest for n in range(INGEST_THREADS)})
threads_to_maintain.update({f'Submit_{n}': self.handle_submit for n in range(SUBMIT_THREADS)})
self.maintain_threads(threads_to_maintain)

def handle_ingest(self):
cpu_mark = time.process_time()
Expand Down
2 changes: 1 addition & 1 deletion assemblyline_core/scaler/controllers/docker_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def get_running_container_names(self):
out.append(container.name)
return out

def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False):
def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False, change_key=''):
volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()}
deployment_name = f'{service_name}-dep-{container_name}'

Expand Down
2 changes: 1 addition & 1 deletion assemblyline_core/scaler/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_running_container_names(self):
def new_events(self):
return []

def start_stateful_container(self, service_name, container_name, spec, labels):
def start_stateful_container(self, service_name, container_name, spec, labels, change_key):
raise NotImplementedError()

def stop_containers(self, labels):
Expand Down
78 changes: 47 additions & 31 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \
V1NetworkPolicyIngressRule, V1Secret, V1LocalObjectReference, V1Service, V1ServiceSpec, V1ServicePort
from kubernetes.client.rest import ApiException
from assemblyline.odm.models.service import DockerConfig

from assemblyline_core.scaler.controllers.interface import ControllerInterface

Expand All @@ -26,6 +27,7 @@
API_TIMEOUT = 90
WATCH_TIMEOUT = 10 * 60
WATCH_API_TIMEOUT = WATCH_TIMEOUT + 10
CHANGE_KEY_NAME = 'al_change_key'

_exponents = {
'Ki': 2**10,
Expand Down Expand Up @@ -239,7 +241,8 @@ def core_config_mount(self, name, config_map, key, target_path):
def add_profile(self, profile, scale=0):
"""Tell the controller about a service profile it needs to manage."""
self._create_deployment(profile.name, self._deployment_name(profile.name),
profile.container_config, profile.shutdown_seconds, scale)
profile.container_config, profile.shutdown_seconds, scale,
change_key=profile.config_blob)
self._external_profiles[profile.name] = profile

def _loop_forever(self, function):
Expand Down Expand Up @@ -433,20 +436,9 @@ def memory_info(self):
return self._quota_mem_limit - self._pod_used_namespace_ram, self._quota_mem_limit
return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram

@staticmethod
def _create_metadata(deployment_name: str, labels: dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, core_mounts=False):
volumes, mounts = [], []

# Attach the mount that provides the config file
volumes.extend(self.config_volumes.values())
mounts.extend(self.config_mounts.values())

if core_mounts:
volumes.extend(self.core_config_volumes.values())
mounts.extend(self.core_config_mounts.values())

return volumes, mounts

Expand Down Expand Up @@ -483,17 +475,30 @@ def _create_containers(self, service_name: str, deployment_name: str, container_
)
)]

def _create_deployment(self, service_name: str, deployment_name: str, docker_config,
shutdown_seconds, scale: int, labels=None, volumes=None, mounts=None,
core_mounts=False):

replace = False
resources = self.apps_api.list_namespaced_deployment(namespace=self.namespace, _request_timeout=API_TIMEOUT)
for dep in resources.items:
if dep.metadata.name == deployment_name:
replace = True
break

def _create_deployment(self, service_name: str, deployment_name: str, docker_config: DockerConfig,
shutdown_seconds: int, scale: int, labels:dict[str,str]=None,
volumes:list[V1Volume]=None, mounts:list[V1VolumeMount]=None,
core_mounts:bool=False, change_key:str=''):
# Build a cache key to check for changes, just trying to only patch what changed
# will still potentially result in a lot of restarts due to different kubernetes
# systems returning differently formatted data
change_key = (
deployment_name + change_key + str(docker_config) + str(shutdown_seconds) +
str(sorted((labels or {}).items())) + str(volumes) + str(mounts) + str(core_mounts)
)

# Check if a deployment already exists, and if it does check if it has the same change key set
replace = None
try:
replace = self.apps_api.read_namespaced_deployment(deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT)
if replace.metadata.annotations.get(CHANGE_KEY_NAME) == change_key:
if replace.spec.replicas != scale:
self.set_target(service_name, scale)
return
except ApiException as error:
if error.status != 404:
raise

# If we have been given a username or password for the registry, we have to
# update it, if we haven't been, make sure its been cleaned up in the system
# so we don't leave passwords lying around
Expand Down Expand Up @@ -524,7 +529,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con

# Send it to the server
if current_pull_secret:
self.api.replace_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret,
self.api.patch_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret,
_request_timeout=API_TIMEOUT)
else:
self.api.create_namespaced_secret(namespace=self.namespace, body=new_pull_secret,
Expand All @@ -538,10 +543,20 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
all_labels['section'] = 'core'
all_labels.update(labels or {})

all_volumes, all_mounts = self._create_volumes(core_mounts)
# Build set of volumes, first the global mounts, then the core specific ones,
# then the ones specific to this container only
all_volumes: list[V1Volume] = []
all_mounts: list[V1VolumeMount] = []
all_volumes.extend(self.config_volumes.values())
all_mounts.extend(self.config_mounts.values())
if core_mounts:
all_volumes.extend(self.core_config_volumes.values())
all_mounts.extend(self.core_config_mounts.values())
all_volumes.extend(volumes or [])
all_mounts.extend(mounts or [])
metadata = self._create_metadata(deployment_name=deployment_name, labels=all_labels)

# Build metadata
metadata = V1ObjectMeta(name=deployment_name, labels=all_labels, annotations={CHANGE_KEY_NAME: change_key})

pod = V1PodSpec(
volumes=all_volumes,
Expand Down Expand Up @@ -597,7 +612,7 @@ def set_target(self, service_name: str, target: int):
scale = self.apps_api.read_namespaced_deployment_scale(name=name, namespace=self.namespace,
_request_timeout=API_TIMEOUT)
scale.spec.replicas = target
self.apps_api.replace_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale,
self.apps_api.patch_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale,
_request_timeout=API_TIMEOUT)
return
except client.ApiException as error:
Expand Down Expand Up @@ -630,7 +645,8 @@ def stop_container(self, service_name, container_id):

def restart(self, service):
self._create_deployment(service.name, self._deployment_name(service.name), service.container_config,
service.shutdown_seconds, self.get_target(service.name))
service.shutdown_seconds, self.get_target(service.name),
change_key=service.config_blob)

def get_running_container_names(self):
pods = self.api.list_pod_for_all_namespaces(field_selector='status.phase==Running',
Expand All @@ -657,7 +673,7 @@ def new_events(self):
return new

def start_stateful_container(self, service_name: str, container_name: str,
spec, labels: dict[str, str]):
spec, labels: dict[str, str], change_key:str):
# Setup PVC
deployment_name = self._dependency_name(service_name, container_name)
mounts, volumes = [], []
Expand Down Expand Up @@ -692,15 +708,15 @@ def start_stateful_container(self, service_name: str, container_name: str,
spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key})
self._create_deployment(service_name, deployment_name, spec.container,
30, 1, labels, volumes=volumes, mounts=mounts,
core_mounts=spec.run_as_core)
core_mounts=spec.run_as_core, change_key=change_key)

# Setup a service to direct to the deployment
try:
service = self.api.read_namespaced_service(deployment_name, self.namespace)
service.metadata.labels = labels
service.spec.selector = labels
service.spec.ports = [V1ServicePort(port=int(_p)) for _p in spec.container.ports]
self.api.replace_namespaced_service(deployment_name, self.namespace, service)
self.api.patch_namespaced_service(deployment_name, self.namespace, service)
except ApiException as error:
if error.status != 404:
raise
Expand Down
Loading

0 comments on commit af5f2d2

Please sign in to comment.