Skip to content

Commit

Permalink
WIP on #37
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Jun 12, 2018
1 parent d72139b commit f069930
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
35 changes: 33 additions & 2 deletions container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[],
state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True,
dependencies=[], data=None, rack=None, host=None, last_update=None,
constraints={}, *aargs, **kwargs):
constraints={}, deployment=None, *aargs, **kwargs):
self.__id = id
self.__appliance = appliance
self.__type = type if isinstance(type, ContainerType) else ContainerType(type)
Expand All @@ -457,6 +457,7 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
self.__data = data and Data(**data)
self.__last_update = parse_datetime(last_update)
self.__constraints = {k: v and str(v) for k, v in constraints.items()}
self.__deployment = deployment and Deployment(**deployment)

@property
@swagger.property
Expand Down Expand Up @@ -723,6 +724,10 @@ def last_update(self):
def constraints(self):
return dict(self.__constraints)

@property
def deployment(self):
return self.__deployment

@image.setter
def image(self, image):
assert isinstance(image, str)
Expand Down Expand Up @@ -757,6 +762,10 @@ def host(self, host):
def last_update(self, last_update):
self.__last_update = parse_datetime(last_update)

@deployment.setter
def deployment(self, deployment):
self.__deployment = deployment

def add_env(self, **env):
self.__env.update(env)

Expand Down Expand Up @@ -790,7 +799,8 @@ def to_save(self):
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
data=self.data and self.data.to_save(), rack=self.rack, host=self.host,
last_update=self.last_update and self.last_update.isoformat(),
constraints=self.constraints)
constraints=self.constraints,
deployment=self.deployment and self.deployment.to_save())

def __hash__(self):
return hash((self.id, self.appliance))
Expand All @@ -801,6 +811,26 @@ def __eq__(self, other):
and self.appliance == other.appliance


################################
### Internal data structures ###
################################

class Deployment:

def __init__(self, ip_addresses=[]):
self.__ip_addresses = list(ip_addresses)

@property
def ip_addresses(self):
return list(self.__ip_addresses)

def add_ip_address(self, ip_addr):
self.__ip_addresses.append(ip_addr)

def to_save(self):
return dict(ip_addresses=self.ip_addresses)


short_id_pattern = r'@(%s)'%Container.ID_PATTERN


Expand All @@ -812,3 +842,4 @@ def parse_container_short_id(p, appliance):
return re.sub(r'([^@]*)%s([^@]*)'%short_id_pattern,
r'\1\2-%s.marathon.containerip.dcos.thisdcos.directory\3'%appliance,
str(p))

28 changes: 21 additions & 7 deletions container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from commons import MotorClient
from commons import APIManager, Manager
from container.base import Container, ContainerType, ContainerState, Endpoint
from container.base import Container, ContainerType, ContainerState, Endpoint, Deployment
from cluster.manager import AgentDBManager
from config import config

Expand Down Expand Up @@ -130,6 +130,7 @@ async def _get_updated_container(self, contr):
parsed_srv = await self._parse_service_state(raw_service)
contr.state, contr.endpoints = parsed_srv['state'], parsed_srv['endpoints']
contr.rack, contr.host = parsed_srv['rack'], parsed_srv['host']
contr.deployment = parsed_srv['deployment']
elif contr.type == ContainerType.JOB:
status, raw_job, err = await self.__job_api.get_job_update(contr)
if not err:
Expand Down Expand Up @@ -157,9 +158,10 @@ async def _parse_service_state(self, body):
else:
state = ContainerState.FAILED
# parse endpoints
endpoints, rack, host = [], None, None
endpoints, deployment, rack, host = [], Deployment(), None, None
if state == ContainerState.RUNNING:
for t in tasks:
# parse endpoints
hosts = await self.__cluster_db.find_agents(hostname=t['host'])
if not hosts: continue
host, rack = hosts[0], hosts[0].attributes.get('cloud')
Expand All @@ -172,9 +174,16 @@ async def _parse_service_state(self, body):
for i, p in enumerate(body['container']['portMappings']):
endpoints += [Endpoint(public_ip, p['containerPort'], t['ports'][i],
p['protocol'])]

# parse virtual IP addresses
for ip in t['ipAddresses']:
if ip['protocol'] != 'IPv4':
continue
deployment.add_ip_address(ip['ipAddress'])
_, appliance, id = body['id'].split('/')
return dict(id=id, appliance=appliance, state=state,
rack=rack, host=host and host.hostname, endpoints=endpoints)
rack=rack, host=host and host.hostname,
endpoints=endpoints, deployment=deployment)

async def _parse_job_state(self, body):
### TO BE IMPORVED: currently the body is the output of the job summary due to
Expand Down Expand Up @@ -264,11 +273,11 @@ class ContainerDBManager(Manager):
def __init__(self):
self.__contr_col = MotorClient().requester.container

async def get_container_by_virtual_ip_address(self, ip_addr):
return await self._get_container(**{'deployment.ip_addresses': ip_addr})

async def get_container(self, app_id, contr_id):
contr = await self.__contr_col.find_one(dict(id=contr_id, appliance=app_id))
if not contr:
return 404, None, "Container '%s' is not found"%contr_id
return Container.parse(contr)
return await self._get_container(id=contr_id, appliance=app_id)

async def get_containers(self, **filters):
return [Container.parse(c)[1] async for c in self.__contr_col.find(filters)]
Expand All @@ -285,3 +294,8 @@ async def delete_containers(self, **filters):
await self.__contr_col.delete_many(filters)
return 200, "Containers matching '%s' have been deleted"%filters, None

async def _get_container(self, **filters):
contr = await self.__contr_col.find_one(filters)
if not contr:
return 404, None, 'Container matches %s is not found'%filters
return Container.parse(contr)

0 comments on commit f069930

Please sign in to comment.