Skip to content

Commit

Permalink
Merge pull request #288 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Decouple RWX for update-directory
  • Loading branch information
cccs-rs authored Sep 2, 2021
2 parents 6aaa30d + b768446 commit 92fab0b
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 112 deletions.
38 changes: 9 additions & 29 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@


# How to identify the update volume as a whole, in a way that the underlying container system recognizes.
FILE_UPDATE_VOLUME = os.environ.get('FILE_UPDATE_VOLUME', None)
CONTAINER_UPDATE_DIRECTORY = '/mount/updates/'

# Where to find the update directory inside this container.
FILE_UPDATE_DIRECTORY = os.environ.get('FILE_UPDATE_DIRECTORY', None)
# RESERVE_MEMORY_PER_NODE = os.environ.get('RESERVE_MEMORY_PER_NODE')

API_TIMEOUT = 90
Expand All @@ -50,6 +47,7 @@

class TypelessWatch(kubernetes.watch.Watch):
"""A kubernetes watch object that doesn't marshal the response."""

def get_return_type(self, func):
return None

Expand Down Expand Up @@ -210,7 +208,7 @@ def _deployment_name(self, service_name: str):
def _dependency_name(self, service_name: str, container_name: str):
return f"{self._deployment_name(service_name)}-{container_name}".lower()

def config_mount(self, name:str, config_map:str, key:str, target_path:str):
def config_mount(self, name: str, config_map: str, key: str, target_path: str):
if name not in self.config_volumes:
self.config_volumes[name] = V1Volume(
name=name,
Expand Down Expand Up @@ -455,24 +453,10 @@ def _create_volumes(self, service_name, mount_updates=False, core_mounts=False):
volumes.extend(self.core_config_volumes.values())
mounts.extend(self.core_config_mounts.values())

if mount_updates:
# Attach the mount that provides the update
volumes.append(V1Volume(
name='update-directory',
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
claim_name=FILE_UPDATE_VOLUME
),
))

mounts.append(V1VolumeMount(
name='update-directory',
mount_path=CONTAINER_UPDATE_DIRECTORY,
sub_path=service_name
))

return volumes, mounts

def _create_containers(self, service_name: str, deployment_name:str, container_config, mounts, core_container=False):
def _create_containers(self, service_name: str, deployment_name: str, container_config, mounts,
core_container=False):
cores = container_config.cpu_cores
memory = container_config.ram_mb
min_memory = min(container_config.ram_mb_min, container_config.ram_mb)
Expand All @@ -488,7 +472,7 @@ def _create_containers(self, service_name: str, deployment_name:str, container_c
V1EnvVar(name='FILE_UPDATE_DIRECTORY', value=CONTAINER_UPDATE_DIRECTORY),
V1EnvVar(name='AL_SERVICE_NAME', value=service_name),
V1EnvVar(name='LOG_LEVEL', value=self.log_level)
]
]
# Overwrite ones defined dynamically by dependency container launches
for name, value in self._service_limited_env[service_name].items():
environment_variables.append(V1EnvVar(name=name, value=value))
Expand All @@ -510,10 +494,6 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
mount_updates=True, core_mounts=False):

replace = False

if not os.path.exists(os.path.join(FILE_UPDATE_DIRECTORY, service_name)):
os.makedirs(os.path.join(FILE_UPDATE_DIRECTORY, service_name), 0x777)

resources = self.apps_api.list_namespaced_deployment(namespace=self.namespace, _request_timeout=API_TIMEOUT)
for dep in resources.items:
if dep.metadata.name == deployment_name:
Expand Down Expand Up @@ -571,7 +551,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con

pod = V1PodSpec(
volumes=all_volumes,
containers=self._create_containers(service_name, deployment_name, docker_config,
containers=self._create_containers(service_name, deployment_name, docker_config,
all_mounts, core_container=core_mounts),
priority_class_name=self.priority,
termination_grace_period_seconds=shutdown_seconds,
Expand Down Expand Up @@ -683,8 +663,8 @@ def new_events(self):

return new

def start_stateful_container(self, service_name:str, container_name:str,
spec, labels:dict[str, str], mount_updates:bool=False):
def start_stateful_container(self, service_name: str, container_name: str,
spec, labels: dict[str, str], mount_updates: bool = False):
# Setup PVC
deployment_name = self._dependency_name(service_name, container_name)
mounts, volumes = [], []
Expand Down Expand Up @@ -726,7 +706,7 @@ def start_stateful_container(self, service_name:str, container_name:str,
selector=labels,
ports=[V1ServicePort(port=int(_p)) for _p in spec.container.ports]
)
)
)
self.api.create_namespaced_service(self.namespace, service)

