Skip to content

Commit

Permalink
Merge pull request #280 from CybercentreCanada/resource-quota-support
Browse files Browse the repository at this point in the history
support more resource quota setups
  • Loading branch information
cccs-douglass committed Aug 17, 2021
2 parents df2ac46 + f3e5bec commit d148260
Showing 1 changed file with 42 additions and 4 deletions.
46 changes: 42 additions & 4 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def __init__(self, logger, namespace, prefix, priority, cpu_reservation, labels=

self._pod_used_ram: float = 0
self._pod_used_cpu: float = 0
self._pod_used_namespace_ram: float = 0
self._pod_used_namespace_cpu: float = 0
pod_background = threading.Thread(target=self._loop_forever(self._monitor_pods), daemon=True)
pod_background.start()

Expand Down Expand Up @@ -258,22 +260,29 @@ def _monitor_node_pool(self):
def _monitor_pods(self):
watch = TypelessWatch()
containers = {}
namespaced_containers = {}
self._pod_used_cpu = 0
self._pod_used_ram = 0
self._pod_used_namespace_cpu = 0
self._pod_used_namespace_ram = 0

for event in watch.stream(func=self.api.list_pod_for_all_namespaces, timeout_seconds=WATCH_TIMEOUT,
_request_timeout=WATCH_API_TIMEOUT):
if not self.running:
break

uid = event['raw_object']['metadata']['uid']
namespace = event['raw_object']['metadata']['namespace']

if event['type'] in ['ADDED', 'MODIFIED']:
for container in event['raw_object']['spec']['containers']:
containers[f"{uid}-{container['name']}"] = get_resources(container)
if namespace == self.namespace:
namespaced_containers[f"{uid}-{container['name']}"] = get_resources(container)
elif event['type'] == 'DELETED':
for container in event['raw_object']['spec']['containers']:
containers.pop(f"{uid}-{container['name']}", None)
namespaced_containers.pop(f"{uid}-{container['name']}", None)
else:
continue

Expand All @@ -286,6 +295,15 @@ def _monitor_pods(self):
self._pod_used_cpu = sum(cpu_used) + cpu_unrestricted * median(cpu_used)
self._pod_used_ram = sum(memory_used) + memory_unrestricted * median(memory_used)

memory_unrestricted = sum(1 for cpu, mem in namespaced_containers.values() if mem is None)
cpu_unrestricted = sum(1 for cpu, mem in namespaced_containers.values() if cpu is None)

memory_used = [mem for cpu, mem in namespaced_containers.values() if mem is not None]
cpu_used = [cpu for cpu, mem in namespaced_containers.values() if cpu is not None]

self._pod_used_namespace_cpu = sum(cpu_used) + cpu_unrestricted * median(cpu_used)
self._pod_used_namespace_ram = sum(memory_used) + memory_unrestricted * median(memory_used)

def _monitor_quotas(self):
watch = TypelessWatch()
cpu_limits = {}
Expand Down Expand Up @@ -313,14 +331,30 @@ def _monitor_quotas(self):
if 'hard' in status:
if 'cpu' in status['hard']:
cpu_limits[name] = parse_cpu(status['hard']['cpu'])
if 'requests.cpu' in status['hard']:
cpu_limits[name] = parse_cpu(status['hard']['requests.cpu'])
if 'limits.cpu' in status['hard']:
cpu_limits[name] = parse_cpu(status['hard']['limits.cpu'])
if 'memory' in status['hard']:
mem_limits[name] = parse_memory(status['hard']['memory'])
if 'requests.memory' in status['hard']:
mem_limits[name] = parse_memory(status['hard']['requests.memory'])
if 'limits.memory' in status['hard']:
mem_limits[name] = parse_memory(status['hard']['limits.memory'])

if 'used' in status:
if 'cpu' in status['used']:
cpu_used[name] = parse_cpu(status['used']['cpu'])
if 'requests.cpu' in status['used']:
cpu_used[name] = parse_cpu(status['used']['requests.cpu'])
if 'limits.cpu' in status['used']:
cpu_used[name] = parse_cpu(status['used']['limits.cpu'])
if 'memory' in status['used']:
mem_used[name] = parse_memory(status['used']['memory'])
if 'requests.memory' in status['used']:
mem_used[name] = parse_memory(status['used']['requests.memory'])
if 'limits.memory' in status['used']:
mem_used[name] = parse_memory(status['used']['limits.memory'])

elif event['type'] == 'DELETED':
cpu_limits.pop(name, None)
Expand Down Expand Up @@ -368,13 +402,17 @@ def _monitor_deployments(self):
self._deployment_targets.pop(name, None)

def cpu_info(self):
if self._quota_cpu_used and self._quota_cpu_limit:
return self._quota_cpu_limit - self._quota_cpu_used, self._quota_cpu_limit
if self._quota_cpu_limit:
if self._quota_cpu_used:
return self._quota_cpu_limit - self._quota_cpu_used, self._quota_cpu_limit
return self._quota_cpu_limit - self._pod_used_namespace_cpu, self._quota_cpu_limit
return self._node_pool_max_cpu - self._pod_used_cpu, self._node_pool_max_cpu

def memory_info(self):
if self._quota_mem_used and self._quota_mem_limit:
return self._quota_mem_limit - self._quota_mem_used, self._quota_mem_limit
if self._quota_mem_limit:
if self._quota_mem_used:
return self._quota_mem_limit - self._quota_mem_used, self._quota_mem_limit
return self._quota_mem_limit - self._pod_used_namespace_ram, self._quota_mem_limit
return self._node_pool_max_ram - self._pod_used_ram, self._node_pool_max_ram

@staticmethod
Expand Down

0 comments on commit d148260

Please sign in to comment.