From 1bbe4534b26f633a349ff2171a78632c7fcc1c40 Mon Sep 17 00:00:00 2001 From: dcvan Date: Tue, 12 Jun 2018 14:24:36 -0400 Subject: [PATCH] [#19][#30] separate information for different phases of a container's lifetime to facilitate scheduling --- appliance/ui/appliance.html | 2 +- container/base.py | 111 +++++++++++++----- container/job.py | 6 +- container/manager.py | 12 +- container/service.py | 6 +- scheduler/appliance/__init__.py | 9 +- .../appliance/plugin/location/__init__.py | 6 +- 7 files changed, 102 insertions(+), 50 deletions(-) diff --git a/appliance/ui/appliance.html b/appliance/ui/appliance.html index 1982564..06a4718 100644 --- a/appliance/ui/appliance.html +++ b/appliance/ui/appliance.html @@ -107,7 +107,7 @@ row.append($('', {text: container.type})); row.append($('', {text: container.state, style: 'color: ' + getStateColor(container.state) + ';'})); - row.append($('', {text: container.rack == null?'':container.rack})) + row.append($('', {text: container.deployment == null ? '' : container.deployment.cloud})) var endpoints = []; for (j in container.endpoints) { var endpoint = container.endpoints[j]; diff --git a/container/base.py b/container/base.py index b79f6c5..ddd84b9 100644 --- a/container/base.py +++ b/container/base.py @@ -431,8 +431,8 @@ def parse(cls, data): def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env={}, volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[], state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True, - dependencies=[], data=None, rack=None, host=None, last_update=None, - constraints={}, deployment=None, *aargs, **kwargs): + dependencies=[], data=None, cloud=None, host=None, last_update=None, + schedule=None, deployment=None, *aargs, **kwargs): self.__id = id self.__appliance = appliance self.__type = type if isinstance(type, ContainerType) else ContainerType(type) @@ -449,14 +449,14 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env self.__endpoints = [Endpoint(**e) for e in endpoints] self.__ports = [Port(**p) for p in ports] self.__state = state if isinstance(state, ContainerState) else ContainerState(state) - self.__rack = rack + self.__cloud = cloud self.__host = host self.__is_privileged = is_privileged self.__force_pull_image = force_pull_image self.__dependencies = list(dependencies) self.__data = data and Data(**data) self.__last_update = parse_datetime(last_update) - self.__constraints = {k: v and str(v) for k, v in constraints.items()} + self.__schedule = Schedule(**(schedule if schedule else {})) self.__deployment = deployment and Deployment(**deployment) @property @@ -636,31 +636,32 @@ def state(self): @property @swagger.property - def rack(self): + def cloud(self): """ - Placement constraint: the specific rack the container must be placed on + Placement constraint: Cloud platform where the container must be placed --- type: str nullable: true example: 'aws' """ - return self.__rack + return self.__cloud @property @swagger.property def host(self): """ - Placement constraint: the specific host (identified by the host's private IP address) - the container must be placed on + Placement constraint: physical host identified by the public IP address where the + container must be placed on --- type: str nullable: true - example: 10.52.0.3 + example: '35.23.5.16' """ return self.__host + @property @swagger.property def is_privileged(self): @@ -721,11 +722,19 @@ def last_update(self): return self.__last_update @property - def constraints(self): - return dict(self.__constraints) + def schedule(self): + return self.__schedule @property + @swagger.property def deployment(self): + """ + Container deployment info + --- + type: Deployment + read_only: true + + """ return self.__deployment @image.setter @@ -750,14 +759,6 @@ def endpoints(self, endpoints): def state(self, state): self.__state = state if isinstance(state, ContainerState) else ContainerState(state) - @rack.setter - def rack(self, rack): - self.__rack = rack - - @host.setter - def host(self, host): - self.__host = host - @last_update.setter def last_update(self, last_update): self.__last_update = parse_datetime(last_update) @@ -772,9 +773,6 @@ def add_env(self, **env): def add_dependency(self, dep): self.__dependencies.append(dep) - def add_constraint(self, key, val): - self.__constraints += (key, val), - def to_render(self): return dict(id=self.id, appliance=self.appliance, type=self.type.value, image=self.image, resources=self.resources.to_render(), @@ -785,7 +783,9 @@ def to_render(self): ports=[p.to_render() for p in self.ports], state=self.state.value, is_privileged=self.is_privileged, force_pull_image=self.force_pull_image, dependencies=self.dependencies, - data=self.data and self.data.to_render(), rack=self.rack, host=self.host) + data=self.data and self.data.to_render(), + cloud=self.cloud, host=self.host, + deployment=self.deployment and self.deployment.to_render()) def to_save(self): return dict(id=self.id, appliance=self.appliance, type=self.type.value, @@ -797,9 +797,10 @@ def to_save(self): ports=[p.to_save() for p in self.ports], state=self.state.value, is_privileged=self.is_privileged, force_pull_image=self.force_pull_image, dependencies=self.dependencies, - data=self.data and self.data.to_save(), rack=self.rack, host=self.host, + data=self.data and self.data.to_save(), + cloud=self.cloud, host=self.host, last_update=self.last_update and self.last_update.isoformat(), - constraints=self.constraints, + schedule=self.schedule and self.schedule.to_save(), deployment=self.deployment and self.deployment.to_save()) def __hash__(self): @@ -815,20 +816,74 @@ def __eq__(self, other): ### Internal data structures ### ################################ +class Schedule: + + def __init__(self, constraints={}): + self.__constraints = dict(constraints) + + @property + def constraints(self): + return dict(self.__constraints) + + def add_constraint(self, key, value): + self.__constraints[key] = value + + def to_save(self): + return dict(constraints=self.constraints) + + +@swagger.model class Deployment: - def __init__(self, ip_addresses=[]): + def __init__(self, ip_addresses=[], cloud=None, host=None): self.__ip_addresses = list(ip_addresses) + self.__cloud = cloud + self.__host = host + + @property + @swagger.property + def cloud(self): + """ + Cloud platform where the container is deployed + --- + type: str + read_only: true + + """ + return self.__cloud + + @property + @swagger.property + def host(self): + """ + Physical host where the container is deployed + --- + type: str + read_only: true + + """ + return self.__host @property def ip_addresses(self): return list(self.__ip_addresses) + @cloud.setter + def cloud(self, cloud): + self.__cloud = cloud + + @host.setter + def host(self, host): + self.__host = host + def add_ip_address(self, ip_addr): self.__ip_addresses.append(ip_addr) + def to_render(self): + return dict(cloud=self.cloud, host=self.host) + def to_save(self): - return dict(ip_addresses=self.ip_addresses) + return dict(ip_addresses=self.ip_addresses, cloud=self.cloud, host=self.host) short_id_pattern = r'@(%s)'%Container.ID_PATTERN diff --git a/container/job.py b/container/job.py index 4cc8c0e..560ca60 100644 --- a/container/job.py +++ b/container/job.py @@ -122,11 +122,7 @@ def to_request(self): value='%d:%d/%s'%(p.host_port, p.container_port, p.protocol)) for p in self.ports] r['container']['parameters'] = parameters - if self.rack: - r.setdefault('constraints', []).append(['cloud', 'EQUALS', self.rack]) - if self.host: - r.setdefault('constraints', []).append(['hostname', 'EQUALS', self.host]) - for k, v in self.constraints.items(): + for k, v in self.schedule.constraints.items(): r.setdefault('constraints', []).append([str(k), 'EQUALS', str(v)]) return r diff --git a/container/manager.py b/container/manager.py index 472565d..5ad8046 100644 --- a/container/manager.py +++ b/container/manager.py @@ -3,11 +3,11 @@ from datetime import timedelta +from config import config from commons import MongoClient from commons import APIManager, Manager -from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment from cluster.manager import AgentDBManager -from config import config +from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment class ContainerManager(Manager): @@ -129,7 +129,6 @@ async def _get_updated_container(self, contr): if not err: parsed_srv = await self._parse_service_state(raw_service) contr.state, contr.endpoints = parsed_srv['state'], parsed_srv['endpoints'] - contr.rack, contr.host = parsed_srv['rack'], parsed_srv['host'] contr.deployment = parsed_srv['deployment'] elif contr.type == ContainerType.JOB: status, raw_job, err = await self.__job_api.get_job_update(contr) @@ -158,13 +157,13 @@ async def _parse_service_state(self, body): else: state = ContainerState.FAILED # parse endpoints - endpoints, deployment, rack, host = [], Deployment(), None, None + endpoints, deployment, cloud, host = [], Deployment(), None, None if state == ContainerState.RUNNING: for t in tasks: # parse endpoints hosts = await self.__cluster_db.find_agents(hostname=t['host']) if not hosts: continue - host, rack = hosts[0], hosts[0].attributes.get('cloud') + host, cloud = hosts[0], hosts[0].attributes.get('cloud') public_ip = host.attributes.get('public_ip') if not public_ip: continue if 'portDefinitions' in body: @@ -174,15 +173,14 @@ async def _parse_service_state(self, body): for i, p in enumerate(body['container']['portMappings']): endpoints += [Endpoint(public_ip, p['containerPort'], t['ports'][i], p['protocol'])] - # parse virtual IP addresses for ip in t['ipAddresses']: if ip['protocol'] != 'IPv4': continue deployment.add_ip_address(ip['ipAddress']) + deployment.host, deployment.cloud = host.hostname, cloud _, appliance, id = body['id'].split('/') return dict(id=id, appliance=appliance, state=state, - rack=rack, host=host and host.hostname, endpoints=endpoints, deployment=deployment) async def _parse_job_state(self, body): diff --git a/container/service.py b/container/service.py index fdafb93..490cb39 100644 --- a/container/service.py +++ b/container/service.py @@ -282,11 +282,7 @@ def to_request(self): containerPort=p.container_port) for i, p in enumerate(self.ports)] r['container']['docker']['portMappings'] = port_mappings - if self.rack: - r.setdefault('constraints', []).append(['cloud', 'CLUSTER', self.rack]) - if self.host: - r.setdefault('constraints', []).append(['hostname', 'CLUSTER', self.host]) - for k, v in self.constraints.items(): + for k, v in self.schedule.constraints.items(): r.setdefault('constraints', []).append([str(k), 'CLUSTER', str(v)]) return r diff --git a/scheduler/appliance/__init__.py b/scheduler/appliance/__init__.py index 16be6b2..ed126b3 100644 --- a/scheduler/appliance/__init__.py +++ b/scheduler/appliance/__init__.py @@ -81,7 +81,14 @@ async def schedule(self, plans): self.logger.error(err) contrs = self.dag.get_free_containers() self.logger.info('Free containers: %s'%[c.id for c in contrs]) - new_plans = [SchedulePlan(c.id, [c]) for c in contrs if c.id not in plans] + new_plans = [] + for c in contrs: + if c.id in plans: continue + if c.cloud: + c.schedule.add_constraint('cloud', c.cloud) + if c.host: + c.schedule.add_constraint('public_ip', c.host) + new_plans += [SchedulePlan(c.id, [c])] if new_plans: self.logger.info('New plans: %s'%[p.id for p in new_plans]) return new_plans diff --git a/scheduler/appliance/plugin/location/__init__.py b/scheduler/appliance/plugin/location/__init__.py index 79427a2..b59c869 100644 --- a/scheduler/appliance/plugin/location/__init__.py +++ b/scheduler/appliance/plugin/location/__init__.py @@ -66,9 +66,9 @@ async def schedule(self, plans): self.logger.info('\t%s, %s, data size: %d'%(region, cloud, data_size)) agent = max(agents, key=lambda a: regions.get(a.attributes.get('region'), 0)) cloud, region = agent.attributes.get('cloud'), agent.attributes.get('region') - self.logger.info("Container '%s' will land on %s (%s, %s)"%(c.id, agent.hostname, - region, cloud)) - c.host = agent.hostname + self.logger.info("Container '%s' will land on " + "%s (%s, %s)"%(c.id, agent.hostname, region, cloud)) + c.schedule.add_constraint('hostname', agent.hostname) else: self.logger.info("No matched agents have sufficient resources for '%s'"%c) await self.__contr_db.save_container(c, False)