Skip to content

Commit

Permalink
[#19][#30] separate information for different phases of a container's…
Browse files Browse the repository at this point in the history
… lifetime to facilitate scheduling
  • Loading branch information
dcvan24 committed Jun 12, 2018
1 parent 6e53f5a commit 1bbe453
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 50 deletions.
2 changes: 1 addition & 1 deletion appliance/ui/appliance.html
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
row.append($('<td>', {text: container.type}));
row.append($('<td>', {text: container.state,
style: 'color: ' + getStateColor(container.state) + ';'}));
row.append($('<td>', {text: container.rack == null?'':container.rack}))
row.append($('<td>', {text: container.deployment == null ? '' : container.deployment.cloud}))
var endpoints = [];
for (j in container.endpoints) {
var endpoint = container.endpoints[j];
Expand Down
111 changes: 83 additions & 28 deletions container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ def parse(cls, data):
def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env={},
volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[],
state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True,
dependencies=[], data=None, rack=None, host=None, last_update=None,
constraints={}, deployment=None, *aargs, **kwargs):
dependencies=[], data=None, cloud=None, host=None, last_update=None,
schedule=None, deployment=None, *aargs, **kwargs):
self.__id = id
self.__appliance = appliance
self.__type = type if isinstance(type, ContainerType) else ContainerType(type)
Expand All @@ -449,14 +449,14 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
self.__endpoints = [Endpoint(**e) for e in endpoints]
self.__ports = [Port(**p) for p in ports]
self.__state = state if isinstance(state, ContainerState) else ContainerState(state)
self.__rack = rack
self.__cloud = cloud
self.__host = host
self.__is_privileged = is_privileged
self.__force_pull_image = force_pull_image
self.__dependencies = list(dependencies)
self.__data = data and Data(**data)
self.__last_update = parse_datetime(last_update)
self.__constraints = {k: v and str(v) for k, v in constraints.items()}
self.__schedule = Schedule(**(schedule if schedule else {}))
self.__deployment = deployment and Deployment(**deployment)

@property
Expand Down Expand Up @@ -636,31 +636,32 @@ def state(self):

@property
@swagger.property
def rack(self):
def cloud(self):
"""
Placement constraint: the specific rack the container must be placed on
Placement constraint: Cloud platform where the container must be placed
---
type: str
nullable: true
example: 'aws'
"""
return self.__rack
return self.__cloud

@property
@swagger.property
def host(self):
"""
Placement constraint: the specific host (identified by the host's private IP address)
the container must be placed on
Placement constraint: physical host identified by the public IP address where the
container must be placed on
---
type: str
nullable: true
example: 10.52.0.3
example: '35.23.5.16'
"""
return self.__host


@property
@swagger.property
def is_privileged(self):
Expand Down Expand Up @@ -721,11 +722,19 @@ def last_update(self):
return self.__last_update

@property
def constraints(self):
return dict(self.__constraints)
def schedule(self):
return self.__schedule

@property
@swagger.property
def deployment(self):
"""
Container deployment info
---
type: Deployment
read_only: true
"""
return self.__deployment

@image.setter
Expand All @@ -750,14 +759,6 @@ def endpoints(self, endpoints):
def state(self, state):
self.__state = state if isinstance(state, ContainerState) else ContainerState(state)

@rack.setter
def rack(self, rack):
self.__rack = rack

@host.setter
def host(self, host):
self.__host = host

@last_update.setter
def last_update(self, last_update):
self.__last_update = parse_datetime(last_update)
Expand All @@ -772,9 +773,6 @@ def add_env(self, **env):
def add_dependency(self, dep):
self.__dependencies.append(dep)

def add_constraint(self, key, val):
self.__constraints += (key, val),

def to_render(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
image=self.image, resources=self.resources.to_render(),
Expand All @@ -785,7 +783,9 @@ def to_render(self):
ports=[p.to_render() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
data=self.data and self.data.to_render(), rack=self.rack, host=self.host)
data=self.data and self.data.to_render(),
cloud=self.cloud, host=self.host,
deployment=self.deployment and self.deployment.to_render())

def to_save(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
Expand All @@ -797,9 +797,10 @@ def to_save(self):
ports=[p.to_save() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
data=self.data and self.data.to_save(), rack=self.rack, host=self.host,
data=self.data and self.data.to_save(),
cloud=self.cloud, host=self.host,
last_update=self.last_update and self.last_update.isoformat(),
constraints=self.constraints,
schedule=self.schedule and self.schedule.to_save(),
deployment=self.deployment and self.deployment.to_save())

def __hash__(self):
Expand All @@ -815,20 +816,74 @@ def __eq__(self, other):
### Internal data structures ###
################################

class Schedule:

def __init__(self, constraints={}):
self.__constraints = dict(constraints)

@property
def constraints(self):
return dict(self.__constraints)

def add_constraint(self, key, value):
self.__constraints[key] = value

def to_save(self):
return dict(constraints=self.constraints)


@swagger.model
class Deployment:

def __init__(self, ip_addresses=[]):
def __init__(self, ip_addresses=[], cloud=None, host=None):
self.__ip_addresses = list(ip_addresses)
self.__cloud = cloud
self.__host = host

@property
@swagger.property
def cloud(self):
"""
Cloud platform where the container is deployed
---
type: str
read_only: true
"""
return self.__cloud

@property
@swagger.property
def host(self):
"""
Physical host where the container is deployed
---
type: str
read_only: true
"""
return self.__host

@property
def ip_addresses(self):
return list(self.__ip_addresses)

@cloud.setter
def cloud(self, cloud):
self.__cloud = cloud

@host.setter
def host(self, host):
self.__host = host

def add_ip_address(self, ip_addr):
self.__ip_addresses.append(ip_addr)

def to_render(self):
return dict(cloud=self.cloud, host=self.host)

def to_save(self):
return dict(ip_addresses=self.ip_addresses)
return dict(ip_addresses=self.ip_addresses, cloud=self.cloud, host=self.host)


short_id_pattern = r'@(%s)'%Container.ID_PATTERN
Expand Down
6 changes: 1 addition & 5 deletions container/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ def to_request(self):
value='%d:%d/%s'%(p.host_port, p.container_port, p.protocol))
for p in self.ports]
r['container']['parameters'] = parameters
if self.rack:
r.setdefault('constraints', []).append(['cloud', 'EQUALS', self.rack])
if self.host:
r.setdefault('constraints', []).append(['hostname', 'EQUALS', self.host])
for k, v in self.constraints.items():
for k, v in self.schedule.constraints.items():
r.setdefault('constraints', []).append([str(k), 'EQUALS', str(v)])
return r

