Skip to content

Commit

Permalink
Use the non-beta version of the apps api.
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-douglass committed Apr 7, 2020
1 parent daabe34 commit 7a30632
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def __init__(self, logger, namespace, prefix, priority, labels=None):
self.priority = priority
self.logger = logger
self._labels = labels
self.b1api = client.AppsV1beta1Api()
self.apps_api = client.AppsV1Api()
self.api = client.CoreV1Api()
self.net_api = client.NetworkingV1Api()
self.auto_cloud = False # TODO draw from config
self.auto_cloud = False # TODO draw from config
self.namespace = namespace
self.config_mounts: List[Tuple[V1Volume, V1VolumeMount]] = []

Expand Down Expand Up @@ -202,7 +202,8 @@ def memory_info(self):
memory -= parse_memory(requests.get('memory', limits.get('memory', '16Mi')))
return memory, max_memory

def _create_metadata(self, deployment_name: str, labels: Dict[str, str]):
@staticmethod
def _create_metadata(deployment_name: str, labels: Dict[str, str]):
return V1ObjectMeta(name=deployment_name, labels=labels)

def _create_volumes(self, service_name):
Expand Down Expand Up @@ -231,7 +232,8 @@ def _create_volumes(self, service_name):

return volumes, mounts

def _create_containers(self, deployment_name, container_config, mounts):
@staticmethod
def _create_containers(deployment_name, container_config, mounts):
cores = container_config.cpu_cores
memory = container_config.ram_mb
min_memory = max(int(memory/4), min(64, memory))
Expand Down Expand Up @@ -261,7 +263,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
if not os.path.exists(os.path.join(FILE_UPDATE_DIRECTORY, service_name)):
os.makedirs(os.path.join(FILE_UPDATE_DIRECTORY, service_name), 0x777)

for dep in self.b1api.list_namespaced_deployment(namespace=self.namespace).items:
for dep in self.apps_api.list_namespaced_deployment(namespace=self.namespace).items:
if dep.metadata.name == deployment_name:
replace = True

Expand Down Expand Up @@ -300,15 +302,16 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con

if replace:
self.logger.info("Requesting kubernetes replace deployment info for: " + metadata.name)
self.b1api.replace_namespaced_deployment(namespace=self.namespace, body=deployment, name=metadata.name)
self.apps_api.replace_namespaced_deployment(namespace=self.namespace, body=deployment, name=metadata.name)
else:
self.logger.info("Requesting kubernetes create deployment info for: " + metadata.name)
self.b1api.create_namespaced_deployment(namespace=self.namespace, body=deployment)
self.apps_api.create_namespaced_deployment(namespace=self.namespace, body=deployment)

def get_target(self, service_name: str) -> int:
"""Get the target for running instances of a service."""
try:
scale = self.b1api.read_namespaced_deployment_scale(self._deployment_name(service_name), namespace=self.namespace)
scale = self.apps_api.read_namespaced_deployment_scale(self._deployment_name(service_name),
namespace=self.namespace)
return int(scale.spec.replicas or 0)
except ApiException as error:
# If we get a 404 it means the resource doesn't exist, which we treat the same as
Expand All @@ -320,9 +323,9 @@ def get_target(self, service_name: str) -> int:
def set_target(self, service_name: str, target: int):
"""Set the target for running instances of a service."""
name = self._deployment_name(service_name)
scale = self.b1api.read_namespaced_deployment_scale(name=name, namespace=self.namespace)
scale = self.apps_api.read_namespaced_deployment_scale(name=name, namespace=self.namespace)
scale.spec.replicas = target
self.b1api.replace_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale)
self.apps_api.replace_namespaced_deployment_scale(name=name, namespace=self.namespace, body=scale)

def stop_container(self, service_name, container_id):
pods = self.api.list_namespaced_pod(namespace=self.namespace, label_selector=f'component={service_name}')
Expand All @@ -333,7 +336,7 @@ def stop_container(self, service_name, container_id):

def restart(self, service):
self._create_deployment(service.name, self._deployment_name(service.name), service.container_config,
service.timeout, self.get_target(service.name))
service.shutdown_seconds, self.get_target(service.name))

def get_running_container_names(self):
pods = self.api.list_pod_for_all_namespaces(field_selector='status.phase==Running')
Expand Down Expand Up @@ -368,7 +371,10 @@ def start_stateful_container(self, service_name, container_name, spec, labels):
self._ensure_pvc(mount_name, volume_spec.storage_class, volume_spec.capacity)

# Create the volume info
volumes.append(V1Volume(name=mount_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(mount_name)))
volumes.append(V1Volume(
name=mount_name,
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(mount_name)
))
mounts.append(V1VolumeMount(mount_path=volume_spec.mount_path, name=mount_name))

self._create_deployment(service_name, deployment_name, spec.container,
Expand All @@ -383,8 +389,9 @@ def _ensure_pvc(self, name, storage_class, size):

def stop_containers(self, labels):
label_selector = ','.join(f'{_n}={_v}' for _n, _v in labels.items())
for dep in self.b1api.list_namespaced_deployment(namespace=self.namespace, label_selector=label_selector).items:
self.b1api.delete_namespaced_deployment(name=dep.metadata.name, namespace=self.namespace)
deployments = self.apps_api.list_namespaced_deployment(namespace=self.namespace, label_selector=label_selector)
for dep in deployments.items:
self.apps_api.delete_namespaced_deployment(name=dep.metadata.name, namespace=self.namespace)

def prepare_network(self, service_name, internet):
safe_name = service_name.lower().replace('_', '-')
Expand Down Expand Up @@ -455,4 +462,4 @@ def prepare_network(self, service_name, internet):
}),
egress=[V1NetworkPolicyEgressRule(to=[])],
)
))
))

0 comments on commit 7a30632

Please sign in to comment.