diff --git a/appliance/ui/appliance.html b/appliance/ui/appliance.html
index 1982564..06a4718 100644
--- a/appliance/ui/appliance.html
+++ b/appliance/ui/appliance.html
@@ -107,7 +107,7 @@
row.append($('
', {text: container.type}));
row.append($(' | ', {text: container.state,
style: 'color: ' + getStateColor(container.state) + ';'}));
- row.append($(' | ', {text: container.rack == null?'':container.rack}))
+ row.append($(' | ', {text: container.deployment == null ? '' : container.deployment.cloud}))
var endpoints = [];
for (j in container.endpoints) {
var endpoint = container.endpoints[j];
diff --git a/container/base.py b/container/base.py
index b79f6c5..ddd84b9 100644
--- a/container/base.py
+++ b/container/base.py
@@ -431,8 +431,8 @@ def parse(cls, data):
def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env={},
volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[],
state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True,
- dependencies=[], data=None, rack=None, host=None, last_update=None,
- constraints={}, deployment=None, *aargs, **kwargs):
+ dependencies=[], data=None, cloud=None, host=None, last_update=None,
+ schedule=None, deployment=None, *aargs, **kwargs):
self.__id = id
self.__appliance = appliance
self.__type = type if isinstance(type, ContainerType) else ContainerType(type)
@@ -449,14 +449,14 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
self.__endpoints = [Endpoint(**e) for e in endpoints]
self.__ports = [Port(**p) for p in ports]
self.__state = state if isinstance(state, ContainerState) else ContainerState(state)
- self.__rack = rack
+ self.__cloud = cloud
self.__host = host
self.__is_privileged = is_privileged
self.__force_pull_image = force_pull_image
self.__dependencies = list(dependencies)
self.__data = data and Data(**data)
self.__last_update = parse_datetime(last_update)
- self.__constraints = {k: v and str(v) for k, v in constraints.items()}
+ self.__schedule = Schedule(**(schedule if schedule else {}))
self.__deployment = deployment and Deployment(**deployment)
@property
@@ -636,31 +636,32 @@ def state(self):
@property
@swagger.property
- def rack(self):
+ def cloud(self):
"""
- Placement constraint: the specific rack the container must be placed on
+ Placement constraint: Cloud platform where the container must be placed
---
type: str
nullable: true
example: 'aws'
"""
- return self.__rack
+ return self.__cloud
@property
@swagger.property
def host(self):
"""
- Placement constraint: the specific host (identified by the host's private IP address)
- the container must be placed on
+ Placement constraint: physical host identified by the public IP address where the
+ container must be placed on
---
type: str
nullable: true
- example: 10.52.0.3
+ example: '35.23.5.16'
"""
return self.__host
+
@property
@swagger.property
def is_privileged(self):
@@ -721,11 +722,19 @@ def last_update(self):
return self.__last_update
@property
- def constraints(self):
- return dict(self.__constraints)
+ def schedule(self):
+ return self.__schedule
@property
+ @swagger.property
def deployment(self):
+ """
+ Container deployment info
+ ---
+ type: Deployment
+ read_only: true
+
+ """
return self.__deployment
@image.setter
@@ -750,14 +759,6 @@ def endpoints(self, endpoints):
def state(self, state):
self.__state = state if isinstance(state, ContainerState) else ContainerState(state)
- @rack.setter
- def rack(self, rack):
- self.__rack = rack
-
- @host.setter
- def host(self, host):
- self.__host = host
-
@last_update.setter
def last_update(self, last_update):
self.__last_update = parse_datetime(last_update)
@@ -772,9 +773,6 @@ def add_env(self, **env):
def add_dependency(self, dep):
self.__dependencies.append(dep)
- def add_constraint(self, key, val):
- self.__constraints += (key, val),
-
def to_render(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
image=self.image, resources=self.resources.to_render(),
@@ -785,7 +783,9 @@ def to_render(self):
ports=[p.to_render() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
- data=self.data and self.data.to_render(), rack=self.rack, host=self.host)
+ data=self.data and self.data.to_render(),
+ cloud=self.cloud, host=self.host,
+ deployment=self.deployment and self.deployment.to_render())
def to_save(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
@@ -797,9 +797,10 @@ def to_save(self):
ports=[p.to_save() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
- data=self.data and self.data.to_save(), rack=self.rack, host=self.host,
+ data=self.data and self.data.to_save(),
+ cloud=self.cloud, host=self.host,
last_update=self.last_update and self.last_update.isoformat(),
- constraints=self.constraints,
+ schedule=self.schedule and self.schedule.to_save(),
deployment=self.deployment and self.deployment.to_save())
def __hash__(self):
@@ -815,20 +816,74 @@ def __eq__(self, other):
### Internal data structures ###
################################
+class Schedule:
+
+ def __init__(self, constraints={}):
+ self.__constraints = dict(constraints)
+
+ @property
+ def constraints(self):
+ return dict(self.__constraints)
+
+ def add_constraint(self, key, value):
+ self.__constraints[key] = value
+
+ def to_save(self):
+ return dict(constraints=self.constraints)
+
+
+@swagger.model
class Deployment:
- def __init__(self, ip_addresses=[]):
+ def __init__(self, ip_addresses=[], cloud=None, host=None):
self.__ip_addresses = list(ip_addresses)
+ self.__cloud = cloud
+ self.__host = host
+
+ @property
+ @swagger.property
+ def cloud(self):
+ """
+ Cloud platform where the container is deployed
+ ---
+ type: str
+ read_only: true
+
+ """
+ return self.__cloud
+
+ @property
+ @swagger.property
+ def host(self):
+ """
+ Physical host where the container is deployed
+ ---
+ type: str
+ read_only: true
+
+ """
+ return self.__host
@property
def ip_addresses(self):
return list(self.__ip_addresses)
+ @cloud.setter
+ def cloud(self, cloud):
+ self.__cloud = cloud
+
+ @host.setter
+ def host(self, host):
+ self.__host = host
+
def add_ip_address(self, ip_addr):
self.__ip_addresses.append(ip_addr)
+ def to_render(self):
+ return dict(cloud=self.cloud, host=self.host)
+
def to_save(self):
- return dict(ip_addresses=self.ip_addresses)
+ return dict(ip_addresses=self.ip_addresses, cloud=self.cloud, host=self.host)
short_id_pattern = r'@(%s)'%Container.ID_PATTERN
diff --git a/container/job.py b/container/job.py
index 4cc8c0e..560ca60 100644
--- a/container/job.py
+++ b/container/job.py
@@ -122,11 +122,7 @@ def to_request(self):
value='%d:%d/%s'%(p.host_port, p.container_port, p.protocol))
for p in self.ports]
r['container']['parameters'] = parameters
- if self.rack:
- r.setdefault('constraints', []).append(['cloud', 'EQUALS', self.rack])
- if self.host:
- r.setdefault('constraints', []).append(['hostname', 'EQUALS', self.host])
- for k, v in self.constraints.items():
+ for k, v in self.schedule.constraints.items():
r.setdefault('constraints', []).append([str(k), 'EQUALS', str(v)])
return r
diff --git a/container/manager.py b/container/manager.py
index 472565d..5ad8046 100644
--- a/container/manager.py
+++ b/container/manager.py
@@ -3,11 +3,11 @@
from datetime import timedelta
+from config import config
from commons import MongoClient
from commons import APIManager, Manager
-from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment
from cluster.manager import AgentDBManager
-from config import config
+from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment
class ContainerManager(Manager):
@@ -129,7 +129,6 @@ async def _get_updated_container(self, contr):
if not err:
parsed_srv = await self._parse_service_state(raw_service)
contr.state, contr.endpoints = parsed_srv['state'], parsed_srv['endpoints']
- contr.rack, contr.host = parsed_srv['rack'], parsed_srv['host']
contr.deployment = parsed_srv['deployment']
elif contr.type == ContainerType.JOB:
status, raw_job, err = await self.__job_api.get_job_update(contr)
@@ -158,13 +157,13 @@ async def _parse_service_state(self, body):
else:
state = ContainerState.FAILED
# parse endpoints
- endpoints, deployment, rack, host = [], Deployment(), None, None
+ endpoints, deployment, cloud, host = [], Deployment(), None, None
if state == ContainerState.RUNNING:
for t in tasks:
# parse endpoints
hosts = await self.__cluster_db.find_agents(hostname=t['host'])
if not hosts: continue
- host, rack = hosts[0], hosts[0].attributes.get('cloud')
+ host, cloud = hosts[0], hosts[0].attributes.get('cloud')
public_ip = host.attributes.get('public_ip')
if not public_ip: continue
if 'portDefinitions' in body:
@@ -174,15 +173,14 @@ async def _parse_service_state(self, body):
for i, p in enumerate(body['container']['portMappings']):
endpoints += [Endpoint(public_ip, p['containerPort'], t['ports'][i],
p['protocol'])]
-
# parse virtual IP addresses
for ip in t['ipAddresses']:
if ip['protocol'] != 'IPv4':
continue
deployment.add_ip_address(ip['ipAddress'])
+ deployment.host, deployment.cloud = host.hostname, cloud
_, appliance, id = body['id'].split('/')
return dict(id=id, appliance=appliance, state=state,
- rack=rack, host=host and host.hostname,
endpoints=endpoints, deployment=deployment)
async def _parse_job_state(self, body):
diff --git a/container/service.py b/container/service.py
index fdafb93..490cb39 100644
--- a/container/service.py
+++ b/container/service.py
@@ -282,11 +282,7 @@ def to_request(self):
containerPort=p.container_port)
for i, p in enumerate(self.ports)]
r['container']['docker']['portMappings'] = port_mappings
- if self.rack:
- r.setdefault('constraints', []).append(['cloud', 'CLUSTER', self.rack])
- if self.host:
- r.setdefault('constraints', []).append(['hostname', 'CLUSTER', self.host])
- for k, v in self.constraints.items():
+ for k, v in self.schedule.constraints.items():
r.setdefault('constraints', []).append([str(k), 'CLUSTER', str(v)])
return r
diff --git a/scheduler/appliance/__init__.py b/scheduler/appliance/__init__.py
index 16be6b2..ed126b3 100644
--- a/scheduler/appliance/__init__.py
+++ b/scheduler/appliance/__init__.py
@@ -81,7 +81,14 @@ async def schedule(self, plans):
self.logger.error(err)
contrs = self.dag.get_free_containers()
self.logger.info('Free containers: %s'%[c.id for c in contrs])
- new_plans = [SchedulePlan(c.id, [c]) for c in contrs if c.id not in plans]
+ new_plans = []
+ for c in contrs:
+ if c.id in plans: continue
+ if c.cloud:
+ c.schedule.add_constraint('cloud', c.cloud)
+ if c.host:
+ c.schedule.add_constraint('public_ip', c.host)
+ new_plans += [SchedulePlan(c.id, [c])]
if new_plans:
self.logger.info('New plans: %s'%[p.id for p in new_plans])
return new_plans
diff --git a/scheduler/appliance/plugin/location/__init__.py b/scheduler/appliance/plugin/location/__init__.py
index 79427a2..b59c869 100644
--- a/scheduler/appliance/plugin/location/__init__.py
+++ b/scheduler/appliance/plugin/location/__init__.py
@@ -66,9 +66,9 @@ async def schedule(self, plans):
self.logger.info('\t%s, %s, data size: %d'%(region, cloud, data_size))
agent = max(agents, key=lambda a: regions.get(a.attributes.get('region'), 0))
cloud, region = agent.attributes.get('cloud'), agent.attributes.get('region')
- self.logger.info("Container '%s' will land on %s (%s, %s)"%(c.id, agent.hostname,
- region, cloud))
- c.host = agent.hostname
+ self.logger.info("Container '%s' will land on "
+ "%s (%s, %s)"%(c.id, agent.hostname, region, cloud))
+ c.schedule.add_constraint('hostname', agent.hostname)
else:
self.logger.info("No matched agents have sufficient resources for '%s'"%c)
await self.__contr_db.save_container(c, False)
|