# Add entries to the environment variable list to point to this container
Expand Down
49 changes: 29 additions & 20 deletions assemblyline_core/updater/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,38 @@ def _get_proprietary_registry_tags(self, server, image_name, auth, verify):


def get_latest_tag_for_service(service_config, system_config, logger):
def process_image(image):
# Find which server to search in
server = image.split("/")[0]
if server != "cccs":
if ":" in server:
image_name = image[len(server) + 1:]
else:
try:
socket.gethostbyname_ex(server)
image_name = image[len(server) + 1:]
except socket.gaierror:
server = DEFAULT_DOCKER_REGISTRY
image_name = image
else:
server = DEFAULT_DOCKER_REGISTRY
image_name = image

# Split repo name without the tag
image_name = image_name.rsplit(":", 1)[0]

return server, image_name

# Extract info
service_name = service_config.name
image = service_config.docker_config.image
update_channel = service_config.update_channel

# Fix service image
# Fix service image for calling Docker API
image_variables = defaultdict(str)
image_variables.update(system_config.services.image_variables)
image_variables.update(system_config.services.update_image_variables)
image = string.Template(image).safe_substitute(image_variables)
searchable_image = string.Template(image).safe_substitute(image_variables)

# Get authentication
auth = None
Expand All @@ -95,24 +117,7 @@ def get_latest_tag_for_service(service_config, system_config, logger):
upass = f"{service_config.docker_config.registry_username}:{service_config.docker_config.registry_password}"
auth = f"Basic {b64encode(upass.encode()).decode()}"

# Find which server to search in
server = image.split("/")[0]
if server != "cccs":
if ":" in server:
image_name = image[len(server) + 1:]
else:
try:
socket.gethostbyname_ex(server)
image_name = image[len(server) + 1:]
except socket.gaierror:
server = DEFAULT_DOCKER_REGISTRY
image_name = image
else:
server = DEFAULT_DOCKER_REGISTRY
image_name = image

# Split repo name without the tag
image_name = image_name.rsplit(":", 1)[0]
server, image_name = process_image(searchable_image)
registry = REGISTRY_TYPE_MAPPING[service_config.docker_config.registry_type]

if server == DEFAULT_DOCKER_REGISTRY:
Expand All @@ -136,6 +141,10 @@ def get_latest_tag_for_service(service_config, system_config, logger):

logger.info(f"Latest {service_name} tag on {update_channel.upper()} channel is: {tag_name}")

# Fix service image for use in Kubernetes
image = string.Template(image).safe_substitute(system_config.services.image_variables)
server, image_name = process_image(image)

