Skip to content

Commit

Permalink
type annotation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-douglass committed Sep 13, 2021
1 parent 1c67354 commit e7ae18e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion assemblyline_core/scaler/controllers/docker_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def memory_info(self):
self.log.debug(f'Total Memory available {mem}/{self._info["MemTotal"]/mega}')
return mem, total_mem

def get_target(self, service_name):
def get_target(self, service_name: str) -> int:
"""Get how many instances of a service we expect to be running.
Since we start our containers with 'restart always' we just need to count how many
Expand Down
4 changes: 2 additions & 2 deletions assemblyline_core/scaler/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def cpu_info(self):
"""Return free and total memory in the system."""
raise NotImplementedError()

def free_cpu(self):
def free_cpu(self) -> float:
"""Number of cores available for reservation."""
return self.cpu_info()[0]

def free_memory(self):
def free_memory(self) -> float:
"""Megabytes of RAM that has not been reserved."""
return self.memory_info()[0]

Expand Down
20 changes: 10 additions & 10 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import threading
import weakref
from typing import Dict, List, Optional, Tuple
from typing import Optional, Tuple

import urllib3
import kubernetes
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_return_type(self, func):
return None


def median(values: List[float]) -> float:
def median(values: list[float]) -> float:
if len(values) == 0:
return 0
return values[len(values)//2]
Expand Down Expand Up @@ -149,15 +149,15 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
self.cpu_reservation: float = max(0.0, min(cpu_reservation, 1.0))
self.logger = logger
self.log_level: str = log_level
self._labels: Dict[str, str] = labels or {}
self._labels: dict[str, str] = labels or {}
self.apps_api = client.AppsV1Api()
self.api = client.CoreV1Api()
self.net_api = client.NetworkingV1Api()
self.namespace: str = namespace
self.config_volumes: Dict[str, V1Volume] = {}
self.config_mounts: Dict[str, V1VolumeMount] = {}
self.core_config_volumes: Dict[str, V1Volume] = {}
self.core_config_mounts: Dict[str, V1VolumeMount] = {}
self.config_volumes: dict[str, V1Volume] = {}
self.config_mounts: dict[str, V1VolumeMount] = {}
self.core_config_volumes: dict[str, V1Volume] = {}
self.core_config_mounts: dict[str, V1VolumeMount] = {}
self._external_profiles = weakref.WeakValueDictionary()
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)

Expand Down Expand Up @@ -191,7 +191,7 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
pod_background = threading.Thread(target=self._loop_forever(self._monitor_pods), daemon=True)
pod_background.start()

self._deployment_targets: Dict[str, int] = {}
self._deployment_targets: dict[str, int] = {}
deployment_background = threading.Thread(target=self._loop_forever(self._monitor_deployments), daemon=True)
deployment_background.start()

Expand Down Expand Up @@ -434,7 +434,7 @@ def memory_info(self):
return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram

@staticmethod
def _create_metadata(deployment_name: str, labels: Dict[str, str]):
def _create_metadata(deployment_name: str, labels: dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, core_mounts=False):
Expand Down Expand Up @@ -585,7 +585,7 @@ def get_target(self, service_name: str) -> int:
"""Get the target for running instances of a service."""
return self._deployment_targets.get(service_name, 0)

def get_targets(self) -> Dict[str, int]:
def get_targets(self) -> dict[str, int]:
"""Get the target for running instances of all services."""
return self._deployment_targets

Expand Down
20 changes: 10 additions & 10 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
from collections import defaultdict
from string import Template
from typing import Dict, List, Optional, Any
from typing import Optional, Any
import os
import math
import time
Expand Down Expand Up @@ -164,12 +164,12 @@ def instance_limit(self):
return self._max_instances

@property
def max_instances(self):
def max_instances(self) -> int:
# Adjust the max_instances based on the number that is already requested
# this keeps the scaler from running way ahead with its demands when resource caps are reached
return min(self._max_instances, self.target_instances + 2)

def update(self, delta, instances, backlog, duty_cycle):
def update(self, delta: float, instances: int, backlog: int, duty_cycle: float):
self.last_update = time.time()
self.running_instances = instances
self.queue_length = backlog
Expand Down Expand Up @@ -235,7 +235,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):

self.scaler_timeout_queue = NamedQueue(SCALER_TIMEOUT_QUEUE, host=self.redis_persist)
self.error_count_lock = threading.Lock()
self.error_count: Dict[str, List[float]] = {}
self.error_count: dict[str, list[float]] = {}
self.status_table = ExpiringHash(SERVICE_STATE_HASH, host=self.redis, ttl=30*60)
self.service_change_watcher = EventWatcher(self.redis, deserializer=ServiceChange.deserialize)
self.service_change_watcher.register('changes.services.*', self._handle_service_change_event)
Expand Down Expand Up @@ -274,7 +274,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
self.controller.global_mounts.append((CLASSIFICATION_HOST_PATH, '/etc/assemblyline/classification.yml'))

# Information about services
self.profiles: Dict[str, ServiceProfile] = {}
self.profiles: dict[str, ServiceProfile] = {}
self.profiles_lock = threading.RLock()

# Prepare a single threaded scheduler
Expand Down Expand Up @@ -364,7 +364,7 @@ def _sync_service(self, service: Service):
name = service.name
stage = self.get_service_stage(service.name)
default_settings = self.config.core.scaler.service_defaults
image_variables = defaultdict(str)
image_variables: defaultdict[str, str] = defaultdict(str)
image_variables.update(self.config.services.image_variables)

def prepare_container(docker_config: DockerConfig) -> DockerConfig:
Expand Down Expand Up @@ -473,7 +473,7 @@ def update_scaling(self):
# Figure out what services are expected to be running and how many
with elasticapm.capture_span('read_profiles'):
with self.profiles_lock:
all_profiles: Dict[str, ServiceProfile] = copy.deepcopy(self.profiles)
all_profiles: dict[str, ServiceProfile] = copy.deepcopy(self.profiles)
raw_targets = self.controller.get_targets()
targets = {_p.name: raw_targets.get(_p.name, 0) for _p in all_profiles.values()}

Expand Down Expand Up @@ -516,7 +516,7 @@ def update_scaling(self):
free_memory = self.controller.free_memory()

#
def trim(prof: List[ServiceProfile]):
def trim(prof: list[ServiceProfile]):
prof = [_p for _p in prof if _p.desired_instances > targets[_p.name]]
drop = [_p for _p in prof if _p.cpu > free_cpu or _p.ram > free_memory]
if drop:
Expand All @@ -525,7 +525,7 @@ def trim(prof: List[ServiceProfile]):
prof = [_p for _p in prof if _p.cpu <= free_cpu and _p.ram <= free_memory]
return prof

remaining_profiles: List[ServiceProfile] = trim(list(all_profiles.values()))
remaining_profiles: list[ServiceProfile] = trim(list(all_profiles.values()))
# The target values up until now should be in sync with the container orchestrator
# create a copy, so we can track which ones change in the following loop
old_targets = dict(targets)
Expand Down Expand Up @@ -553,7 +553,7 @@ def trim(prof: List[ServiceProfile]):
pool.call(self.controller.set_target, name, value)

@elasticapm.capture_span(span_type=APM_SPAN_TYPE)
def handle_service_error(self, service_name):
def handle_service_error(self, service_name: str):
"""Handle an error occurring in the *analysis* service.
Errors for core systems should simply be logged, and a best effort to continue made.
Expand Down

0 comments on commit e7ae18e

Please sign in to comment.