diff --git a/assemblyline_core/scaler/scaler_server.py b/assemblyline_core/scaler/scaler_server.py index 92234423..7b40ccf4 100644 --- a/assemblyline_core/scaler/scaler_server.py +++ b/assemblyline_core/scaler/scaler_server.py @@ -109,7 +109,7 @@ class ServiceProfile: This includes how the service should be run, and conditions related to the scaling of the service. """ - def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0, min_instances:int=0, max_instances:int=None, + def __init__(self, name: str, container_config: DockerConfig, config_blob:str='', min_instances:int=0, max_instances:int=None, growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30): """ :param name: Name of the service to manage @@ -127,7 +127,7 @@ def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0, self.high_duty_cycle = 0.7 self.low_duty_cycle = 0.5 self.shutdown_seconds = shutdown_seconds - self.config_hash = config_hash + self.config_blob = config_blob # How many instances we want, and can have self.min_instances: int = max(0, int(min_instances)) @@ -211,7 +211,7 @@ def __deepcopy__(self, memodict=None): prof = ServiceProfile( name=self.name, container_config=DockerConfig(self.container_config.as_primitives()), - config_hash=self.config_hash, + config_blob=self.config_blob, min_instances=self.min_instances, max_instances=self.max_instances, growth=self.growth_threshold, @@ -378,7 +378,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # noinspection PyBroadException try: if service.enabled and (stage == ServiceStage.Off or name not in self.profiles): - # Enable this service's dependencies + # Enable this service's dependencies before trying to launch the service containers self.controller.prepare_network(service.name, service.docker_config.allow_internet_access) for _n, dependency in service.dependencies.items(): dependency.container = prepare_container(dependency.container) @@ -401,14 +401,22 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: # Check that all enabled services are enabled if service.enabled and stage == ServiceStage.Running: - # Compute a hash of service properties not include in the docker config, that + # Compute a blob of service properties not include in the docker config, that # should still result in a service being restarted when changed - config_hash = hash(str(sorted(service.config.items()))) - config_hash = hash((config_hash, str(service.submission_params))) + config_blob = str(sorted(service.config.items())) + config_blob += str(service.submission_params) # Build the docker config for the service, we are going to either create it or # update it so we need to know what the current configuration is either way docker_config = prepare_container(service.docker_config) + config_blob += str(docker_config) + + # Build the docker config for the dependencies. + dependency_config = {} + for _n, dependency in service.dependencies.items(): + dependency.container = prepare_container(dependency.container) + dependency_config[_n] = dependency + config_blob += str(sorted(dependency_config.items())) # Add the service to the list of services being scaled with self.profiles_lock: @@ -419,7 +427,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: min_instances=default_settings.min_instances, growth=default_settings.growth, shrink=default_settings.shrink, - config_hash=config_hash, + config_blob=config_blob, backlog=default_settings.backlog, max_instances=service.licence_count, container_config=docker_config, @@ -436,13 +444,25 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig: else: profile._max_instances = service.licence_count - if profile.container_config != docker_config or profile.config_hash != config_hash: + if profile.config_blob != config_blob: self.log.info(f"Updating deployment information for {name}") + # Update the dependencies. Should do nothing if container spec is the same. + # let kubernetes decide if anything needs to change though. + for _n, dependency in dependency_config.items(): + self.controller.start_stateful_container( + service_name=service.name, + container_name=_n, + spec=dependency, + labels={'dependency_for': service.name} + ) + + # Update the service itself profile.container_config = docker_config - profile.config_hash = config_hash + profile.config_blob = config_blob self.controller.restart(profile) self.log.info(f"Deployment information for {name} replaced") + except Exception: self.log.exception(f"Error applying service settings from: {service.name}") self.handle_service_error(service.name) diff --git a/assemblyline_core/server_base.py b/assemblyline_core/server_base.py index cb9af0c6..0fe7cab3 100644 --- a/assemblyline_core/server_base.py +++ b/assemblyline_core/server_base.py @@ -12,7 +12,7 @@ import sys import io import os -from typing import cast, Callable, TYPE_CHECKING +from typing import Callable, TYPE_CHECKING from assemblyline.remote.datatypes import get_client from assemblyline.remote.datatypes.hash import Hash @@ -208,7 +208,7 @@ def __init__(self, component_name: str, logger: logging.Logger = None, ) # Create a cached service data object, and access to the service status - self.service_info = cast(dict[str, Service], forge.CachedObject(self._get_services)) + self.service_info: dict[str, Service] = forge.CachedObject(self._get_services) self._service_stage_hash = get_service_stage_hash(self.redis) def _get_services(self):