From fa2e07979d95952d9b31d27bcf59a786ca0beceb Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Mon, 20 Sep 2021 16:21:43 +0000 Subject: [PATCH 1/8] Remove old TODO --- assemblyline_core/ingester/ingester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assemblyline_core/ingester/ingester.py b/assemblyline_core/ingester/ingester.py index 3560da5b..fd3a30f7 100644 --- a/assemblyline_core/ingester/ingester.py +++ b/assemblyline_core/ingester/ingester.py @@ -147,7 +147,7 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None, datastore=datastore, config=config) # Cache the user groups - self.cache_lock = threading.RLock() # TODO are middle man instances single threaded now? + self.cache_lock = threading.RLock() self._user_groups = {} self._user_groups_reset = time.time()//HOUR_IN_SECONDS self.cache = {} From dc3e8fc03ea5172b254873e125d28a0a88d5fc3b Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Tue, 21 Sep 2021 15:38:04 +0000 Subject: [PATCH 2/8] Pep8 and style changes --- assemblyline_core/updater/helper.py | 12 +++++++----- assemblyline_core/updater/run_updater.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/assemblyline_core/updater/helper.py b/assemblyline_core/updater/helper.py index 15741cc9..c846038c 100644 --- a/assemblyline_core/updater/helper.py +++ b/assemblyline_core/updater/helper.py @@ -6,7 +6,7 @@ from assemblyline.common.version import FRAMEWORK_VERSION, SYSTEM_VERSION from collections import defaultdict from base64 import b64encode -from packaging.version import parse +from packaging.version import parse, Version DEFAULT_DOCKER_REGISTRY = "registry.hub.docker.com" @@ -27,6 +27,7 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify): if auth: headers["Authorization"] = auth + resp = None try: resp = requests.get(url, headers=headers, verify=verify) except requests.exceptions.SSLError: @@ -36,7 +37,7 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify): resp = requests.get(url, headers=headers, verify=verify) # Test for valid response - if resp.ok: + if resp and resp.ok: # Test for positive list of tags resp_data = resp.json() return resp_data['tags'] @@ -54,6 +55,7 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify): if auth: headers["Authorization"] = auth + resp = None try: resp = requests.get(url, headers=headers, verify=verify) except requests.exceptions.SSLError: @@ -62,7 +64,7 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify): url = f"http://{server}/api/v2.0/projects/{project_id}/repositories/{repo_id}/artifacts" resp = requests.get(url, headers=headers, verify=verify) - if resp.ok: + if resp and resp.ok: return [tag['name'] for image in resp.json() if image['tags'] for tag in image['tags']] return [] @@ -134,8 +136,8 @@ def process_image(image): else: tag_name = f"{FRAMEWORK_VERSION}.{SYSTEM_VERSION}.0.{update_channel}0" for t in tags: - if re.match(f"({FRAMEWORK_VERSION})[.]({SYSTEM_VERSION})[.]\d+[.]({update_channel})\d+", t): - t_version = parse(t.replace(update_channel, "")) + if re.match(f"({FRAMEWORK_VERSION})[.]({SYSTEM_VERSION})[.]\\d+[.]({update_channel})\\d+", t): + t_version = Version(t.replace(update_channel, "")) if t_version.major == FRAMEWORK_VERSION and t_version.minor == SYSTEM_VERSION and \ t_version > parse(tag_name.replace(update_channel, "")): tag_name = t diff --git a/assemblyline_core/updater/run_updater.py b/assemblyline_core/updater/run_updater.py index 4504b26d..c7f4c98a 100644 --- a/assemblyline_core/updater/run_updater.py +++ b/assemblyline_core/updater/run_updater.py @@ -381,7 +381,7 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None): if 'KUBERNETES_SERVICE_HOST' in os.environ and NAMESPACE: extra_labels = {} if self.config.core.scaler.additional_labels: - extra_labels = {k: v for k, v in (l.split("=") for l in self.config.core.scaler.additional_labels)} + extra_labels = {k: v for k, v in (_l.split("=") for _l in self.config.core.scaler.additional_labels)} self.controller = KubernetesUpdateInterface(prefix='alsvc_', namespace=NAMESPACE, priority_class='al-core-priority', extra_labels=extra_labels, From a089d7d7b123d742f63b520167ca84d433c566cc Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Tue, 21 Sep 2021 18:56:51 +0000 Subject: [PATCH 3/8] style, linter fixes --- assemblyline_core/server_base.py | 9 ++--- test/test_updater.py | 66 -------------------------------- 2 files changed, 4 insertions(+), 71 deletions(-) delete mode 100644 test/test_updater.py diff --git a/assemblyline_core/server_base.py b/assemblyline_core/server_base.py index dd78e8b1..3cd1980a 100644 --- a/assemblyline_core/server_base.py +++ b/assemblyline_core/server_base.py @@ -13,6 +13,7 @@ import io import os from typing import Callable, TYPE_CHECKING +import typing from assemblyline.remote.datatypes import get_client from assemblyline.remote.datatypes.hash import Hash @@ -39,7 +40,7 @@ class ServerBase(threading.Thread): makes a blocking call that would normally stop this. """ def __init__(self, component_name: str, logger: logging.Logger = None, - shutdown_timeout: float = SHUTDOWN_SECONDS_LIMIT, config=None): + shutdown_timeout: float = None, config=None): super().__init__(name=component_name) al_log.init_logging(component_name) self.config: Config = config or forge.get_config() @@ -209,7 +210,7 @@ def __init__(self, component_name: str, logger: logging.Logger = None, ) # Create a cached service data object, and access to the service status - self.service_info: dict[str, Service] = forge.CachedObject(self._get_services) + self.service_info = typing.cast(typing.Dict[str, Service], forge.CachedObject(self._get_services)) self._service_stage_hash = get_service_stage_hash(self.redis) def _get_services(self): @@ -234,12 +235,10 @@ def stop(self): super().stop() self.main_loop_exit.wait(30) - def sleep(self, timeout: float): self.stopping.wait(timeout) return self.running - def log_crashes(self, fn): @functools.wraps(fn) def with_logs(*args, **kwargs): @@ -252,7 +251,7 @@ def with_logs(*args, **kwargs): def maintain_threads(self, expected_threads: dict[str, Callable[..., None]]): expected_threads = {name: self.log_crashes(start) for name, start in expected_threads.items()} - threads = {} + threads: dict[str, threading.Thread] = {} # Run as long as we need to while self.running: diff --git a/test/test_updater.py b/test/test_updater.py deleted file mode 100644 index 31ddbc84..00000000 --- a/test/test_updater.py +++ /dev/null @@ -1,66 +0,0 @@ -from typing import Union - -import redis -import tempfile -import pytest - -from assemblyline_core.server_base import ServiceStage -from assemblyline_core.updater import run_updater -from assemblyline.common.isotime import now_as_iso -from assemblyline.datastore.helper import AssemblylineDatastore -from assemblyline.odm.models.service import UpdateConfig -from assemblyline.odm.random_data import create_services -from assemblyline.odm.randomizer import random_model_obj - -from mocking import MockDatastore - - -@pytest.fixture(scope='session') -def updater_directory(): - with tempfile.TemporaryDirectory() as tmpdir: - run_updater.FILE_UPDATE_DIRECTORY = tmpdir - yield tmpdir - - -@pytest.fixture -def ds(): - return AssemblylineDatastore(MockDatastore()) - - -@pytest.fixture -def updater(clean_redis: redis.Redis, ds, updater_directory): - return run_updater.ServiceUpdater(redis_persist=clean_redis, redis=clean_redis, datastore=ds) - - -# def test_service_changes(updater: run_updater.ServiceUpdater): -# ds: MockDatastore = updater.datastore.ds -# # Base conditions, nothing anywhere -# assert updater.services.length() == 0 -# assert len(updater.datastore.list_all_services()) == 0 - -# # Nothing does nothing -# updater.sync_services() -# assert updater.services.length() == 0 -# assert len(updater.datastore.list_all_services()) == 0 - -# # Any non-disabled services should be picked up by the updater -# create_services(updater.datastore, limit=1) -# for data in ds._collections['service']._docs.values(): -# data.enabled = True -# updater._service_stage_hash.set(data.name, ServiceStage.Update) -# data.update_config = random_model_obj(UpdateConfig) -# assert len(updater.datastore.list_all_services(full=True)) == 1 -# updater.sync_services() -# assert updater.services.length() == 1 -# assert len(updater.datastore.list_all_services(full=True)) == 1 - -# # It should be scheduled to update ASAP -# for data in updater.services.items().values(): -# assert data['next_update'] <= now_as_iso() - -# # Disable the service and it will disappear from redis -# for data in ds._collections['service']._docs.values(): -# data.enabled = False -# updater.sync_services() -# assert updater.services.length() == 0 -# assert len(updater.datastore.list_all_services(full=True)) == 1 From b0dbdb285b90c12dbdcf20ec921c6cc04505fc37 Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Tue, 21 Sep 2021 18:57:09 +0000 Subject: [PATCH 4/8] Fixes for docker interface --- .../scaler/controllers/docker_ctl.py | 85 ++++++++++++++----- .../scaler/controllers/interface.py | 2 +- .../scaler/controllers/kubernetes_ctl.py | 2 +- assemblyline_core/scaler/scaler_server.py | 17 +++- 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/assemblyline_core/scaler/controllers/docker_ctl.py b/assemblyline_core/scaler/controllers/docker_ctl.py index ae8aba6b..55da3e45 100644 --- a/assemblyline_core/scaler/controllers/docker_ctl.py +++ b/assemblyline_core/scaler/controllers/docker_ctl.py @@ -1,25 +1,25 @@ +from __future__ import annotations import os import threading import time +from collections import defaultdict from typing import List, Tuple, Dict +import uuid -from assemblyline.odm.models.service import DockerConfig +from assemblyline.odm.models.service import DependencyConfig, DockerConfig from .interface import ControllerInterface, ServiceControlError -# How to identify the update volume as a whole, in a way that the underlying container system recognizes. -FILE_UPDATE_VOLUME = os.environ.get('FILE_UPDATE_VOLUME', None) - # Where to find the update directory inside this container. -FILE_UPDATE_DIRECTORY = os.environ.get('FILE_UPDATE_DIRECTORY', None) INHERITED_VARIABLES = ['HTTP_PROXY', 'HTTPS_PROXY', 'NO_PROXY', 'http_proxy', 'https_proxy', 'no_proxy'] # Every this many seconds, check that the services can actually reach the service server. NETWORK_REFRESH_INTERVAL = 60 * 3 +CHANGE_KEY_NAME = 'al_change_key' class DockerController(ControllerInterface): """A controller for *non* swarm mode docker.""" - def __init__(self, logger, prefix='', labels=None, cpu_overallocation=1, memory_overallocation=1, log_level="INFO"): + def __init__(self, logger, prefix='', labels: dict[str, str] = None, cpu_overallocation=1, memory_overallocation=1, log_level="INFO"): """ :param logger: A logger to report status and debug information. :param prefix: A prefix used to distinguish containers launched by this controller. @@ -32,9 +32,11 @@ def __init__(self, logger, prefix='', labels=None, cpu_overallocation=1, memory_ self.log = logger self.log_level = log_level self.global_mounts: List[Tuple[str, str]] = [] + self.core_mounts: List[Tuple[str, str]] = [] self._prefix: str = prefix - self._labels = labels + self._labels: dict[str, str] = labels or {} self.prune_lock = threading.Lock() + self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict) for network in self.client.networks.list(names=['external']): self.external_network = network @@ -109,8 +111,8 @@ def _flush_containers(self): def add_profile(self, profile, scale=0): """Tell the controller about a service profile it needs to manage.""" - self._pull_image(profile) self._profiles[profile.name] = profile + self._pull_image(profile) def _start(self, service_name): """Launch a docker container in a manner suitable for Assemblyline.""" @@ -124,15 +126,13 @@ def _start(self, service_name): # Prepare the volumes and folders volumes = {row[0]: {'bind': row[1], 'mode': 'ro'} for row in self.global_mounts} - volumes[os.path.join(FILE_UPDATE_VOLUME, service_name)] = {'bind': '/mount/updates/', 'mode': 'ro'} - if not os.path.exists(os.path.join(FILE_UPDATE_DIRECTORY, service_name)): - os.makedirs(os.path.join(FILE_UPDATE_DIRECTORY, service_name), 0x777) # Define environment variables env = [f'{_e.name}={_e.value}' for _e in cfg.environment] env += ['UPDATE_PATH=/mount/updates/'] env += [f'{name}={os.environ[name]}' for name in INHERITED_VARIABLES if name in os.environ] env += [f'LOG_LEVEL={self.log_level}'] + env += [f'{_n}={_v}' for _n, _v in self._service_limited_env[service_name].items()] container = self.client.containers.run( image=cfg.image, @@ -152,7 +152,7 @@ def _start(self, service_name): if cfg.allow_internet_access: self.external_network.connect(container) - def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, hostname): + def _start_container(self, service_name, name, labels, volumes, cfg: DockerConfig, network, hostname, core_container=False): """Launch a docker container.""" # Take the port strings and convert them to a dictionary ports = {} @@ -174,9 +174,13 @@ def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, ho self.log.warning(f"Not sure how to parse port string {port_string} for container {name} not using it...") # Put together the environment variables - env = [f'{_e.name}={_e.value}' for _e in cfg.environment] + env = [] + if core_container: + env += [f'{_n}={_v}' for _n, _v in os.environ.items() + if any(term in _n for term in ['ELASTIC', 'FILESTORE', 'UI_SERVER'])] + env += [f'{_e.name}={_e.value}' for _e in cfg.environment] env += [f'{name}={os.environ[name]}' for name in INHERITED_VARIABLES if name in os.environ] - env += [f'LOG_LEVEL={self.log_level}'] + env += [f'LOG_LEVEL={self.log_level}', f'AL_SERVICE_NAME={service_name}'] container = self.client.containers.run( image=cfg.image, @@ -192,8 +196,9 @@ def _start_container(self, name, labels, volumes, cfg: DockerConfig, network, ho network=network, environment=env, detach=True, - ports=ports, + # ports=ports, ) + if cfg.allow_internet_access: self.external_network.connect(container, aliases=[hostname]) @@ -324,16 +329,44 @@ def get_running_container_names(self): out.append(container.name) return out - def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False, change_key=''): - volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()} + def start_stateful_container(self, service_name: str, container_name: str, spec: DependencyConfig, + labels: dict[str, str], change_key: str): + import docker.errors deployment_name = f'{service_name}-dep-{container_name}' + change_check = change_key + service_name + container_name + str(spec) + + try: + old_container = self.client.containers.get(deployment_name) + instance_key = old_container.attrs["Config"]["Env"]['AL_INSTANCE_KEY'] + if old_container.labels.get(CHANGE_KEY_NAME) == change_check and old_container.status == 'running': + self._service_limited_env[service_name][f'{container_name}_host'] = deployment_name + self._service_limited_env[service_name][f'{container_name}_key'] = instance_key + if spec.container.ports: + self._service_limited_env[service_name][f'{container_name}_port'] = spec.container.ports[0] + return + else: + old_container.kill() + except docker.errors.NotFound: + instance_key = uuid.uuid4().hex + + volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()} + if spec.run_as_core: + volumes.update({row[0]: {'bind': row[1], 'mode': 'ro'} for row in self.core_mounts}) + all_labels = dict(self._labels) - all_labels.update({'component': service_name}) + all_labels.update({'component': service_name, CHANGE_KEY_NAME: change_check}) all_labels.update(labels) - self._start_container(name=deployment_name, labels=all_labels, volumes=volumes, hostname=container_name, - cfg=spec.container, network=self._get_network(service_name).name) + spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key}) + + self._service_limited_env[service_name][f'{container_name}_host'] = deployment_name + self._service_limited_env[service_name][f'{container_name}_key'] = instance_key + if spec.container.ports: + self._service_limited_env[service_name][f'{container_name}_port'] = spec.container.ports[0] + + self._start_container(service_name=service_name, name=deployment_name, labels=all_labels, volumes=volumes, hostname=container_name, + cfg=spec.container, core_container=spec.run_as_core, network=self._get_network(service_name).name) def stop_containers(self, labels): label_strings = [f'{name}={value}' for name, value in labels.items()] @@ -368,6 +401,7 @@ def _pull_image(self, service): This lets us override the auth_config on a per image basis. """ + from docker.errors import ImageNotFound # Split the image string into "[registry/]image_name" and "tag" repository, _, tag = service.container_config.image.rpartition(':') if '/' in tag: @@ -385,4 +419,13 @@ def _pull_image(self, service): 'password': service.container_config.registry_password } - self.client.images.pull(repository, tag, auth_config=auth_config) + try: + self.client.images.pull(repository, tag, auth_config=auth_config) + except ImageNotFound: + self.log.error(f"Couldn't pull image {repository}:{tag} check authentication settings. " + "Will try to use local copy.") + + try: + self.client.images.get(repository + ':' + tag) + except ImageNotFound: + self.log.error(f"Couldn't find local image {repository}:{tag}") diff --git a/assemblyline_core/scaler/controllers/interface.py b/assemblyline_core/scaler/controllers/interface.py index 5425ca29..134b769d 100644 --- a/assemblyline_core/scaler/controllers/interface.py +++ b/assemblyline_core/scaler/controllers/interface.py @@ -53,7 +53,7 @@ def get_running_container_names(self): def new_events(self): return [] - def start_stateful_container(self, service_name, container_name, spec, labels, change_key): + def start_stateful_container(self, service_name: str, container_name: str, spec, labels, change_key): raise NotImplementedError() def stop_containers(self, labels): diff --git a/assemblyline_core/scaler/controllers/kubernetes_ctl.py b/assemblyline_core/scaler/controllers/kubernetes_ctl.py index 00158b00..efe295f3 100644 --- a/assemblyline_core/scaler/controllers/kubernetes_ctl.py +++ b/assemblyline_core/scaler/controllers/kubernetes_ctl.py @@ -673,7 +673,7 @@ def new_events(self): return new def start_stateful_container(self, service_name: str, container_name: str, - spec, labels: dict[str, str], change_key:str): + spec, labels: dict[str, str], change_key: str): # Setup PVC deployment_name = self._dependency_name(service_name, container_name) mounts, volumes = [], [] diff --git a/assemblyline_core/scaler/scaler_server.py b/assemblyline_core/scaler/scaler_server.py index 42e9ac9b..5a47c5b3 100644 --- a/assemblyline_core/scaler/scaler_server.py +++ b/assemblyline_core/scaler/scaler_server.py @@ -16,6 +16,7 @@ from contextlib import contextmanager import elasticapm +import yaml from assemblyline.remote.datatypes.queues.named import NamedQueue from assemblyline.remote.datatypes.queues.priority import PriorityQueue, length as pq_length @@ -26,7 +27,7 @@ from assemblyline.odm.messages.scaler_heartbeat import Metrics from assemblyline.odm.messages.scaler_status_heartbeat import Status from assemblyline.odm.messages.changes import ServiceChange, Operation -from assemblyline.common.forge import get_service_queue +from assemblyline.common.forge import get_classification, get_service_queue from assemblyline.common.constants import SCALER_TIMEOUT_QUEUE, SERVICE_STATE_HASH, ServiceStatus from assemblyline_core.scaler.controllers import KubernetesController from assemblyline_core.scaler.controllers.interface import ServiceControlError @@ -60,6 +61,8 @@ CLASSIFICATION_CONFIGMAP = os.getenv('CLASSIFICATION_CONFIGMAP', None) CLASSIFICATION_CONFIGMAP_KEY = os.getenv('CLASSIFICATION_CONFIGMAP_KEY', 'classification.yml') +DOCKER_CONFIGURATION_PATH = os.getenv('DOCKER_CONFIGURATION_PATH', None) +DOCKER_CONFIGURATION_VOLUME = os.getenv('DOCKER_CONFIGURATION_VOLUME', None) CONFIGURATION_CONFIGMAP = os.getenv('CONFIGURATION_CONFIGMAP', None) CONFIGURATION_CONFIGMAP_KEY = os.getenv('CONFIGURATION_CONFIGMAP_KEY', 'config') @@ -276,6 +279,16 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None): cpu_overallocation=self.config.core.scaler.cpu_overallocation, memory_overallocation=self.config.core.scaler.memory_overallocation, labels=labels, log_level=self.config.logging.log_level) + + if DOCKER_CONFIGURATION_PATH and DOCKER_CONFIGURATION_VOLUME: + self.controller.core_mounts.append((DOCKER_CONFIGURATION_VOLUME, '/etc/assemblyline/')) + + with open(os.path.join(DOCKER_CONFIGURATION_PATH, 'config.yml'), 'w') as handle: + yaml.dump(self.config.as_primitives(), handle) + + with open(os.path.join(DOCKER_CONFIGURATION_PATH, 'classification.yml'), 'w') as handle: + yaml.dump(get_classification().original_definition, handle) + # If we know where to find it, mount the classification into the service containers if CLASSIFICATION_HOST_PATH: self.controller.global_mounts.append((CLASSIFICATION_HOST_PATH, '/etc/assemblyline/classification.yml')) @@ -397,6 +410,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: dependency_blobs[_n] = str(dependency) if service.enabled and stage == ServiceStage.Off: + self.log.info(f'Preparing environment for {service.name}') # Move to the next service stage (do this first because the container we are starting may care) if service.update_config and service.update_config.wait_for_update: self._service_stage_hash.set(name, ServiceStage.Update) @@ -408,6 +422,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # Enable this service's dependencies before trying to launch the service containers self.controller.prepare_network(service.name, service.docker_config.allow_internet_access) for _n, dependency in dependency_config.items(): + self.log.info(f'Launching {service.name} dependency {_n}') self.controller.start_stateful_container( service_name=service.name, container_name=_n, From 7aa3f3055397d89f092e35f67b658b619a9dd3a3 Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Tue, 21 Sep 2021 19:27:13 +0000 Subject: [PATCH 5/8] type annotation and style fixes --- assemblyline_core/server_base.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/assemblyline_core/server_base.py b/assemblyline_core/server_base.py index 3cd1980a..1ebc3721 100644 --- a/assemblyline_core/server_base.py +++ b/assemblyline_core/server_base.py @@ -14,6 +14,7 @@ import os from typing import Callable, TYPE_CHECKING import typing +from assemblyline.odm.base import Optional from assemblyline.remote.datatypes import get_client from assemblyline.remote.datatypes.hash import Hash @@ -52,25 +53,26 @@ def __init__(self, component_name: str, logger: logging.Logger = None, self._exception = None self._traceback = None self._shutdown_timeout = shutdown_timeout if shutdown_timeout is not None else SHUTDOWN_SECONDS_LIMIT - self._old_sigint = None - self._old_sigterm = None + self._old_sigint: Optional[Callable[..., None]] = None + self._old_sigterm: Optional[Callable[..., None]] = None self._stopped = False - self._last_heartbeat = 0 + self._last_heartbeat = 0.0 def __enter__(self): - self.log.info(f"Initialized") + self.log.info("Initialized") return self def __exit__(self, _exc_type, _exc_val, _exc_tb): if _exc_type is not None: self.log.exception(f'Terminated because of an {_exc_type} exception') else: - self.log.info(f'Terminated') + self.log.info('Terminated') def __stop(self): """Hard stop, can still be blocked in some cases, but we should try to avoid them.""" time.sleep(self._shutdown_timeout) - self.log.error(f"Server {self.__class__.__name__} has shutdown hard after waiting {self._shutdown_timeout} seconds to stop") + self.log.error(f"Server {self.__class__.__name__} has shutdown hard after " + f"waiting {self._shutdown_timeout} seconds to stop") if not self._stopped: self._stopped = True @@ -79,7 +81,7 @@ def __stop(self): ctypes.string_at(0) # SEGFAULT out of here def interrupt_handler(self, signum, stack_frame): - self.log.info(f"Instance caught signal. Coming down...") + self.log.info("Instance caught signal. Coming down...") self.stop() if signum == signal.SIGINT and self._old_sigint: self._old_sigint(signum, stack_frame) @@ -114,7 +116,7 @@ def start(self): """Start the server workload.""" self.running = True super().start() - self.log.info(f"Started") + self.log.info("Started") self._old_sigint = signal.signal(signal.SIGINT, self.interrupt_handler) self._old_sigterm = signal.signal(signal.SIGTERM, self.interrupt_handler) @@ -142,8 +144,9 @@ def heartbeat(self, timestamp: int = None): a background thread defeats the purpose. Ideally it should be called at least a couple times a minute. """ + utime_timestamp = None if timestamp is not None: - timestamp = (timestamp, timestamp) + utime_timestamp = (timestamp, timestamp) if self.config.logging.heartbeat_file: # Only do the heartbeat every few seconds at most. If a fast component is @@ -154,7 +157,7 @@ def heartbeat(self, timestamp: int = None): return self._last_heartbeat = now with io.open(self.config.logging.heartbeat_file, 'ab'): - os.utime(self.config.logging.heartbeat_file, times=timestamp) + os.utime(self.config.logging.heartbeat_file, times=utime_timestamp) def sleep_with_heartbeat(self, duration): """Sleep while calling heartbeat periodically.""" From fefd7c77b61d9c84b1c11a74620563aff0253eba Mon Sep 17 00:00:00 2001 From: cccs-rs <62077998+cccs-rs@users.noreply.github.com> Date: Fri, 24 Sep 2021 15:01:08 +0000 Subject: [PATCH 6/8] remove unused function --- .../scaler/controllers/kubernetes_ctl.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/assemblyline_core/scaler/controllers/kubernetes_ctl.py b/assemblyline_core/scaler/controllers/kubernetes_ctl.py index efe295f3..acbdf50f 100644 --- a/assemblyline_core/scaler/controllers/kubernetes_ctl.py +++ b/assemblyline_core/scaler/controllers/kubernetes_ctl.py @@ -436,12 +436,6 @@ def memory_info(self): return self._quota_mem_limit - self._pod_used_namespace_ram, self._quota_mem_limit return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram - def _create_volumes(self, core_mounts=False): - volumes, mounts = [], [] - - - return volumes, mounts - def _create_containers(self, service_name: str, deployment_name: str, container_config, mounts, core_container=False): cores = container_config.cpu_cores @@ -476,9 +470,9 @@ def _create_containers(self, service_name: str, deployment_name: str, container_ )] def _create_deployment(self, service_name: str, deployment_name: str, docker_config: DockerConfig, - shutdown_seconds: int, scale: int, labels:dict[str,str]=None, - volumes:list[V1Volume]=None, mounts:list[V1VolumeMount]=None, - core_mounts:bool=False, change_key:str=''): + shutdown_seconds: int, scale: int, labels: dict[str, str] = None, + volumes: list[V1Volume] = None, mounts: list[V1VolumeMount] = None, + core_mounts: bool = False, change_key: str = ''): # Build a cache key to check for changes, just trying to only patch what changed # will still potentially result in a lot of restarts due to different kubernetes # systems returning differently formatted data @@ -490,7 +484,8 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con # Check if a deployment already exists, and if it does check if it has the same change key set replace = None try: - replace = self.apps_api.read_namespaced_deployment(deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT) + replace = self.apps_api.read_namespaced_deployment( + deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT) if replace.metadata.annotations.get(CHANGE_KEY_NAME) == change_key: if replace.spec.replicas != scale: self.set_target(service_name, scale) @@ -530,7 +525,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con # Send it to the server if current_pull_secret: self.api.patch_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret, - _request_timeout=API_TIMEOUT) + _request_timeout=API_TIMEOUT) else: self.api.create_namespaced_secret(namespace=self.namespace, body=new_pull_secret, _request_timeout=API_TIMEOUT) @@ -613,7 +608,7 @@ def set_target(self, service_name: str, target: int): _request_timeout=API_TIMEOUT) scale.spec.replicas = target self.apps_api.patch_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale, - _request_timeout=API_TIMEOUT) + _request_timeout=API_TIMEOUT) return except client.ApiException as error: # If the error is a conflict, it means multiple attempts to scale a deployment From 7fee268e12e60d3a33c786d9ed89fc9e1f8d7313 Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Mon, 27 Sep 2021 19:42:49 +0000 Subject: [PATCH 7/8] Add complete queue to heartbeat --- assemblyline_core/metrics/heartbeat_formatter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/assemblyline_core/metrics/heartbeat_formatter.py b/assemblyline_core/metrics/heartbeat_formatter.py index 41c51d62..8f638138 100644 --- a/assemblyline_core/metrics/heartbeat_formatter.py +++ b/assemblyline_core/metrics/heartbeat_formatter.py @@ -23,6 +23,7 @@ from assemblyline.remote.datatypes.queues.comms import CommsQueue from assemblyline.remote.datatypes.queues.named import NamedQueue from assemblyline.remote.datatypes.queues.priority import PriorityQueue +from assemblyline_core.ingester.constants import COMPLETE_QUEUE_NAME STATUS_QUEUE = "status" @@ -68,6 +69,7 @@ def __init__(self, sender, log, config=None, redis=None): self.ingest_scanning = Hash('m-scanning-table', self.redis_persist) self.ingest_unique_queue = PriorityQueue('m-unique', self.redis_persist) self.ingest_queue = NamedQueue(INGEST_QUEUE_NAME, self.redis_persist) + self.ingest_complete_queue = NamedQueue(COMPLETE_QUEUE_NAME, self.redis) self.alert_queue = NamedQueue(ALERT_QUEUE_NAME, self.redis_persist) constants = forge.get_constants(self.config) @@ -162,6 +164,7 @@ def send_heartbeat(self, m_type, m_name, m_data, instances): "critical": c_q_len, "high": h_q_len, "ingest": self.ingest_queue.length(), + "complete": self.ingest_complete_queue.length(), "low": l_q_len, "medium": m_q_len } From 1cc8eef02e83928dcad0772ebf0dd00133e97386 Mon Sep 17 00:00:00 2001 From: cccs-rs <62077998+cccs-rs@users.noreply.github.com> Date: Mon, 27 Sep 2021 19:49:12 +0000 Subject: [PATCH 8/8] Multi-thread complete --- assemblyline_core/ingester/ingester.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/assemblyline_core/ingester/ingester.py b/assemblyline_core/ingester/ingester.py index fd3a30f7..eeab6b8d 100644 --- a/assemblyline_core/ingester/ingester.py +++ b/assemblyline_core/ingester/ingester.py @@ -49,6 +49,7 @@ _retry_delay = 60 * 4 # Wait 4 minutes to retry _max_time = 2 * 24 * 60 * 60 # Wait 2 days for responses. HOUR_IN_SECONDS = 60 * 60 +COMPLETE_THREADS = int(environ.get('INGESTER_COMPLETE_THREADS', 4)) INGEST_THREADS = int(environ.get('INGESTER_INGEST_THREADS', 1)) SUBMIT_THREADS = int(environ.get('INGESTER_SUBMIT_THREADS', 4)) @@ -223,10 +224,10 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None, def try_run(self): threads_to_maintain = { - 'Complete': self.handle_complete, 'Retries': self.handle_retries, 'Timeouts': self.handle_timeouts } + threads_to_maintain.update({f'Complete_{n}': self.handle_complete for n in range(COMPLETE_THREADS)}) threads_to_maintain.update({f'Ingest_{n}': self.handle_ingest for n in range(INGEST_THREADS)}) threads_to_maintain.update({f'Submit_{n}': self.handle_submit for n in range(SUBMIT_THREADS)}) self.maintain_threads(threads_to_maintain)