Skip to content

Commit

Permalink
Merge pull request #268 from CybercentreCanada/kubernetes-watches
Browse files Browse the repository at this point in the history
Improve error handling in scaler (dev)
  • Loading branch information
cccs-douglass committed Aug 3, 2021
2 parents 10d6efe + 12e5251 commit a851021
Showing 1 changed file with 125 additions and 128 deletions.
253 changes: 125 additions & 128 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import functools
import json
import os
import threading
import weakref
from typing import Dict, List, Optional, Tuple

import urllib3
import kubernetes
from kubernetes import client, config
from kubernetes.client import ExtensionsV1beta1Deployment, ExtensionsV1beta1DeploymentSpec, V1PodTemplateSpec, \
Expand Down Expand Up @@ -172,21 +174,21 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=
self._quota_cpu_used: Optional[float] = None
self._quota_mem_limit: Optional[float] = None
self._quota_mem_used: Optional[float] = None
quota_background = threading.Thread(target=self._monitor_quotas, daemon=True)
quota_background = threading.Thread(target=self._loop_forever(self._monitor_quotas), daemon=True)
quota_background.start()

self._node_pool_max_ram: float = 0
self._node_pool_max_cpu: float = 0
node_background = threading.Thread(target=self._monitor_node_pool, daemon=True)
node_background = threading.Thread(target=self._loop_forever(self._monitor_node_pool), daemon=True)
node_background.start()

self._pod_used_ram: float = 0
self._pod_used_cpu: float = 0
pod_background = threading.Thread(target=self._monitor_pods, daemon=True)
pod_background = threading.Thread(target=self._loop_forever(self._monitor_pods), daemon=True)
pod_background.start()

self._deployment_targets: Dict[str, int] = {}
deployment_background = threading.Thread(target=self._monitor_deployments, daemon=True)
deployment_background = threading.Thread(target=self._loop_forever(self._monitor_deployments), daemon=True)
deployment_background.start()

def stop(self):
Expand Down Expand Up @@ -218,151 +220,146 @@ def add_profile(self, profile, scale=0):
mount_updates=profile.mount_updates)
self._external_profiles[profile.name] = profile

def _monitor_node_pool(self):
while self.running:
try:
self._node_pool_max_cpu = 0
self._node_pool_max_ram = 0
watch = TypelessWatch()
def _loop_forever(self, function):
@functools.wraps(function)
def _function():
while self.running:
# noinspection PyBroadException
try:
function()

for event in watch.stream(func=self.api.list_node):
if not self.running:
break
except urllib3.exceptions.ProtocolError:
# Protocol errors are a product of api connections timing out, just retry silently.
pass

if event['type'] == "ADDED":
self._node_pool_max_cpu += parse_cpu(event['raw_object']['status']['allocatable']['cpu'])
self._node_pool_max_ram += parse_memory(event['raw_object']['status']['allocatable']['memory'])
elif event['type'] == "DELETED":
self._node_pool_max_cpu -= parse_cpu(event['raw_object']['status']['allocatable']['cpu'])
self._node_pool_max_ram -= parse_memory(event['raw_object']['status']['allocatable']['memory'])
except Exception:
self.logger.exception(f"Error in {function.__name__}")
return _function

except ApiException:
self.logger.exception("Error in node pool loop")
def _monitor_node_pool(self):
self._node_pool_max_cpu = 0
self._node_pool_max_ram = 0
watch = TypelessWatch()

def _monitor_pods(self):
while self.running:
try:
watch = TypelessWatch()
containers = {}
self._pod_used_cpu = 0
self._pod_used_ram = 0
for event in watch.stream(func=self.api.list_node):
if not self.running:
break

for event in watch.stream(func=self.api.list_pod_for_all_namespaces):
if not self.running:
break
if event['type'] == "ADDED":
self._node_pool_max_cpu += parse_cpu(event['raw_object']['status']['allocatable']['cpu'])
self._node_pool_max_ram += parse_memory(event['raw_object']['status']['allocatable']['memory'])
elif event['type'] == "DELETED":
self._node_pool_max_cpu -= parse_cpu(event['raw_object']['status']['allocatable']['cpu'])
self._node_pool_max_ram -= parse_memory(event['raw_object']['status']['allocatable']['memory'])

uid = event['raw_object']['metadata']['uid']
def _monitor_pods(self):
watch = TypelessWatch()
containers = {}
self._pod_used_cpu = 0
self._pod_used_ram = 0

if event['type'] in ['ADDED', 'MODIFIED']:
for container in event['raw_object']['spec']['containers']:
containers[f"{uid}-{container['name']}"] = get_resources(container)
elif event['type'] == 'DELETED':
for container in event['raw_object']['spec']['containers']:
containers.pop(f"{uid}-{container['name']}", None)
else:
continue
for event in watch.stream(func=self.api.list_pod_for_all_namespaces):
if not self.running:
break

memory_unrestricted = sum(1 for cpu, mem in containers.values() if mem is None)
cpu_unrestricted = sum(1 for cpu, mem in containers.values() if cpu is None)
uid = event['raw_object']['metadata']['uid']

memory_used = [mem for cpu, mem in containers.values() if mem is not None]
cpu_used = [cpu for cpu, mem in containers.values() if cpu is not None]
if event['type'] in ['ADDED', 'MODIFIED']:
for container in event['raw_object']['spec']['containers']:
containers[f"{uid}-{container['name']}"] = get_resources(container)
elif event['type'] == 'DELETED':
for container in event['raw_object']['spec']['containers']:
containers.pop(f"{uid}-{container['name']}", None)
else:
continue

