Skip to content

Commit

Permalink
[#46] finished the basic functions of global persistent volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Nov 29, 2018
1 parent 744d2f1 commit 5b6d097
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 39 deletions.
32 changes: 24 additions & 8 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib

import volume
import schedule

from tornado.gen import multi
Expand Down Expand Up @@ -37,11 +38,20 @@ async def get_appliance(self, app_id):

async def create_appliance(self, data):

vol_mgr = self.__vol_mgr

def validate_volume_mounts(app, vols_existed, vols_declared):
all_vols = set(vols_existed) | set(vols_declared)
vols_to_mount = set([pv.src for c in app.containers for pv in c.persistent_volumes])
return list(vols_to_mount - all_vols)

def set_container_volume_scope(contrs, vols):
vols = {v.id: v for v in vols}
for c in contrs:
for pv in c.persistent_volumes:
if pv.src in vols:
pv.scope = vols[pv.src].scope

status, app, _ = await self.get_appliance(data['id'])
if status == 200 and len(app.containers) > 0:
return 409, None, "Appliance '%s' already exists"%data['id']
Expand All @@ -53,21 +63,25 @@ def validate_volume_mounts(app, vols_existed, vols_declared):
# create persistent volumes if any
dp = app.data_persistence
if dp:
resps = await multi([self.__vol_mgr.get_volume(app.id, v.id) for v in dp.volumes])
resps = await multi([vol_mgr.get_volume(app.id if v.scope == volume.VolumeScope.LOCAL
else None, v.id)
for v in dp.volumes])
vols_existed = set([v.id for status, v, _ in resps if status == 200])
vols_declared = set([v.id for v in dp.volumes])
invalid_vols = validate_volume_mounts(app, vols_existed, vols_declared)
if len(invalid_vols) > 0:
await self._clean_up_incomplete_appliance(app.id)
return 400, None, 'Invalid persistent volume(s): %s'%invalid_vols
if len(vols_existed) < len(dp.volumes):
resps = await multi([self.__vol_mgr.create_volume(v.to_save()) for v in dp.volumes
resps = await multi([vol_mgr.create_volume(v.to_save())
for v in dp.volumes
if v.id not in vols_existed])
for status, _, err in resps:
for status, v, err in resps:
if status != 201:
self.logger.error(err)
await self._clean_up_incomplete_appliance(app.id)
return status, None, err
set_container_volume_scope(app.containers, dp.volumes)

# create containers
resps = await multi([self.__contr_mgr.create_container(c.to_save()) for c in app.containers])
Expand All @@ -88,6 +102,7 @@ def validate_volume_mounts(app, vols_existed, vols_declared):
return 201, app, None

async def delete_appliance(self, app_id, erase_data=False):
vol_mgr = self.__vol_mgr
self.logger.info('Erase data?: %s'%erase_data)
status, app, err = await self.get_appliance(app_id)
if status != 200:
Expand All @@ -102,12 +117,13 @@ async def delete_appliance(self, app_id, erase_data=False):
return 207, None, "Failed to deprovision jobs of appliance '%s'"%app_id
self.logger.info(msg)

# deprovision/delete persistent volumes if any
# deprovision/delete local persistent volumes if any
if app.data_persistence:
_, vols, _ = await self.__vol_mgr.get_volumes(appliance=app_id)
resps = await multi([(self.__vol_mgr.delete_volume(app_id, v.id)
if erase_data else self.__vol_mgr.deprovision_volume(v))
for v in vols])
_, vols, _ = await vol_mgr.get_volumes(appliance=app_id)
resps = await multi([(vol_mgr.erase_volume(app_id, v.id)
if erase_data else vol_mgr.deprovision_volume(v))
for v in vols
if v.scope == volume.VolumeScope.LOCAL])
for i, (status, _, err) in enumerate(resps):
if status != 200:
self.logger.error(err)
Expand Down
16 changes: 14 additions & 2 deletions container/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import swagger

import appliance
import volume
import schedule

from enum import Enum
Expand Down Expand Up @@ -92,10 +93,12 @@ class ContainerVolume:
"""

def __init__(self, src, dest, type=ContainerVolumeType.PERSISTENT, *args, **kwargs):
def __init__(self, src, dest, type=ContainerVolumeType.PERSISTENT, scope=volume.VolumeScope.LOCAL,
*args, **kwargs):
self.__src = src
self.__dest = dest
self.__type = type if isinstance(type, ContainerVolumeType) else ContainerVolumeType(type.upper())
self.__scope = scope if isinstance(scope, volume.VolumeScope) else volume.VolumeScope(scope.upper())

@property
@swagger.property
Expand Down Expand Up @@ -135,11 +138,20 @@ def type(self):
"""
return self.__type

@property
def scope(self):
return self.__scope

@scope.setter
def scope(self, scope):
assert isinstance(scope, volume.VolumeScope)
self.__scope = scope

def to_render(self):
return dict(src=self.src, dest=self.dest, type=self.type.value)

def to_save(self):
return self.to_render()
return dict(**self.to_render(), scope=self.__scope.value)


@swagger.model
Expand Down
24 changes: 5 additions & 19 deletions container/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import swagger

from container import Container, NetworkMode, ContainerVolumeType, parse_container_short_id
from volume import VolumeScope


@swagger.model
Expand Down Expand Up @@ -159,11 +160,10 @@ class Service(Container):
"""

def __init__(self, instances=1, labels={}, health_check=None, default_health_check=False,
def __init__(self, instances=1, health_check=None, default_health_check=False,
minimum_capacity=0, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
self.__instances = instances
self.__labels = dict(labels)
self.__health_check = health_check and HealthCheck(**health_check)
self.__default_health_check = default_health_check
self.__minimum_capacity = minimum_capacity
Expand All @@ -181,19 +181,6 @@ def instances(self):
"""
return self.__instances

@property
@swagger.property
def labels(self):
"""
Key-value label(s) assigned to the service for facilitating service discovery
---
type: dict
default: {}
example:
region: us-east1
"""
return dict(self.__labels)

@property
@swagger.property
def health_check(self):
Expand Down Expand Up @@ -242,7 +229,6 @@ def health_check(self, hc):
def to_render(self):
return dict(**super(Service, self).to_render(),
instances=self.instances,
labels=self.labels,
health_check=self.health_check.to_render() if self.health_check else None,
default_health_check=self.default_health_check,
minimum_capacity=self.minimum_capacity)
Expand All @@ -251,7 +237,6 @@ def to_save(self):
self._add_default_health_check()
return dict(**super(Service, self).to_save(),
instances=self.instances,
labels=self.labels,
health_check=self.health_check.to_save() if self.health_check else None,
default_health_check=self.default_health_check,
minimum_capacity=self.minimum_capacity)
Expand All @@ -274,7 +259,9 @@ def get_persistent_volumes():
return params
params += [dict(key='volume-driver',
value=self.appliance.data_persistence.volume_type.driver)]
params += [dict(key='volume', value='%s-%s:%s'%(self.appliance.id, v.src, v.dest))
params += [dict(key='volume',
value=('%s-%s:%s'%(self.appliance.id, v.src, v.dest)
if v.scope == VolumeScope.LOCAL else '%s:%s'%(v.src, v.dest)))
for v in self.persistent_volumes]
return params

Expand All @@ -295,7 +282,6 @@ def merge_env():
r = dict(id=str(self), instances=self.instances,
**self.resources.to_request(),
env=merge_env(),
labels=self.labels,
requirePorts=len(self.ports) > 0,
acceptedResourceRoles=["slave_public", "*"],
container=dict(type='DOCKER',
Expand Down
36 changes: 32 additions & 4 deletions volume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
@swagger.enum
class PersistentVolumeType(Enum):
"""
Volume type
Persistent volume type
"""
CEPHFS = 'cephfs', 'heliumdatacommons/cephfs'
Expand Down Expand Up @@ -105,6 +105,17 @@ def __init__(self, *args, **kwargs):
super(VolumeScheduleHints, self).__init__(*args, **kwargs)


@swagger.enum
class VolumeScope(Enum):
"""
Persistent volume scope
"""

GLOBAL = 'GLOBAL'
LOCAL = 'LOCAL'


@swagger.model
class PersistentVolume:
"""
Expand All @@ -123,6 +134,8 @@ def parse(cls, data, from_user=True):
if missing:
return 400, None, "Missing required field(s) of persistence volume: %s"%missing
if from_user:
for f in ('deployment', ):
data.pop('deployment', None)
sched_hints = data.pop('schedule_hints', None)
if sched_hints:
status, sched_hints, err = VolumeScheduleHints.parse(sched_hints, from_user)
Expand All @@ -139,10 +152,11 @@ def parse(cls, data, from_user=True):
return 200, PersistentVolume(**data), None

def __init__(self, id, appliance, type, is_instantiated=False,
user_schedule_hints=None, sys_schedule_hints=None, deployment=None,
*args, **kwargs):
scope=VolumeScope.LOCAL, user_schedule_hints=None, sys_schedule_hints=None,
deployment=None, *args, **kwargs):
self.__id = id
self.__appliance = appliance
self.__scope = scope if isinstance(scope, VolumeScope) else VolumeScope(scope.upper())
self.__type = type if isinstance(type, PersistentVolumeType) else PersistentVolumeType(type)
self.__is_instantiated = is_instantiated

Expand Down Expand Up @@ -206,6 +220,17 @@ def is_instantiated(self):
"""
return self.__is_instantiated

@property
@swagger.property
def scope(self):
"""
Persistent volume scope
---
type: PersistentVolumeScope
"""
return self.__scope

@property
@swagger.property
def type(self):
Expand Down Expand Up @@ -281,6 +306,7 @@ def to_render(self):
return dict(id=self.id,
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id,
type=self.type.value, is_instantiated=self.is_instantiated,
scope=self.scope.value,
user_schedule_hints=self.user_schedule_hints.to_render(),
sys_schedule_hints=self.sys_schedule_hints.to_render(),
deployment=self.deployment.to_render())
Expand All @@ -289,12 +315,14 @@ def to_save(self):
return dict(id=self.id,
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id,
type=self.type.value, is_instantiated=self.is_instantiated,
scope=self.scope.value,
user_schedule_hints=self.user_schedule_hints.to_save(),
sys_schedule_hints=self.sys_schedule_hints.to_save(),
deployment=self.deployment.to_render())

def to_request(self):
req = dict(name='%s-%s'%(self.appliance, self.id))
req = dict(name=('%s-%s'%(self.appliance, self.id)
if self.scope == VolumeScope.LOCAL else str(self.id)))
sched_hints = self.sys_schedule_hints.placement
if sched_hints.host:
req['placement'] = dict(type='host', value=sched_hints.host)
Expand Down
2 changes: 1 addition & 1 deletion volume/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def delete(self, app_id, vol_id):
schema: Error
"""
status, msg, err = await self.__vol_mgr.delete_volume(app_id, vol_id)
status, msg, err = await self.__vol_mgr.erase_volume(app_id, vol_id)
self.set_status(status)
self.write(json_encode(message(msg) if status == 200 else error(err)))

Expand Down
16 changes: 11 additions & 5 deletions volume/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from config import config
from commons import MongoClient
from commons import APIManager, Manager
from volume import PersistentVolume, VolumeDeployment
from volume import PersistentVolume, VolumeDeployment, VolumeScope
from locality import Placement


Expand All @@ -24,7 +24,8 @@ async def create_volume(self, data):
status, vol, err = PersistentVolume.parse(data)
if status != 200:
return status, vol, err
status, _, _ = await self.__vol_db.get_volume(vol.appliance, vol.id)
app_id = vol.appliance if vol.scope == VolumeScope.LOCAL else None,
status, _, _ = await self.__vol_db.get_volume(app_id, vol.id)
if status == 200:
return 409, None, "Volume '%s' already exists"%vol.id
await self.__vol_db.save_volume(vol)
Expand Down Expand Up @@ -61,7 +62,7 @@ async def deprovision_volume(self, vol):
await self.__vol_db.save_volume(vol)
return status, "Persistent volume '%s' has been deprovisioned"%vol.id, None

async def delete_volume(self, app_id, vol_id):
async def erase_volume(self, app_id, vol_id):
status, vol, err = await self.get_volume(app_id, vol_id, full_blown=True)
if status == 404:
return status, vol, err
Expand Down Expand Up @@ -93,7 +94,9 @@ async def get_volume(self, app_id, vol_id, full_blown=False):

async def get_volumes(self, full_blown=False, **filters):
vols = await self.__vol_db.get_volumes(**filters)
resps = await multi([self.__vol_api.get_volume(v.app, v.id) for v in vols])
resps = await multi([self.__vol_api.get_volume(v.appliance
if v.scope == VolumeScope.LOCAL else None, v.id)
for v in vols])
for i, (status, output, err) in enumerate(resps):
if status != 200:
self.logger.error(err)
Expand All @@ -114,7 +117,8 @@ def __init__(self):

async def get_volume(self, app_id, vol_id):
api = config.ceph
status, vol, err = await self.http_cli.get(api.host, api.port, '/fs/%s-%s'%(app_id, vol_id))
endpoint = '/fs/%s-%s'%(app_id, vol_id) if app_id else '/fs/%s'%vol_id
status, vol, err = await self.http_cli.get(api.host, api.port, endpoint)
if status != 200:
return status, None, err
return status, vol, None
Expand Down Expand Up @@ -142,6 +146,8 @@ async def get_volumes(self, **filters):
return [PersistentVolume.parse(v)[1] async for v in self.__vol_col.find(filters)]

async def get_volume(self, app_id, vol_id):
if not app_id:
return await self._get_volume(id=vol_id)
return await self._get_volume(id=vol_id, appliance=app_id)

async def save_volume(self, vol, upsert=True):
Expand Down

0 comments on commit 5b6d097

Please sign in to comment.