diff --git a/assemblyline_core/dispatching/dispatcher.py b/assemblyline_core/dispatching/dispatcher.py index 34a3d1f3..25168458 100644 --- a/assemblyline_core/dispatching/dispatcher.py +++ b/assemblyline_core/dispatching/dispatcher.py @@ -719,7 +719,7 @@ def retry_error(self, task: SubmissionTask, sha256, service_name): task.service_errors[(sha256, service_name)] = error_key export_metrics_once(service_name, ServiceMetrics, dict(fail_nonrecoverable=1), - counter_type='service', host='dispatcher') + counter_type='service', host='dispatcher', redis=self.redis) # Send the result key to any watching systems msg = {'status': 'FAIL', 'cache_key': error_key} @@ -1040,7 +1040,7 @@ def timeout_service(self, task: SubmissionTask, sha256, service_name, worker_id) # Report to the metrics system that a recoverable error has occurred for that service export_metrics_once(service_name, ServiceMetrics, dict(fail_recoverable=1), - host=worker_id, counter_type='service') + host=worker_id, counter_type='service', redis=self.redis) def work_guard(self): check_interval = GUARD_TIMEOUT/8 @@ -1244,7 +1244,7 @@ def timeout_backstop(self): # Report to the metrics system that a recoverable error has occurred for that service export_metrics_once(task.service_name, ServiceMetrics, dict(fail_recoverable=1), - host=task.metadata['worker__'], counter_type='service') + host=task.metadata['worker__'], counter_type='service', redis=self.redis) # Look for unassigned submissions in the datastore if we don't have a # large number of outstanding things in the queue already. diff --git a/assemblyline_core/ingester/ingester.py b/assemblyline_core/ingester/ingester.py index 71ea24d6..f22804aa 100644 --- a/assemblyline_core/ingester/ingester.py +++ b/assemblyline_core/ingester/ingester.py @@ -11,6 +11,7 @@ import threading import time +from os import environ from random import random from typing import Iterable, List, Optional, Dict, Tuple @@ -48,6 +49,8 @@ _retry_delay = 60 * 4 # Wait 4 minutes to retry _max_time = 2 * 24 * 60 * 60 # Wait 2 days for responses. HOUR_IN_SECONDS = 60 * 60 +INGEST_THREADS = environ.get('INGESTER_INGEST_THREADS', 1) +SUBMIT_THREADS = environ.get('INGESTER_SUBMIT_THREADS', 4) def must_drop(length: int, maximum: int) -> bool: @@ -219,13 +222,14 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None, self.apm_client = None def try_run(self): - self.maintain_threads({ - 'Ingest': self.handle_ingest, - 'Submit': self.handle_submit, + threads_to_maintain = { 'Complete': self.handle_complete, 'Retries': self.handle_retries, - 'Timeouts': self.handle_timeouts, - }) + 'Timeouts': self.handle_timeouts + } + threads_to_maintain.update({f'Ingest_{n}': self.handle_ingest for n in range(INGEST_THREADS)}) + threads_to_maintain.update({f'Submit_{n}': self.handle_submit for n in range(SUBMIT_THREADS)}) + self.maintain_threads(threads_to_maintain) def handle_ingest(self): cpu_mark = time.process_time() diff --git a/assemblyline_core/scaler/controllers/docker_ctl.py b/assemblyline_core/scaler/controllers/docker_ctl.py index b6892376..ae8aba6b 100644 --- a/assemblyline_core/scaler/controllers/docker_ctl.py +++ b/assemblyline_core/scaler/controllers/docker_ctl.py @@ -324,7 +324,7 @@ def get_running_container_names(self): out.append(container.name) return out - def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False): + def start_stateful_container(self, service_name, container_name, spec, labels, mount_updates=False, change_key=''): volumes = {_n: {'bind': _v.mount_path, 'mode': 'rw'} for _n, _v in spec.volumes.items()} deployment_name = f'{service_name}-dep-{container_name}' diff --git a/assemblyline_core/scaler/controllers/interface.py b/assemblyline_core/scaler/controllers/interface.py index b3222dcb..5425ca29 100644 --- a/assemblyline_core/scaler/controllers/interface.py +++ b/assemblyline_core/scaler/controllers/interface.py @@ -53,7 +53,7 @@ def get_running_container_names(self): def new_events(self): return [] - def start_stateful_container(self, service_name, container_name, spec, labels): + def start_stateful_container(self, service_name, container_name, spec, labels, change_key): raise NotImplementedError() def stop_containers(self, labels): diff --git a/assemblyline_core/scaler/controllers/kubernetes_ctl.py b/assemblyline_core/scaler/controllers/kubernetes_ctl.py index 06541db1..1d363c83 100644 --- a/assemblyline_core/scaler/controllers/kubernetes_ctl.py +++ b/assemblyline_core/scaler/controllers/kubernetes_ctl.py @@ -18,6 +18,7 @@ V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \ V1NetworkPolicyIngressRule, V1Secret, V1LocalObjectReference, V1Service, V1ServiceSpec, V1ServicePort from kubernetes.client.rest import ApiException +from assemblyline.odm.models.service import DockerConfig from assemblyline_core.scaler.controllers.interface import ControllerInterface @@ -26,6 +27,7 @@ API_TIMEOUT = 90 WATCH_TIMEOUT = 10 * 60 WATCH_API_TIMEOUT = WATCH_TIMEOUT + 10 +CHANGE_KEY_NAME = 'al_change_key' _exponents = { 'Ki': 2**10, @@ -239,7 +241,8 @@ def core_config_mount(self, name, config_map, key, target_path): def add_profile(self, profile, scale=0): """Tell the controller about a service profile it needs to manage.""" self._create_deployment(profile.name, self._deployment_name(profile.name), - profile.container_config, profile.shutdown_seconds, scale) + profile.container_config, profile.shutdown_seconds, scale, + change_key=profile.config_blob) self._external_profiles[profile.name] = profile def _loop_forever(self, function): @@ -433,20 +436,9 @@ def memory_info(self): return self._quota_mem_limit - self._pod_used_namespace_ram, self._quota_mem_limit return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram - @staticmethod - def _create_metadata(deployment_name: str, labels: dict[str, str]): - return V1ObjectMeta(name=deployment_name, labels=labels) - def _create_volumes(self, core_mounts=False): volumes, mounts = [], [] - # Attach the mount that provides the config file - volumes.extend(self.config_volumes.values()) - mounts.extend(self.config_mounts.values()) - - if core_mounts: - volumes.extend(self.core_config_volumes.values()) - mounts.extend(self.core_config_mounts.values()) return volumes, mounts @@ -483,17 +475,30 @@ def _create_containers(self, service_name: str, deployment_name: str, container_ ) )] - def _create_deployment(self, service_name: str, deployment_name: str, docker_config, - shutdown_seconds, scale: int, labels=None, volumes=None, mounts=None, - core_mounts=False): - - replace = False - resources = self.apps_api.list_namespaced_deployment(namespace=self.namespace, _request_timeout=API_TIMEOUT) - for dep in resources.items: - if dep.metadata.name == deployment_name: - replace = True - break - + def _create_deployment(self, service_name: str, deployment_name: str, docker_config: DockerConfig, + shutdown_seconds: int, scale: int, labels:dict[str,str]=None, + volumes:list[V1Volume]=None, mounts:list[V1VolumeMount]=None, + core_mounts:bool=False, change_key:str=''): + # Build a cache key to check for changes, just trying to only patch what changed + # will still potentially result in a lot of restarts due to different kubernetes + # systems returning differently formatted data + change_key = ( + deployment_name + change_key + str(docker_config) + str(shutdown_seconds) + + str(sorted((labels or {}).items())) + str(volumes) + str(mounts) + str(core_mounts) + ) + + # Check if a deployment already exists, and if it does check if it has the same change key set + replace = None + try: + replace = self.apps_api.read_namespaced_deployment(deployment_name, namespace=self.namespace, _request_timeout=API_TIMEOUT) + if replace.metadata.annotations.get(CHANGE_KEY_NAME) == change_key: + if replace.spec.replicas != scale: + self.set_target(service_name, scale) + return + except ApiException as error: + if error.status != 404: + raise + # If we have been given a username or password for the registry, we have to # update it, if we haven't been, make sure its been cleaned up in the system # so we don't leave passwords lying around @@ -524,7 +529,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con # Send it to the server if current_pull_secret: - self.api.replace_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret, + self.api.patch_namespaced_secret(pull_secret_name, namespace=self.namespace, body=new_pull_secret, _request_timeout=API_TIMEOUT) else: self.api.create_namespaced_secret(namespace=self.namespace, body=new_pull_secret, @@ -538,10 +543,20 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con all_labels['section'] = 'core' all_labels.update(labels or {}) - all_volumes, all_mounts = self._create_volumes(core_mounts) + # Build set of volumes, first the global mounts, then the core specific ones, + # then the ones specific to this container only + all_volumes: list[V1Volume] = [] + all_mounts: list[V1VolumeMount] = [] + all_volumes.extend(self.config_volumes.values()) + all_mounts.extend(self.config_mounts.values()) + if core_mounts: + all_volumes.extend(self.core_config_volumes.values()) + all_mounts.extend(self.core_config_mounts.values()) all_volumes.extend(volumes or []) all_mounts.extend(mounts or []) - metadata = self._create_metadata(deployment_name=deployment_name, labels=all_labels) + + # Build metadata + metadata = V1ObjectMeta(name=deployment_name, labels=all_labels, annotations={CHANGE_KEY_NAME: change_key}) pod = V1PodSpec( volumes=all_volumes, @@ -597,7 +612,7 @@ def set_target(self, service_name: str, target: int): scale = self.apps_api.read_namespaced_deployment_scale(name=name, namespace=self.namespace, _request_timeout=API_TIMEOUT) scale.spec.replicas = target - self.apps_api.replace_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale, + self.apps_api.patch_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale, _request_timeout=API_TIMEOUT) return except client.ApiException as error: @@ -630,7 +645,8 @@ def stop_container(self, service_name, container_id): def restart(self, service): self._create_deployment(service.name, self._deployment_name(service.name), service.container_config, - service.shutdown_seconds, self.get_target(service.name)) + service.shutdown_seconds, self.get_target(service.name), + change_key=service.config_blob) def get_running_container_names(self): pods = self.api.list_pod_for_all_namespaces(field_selector='status.phase==Running', @@ -657,7 +673,7 @@ def new_events(self): return new def start_stateful_container(self, service_name: str, container_name: str, - spec, labels: dict[str, str]): + spec, labels: dict[str, str], change_key:str): # Setup PVC deployment_name = self._dependency_name(service_name, container_name) mounts, volumes = [], [] @@ -692,7 +708,7 @@ def start_stateful_container(self, service_name: str, container_name: str, spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key}) self._create_deployment(service_name, deployment_name, spec.container, 30, 1, labels, volumes=volumes, mounts=mounts, - core_mounts=spec.run_as_core) + core_mounts=spec.run_as_core, change_key=change_key) # Setup a service to direct to the deployment try: @@ -700,7 +716,7 @@ def start_stateful_container(self, service_name: str, container_name: str, service.metadata.labels = labels service.spec.selector = labels service.spec.ports = [V1ServicePort(port=int(_p)) for _p in spec.container.ports] - self.api.replace_namespaced_service(deployment_name, self.namespace, service) + self.api.patch_namespaced_service(deployment_name, self.namespace, service) except ApiException as error: if error.status != 404: raise diff --git a/assemblyline_core/scaler/scaler_server.py b/assemblyline_core/scaler/scaler_server.py index 7b40ccf4..427b9e4e 100644 --- a/assemblyline_core/scaler/scaler_server.py +++ b/assemblyline_core/scaler/scaler_server.py @@ -110,7 +110,8 @@ class ServiceProfile: """ def __init__(self, name: str, container_config: DockerConfig, config_blob:str='', min_instances:int=0, max_instances:int=None, - growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30): + growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30, + dependency_blobs:dict[str, str]=None): """ :param name: Name of the service to manage :param container_config: Instructions on how to start this service @@ -128,6 +129,7 @@ def __init__(self, name: str, container_config: DockerConfig, config_blob:str='' self.low_duty_cycle = 0.5 self.shutdown_seconds = shutdown_seconds self.config_blob = config_blob + self.dependency_blobs = dependency_blobs or {} # How many instances we want, and can have self.min_instances: int = max(0, int(min_instances)) @@ -169,6 +171,12 @@ def max_instances(self) -> int: # this keeps the scaler from running way ahead with its demands when resource caps are reached return min(self._max_instances, self.target_instances + 2) + def set_max_instances(self, value:int): + if value == 0: + self._max_instances = float('inf') + else: + self._max_instances = value + def update(self, delta: float, instances: int, backlog: int, duty_cycle: float): self.last_update = time.time() self.running_instances = instances @@ -377,24 +385,36 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # noinspection PyBroadException try: + # Build the docker config for the dependencies. For now the dependency blob values + # aren't set for the change key going to kubernetes because everything about + # the dependency config should be captured in change key that the function generates + # internally. A change key is set for the service deployment as that includes + # things like the submission params + dependency_config: dict[str, Any] = {} + dependency_blobs: dict[str, str] = {} + for _n, dependency in service.dependencies.items(): + dependency.container = prepare_container(dependency.container) + dependency_config[_n] = dependency + dependency_blobs[_n] = str(dependency) + if service.enabled and (stage == ServiceStage.Off or name not in self.profiles): + # Move to the next service stage (do this first because the container we are starting may care) + if service.update_config and service.update_config.wait_for_update: + self._service_stage_hash.set(name, ServiceStage.Update) + else: + self._service_stage_hash.set(name, ServiceStage.Running) + # Enable this service's dependencies before trying to launch the service containers self.controller.prepare_network(service.name, service.docker_config.allow_internet_access) - for _n, dependency in service.dependencies.items(): - dependency.container = prepare_container(dependency.container) + for _n, dependency in dependency_config.items(): self.controller.start_stateful_container( service_name=service.name, container_name=_n, spec=dependency, - labels={'dependency_for': service.name} + labels={'dependency_for': service.name}, + change_key='' ) - # Move to the next service stage - if service.update_config and service.update_config.wait_for_update: - self._service_stage_hash.set(name, ServiceStage.Update) - else: - self._service_stage_hash.set(name, ServiceStage.Running) - if not service.enabled: self.stop_service(service.name, stage) return @@ -411,13 +431,6 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: docker_config = prepare_container(service.docker_config) config_blob += str(docker_config) - # Build the docker config for the dependencies. - dependency_config = {} - for _n, dependency in service.dependencies.items(): - dependency.container = prepare_container(dependency.container) - dependency_config[_n] = dependency - config_blob += str(sorted(dependency_config.items())) - # Add the service to the list of services being scaled with self.profiles_lock: if name not in self.profiles: @@ -428,6 +441,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: growth=default_settings.growth, shrink=default_settings.shrink, config_blob=config_blob, + dependency_blobs=dependency_blobs, backlog=default_settings.backlog, max_instances=service.licence_count, container_config=docker_config, @@ -439,24 +453,22 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # Update RAM, CPU, licence requirements for running services else: profile = self.profiles[name] - if service.licence_count == 0: - profile._max_instances = float('inf') - else: - profile._max_instances = service.licence_count + profile.set_max_instances(service.licence_count) - if profile.config_blob != config_blob: - self.log.info(f"Updating deployment information for {name}") - # Update the dependencies. Should do nothing if container spec is the same. - # let kubernetes decide if anything needs to change though. - for _n, dependency in dependency_config.items(): + for dependency_name, dependency_blob in dependency_blobs.items(): + if profile.dependency_blobs[dependency_name] != dependency_blob: + self.log.info(f"Updating deployment information for {name}/{dependency_name}") + profile.dependency_blobs[dependency_name] = dependency_blob self.controller.start_stateful_container( service_name=service.name, - container_name=_n, - spec=dependency, - labels={'dependency_for': service.name} + container_name=dependency_name, + spec=dependency_config[dependency_name], + labels={'dependency_for': service.name}, + change_key='' ) - # Update the service itself + if profile.config_blob != config_blob: + self.log.info(f"Updating deployment information for {name}") profile.container_config = docker_config profile.config_blob = config_blob self.controller.restart(profile) diff --git a/assemblyline_core/server_base.py b/assemblyline_core/server_base.py index 0fe7cab3..439bf6f1 100644 --- a/assemblyline_core/server_base.py +++ b/assemblyline_core/server_base.py @@ -181,7 +181,7 @@ class ServiceStage(enum.IntEnum): # If at any time a service is disabled, scaler will stop the dependent containers -def get_service_stage_hash(redis): +def get_service_stage_hash(redis) -> Hash[int]: """A hash from service name to ServiceStage enum values.""" return Hash('service-stage', redis) diff --git a/assemblyline_core/updater/run_updater.py b/assemblyline_core/updater/run_updater.py index 3b74b5c1..2aafdd65 100644 --- a/assemblyline_core/updater/run_updater.py +++ b/assemblyline_core/updater/run_updater.py @@ -1,115 +1,44 @@ """ A process that manages tracking and running update commands for the AL services. - -TODO: - - docker build updates - - If the service update interval changes in datastore move the next update time - """ -from collections import defaultdict +from __future__ import annotations import os -import json import uuid -import random import sched -import shutil -import string -import tempfile import time -from contextlib import contextmanager -from threading import Thread -from typing import Dict +from typing import Any import docker -import yaml from assemblyline.common import isotime -from assemblyline.remote.datatypes.lock import Lock from kubernetes.client import V1Job, V1ObjectMeta, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Volume, \ V1PersistentVolumeClaimVolumeSource, V1VolumeMount, V1EnvVar, V1Container, V1ResourceRequirements, \ V1ConfigMapVolumeSource, V1Secret, V1LocalObjectReference from kubernetes import client, config from kubernetes.client.rest import ApiException -from passlib.hash import bcrypt - -from assemblyline.common.isotime import now_as_iso -from assemblyline.common.security import get_random_password, get_password_hash -from assemblyline.datastore.helper import AssemblylineDatastore from assemblyline.odm.models.service import DockerConfig -from assemblyline.odm.models.user import User -from assemblyline.odm.models.user_settings import UserSettings from assemblyline.remote.datatypes.hash import Hash from assemblyline_core.scaler.controllers.kubernetes_ctl import create_docker_auth_config -from assemblyline_core.server_base import CoreBase, ServiceStage +from assemblyline_core.server_base import CoreBase from assemblyline_core.updater.helper import get_latest_tag_for_service -SERVICE_SYNC_INTERVAL = 30 # How many seconds between checking for new services, or changes in service status UPDATE_CHECK_INTERVAL = 60 # How many seconds per check for outstanding updates CONTAINER_CHECK_INTERVAL = 60 * 5 # How many seconds to wait for checking for new service versions API_TIMEOUT = 90 HEARTBEAT_INTERVAL = 5 -UPDATE_STAGES = [ServiceStage.Update, ServiceStage.Running] - -# Where to find the update directory inside this container. -FILE_UPDATE_DIRECTORY = os.environ.get('FILE_UPDATE_DIRECTORY', None) -# How to identify the update volume as a whole, in a way that the underlying container system recognizes. -FILE_UPDATE_VOLUME = os.environ.get('FILE_UPDATE_VOLUME', FILE_UPDATE_DIRECTORY) # How many past updates to keep for file based updates -UPDATE_FOLDER_LIMIT = 5 NAMESPACE = os.getenv('NAMESPACE', None) -UI_SERVER = os.getenv('UI_SERVER', 'https://nginx') INHERITED_VARIABLES = ['HTTP_PROXY', 'HTTPS_PROXY', 'NO_PROXY', 'http_proxy', 'https_proxy', 'no_proxy'] CLASSIFICATION_HOST_PATH = os.getenv('CLASSIFICATION_HOST_PATH', None) CLASSIFICATION_CONFIGMAP = os.getenv('CLASSIFICATION_CONFIGMAP', None) CLASSIFICATION_CONFIGMAP_KEY = os.getenv('CLASSIFICATION_CONFIGMAP_KEY', 'classification.yml') -@contextmanager -def temporary_api_key(ds: AssemblylineDatastore, user_name: str, permissions=('R', 'W')): - """Creates a context where a temporary API key is available.""" - with Lock(f'user-{user_name}', timeout=10): - name = ''.join(random.choices(string.ascii_lowercase, k=20)) - random_pass = get_random_password(length=48) - user = ds.user.get(user_name) - user.apikeys[name] = { - "password": bcrypt.hash(random_pass), - "acl": permissions - } - ds.user.save(user_name, user) - - try: - yield f"{name}:{random_pass}" - finally: - with Lock(f'user-{user_name}', timeout=10): - user = ds.user.get(user_name) - user.apikeys.pop(name) - ds.user.save(user_name, user) - - -def chmod(directory, mask): - try: - os.chmod(directory, mask) - except PermissionError as error: - # If we are using an azure file share, we may not be able to change the permissions - # on files, everything might be set to a pre defined permission level by the file share - if "Operation not permitted" in str(error): - return - raise - - class DockerUpdateInterface: - """Wrap docker interface for the commands used in the update process. - - Volumes used for file updating on the docker interface are simply a host directory that gets - mounted at different depths and locations in each container. - - FILE_UPDATE_VOLUME gives us the path of the directory on the host, so that we can mount - it properly on new containers we launch. FILE_UPDATE_DIRECTORY gives us the path - that it is mounted at in the update manager container. - """ + """Wrap docker interface for the commands used in the update process.""" def __init__(self, log_level="INFO"): self.client = docker.from_env() @@ -439,28 +368,8 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None): super().__init__('assemblyline.service.updater', logger=logger, datastore=datastore, redis_persist=redis_persist, redis=redis) - if not FILE_UPDATE_DIRECTORY: - raise RuntimeError("The updater process must be run within the orchestration environment, " - "the update volume must be mounted, and the path to the volume must be " - "set in the environment variable FILE_UPDATE_DIRECTORY. Setting " - "FILE_UPDATE_DIRECTORY directly may be done for testing.") - - # The directory where we want working temporary directories to be created. - # Building our temporary directories in the persistent update volume may - # have some performance down sides, but may help us run into fewer docker FS overlay - # cleanup issues. Try to flush it out every time we start. This service should - # be a singleton anyway. - - ################################# DELETE FOR PSU CHANGE######################### - # self.temporary_directory = os.path.join(FILE_UPDATE_DIRECTORY, '.tmp') - # shutil.rmtree(self.temporary_directory, ignore_errors=True) - # os.makedirs(self.temporary_directory) - ################################# DELETE FOR PSU CHANGE######################### - - self.container_update = Hash('container-update', self.redis_persist) - self.services = Hash('service-updates', self.redis_persist) - self.latest_service_tags = Hash('service-tags', self.redis_persist) - self.running_updates: Dict[str, Thread] = {} + self.container_update: Hash[dict[str, Any]] = Hash('container-update', self.redis_persist) + self.latest_service_tags: Hash[dict[str, str]] = Hash('service-tags', self.redis_persist) # Prepare a single threaded scheduler self.scheduler = sched.scheduler() @@ -477,67 +386,6 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None): else: self.controller = DockerUpdateInterface(log_level=self.config.logging.log_level) - def sync_services(self): - """Download the service list and make sure our settings are up to date""" - self.scheduler.enter(SERVICE_SYNC_INTERVAL, 0, self.sync_services) - existing_services = (set(self.services.keys()) | - set(self.container_update.keys()) | - set(self.latest_service_tags.keys())) - discovered_services = [] - - # Get all the service data - for service in self.datastore.list_all_services(full=True): - discovered_services.append(service.name) - - # Ensure that any disabled services are not being updated - if not service.enabled and self.services.exists(service.name): - self.log.info(f"Service updates disabled for {service.name}") - self.services.pop(service.name) - - if not service.enabled: - continue - - # Ensure that any enabled services with an update config are being updated - stage = self.get_service_stage(service.name) - record = self.services.get(service.name) - - if stage in UPDATE_STAGES and service.update_config: - # Stringify and hash the the current update configuration - config_hash = hash(json.dumps(service.update_config.as_primitives())) - - # If we can update, but there is no record, create one - if not record: - self.log.info(f"Service updates enabled for {service.name}") - self.services.add( - service.name, - dict( - next_update=now_as_iso(), - previous_update=now_as_iso(-10**10), - config_hash=config_hash, - sha256=None, - ) - ) - else: - # If there is a record, check that its configuration hash is still good - # If an update is in progress, it may overwrite this, but we will just come back - # and reapply this again in the iteration after that - if record.get('config_hash', None) != config_hash: - record['next_update'] = now_as_iso() - record['config_hash'] = config_hash - self.services.set(service.name, record) - - if stage == ServiceStage.Update: - if (record and record.get('sha256', None) is not None) or not service.update_config: - self._service_stage_hash.set(service.name, ServiceStage.Running) - - # Remove services we have locally or in redis that have been deleted from the database - for stray_service in existing_services - set(discovered_services): - self.log.info(f"Service updates disabled for {stray_service}") - self.services.pop(stray_service) - self._service_stage_hash.pop(stray_service) - self.container_update.pop(stray_service) - self.latest_service_tags.pop(stray_service) - def container_updates(self): """Go through the list of services and check what are the latest tags for it""" self.scheduler.enter(UPDATE_CHECK_INTERVAL, 0, self.container_updates) @@ -601,8 +449,11 @@ def container_updates(self): def container_versions(self): """Go through the list of services and check what are the latest tags for it""" self.scheduler.enter(CONTAINER_CHECK_INTERVAL, 0, self.container_versions) + existing_services = set(self.container_update.keys()) | set(self.latest_service_tags.keys()) + discovered_services: list[str] = [] for service in self.datastore.list_all_services(full=True): + discovered_services.append(service.name) if not service.enabled: continue @@ -611,11 +462,16 @@ def container_versions(self): self.latest_service_tags.set(service.name, {'auth': auth, 'image': image_name, service.update_channel: tag_name}) + # Remove services we have locally or in redis that have been deleted from the database + for stray_service in existing_services - set(discovered_services): + self.log.info(f"Service updates disabled for {stray_service}") + self._service_stage_hash.pop(stray_service) + self.container_update.pop(stray_service) + self.latest_service_tags.pop(stray_service) + def try_run(self): """Run the scheduler loop until told to stop.""" # Do an initial call to the main methods, who will then be registered with the scheduler - self.sync_services() - # self.update_services() self.container_versions() self.container_updates() self.heartbeat() @@ -623,9 +479,10 @@ def try_run(self): # Run as long as we need to while self.running: delay = self.scheduler.run(False) - time.sleep(min(delay, 0.1)) + if delay: + time.sleep(min(delay, 0.1)) - def heartbeat(self): + def heartbeat(self, timestamp: int = None): """Periodically touch a file on disk. Since tasks are run serially, the delay between touches will be the maximum of @@ -633,191 +490,7 @@ def heartbeat(self): """ if self.config.logging.heartbeat_file: self.scheduler.enter(HEARTBEAT_INTERVAL, 0, self.heartbeat) - super().heartbeat() - - def update_services(self): - """Check if we need to update any services. - - Spin off a thread to actually perform any updates. Don't allow multiple threads per service. - """ - self.scheduler.enter(UPDATE_CHECK_INTERVAL, 0, self.update_services) - - # Check for finished update threads - self.running_updates = {name: thread for name, thread in self.running_updates.items() if thread.is_alive()} - - # Check if its time to try to update the service - for service_name, data in self.services.items().items(): - if data['next_update'] <= now_as_iso() and service_name not in self.running_updates: - self.log.info(f"Time to update {service_name}") - self.running_updates[service_name] = Thread( - target=self.run_update, - kwargs=dict(service_name=service_name) - ) - self.running_updates[service_name].start() - - def run_update(self, service_name): - """Common setup and tear down for all update types.""" - # noinspection PyBroadException - try: - # Check for new update with service specified update method - service = self.datastore.get_service_with_delta(service_name) - update_method = service.update_config.method - update_data = self.services.get(service_name) - update_hash = None - - try: - # Actually run the update method - if update_method == 'run': - update_hash = self.do_file_update( - service=service, - previous_hash=update_data['sha256'], - previous_update=update_data['previous_update'] - ) - elif update_method == 'build': - update_hash = self.do_build_update() - - # If we have performed an update, write that data - if update_hash is not None and update_hash != update_data['sha256']: - update_data['sha256'] = update_hash - update_data['previous_update'] = now_as_iso() - else: - update_hash = None - - finally: - # Update the next service update check time, don't update the config_hash, - # as we don't want to disrupt being re-run if our config has changed during this run - update_data['next_update'] = now_as_iso(service.update_config.update_interval_seconds) - self.services.set(service_name, update_data) - - if update_hash: - self.log.info(f"New update applied for {service_name}. Restarting service.") - self.controller.restart(service_name=service_name) - - except BaseException: - self.log.exception("An error occurred while running an update for: " + service_name) - - def do_build_update(self): - """Update a service by building a new container to run.""" - raise NotImplementedError() - - def do_file_update(self, service, previous_hash, previous_update): - """Update a service by running a container to get new files.""" - temp_directory = tempfile.mkdtemp(dir=self.temporary_directory) - chmod(temp_directory, 0o777) - input_directory = os.path.join(temp_directory, 'input_directory') - output_directory = os.path.join(temp_directory, 'output_directory') - service_dir = os.path.join(FILE_UPDATE_DIRECTORY, service.name) - image_variables = defaultdict(str) - image_variables.update(self.config.services.image_variables) - - try: - # Use chmod directly to avoid effects of umask - os.makedirs(input_directory) - chmod(input_directory, 0o755) - os.makedirs(output_directory) - chmod(output_directory, 0o777) - - username = self.ensure_service_account() - - with temporary_api_key(self.datastore, username) as api_key: - - # Write out the parameters we want to pass to the update container - with open(os.path.join(input_directory, 'config.yaml'), 'w') as fh: - yaml.safe_dump({ - 'previous_update': previous_update, - 'previous_hash': previous_hash, - 'sources': [x.as_primitives() for x in service.update_config.sources], - 'api_user': username, - 'api_key': api_key, - 'ui_server': UI_SERVER - }, fh) - - # Run the update container - run_options = service.update_config.run_options - run_options.image = string.Template(run_options.image).safe_substitute(image_variables) - self.controller.launch( - name=service.name, - docker_config=run_options, - mounts=[ - { - 'volume': FILE_UPDATE_VOLUME, - 'source_path': os.path.relpath(temp_directory, start=FILE_UPDATE_DIRECTORY), - 'dest_path': '/mount/' - }, - ], - env={ - 'UPDATE_CONFIGURATION_PATH': '/mount/input_directory/config.yaml', - 'UPDATE_OUTPUT_PATH': '/mount/output_directory/' - }, - network=f'service-net-{service.name}', - blocking=True, - ) - - # Read out the results from the output container - results_meta_file = os.path.join(output_directory, 'response.yaml') - - if not os.path.exists(results_meta_file) or not os.path.isfile(results_meta_file): - self.log.warning(f"Update produced no output for {service.name}") - return None - - with open(results_meta_file) as rf: - results_meta = yaml.safe_load(rf) - update_hash = results_meta.get('hash', None) - - # Erase the results meta file - os.unlink(results_meta_file) - - # Get a timestamp for now, and switch it to basic format representation of time - # Still valid iso 8601, and : is sometimes a restricted character - timestamp = now_as_iso().replace(":", "") - - # FILE_UPDATE_DIRECTORY/{service_name} is the directory mounted to the service, - # the service sees multiple directories in that directory, each with a timestamp - destination_dir = os.path.join(service_dir, service.name + '_' + timestamp) - shutil.move(output_directory, destination_dir) - - # Remove older update files, due to the naming scheme, older ones will sort first lexically - existing_folders = [] - for folder_name in os.listdir(service_dir): - folder_path = os.path.join(service_dir, folder_name) - if os.path.isdir(folder_path) and folder_name.startswith(service.name): - existing_folders.append(folder_name) - existing_folders.sort() - - self.log.info(f'There are {len(existing_folders)} update folders for {service.name} in cache.') - if len(existing_folders) > UPDATE_FOLDER_LIMIT: - extra_count = len(existing_folders) - UPDATE_FOLDER_LIMIT - self.log.info(f'We will only keep {UPDATE_FOLDER_LIMIT} updates, deleting {extra_count}.') - for extra_folder in existing_folders[:extra_count]: - # noinspection PyBroadException - try: - shutil.rmtree(os.path.join(service_dir, extra_folder)) - except Exception: - self.log.exception('Failed to delete update folder') - - return update_hash - finally: - # If the working directory is still there for any reason erase it - shutil.rmtree(temp_directory, ignore_errors=True) - - def ensure_service_account(self): - """Check that the update service account exists, if it doesn't, create it.""" - uname = 'update_service_account' - - if self.datastore.user.get_if_exists(uname): - return uname - - user_data = User({ - "agrees_with_tos": "NOW", - "classification": "RESTRICTED", - "name": "Update Account", - "password": get_password_hash(''.join(random.choices(string.ascii_letters, k=20))), - "uname": uname, - "type": ["signature_importer"] - }) - self.datastore.user.save(uname, user_data) - self.datastore.user_settings.save(uname, UserSettings()) - return uname + super().heartbeat(timestamp) if __name__ == '__main__': diff --git a/assemblyline_core/updater/url_update.py b/assemblyline_core/updater/url_update.py deleted file mode 100644 index 85f0687c..00000000 --- a/assemblyline_core/updater/url_update.py +++ /dev/null @@ -1,176 +0,0 @@ -import json -import logging -import os -import shutil -import time - -from copy import deepcopy - -import certifi -import requests -import yaml - -from assemblyline.common import log as al_log -from assemblyline.common.digests import get_sha256_for_file -from assemblyline.common.isotime import iso_to_epoch - -al_log.init_logging('service_updater') - -LOGGER = logging.getLogger('assemblyline.updater.service') - - -UPDATE_CONFIGURATION_PATH = os.environ.get('UPDATE_CONFIGURATION_PATH', None) -UPDATE_OUTPUT_PATH = os.environ.get('UPDATE_OUTPUT_PATH', "/tmp/updater_output") - - -def test_file(_): - return True - - -def url_update(test_func=test_file) -> None: - """ - Using an update configuration file as an input, which contains a list of sources, download all the file(s) which - have been modified since the last update. - """ - update_config = {} - # Load configuration - if UPDATE_CONFIGURATION_PATH and os.path.exists(UPDATE_CONFIGURATION_PATH): - with open(UPDATE_CONFIGURATION_PATH, 'r') as yml_fh: - update_config = yaml.safe_load(yml_fh) - else: - LOGGER.warning("Could not find update configuration file.") - exit(1) - - # Cleanup output path - if os.path.exists(UPDATE_OUTPUT_PATH): - if os.path.isdir(UPDATE_OUTPUT_PATH): - shutil.rmtree(UPDATE_OUTPUT_PATH) - else: - os.unlink(UPDATE_OUTPUT_PATH) - os.makedirs(UPDATE_OUTPUT_PATH) - - # Get sources - sources = update_config.get('sources', None) - # Exit if no update sources given - if not sources: - exit() - - # Parse updater configuration - previous_update = update_config.get('previous_update', None) - previous_hash = update_config.get('previous_hash', None) or {} - if previous_hash: - previous_hash = json.loads(previous_hash) - if isinstance(previous_update, str): - previous_update = iso_to_epoch(previous_update) - - # Create a requests session - session = requests.Session() - - files_sha256 = {} - - # Go through each source and download file - for source in sources: - uri = source['uri'] - name = source['name'] - - if not uri or not name: - LOGGER.warning(f"Invalid source: {source}") - continue - - LOGGER.info(f"Downloading file '{name}' from uri '{uri}' ...") - - username = source.get('username', None) - password = source.get('password', None) - auth = (username, password) if username and password else None - ca_cert = source.get('ca_cert', None) - ignore_ssl_errors = source.get('ssl_ignore_errors', False) - - headers = source.get('headers', None) - - if ca_cert: - # Add certificate to requests - cafile = certifi.where() - with open(cafile, 'a') as ca_editor: - ca_editor.write(f"\n{ca_cert}") - - session.verify = not ignore_ssl_errors - - try: - # Check the response header for the last modified date - response = session.head(uri, auth=auth, headers=headers) - last_modified = response.headers.get('Last-Modified', None) - if last_modified: - # Convert the last modified time to epoch - last_modified = time.mktime(time.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z")) - - # Compare the last modified time with the last updated time - if update_config.get('previous_update', None) and last_modified <= previous_update: - # File has not been modified since last update, do nothing - LOGGER.info("File has not changed since last time, Skipping...") - continue - - if update_config.get('previous_update', None): - previous_update = time.strftime("%a, %d %b %Y %H:%M:%S %Z", time.gmtime(previous_update)) - if headers: - headers['If-Modified-Since'] = previous_update - else: - headers = { - 'If-Modified-Since': previous_update, - } - - response = session.get(uri, auth=auth, headers=headers) - - # Check the response code - if response.status_code == requests.codes['not_modified']: - # File has not been modified since last update, do nothing - LOGGER.info("File has not changed since last time, Skipping...") - continue - elif response.ok: - file_path = os.path.join(UPDATE_OUTPUT_PATH, name) - with open(file_path, 'wb') as f: - f.write(response.content) - - if not test_func(file_path): - os.unlink(file_path) - LOGGER.warning(f"The downloaded file was invalid. It will not be part of this update...") - continue - - # Append the SHA256 of the file to a list of downloaded files - sha256 = get_sha256_for_file(file_path) - if previous_hash.get(name, None) != sha256: - files_sha256[name] = sha256 - else: - LOGGER.info("File as the same hash as last time. Skipping...") - - LOGGER.info("File successfully downloaded!") - except requests.Timeout: - LOGGER.warning(f"Cannot find the file for source {name} with url {uri} - (Timeout)") - continue - except Exception as e: - # Catch all other types of exceptions such as ConnectionError, ProxyError, etc. - LOGGER.warning(f"Source {name} failed with error: {str(e)}") - - if files_sha256: - new_hash = deepcopy(previous_hash) - new_hash.update(files_sha256) - - # Check if the new update hash matches the previous update hash - if new_hash == previous_hash: - # Update file(s) not changed, delete the downloaded files and exit - shutil.rmtree(UPDATE_OUTPUT_PATH, ignore_errors=True) - exit() - - # Create the response yaml - with open(os.path.join(UPDATE_OUTPUT_PATH, 'response.yaml'), 'w') as yml_fh: - yaml.safe_dump(dict( - hash=json.dumps(new_hash), - ), yml_fh) - - LOGGER.info("Service update file(s) successfully downloaded") - - # Close the requests session - session.close() - - -if __name__ == '__main__': - url_update() diff --git a/test/test_simulation.py b/test/test_simulation.py index d8b9bd20..95996ee8 100644 --- a/test/test_simulation.py +++ b/test/test_simulation.py @@ -3,7 +3,7 @@ Needs the datastore and filestore to be running, otherwise these test are stand alone. """ - +from __future__ import annotations import hashlib import json import typing @@ -11,7 +11,7 @@ import threading import logging from tempfile import NamedTemporaryFile -from typing import List +from typing import TYPE_CHECKING, Any import pytest @@ -39,11 +39,14 @@ from mocking import MockCollection from test_scheduler import dummy_service +if TYPE_CHECKING: + from redis import Redis + RESPONSE_TIMEOUT = 60 @pytest.fixture(scope='module') -def redis(redis_connection): +def redis(redis_connection: Redis[Any]): redis_connection.flushdb() yield redis_connection redis_connection.flushdb() @@ -85,9 +88,13 @@ def try_run(self): if instructions.get('hold', False): queue = get_service_queue(self.service_name, self.dispatch_client.redis) queue.push(0, task.as_primitives()) + self.log.info(f"{self.service_name} Requeued task to {queue.name} holding for {instructions['hold']}") _global_semaphore.acquire(blocking=True, timeout=instructions['hold']) continue + if instructions.get('lock', False): + _global_semaphore.acquire(blocking=True, timeout=instructions['lock']) + if 'drop' in instructions: if instructions['drop'] >= hits: self.drops[task.fileinfo.sha256] = self.drops.get(task.fileinfo.sha256, 0) + 1 @@ -208,8 +215,8 @@ def core(request, redis, filestore, config): # Block logs from being initialized, it breaks under pytest if you create new stream handlers from assemblyline.common import log as al_log al_log.init_logging = lambda *args: None - dispatcher.TIMEOUT_EXTRA_TIME = 0 - dispatcher.TIMEOUT_TEST_INTERVAL = 1 + dispatcher.TIMEOUT_EXTRA_TIME = 1 + dispatcher.TIMEOUT_TEST_INTERVAL = 3 # al_log.init_logging("simulation") ds = forge.get_datastore() @@ -224,7 +231,7 @@ def core(request, redis, filestore, config): threads = [] fields.filestore = filestore - threads: List[ServerBase] = [ + threads: list[ServerBase] = [ # Start the ingester components ingester, @@ -310,9 +317,14 @@ def ready_extract(core, children): def test_deduplication(core, metrics): + global _global_semaphore # ------------------------------------------------------------------------------- # Submit two identical jobs, check that they get deduped by ingester - sha, size = ready_body(core) + sha, size = ready_body(core, { + 'pre': {'lock': 60} + }) + + _global_semaphore = threading.Semaphore(value=0) for _ in range(2): core.ingest_queue.push(SubmissionInput(dict( @@ -334,6 +346,9 @@ def test_deduplication(core, metrics): )] )).as_primitives()) + metrics.expect('ingester', 'duplicates', 1) + _global_semaphore.release() + notification_queue = NamedQueue('nq-output-queue-one', core.redis) first_task = notification_queue.pop(timeout=RESPONSE_TIMEOUT) @@ -373,6 +388,7 @@ def test_deduplication(core, metrics): name='abc123' )] )).as_primitives()) + _global_semaphore.release() notification_queue = NamedQueue('nq-2', core.redis) third_task = notification_queue.pop(timeout=RESPONSE_TIMEOUT) @@ -389,7 +405,6 @@ def test_deduplication(core, metrics): metrics.expect('ingester', 'submissions_ingested', 3) metrics.expect('ingester', 'submissions_completed', 2) metrics.expect('ingester', 'files_completed', 2) - metrics.expect('ingester', 'duplicates', 1) metrics.expect('dispatcher', 'submissions_completed', 2) metrics.expect('dispatcher', 'files_completed', 2) @@ -958,7 +973,11 @@ def test_plumber_clearing(core, metrics): metrics.expect('ingester', 'submissions_ingested', 1) service_queue = get_service_queue('pre', core.redis) - while service_queue.length() != 1: + + start = time.time() + while service_queue.length() < 1: + if time.time() - start > RESPONSE_TIMEOUT: + pytest.fail(f'Found { service_queue.length()}') time.sleep(0.1) service_delta = core.ds.service_delta.get('pre') diff --git a/test/test_updater.py b/test/test_updater.py index a8dfdd3d..31ddbc84 100644 --- a/test/test_updater.py +++ b/test/test_updater.py @@ -32,35 +32,35 @@ def updater(clean_redis: redis.Redis, ds, updater_directory): return run_updater.ServiceUpdater(redis_persist=clean_redis, redis=clean_redis, datastore=ds) -def test_service_changes(updater: run_updater.ServiceUpdater): - ds: MockDatastore = updater.datastore.ds - # Base conditions, nothing anywhere - assert updater.services.length() == 0 - assert len(updater.datastore.list_all_services()) == 0 +# def test_service_changes(updater: run_updater.ServiceUpdater): +# ds: MockDatastore = updater.datastore.ds +# # Base conditions, nothing anywhere +# assert updater.services.length() == 0 +# assert len(updater.datastore.list_all_services()) == 0 - # Nothing does nothing - updater.sync_services() - assert updater.services.length() == 0 - assert len(updater.datastore.list_all_services()) == 0 +# # Nothing does nothing +# updater.sync_services() +# assert updater.services.length() == 0 +# assert len(updater.datastore.list_all_services()) == 0 - # Any non-disabled services should be picked up by the updater - create_services(updater.datastore, limit=1) - for data in ds._collections['service']._docs.values(): - data.enabled = True - updater._service_stage_hash.set(data.name, ServiceStage.Update) - data.update_config = random_model_obj(UpdateConfig) - assert len(updater.datastore.list_all_services(full=True)) == 1 - updater.sync_services() - assert updater.services.length() == 1 - assert len(updater.datastore.list_all_services(full=True)) == 1 +# # Any non-disabled services should be picked up by the updater +# create_services(updater.datastore, limit=1) +# for data in ds._collections['service']._docs.values(): +# data.enabled = True +# updater._service_stage_hash.set(data.name, ServiceStage.Update) +# data.update_config = random_model_obj(UpdateConfig) +# assert len(updater.datastore.list_all_services(full=True)) == 1 +# updater.sync_services() +# assert updater.services.length() == 1 +# assert len(updater.datastore.list_all_services(full=True)) == 1 - # It should be scheduled to update ASAP - for data in updater.services.items().values(): - assert data['next_update'] <= now_as_iso() +# # It should be scheduled to update ASAP +# for data in updater.services.items().values(): +# assert data['next_update'] <= now_as_iso() - # Disable the service and it will disappear from redis - for data in ds._collections['service']._docs.values(): - data.enabled = False - updater.sync_services() - assert updater.services.length() == 0 - assert len(updater.datastore.list_all_services(full=True)) == 1 +# # Disable the service and it will disappear from redis +# for data in ds._collections['service']._docs.values(): +# data.enabled = False +# updater.sync_services() +# assert updater.services.length() == 0 +# assert len(updater.datastore.list_all_services(full=True)) == 1