self._pod_used_cpu = sum(cpu_used) + cpu_unrestricted * median(cpu_used)
self._pod_used_ram = sum(memory_used) + memory_unrestricted * median(memory_used)
memory_unrestricted = sum(1 for cpu, mem in containers.values() if mem is None)
cpu_unrestricted = sum(1 for cpu, mem in containers.values() if cpu is None)

except ApiException:
self.logger.exception("Error in pod loop")
memory_used = [mem for cpu, mem in containers.values() if mem is not None]
cpu_used = [cpu for cpu, mem in containers.values() if cpu is not None]

self._pod_used_cpu = sum(cpu_used) + cpu_unrestricted * median(cpu_used)
self._pod_used_ram = sum(memory_used) + memory_unrestricted * median(memory_used)

def _monitor_quotas(self):
watch = TypelessWatch()
cpu_limits = {}
cpu_used = {}
mem_limits = {}
mem_used = {}

self._quota_cpu_limit = None
self._quota_cpu_used = None
self._quota_mem_limit = None
self._quota_mem_used = None

for event in watch.stream(func=self.api.list_namespaced_resource_quota, namespace=self.namespace):
if not self.running:
break

while self.running:
try:
watch = TypelessWatch()
cpu_limits = {}
cpu_used = {}
mem_limits = {}
mem_used = {}
name = event['raw_object']['metadata']['name']
if 'scope_selector' in event['raw_object']['spec'] or 'scopes' in event['raw_object']['spec']:
continue

if event['type'] in ['ADDED', 'MODIFIED']:
status = event['raw_object']['status']

if 'hard' in status:
if 'cpu' in status['hard']:
cpu_limits[name] = parse_cpu(status['hard']['cpu'])
if 'memory' in status['hard']:
mem_limits[name] = parse_memory(status['hard']['memory'])

if 'used' in status:
if 'cpu' in status['used']:
cpu_used[name] = parse_cpu(status['used']['cpu'])
if 'memory' in status['used']:
mem_used[name] = parse_memory(status['used']['memory'])

elif event['type'] == 'DELETED':
cpu_limits.pop(name, None)
cpu_used.pop(name, None)
mem_limits.pop(name, None)
mem_used.pop(name, None)
else:
continue

if cpu_limits:
self._quota_cpu_limit = min(cpu_limits.values())
else:
self._quota_cpu_limit = None

if cpu_used:
self._quota_cpu_used = max(cpu_used.values())
else:
self._quota_cpu_used = None

if mem_limits:
self._quota_mem_limit = min(mem_limits.values())
else:
self._quota_mem_limit = None
self._quota_mem_used = None

for event in watch.stream(func=self.api.list_namespaced_resource_quota, namespace=self.namespace):
if not self.running:
break

name = event['raw_object']['metadata']['name']
if 'scope_selector' in event['raw_object']['spec'] or 'scopes' in event['raw_object']['spec']:
continue

if event['type'] in ['ADDED', 'MODIFIED']:
status = event['raw_object']['status']

if 'hard' in status:
if 'cpu' in status['hard']:
cpu_limits[name] = parse_cpu(status['hard']['cpu'])
if 'memory' in status['hard']:
mem_limits[name] = parse_memory(status['hard']['memory'])

if 'used' in status:
if 'cpu' in status['used']:
cpu_used[name] = parse_cpu(status['used']['cpu'])
if 'memory' in status['used']:
mem_used[name] = parse_memory(status['used']['memory'])

elif event['type'] == 'DELETED':
cpu_limits.pop(name, None)
cpu_used.pop(name, None)
mem_limits.pop(name, None)
mem_used.pop(name, None)
else:
continue

if cpu_limits:
self._quota_cpu_limit = min(cpu_limits.values())
else:
self._quota_cpu_limit = None

if cpu_used:
self._quota_cpu_used = max(cpu_used.values())
else:
self._quota_cpu_used = None

if mem_limits:
self._quota_mem_limit = min(mem_limits.values())
else:
self._quota_mem_limit = None

if mem_used:
self._quota_mem_used = max(mem_used.values())
else:
self._quota_mem_used = None

except ApiException:
self.logger.exception("Error in quota monitoring")
if mem_used:
self._quota_mem_used = max(mem_used.values())
else:
self._quota_mem_used = None

def _monitor_deployments(self):
while self.running:
try:
watch = TypelessWatch()

self._deployment_targets = {}
label_selector = ','.join(f'{_n}={_v}' for _n, _v in self._labels.items())

for event in watch.stream(func=self.apps_api.list_namespaced_deployment,
namespace=self.namespace, label_selector=label_selector):
if event['type'] in ['ADDED', 'MODIFIED']:
name = event['raw_object']['metadata']['labels'].get('component', None)
if name is not None:
self._deployment_targets[name] = event['raw_object']['spec']['replicas']
elif event['type'] == 'DELETED':
name = event['raw_object']['metadata']['labels'].get('component', None)
self._deployment_targets.pop(name, None)

except ApiException:
self.logger.exception("Error in quota monitoring")
watch = TypelessWatch()

self._deployment_targets = {}
label_selector = ','.join(f'{_n}={_v}' for _n, _v in self._labels.items())

for event in watch.stream(func=self.apps_api.list_namespaced_deployment,
namespace=self.namespace, label_selector=label_selector):
if event['type'] in ['ADDED', 'MODIFIED']:
name = event['raw_object']['metadata']['labels'].get('component', None)
if name is not None:
self._deployment_targets[name] = event['raw_object']['spec']['replicas']
elif event['type'] == 'DELETED':
name = event['raw_object']['metadata']['labels'].get('component', None)
self._deployment_targets.pop(name, None)

def cpu_info(self):
if self._quota_cpu_used and self._quota_cpu_limit:
Expand Down

0 comments on commit a851021

Please sign in to comment.