# Append server to image if not the default server
if server != "registry.hub.docker.com":
image_name = "/".join([server, image_name])
Expand Down
126 changes: 63 additions & 63 deletions assemblyline_core/updater/run_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class DockerUpdateInterface:
it properly on new containers we launch. FILE_UPDATE_DIRECTORY gives us the path
that it is mounted at in the update manager container.
"""

def __init__(self, log_level="INFO"):
self.client = docker.from_env()
self._external_network = None
Expand Down Expand Up @@ -324,77 +325,76 @@ def launch(self, name, docker_config: DockerConfig, mounts, env, network, blocki
section = 'core'
if network == 'al_registration':
section = 'service'
labels = {
'app': 'assemblyline',
'section': section,
'component': 'update-script',
}
labels.update(self.extra_labels)

labels = {
'app': 'assemblyline',
'section': section,
'component': 'update-script',
}
labels.update(self.extra_labels)

metadata = V1ObjectMeta(
name=name,
labels=labels
)

environment_variables = [V1EnvVar(name=_e.name, value=_e.value) for _e in docker_config.environment]
environment_variables.extend([V1EnvVar(name=k, value=v) for k, v in env.items()])
environment_variables.append(V1EnvVar(name="LOG_LEVEL", value=self.log_level))

cores = docker_config.cpu_cores
memory = docker_config.ram_mb
memory_min = min(docker_config.ram_mb_min, memory)
metadata = V1ObjectMeta(
name=name,
labels=labels
)

container = V1Container(
name=name,
image=docker_config.image,
command=docker_config.command,
env=environment_variables,
image_pull_policy='Always',
volume_mounts=volume_mounts,
resources=V1ResourceRequirements(
limits={'cpu': cores, 'memory': f'{memory}Mi'},
requests={'cpu': cores / 4, 'memory': f'{memory_min}Mi'},
environment_variables = [V1EnvVar(name=_e.name, value=_e.value) for _e in docker_config.environment]
environment_variables.extend([V1EnvVar(name=k, value=v) for k, v in env.items()])
environment_variables.append(V1EnvVar(name="LOG_LEVEL", value=self.log_level))

cores = docker_config.cpu_cores
memory = docker_config.ram_mb
memory_min = min(docker_config.ram_mb_min, memory)

container = V1Container(
name=name,
image=docker_config.image,
command=docker_config.command,
env=environment_variables,
image_pull_policy='Always',
volume_mounts=volume_mounts,
resources=V1ResourceRequirements(
limits={'cpu': cores, 'memory': f'{memory}Mi'},
requests={'cpu': cores / 4, 'memory': f'{memory_min}Mi'},
)
)
)

pod = V1PodSpec(
volumes=volumes,
restart_policy='Never',
containers=[container],
priority_class_name=self.priority_class,
)
pod = V1PodSpec(
volumes=volumes,
restart_policy='Never',
containers=[container],
priority_class_name=self.priority_class,
)

if use_pull_secret:
pod.image_pull_secrets = [V1LocalObjectReference(name=pull_secret_name)]

job = V1Job(
metadata=metadata,
spec=V1JobSpec(
backoff_limit=1,
completions=1,
template=V1PodTemplateSpec(
metadata=metadata,
spec=pod
if use_pull_secret:
pod.image_pull_secrets = [V1LocalObjectReference(name=pull_secret_name)]

job = V1Job(
metadata=metadata,
spec=V1JobSpec(
backoff_limit=1,
completions=1,
template=V1PodTemplateSpec(
metadata=metadata,
spec=pod
)
)
)
)

status = self.batch_api.create_namespaced_job(namespace=self.namespace, body=job,
_request_timeout=API_TIMEOUT).status
status = self.batch_api.create_namespaced_job(namespace=self.namespace, body=job,
_request_timeout=API_TIMEOUT).status

if blocking:
try:
while not (status.failed or status.succeeded):
time.sleep(3)
status = self.batch_api.read_namespaced_job(namespace=self.namespace, name=name,
_request_timeout=API_TIMEOUT).status
if blocking:
try:
while not (status.failed or status.succeeded):
time.sleep(3)
status = self.batch_api.read_namespaced_job(namespace=self.namespace, name=name,
_request_timeout=API_TIMEOUT).status

self.batch_api.delete_namespaced_job(name=name, namespace=self.namespace,
propagation_policy='Background', _request_timeout=API_TIMEOUT)
except ApiException as error:
if error.status != 404:
raise
self.batch_api.delete_namespaced_job(name=name, namespace=self.namespace,
propagation_policy='Background', _request_timeout=API_TIMEOUT)
except ApiException as error:
if error.status != 404:
raise

def cleanup_stale(self):
# Clear up any finished jobs.
Expand All @@ -410,7 +410,7 @@ def cleanup_stale(self):
except ApiException as error:
if error.status != 404:
raise

def restart(self, service_name):
for _ in range(10):
try:
Expand Down Expand Up @@ -565,7 +565,7 @@ def container_updates(self):
env={
"SERVICE_TAG": update_data['latest_tag'],
"SERVICE_API_HOST": os.environ.get('SERVICE_API_HOST', "http://al_service_server:5003"),
"SERVICE_API_KEY": os.environ.get('SERVICE_API_KEY','ThisIsARandomAuthKey...ChangeMe!'),
"SERVICE_API_KEY": os.environ.get('SERVICE_API_KEY', 'ThisIsARandomAuthKey...ChangeMe!'),
"REGISTER_ONLY": 'true'
},
network='al_registration',
Expand Down

0 comments on commit 92fab0b

Please sign in to comment.