Skip to content

Commit

Permalink
Merge pull request #295 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update (dev)
  • Loading branch information
cccs-douglass committed Sep 13, 2021
2 parents fd87423 + e7ae18e commit 0d01b05
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 119 deletions.
2 changes: 1 addition & 1 deletion assemblyline_core/scaler/controllers/docker_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def memory_info(self):
self.log.debug(f'Total Memory available {mem}/{self._info["MemTotal"]/mega}')
return mem, total_mem

def get_target(self, service_name):
def get_target(self, service_name: str) -> int:
"""Get how many instances of a service we expect to be running.
Since we start our containers with 'restart always' we just need to count how many
Expand Down
4 changes: 2 additions & 2 deletions assemblyline_core/scaler/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def cpu_info(self):
"""Return free and total memory in the system."""
raise NotImplementedError()

def free_cpu(self):
def free_cpu(self) -> float:
"""Number of cores available for reservation."""
return self.cpu_info()[0]

def free_memory(self):
def free_memory(self) -> float:
"""Megabytes of RAM that has not been reserved."""
return self.memory_info()[0]

Expand Down
34 changes: 23 additions & 11 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import threading
import weakref
from typing import Dict, List, Optional, Tuple
from typing import Optional, Tuple

import urllib3
import kubernetes
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_return_type(self, func):
return None


def median(values: List[float]) -> float:
def median(values: list[float]) -> float:
if len(values) == 0:
return 0
return values[len(values)//2]
Expand Down Expand Up @@ -149,15 +149,15 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
self.cpu_reservation: float = max(0.0, min(cpu_reservation, 1.0))
self.logger = logger
self.log_level: str = log_level
self._labels: Dict[str, str] = labels or {}
self._labels: dict[str, str] = labels or {}
self.apps_api = client.AppsV1Api()
self.api = client.CoreV1Api()
self.net_api = client.NetworkingV1Api()
self.namespace: str = namespace
self.config_volumes: Dict[str, V1Volume] = {}
self.config_mounts: Dict[str, V1VolumeMount] = {}
self.core_config_volumes: Dict[str, V1Volume] = {}
self.core_config_mounts: Dict[str, V1VolumeMount] = {}
self.config_volumes: dict[str, V1Volume] = {}
self.config_mounts: dict[str, V1VolumeMount] = {}
self.core_config_volumes: dict[str, V1Volume] = {}
self.core_config_mounts: dict[str, V1VolumeMount] = {}
self._external_profiles = weakref.WeakValueDictionary()
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)

Expand Down Expand Up @@ -191,7 +191,7 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
pod_background = threading.Thread(target=self._loop_forever(self._monitor_pods), daemon=True)
pod_background.start()

self._deployment_targets: Dict[str, int] = {}
self._deployment_targets: dict[str, int] = {}
deployment_background = threading.Thread(target=self._loop_forever(self._monitor_deployments), daemon=True)
deployment_background.start()

Expand Down Expand Up @@ -434,7 +434,7 @@ def memory_info(self):
return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram

@staticmethod
def _create_metadata(deployment_name: str, labels: Dict[str, str]):
def _create_metadata(deployment_name: str, labels: dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, core_mounts=False):
Expand Down Expand Up @@ -585,7 +585,7 @@ def get_target(self, service_name: str) -> int:
"""Get the target for running instances of a service."""
return self._deployment_targets.get(service_name, 0)

def get_targets(self) -> Dict[str, int]:
def get_targets(self) -> dict[str, int]:
"""Get the target for running instances of all services."""
return self._deployment_targets

Expand Down Expand Up @@ -674,8 +674,20 @@ def start_stateful_container(self, service_name: str, container_name: str,
))
mounts.append(V1VolumeMount(mount_path=volume_spec.mount_path, name=mount_name))

# Read the key being used for the deployment instance or generate a new one
try:
instance_key = uuid.uuid4().hex
old_deployment = self.apps_api.read_namespaced_deployment(deployment_name, self.namespace)
for container in old_deployment.spec.template.spec.containers:
for env in container.env:
if env.name == 'AL_INSTANCE_KEY':
instance_key = env.value
break
except ApiException as error:
if error.status != 404:
raise

# Setup the deployment itself
instance_key = uuid.uuid4().hex
labels['container'] = container_name
spec.container.environment.append({'name': 'AL_INSTANCE_KEY', 'value': instance_key})
self._create_deployment(service_name, deployment_name, spec.container,
Expand Down
Loading

0 comments on commit 0d01b05

Please sign in to comment.