Expand Down
12 changes: 5 additions & 7 deletions container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from datetime import timedelta

from config import config
from commons import MongoClient
from commons import APIManager, Manager
from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment
from cluster.manager import AgentDBManager
from config import config
from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment


class ContainerManager(Manager):
Expand Down Expand Up @@ -129,7 +129,6 @@ async def _get_updated_container(self, contr):
if not err:
parsed_srv = await self._parse_service_state(raw_service)
contr.state, contr.endpoints = parsed_srv['state'], parsed_srv['endpoints']
contr.rack, contr.host = parsed_srv['rack'], parsed_srv['host']
contr.deployment = parsed_srv['deployment']
elif contr.type == ContainerType.JOB:
status, raw_job, err = await self.__job_api.get_job_update(contr)
Expand Down Expand Up @@ -158,13 +157,13 @@ async def _parse_service_state(self, body):
else:
state = ContainerState.FAILED
# parse endpoints
endpoints, deployment, rack, host = [], Deployment(), None, None
endpoints, deployment, cloud, host = [], Deployment(), None, None
if state == ContainerState.RUNNING:
for t in tasks:
# parse endpoints
hosts = await self.__cluster_db.find_agents(hostname=t['host'])
if not hosts: continue
host, rack = hosts[0], hosts[0].attributes.get('cloud')
host, cloud = hosts[0], hosts[0].attributes.get('cloud')
public_ip = host.attributes.get('public_ip')
if not public_ip: continue
if 'portDefinitions' in body:
Expand All @@ -174,15 +173,14 @@ async def _parse_service_state(self, body):
for i, p in enumerate(body['container']['portMappings']):
endpoints += [Endpoint(public_ip, p['containerPort'], t['ports'][i],
p['protocol'])]

# parse virtual IP addresses
for ip in t['ipAddresses']:
if ip['protocol'] != 'IPv4':
continue
deployment.add_ip_address(ip['ipAddress'])
deployment.host, deployment.cloud = host.hostname, cloud
_, appliance, id = body['id'].split('/')
return dict(id=id, appliance=appliance, state=state,
rack=rack, host=host and host.hostname,
endpoints=endpoints, deployment=deployment)

async def _parse_job_state(self, body):
Expand Down
6 changes: 1 addition & 5 deletions container/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,7 @@ def to_request(self):
containerPort=p.container_port)
for i, p in enumerate(self.ports)]
r['container']['docker']['portMappings'] = port_mappings
if self.rack:
r.setdefault('constraints', []).append(['cloud', 'CLUSTER', self.rack])
if self.host:
r.setdefault('constraints', []).append(['hostname', 'CLUSTER', self.host])
for k, v in self.constraints.items():
for k, v in self.schedule.constraints.items():
r.setdefault('constraints', []).append([str(k), 'CLUSTER', str(v)])
return r

Expand Down
9 changes: 8 additions & 1 deletion scheduler/appliance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ async def schedule(self, plans):
self.logger.error(err)
contrs = self.dag.get_free_containers()
self.logger.info('Free containers: %s'%[c.id for c in contrs])
new_plans = [SchedulePlan(c.id, [c]) for c in contrs if c.id not in plans]
new_plans = []
for c in contrs:
if c.id in plans: continue
if c.cloud:
c.schedule.add_constraint('cloud', c.cloud)
if c.host:
c.schedule.add_constraint('public_ip', c.host)
new_plans += [SchedulePlan(c.id, [c])]
if new_plans:
self.logger.info('New plans: %s'%[p.id for p in new_plans])
return new_plans
Expand Down
6 changes: 3 additions & 3 deletions scheduler/appliance/plugin/location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ async def schedule(self, plans):
self.logger.info('\t%s, %s, data size: %d'%(region, cloud, data_size))
agent = max(agents, key=lambda a: regions.get(a.attributes.get('region'), 0))
cloud, region = agent.attributes.get('cloud'), agent.attributes.get('region')
self.logger.info("Container '%s' will land on %s (%s, %s)"%(c.id, agent.hostname,
region, cloud))
c.host = agent.hostname
self.logger.info("Container '%s' will land on "
"%s (%s, %s)"%(c.id, agent.hostname, region, cloud))
c.schedule.add_constraint('hostname', agent.hostname)
else:
self.logger.info("No matched agents have sufficient resources for '%s'"%c)
await self.__contr_db.save_container(c, False)
Expand Down

0 comments on commit 1bbe453

Please sign in to comment.