diff --git a/assemblyline_core/dispatching/dispatcher.py b/assemblyline_core/dispatching/dispatcher.py index 25168458..47877f09 100644 --- a/assemblyline_core/dispatching/dispatcher.py +++ b/assemblyline_core/dispatching/dispatcher.py @@ -1,3 +1,4 @@ +from __future__ import annotations import uuid import os import threading @@ -57,9 +58,9 @@ class Action(enum.IntEnum): class DispatchAction: kind: Action sid: str = dataclasses.field(compare=False) - sha: str = dataclasses.field(compare=False, default=None) - service_name: str = dataclasses.field(compare=False, default=None) - worker_id: str = dataclasses.field(compare=False, default=None) + sha: Optional[str] = dataclasses.field(compare=False, default=None) + service_name: Optional[str] = dataclasses.field(compare=False, default=None) + worker_id: Optional[str] = dataclasses.field(compare=False, default=None) data: Any = dataclasses.field(compare=False, default=None) @@ -103,7 +104,7 @@ def __init__(self, submission, completed_queue): self.dropped_files = set() self.service_results: Dict[Tuple[str, str], ResultSummary] = {} - self.service_errors: Dict[Tuple[str, str], dict] = {} + self.service_errors: Dict[Tuple[str, str], str] = {} self.service_attempts: Dict[Tuple[str, str], int] = defaultdict(int) self.queue_keys: Dict[Tuple[str, str], bytes] = {} self.running_services: Set[Tuple[str, str]] = set() @@ -198,8 +199,8 @@ def __init__(self, datastore=None, redis=None, redis_persist=None, logger=None, self.apm_client = elasticapm.Client(server_url=self.config.core.metrics.apm_server.server_url, service_name="dispatcher") - self._service_timeouts = TimeoutTable() - self._submission_timeouts = TimeoutTable() + self._service_timeouts: TimeoutTable[Tuple[str, str, str], str] = TimeoutTable() + self._submission_timeouts: TimeoutTable[str, None] = TimeoutTable() # Setup queues for work to be divided into self.process_queues: List[PriorityQueue[DispatchAction]] = [PriorityQueue() for _ in range(RESULT_THREADS)] @@ -404,7 +405,7 @@ def dispatch_file(self, task: SubmissionTask, sha256: str) -> bool: # Go through each round of the schedule removing complete/failed services # Break when we find a stage that still needs processing - outstanding = {} + outstanding: dict[str, Service] = {} started_stages = [] with elasticapm.capture_span('check_result_table'): while schedule and not outstanding: @@ -1156,8 +1157,8 @@ def handle_commands(self): command = DispatcherCommandMessage(message) if command.kind == CREATE_WATCH: - payload: CreateWatch = command.payload() - self.setup_watch_queue(payload.submission, payload.queue_name) + watch_payload: CreateWatch = command.payload() + self.setup_watch_queue(watch_payload.submission, watch_payload.queue_name) elif command.kind == LIST_OUTSTANDING: payload: ListOutstanding = command.payload() self.list_outstanding(payload.submission, payload.response_queue) @@ -1192,7 +1193,7 @@ def setup_watch_queue(self, sid, queue_name): @elasticapm.capture_span(span_type='dispatcher') def list_outstanding(self, sid: str, queue_name: str): response_queue = NamedQueue(queue_name, host=self.redis) - outstanding = defaultdict(int) + outstanding: defaultdict[str, int] = defaultdict(int) task = self.tasks.get(sid) if task: for sha, service_name in list(task.queue_keys.keys()): @@ -1290,7 +1291,7 @@ def timeout_backstop(self): def recover_submission(self, sid: str, message: str) -> bool: # Make sure we can load the submission body - submission: Submission = self.datastore.submission.get_if_exists(sid) + submission: Optional[Submission] = self.datastore.submission.get_if_exists(sid) if not submission: return False if submission.state != 'submitted': diff --git a/assemblyline_core/dispatching/schedules.py b/assemblyline_core/dispatching/schedules.py index d64fb82f..f1cd8a6a 100644 --- a/assemblyline_core/dispatching/schedules.py +++ b/assemblyline_core/dispatching/schedules.py @@ -1,4 +1,5 @@ -from typing import List, Dict, cast +from __future__ import annotations +from typing import Dict, cast import logging import os @@ -26,7 +27,7 @@ def __init__(self, datastore: AssemblylineDatastore, config: Config, redis): self.services = cast(Dict[str, Service], CachedObject(self._get_services)) self.service_stage = get_service_stage_hash(redis) - def build_schedule(self, submission: Submission, file_type: str) -> List[Dict[str, Service]]: + def build_schedule(self, submission: Submission, file_type: str) -> list[dict[str, Service]]: all_services = dict(self.services) # Load the selected and excluded services by category @@ -38,7 +39,7 @@ def build_schedule(self, submission: Submission, file_type: str) -> List[Dict[st selected = self.expand_categories(submission.params.services.selected) # Add all selected, accepted, and not rejected services to the schedule - schedule: List[Dict[str, Service]] = [{} for _ in self.config.services.stages] + schedule: list[dict[str, Service]] = [{} for _ in self.config.services.stages] services = list(set(selected) - set(excluded) - set(runtime_excluded)) selected = [] skipped = [] @@ -61,7 +62,7 @@ def build_schedule(self, submission: Submission, file_type: str) -> List[Dict[st return schedule - def expand_categories(self, services: List[str]) -> List[str]: + def expand_categories(self, services: list[str]) -> list[str]: """Expands the names of service categories found in the list of services. Args: @@ -74,7 +75,7 @@ def expand_categories(self, services: List[str]) -> List[str]: categories = self.categories() found_services = [] - seen_categories = set() + seen_categories: set[str] = set() while services: name = services.pop() @@ -94,8 +95,8 @@ def expand_categories(self, services: List[str]) -> List[str]: # Use set to remove duplicates, set is more efficient in batches return list(set(found_services)) - def categories(self) -> Dict[str, List[str]]: - all_categories = {} + def categories(self) -> Dict[str, list[str]]: + all_categories: dict[str, list[str]] = {} for service in self.services.values(): try: all_categories[service.category].append(service.name) diff --git a/assemblyline_core/dispatching/timeout.py b/assemblyline_core/dispatching/timeout.py index 90df11b7..af1104ef 100644 --- a/assemblyline_core/dispatching/timeout.py +++ b/assemblyline_core/dispatching/timeout.py @@ -1,27 +1,28 @@ """ A data structure encapsulating the timeout logic for the dispatcher. """ +from __future__ import annotations import queue import time from queue import PriorityQueue from dataclasses import dataclass, field -from typing import TypeVar, Dict +from typing import TypeVar, Generic, Hashable -KeyType = TypeVar('KeyType') +KeyType = TypeVar('KeyType', bound=Hashable) DataType = TypeVar('DataType') @dataclass(order=True) -class TimeoutItem: +class TimeoutItem(Generic[KeyType, DataType]): expiry: float key: KeyType = field(compare=False) data: DataType = field(compare=False) -class TimeoutTable: +class TimeoutTable(Generic[KeyType, DataType]): def __init__(self): self.timeout_queue: PriorityQueue[TimeoutItem] = PriorityQueue() - self.event_data: Dict[KeyType, TimeoutItem] = {} + self.event_data: dict[KeyType, TimeoutItem] = {} def set(self, key: KeyType, timeout: float, data: DataType): # If a timeout is set repeatedly with the same key, only the last one will count @@ -37,7 +38,7 @@ def clear(self, key: KeyType): def __contains__(self, item): return item in self.event_data - def timeouts(self) -> Dict[KeyType, DataType]: + def timeouts(self) -> dict[KeyType, DataType]: found = {} try: now = time.time() diff --git a/assemblyline_core/ingester/ingester.py b/assemblyline_core/ingester/ingester.py index f22804aa..3560da5b 100644 --- a/assemblyline_core/ingester/ingester.py +++ b/assemblyline_core/ingester/ingester.py @@ -49,8 +49,8 @@ _retry_delay = 60 * 4 # Wait 4 minutes to retry _max_time = 2 * 24 * 60 * 60 # Wait 2 days for responses. HOUR_IN_SECONDS = 60 * 60 -INGEST_THREADS = environ.get('INGESTER_INGEST_THREADS', 1) -SUBMIT_THREADS = environ.get('INGESTER_SUBMIT_THREADS', 4) +INGEST_THREADS = int(environ.get('INGESTER_INGEST_THREADS', 1)) +SUBMIT_THREADS = int(environ.get('INGESTER_SUBMIT_THREADS', 4)) def must_drop(length: int, maximum: int) -> bool: @@ -79,11 +79,11 @@ def must_drop(length: int, maximum: int) -> bool: def determine_resubmit_selected(selected: List[str], resubmit_to: List[str]) -> Optional[List[str]]: resubmit_selected = None - selected = set(selected) - resubmit_to = set(resubmit_to) + _selected = set(selected) + _resubmit_to = set(resubmit_to) - if not selected.issuperset(resubmit_to): - resubmit_selected = sorted(selected.union(resubmit_to)) + if not _selected.issuperset(_resubmit_to): + resubmit_selected = sorted(_selected.union(_resubmit_to)) return resubmit_selected @@ -196,7 +196,7 @@ def __init__(self, datastore=None, logger=None, classification=None, redis=None, self.retry_queue = PriorityQueue('m-retry', self.redis_persist) # Internal, timeout watch queue - self.timeout_queue = PriorityQueue('m-timeout', self.redis) + self.timeout_queue: PriorityQueue[str] = PriorityQueue('m-timeout', self.redis) # Internal, queue for processing duplicates # When a duplicate file is detected (same cache key => same file, and same diff --git a/assemblyline_core/scaler/controllers/kubernetes_ctl.py b/assemblyline_core/scaler/controllers/kubernetes_ctl.py index 1d363c83..00158b00 100644 --- a/assemblyline_core/scaler/controllers/kubernetes_ctl.py +++ b/assemblyline_core/scaler/controllers/kubernetes_ctl.py @@ -10,9 +10,9 @@ from typing import Optional, Tuple import urllib3 -import kubernetes -from kubernetes import client, config -from kubernetes.client import ExtensionsV1beta1Deployment, ExtensionsV1beta1DeploymentSpec, V1PodTemplateSpec, \ + +from kubernetes import client, config, watch +from kubernetes.client import V1Deployment, V1DeploymentSpec, V1PodTemplateSpec, \ V1PodSpec, V1ObjectMeta, V1Volume, V1Container, V1VolumeMount, V1EnvVar, V1ConfigMapVolumeSource, \ V1PersistentVolumeClaimVolumeSource, V1LabelSelector, V1ResourceRequirements, V1PersistentVolumeClaim, \ V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \ @@ -43,7 +43,7 @@ } -class TypelessWatch(kubernetes.watch.Watch): +class TypelessWatch(watch.Watch): """A kubernetes watch object that doesn't marshal the response.""" def get_return_type(self, func): @@ -476,16 +476,16 @@ def _create_containers(self, service_name: str, deployment_name: str, container_ )] def _create_deployment(self, service_name: str, deployment_name: str, docker_config: DockerConfig, - shutdown_seconds: int, scale: int, labels:dict[str,str]=None, + shutdown_seconds: int, scale: int, labels:dict[str,str]=None, volumes:list[V1Volume]=None, mounts:list[V1VolumeMount]=None, core_mounts:bool=False, change_key:str=''): - # Build a cache key to check for changes, just trying to only patch what changed + # Build a cache key to check for changes, just trying to only patch what changed # will still potentially result in a lot of restarts due to different kubernetes # systems returning differently formatted data change_key = ( - deployment_name + change_key + str(docker_config) + str(shutdown_seconds) + + deployment_name + change_key + str(docker_config) + str(shutdown_seconds) + str(sorted((labels or {}).items())) + str(volumes) + str(mounts) + str(core_mounts) - ) + ) # Check if a deployment already exists, and if it does check if it has the same change key set replace = None @@ -498,7 +498,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con except ApiException as error: if error.status != 404: raise - + # If we have been given a username or password for the registry, we have to # update it, if we haven't been, make sure its been cleaned up in the system # so we don't leave passwords lying around @@ -543,7 +543,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con all_labels['section'] = 'core' all_labels.update(labels or {}) - # Build set of volumes, first the global mounts, then the core specific ones, + # Build set of volumes, first the global mounts, then the core specific ones, # then the ones specific to this container only all_volumes: list[V1Volume] = [] all_mounts: list[V1VolumeMount] = [] @@ -574,14 +574,14 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con spec=pod, ) - spec = ExtensionsV1beta1DeploymentSpec( + spec = V1DeploymentSpec( replicas=int(scale), revision_history_limit=0, selector=V1LabelSelector(match_labels=all_labels), template=template, ) - deployment = ExtensionsV1beta1Deployment( + deployment = V1Deployment( kind="Deployment", metadata=metadata, spec=spec, @@ -645,7 +645,7 @@ def stop_container(self, service_name, container_id): def restart(self, service): self._create_deployment(service.name, self._deployment_name(service.name), service.container_config, - service.shutdown_seconds, self.get_target(service.name), + service.shutdown_seconds, self.get_target(service.name), change_key=service.config_blob) def get_running_container_names(self): diff --git a/assemblyline_core/scaler/scaler_server.py b/assemblyline_core/scaler/scaler_server.py index 427b9e4e..42e9ac9b 100644 --- a/assemblyline_core/scaler/scaler_server.py +++ b/assemblyline_core/scaler/scaler_server.py @@ -109,9 +109,10 @@ class ServiceProfile: This includes how the service should be run, and conditions related to the scaling of the service. """ - def __init__(self, name: str, container_config: DockerConfig, config_blob:str='', min_instances:int=0, max_instances:int=None, - growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30, - dependency_blobs:dict[str, str]=None): + def __init__(self, name: str, container_config: DockerConfig, config_blob: str = '', + min_instances: int = 0, max_instances: int = None, growth: float = 600, + shrink: Optional[float] = None, backlog: int = 500, queue=None, + shutdown_seconds: int = 30, dependency_blobs: dict[str, str] = None): """ :param name: Name of the service to manage :param container_config: Instructions on how to start this service @@ -134,7 +135,7 @@ def __init__(self, name: str, container_config: DockerConfig, config_blob:str='' # How many instances we want, and can have self.min_instances: int = max(0, int(min_instances)) self._min_instances: int = self.min_instances - self._max_instances: float = max(0, int(max_instances)) if max_instances else float('inf') + self._max_instances: int = max(0, int(max_instances or 0)) self.desired_instances: int = 0 self.target_instances: int = 0 self.running_instances: int = 0 @@ -148,8 +149,8 @@ def __init__(self, name: str, container_config: DockerConfig, config_blob:str='' # How long does a backlog need to be before we are concerned self.backlog = int(backlog) self.queue_length = 0 - self.duty_cycle = 0 - self.last_update = 0 + self.duty_cycle = 0.0 + self.last_update = 0.0 @property def cpu(self): @@ -161,21 +162,19 @@ def ram(self): @property def instance_limit(self): - if self._max_instances == float('inf'): - return 0 return self._max_instances @property def max_instances(self) -> int: # Adjust the max_instances based on the number that is already requested # this keeps the scaler from running way ahead with its demands when resource caps are reached + if self._max_instances == 0: + return self.target_instances + 2 return min(self._max_instances, self.target_instances + 2) - def set_max_instances(self, value:int): - if value == 0: - self._max_instances = float('inf') - else: - self._max_instances = value + @max_instances.setter + def max_instances(self, value: int): + self._max_instances = max(0, value) def update(self, delta: float, instances: int, backlog: int, duty_cycle: float): self.last_update = time.time() @@ -388,21 +387,23 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # Build the docker config for the dependencies. For now the dependency blob values # aren't set for the change key going to kubernetes because everything about # the dependency config should be captured in change key that the function generates - # internally. A change key is set for the service deployment as that includes + # internally. A change key is set for the service deployment as that includes # things like the submission params - dependency_config: dict[str, Any] = {} - dependency_blobs: dict[str, str] = {} + dependency_config: dict[str, Any] = {} + dependency_blobs: dict[str, str] = {} for _n, dependency in service.dependencies.items(): dependency.container = prepare_container(dependency.container) dependency_config[_n] = dependency dependency_blobs[_n] = str(dependency) - if service.enabled and (stage == ServiceStage.Off or name not in self.profiles): + if service.enabled and stage == ServiceStage.Off: # Move to the next service stage (do this first because the container we are starting may care) if service.update_config and service.update_config.wait_for_update: self._service_stage_hash.set(name, ServiceStage.Update) + stage = ServiceStage.Update else: self._service_stage_hash.set(name, ServiceStage.Running) + stage = ServiceStage.Running # Enable this service's dependencies before trying to launch the service containers self.controller.prepare_network(service.name, service.docker_config.allow_internet_access) @@ -453,10 +454,10 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # Update RAM, CPU, licence requirements for running services else: profile = self.profiles[name] - profile.set_max_instances(service.licence_count) + profile.max_instances = service.licence_count for dependency_name, dependency_blob in dependency_blobs.items(): - if profile.dependency_blobs[dependency_name] != dependency_blob: + if profile.dependency_blobs[dependency_name] != dependency_blob: self.log.info(f"Updating deployment information for {name}/{dependency_name}") profile.dependency_blobs[dependency_name] = dependency_blob self.controller.start_stateful_container( @@ -474,7 +475,6 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: self.controller.restart(profile) self.log.info(f"Deployment information for {name} replaced") - except Exception: self.log.exception(f"Error applying service settings from: {service.name}") self.handle_service_error(service.name) @@ -552,8 +552,8 @@ def trim(prof: list[ServiceProfile]): prof = [_p for _p in prof if _p.desired_instances > targets[_p.name]] drop = [_p for _p in prof if _p.cpu > free_cpu or _p.ram > free_memory] if drop: - drop = {_p.name: (_p.cpu, _p.ram) for _p in drop} - self.log.debug(f"Can't make more because not enough resources {drop}") + summary = {_p.name: (_p.cpu, _p.ram) for _p in drop} + self.log.debug(f"Can't make more because not enough resources {summary}") prof = [_p for _p in prof if _p.cpu <= free_cpu and _p.ram <= free_memory] return prof diff --git a/assemblyline_core/server_base.py b/assemblyline_core/server_base.py index 439bf6f1..dd78e8b1 100644 --- a/assemblyline_core/server_base.py +++ b/assemblyline_core/server_base.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: from assemblyline.datastore.helper import AssemblylineDatastore from assemblyline.odm.models.config import Config + from redis import Redis SHUTDOWN_SECONDS_LIMIT = 10 @@ -100,10 +101,10 @@ def run(self): def sleep(self, timeout: float): self.stopping.wait(timeout) return self.running - + def serve_forever(self): self.start() - # We may not want to let the main thread block on a single join call. + # We may not want to let the main thread block on a single join call. # It can interfere with signal handling. while self.sleep(1): pass @@ -196,12 +197,12 @@ def __init__(self, component_name: str, logger: logging.Logger = None, self.datastore: AssemblylineDatastore = datastore or forge.get_datastore(self.config) # Connect to all of our persistent redis structures - self.redis = redis or get_client( + self.redis: Redis = redis or get_client( host=self.config.core.redis.nonpersistent.host, port=self.config.core.redis.nonpersistent.port, private=False, ) - self.redis_persist = redis_persist or get_client( + self.redis_persist: Redis = redis_persist or get_client( host=self.config.core.redis.persistent.host, port=self.config.core.redis.persistent.port, private=False,