Skip to content

Commit

Permalink
[#46] Refactored persistent volume code to accommodate for volume scope
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Nov 30, 2018
1 parent 5b6d097 commit bd7f902
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 87 deletions.
40 changes: 29 additions & 11 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib

import volume
import schedule
import volume

from tornado.gen import multi

Expand Down Expand Up @@ -29,10 +29,12 @@ async def get_appliance(self, app_id):
app = Appliance(**app)
status, app.containers, err = await self.__contr_mgr.get_containers(appliance=app_id)
if app.data_persistence:
status, app.data_persistence.volumes, err = await self.__vol_mgr.get_volumes(appliance=app_id)
_, local_vols, _ = await self.__vol_mgr.get_local_volumes(appliance=app_id)
_, global_vols, _ = await self.__vol_mgr.get_global_volumes_by_appliance(app_id)
app.data_persistence.volumes = local_vols + global_vols
if len(app.containers) == 0 \
and (not app.data_persistence or len(app.data_persistence.volumes) == 0):
self.__app_db.delete_appliance(app_id)
await self.__app_db.delete_appliance(app_id)
return 404, None, "Appliance '%s' is not found"%app_id
return 200, app, None

Expand All @@ -45,6 +47,14 @@ def validate_volume_mounts(app, vols_existed, vols_declared):
vols_to_mount = set([pv.src for c in app.containers for pv in c.persistent_volumes])
return list(vols_to_mount - all_vols)

async def update_global_volumes(global_vols, app_id):
for gpv in global_vols:
gpv.subscribe(app_id)
resps = await multi([vol_mgr.update_volume(gpv) for gpv in global_vols])
for status, _, err in resps:
if status != 200:
self.logger.error(err)

def set_container_volume_scope(contrs, vols):
vols = {v.id: v for v in vols}
for c in contrs:
Expand All @@ -63,15 +73,15 @@ def set_container_volume_scope(contrs, vols):
# create persistent volumes if any
dp = app.data_persistence
if dp:
resps = await multi([vol_mgr.get_volume(app.id if v.scope == volume.VolumeScope.LOCAL
else None, v.id)
for v in dp.volumes])
resps = await multi([vol_mgr.get_local_volume(app.id, v.id) for v in dp.local_volumes]
+ [vol_mgr.get_global_volume(v.id) for v in dp.global_volumes])
vols_existed = set([v.id for status, v, _ in resps if status == 200])
vols_declared = set([v.id for v in dp.volumes])
invalid_vols = validate_volume_mounts(app, vols_existed, vols_declared)
if len(invalid_vols) > 0:
await self._clean_up_incomplete_appliance(app.id)
return 400, None, 'Invalid persistent volume(s): %s'%invalid_vols
global_vols = [v for _, v, _ in resps if v and v.scope == volume.VolumeScope.GLOBAL]
if len(vols_existed) < len(dp.volumes):
resps = await multi([vol_mgr.create_volume(v.to_save())
for v in dp.volumes
Expand All @@ -81,6 +91,9 @@ def set_container_volume_scope(contrs, vols):
self.logger.error(err)
await self._clean_up_incomplete_appliance(app.id)
return status, None, err
if v.scope == volume.VolumeScope.GLOBAL:
global_vols += v,
await update_global_volumes(global_vols, app.id)
set_container_volume_scope(app.containers, dp.volumes)

# create containers
Expand Down Expand Up @@ -119,15 +132,20 @@ async def delete_appliance(self, app_id, erase_data=False):

# deprovision/delete local persistent volumes if any
if app.data_persistence:
_, vols, _ = await vol_mgr.get_volumes(appliance=app_id)
resps = await multi([(vol_mgr.erase_volume(app_id, v.id)
_, local_vols, _ = await vol_mgr.get_local_volumes(appliance=app_id)
resps = await multi([(vol_mgr.erase_local_volume(app_id, v.id)
if erase_data else vol_mgr.deprovision_volume(v))
for v in vols
if v.scope == volume.VolumeScope.LOCAL])
for v in local_vols])
for i, (status, _, err) in enumerate(resps):
if status != 200:
self.logger.error(err)
return 207, None, "Failed to deprovision persistent volume '%s'"%vols[i].id
return 207, None, "Failed to deprovision persistent volume '%s'"%local_vols[i].id
_, global_vols, _ = await vol_mgr.get_global_volumes_by_appliance(app_id)
for gpv in global_vols:
gpv.unsubscribe(app_id)
for status, _, err in (await multi([vol_mgr.update_volume(gpv) for gpv in global_vols])):
if status != 200:
self.logger.error(err)

# deprovision appliance
status, msg, err = await self.__app_api.deprovision_appliance(app_id)
Expand Down
5 changes: 4 additions & 1 deletion container/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import swagger

from container import Container, NetworkMode, parse_container_short_id
from volume import VolumeScope

# Limitations:
# 1. Inefficient job state monitoring: Chronos does not have an API for per-job state
Expand Down Expand Up @@ -116,7 +117,9 @@ def get_persistent_volumes():
return params
params += [dict(key='volume-driver',
value=self.appliance.data_persistence.volume_type.driver)]
params += [dict(key='volume', value='%s-%s:%s'%(self.appliance.id, v.src, v.dest))
params += [dict(key='volume',
value=('%s-%s:%s'%(self.appliance.id, v.src, v.dest)
if v.scope == VolumeScope.LOCAL else '%s:%s'%(v.src, v.dest)))
for v in self.persistent_volumes]
return params

Expand Down
136 changes: 107 additions & 29 deletions volume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,40 @@ def volume_type(self):
return self.__volume_type

@property
@swagger.property
def volumes(self):
"""
Volumes for data persistence in the appliance
Persistent volumes used by the appliance
---
type: list
items: Volume
items: PersistentVolume
required: true
"""
return list(self.__volumes)

@property
def global_volumes(self):
"""
Global persistent volumes used by the appliance
---
type: list
item: GlobalPersistentVolume
"""
return [v for v in self.__volumes if isinstance(v, GlobalPersistentVolume)]

@property
def local_volumes(self):
"""
Local persistent volumes bound to the appliance
---
type: list
item: LocalPersistentVolume
"""
return [v for v in self.__volumes if isinstance(v, LocalPersistentVolume)]

@volumes.setter
def volumes(self, volumes):
self.__volumes = list(volumes)
Expand Down Expand Up @@ -133,6 +156,11 @@ def parse(cls, data, from_user=True):
missing = PersistentVolume.REQUIRED - data.keys()
if missing:
return 400, None, "Missing required field(s) of persistence volume: %s"%missing
scope = data.get('scope', 'local')
try:
scope = VolumeScope(scope and scope.upper())
except ValueError:
return 400, None, "Invalid volume scope: %s"%data.get('scope')
if from_user:
for f in ('deployment', ):
data.pop('deployment', None)
Expand All @@ -149,13 +177,19 @@ def parse(cls, data, from_user=True):
_, data['user_schedule_hints'], _ = VolumeScheduleHints.parse(user_sched_hints, from_user)
if sys_sched_hints:
_, data['sys_schedule_hints'], _ = VolumeScheduleHints.parse(sys_sched_hints, from_user)
return 200, PersistentVolume(**data), None
vol = None
if scope == VolumeScope.LOCAL:
vol = LocalPersistentVolume(**data)
elif scope == VolumeScope.GLOBAL:
vol = GlobalPersistentVolume(**data)
else:
return 400, None, "Unrecognized volume scope: %s"%vol.value
return 200, vol, None

def __init__(self, id, appliance, type, is_instantiated=False,
def __init__(self, id, type, is_instantiated=False,
scope=VolumeScope.LOCAL, user_schedule_hints=None, sys_schedule_hints=None,
deployment=None, *args, **kwargs):
self.__id = id
self.__appliance = appliance
self.__scope = scope if isinstance(scope, VolumeScope) else VolumeScope(scope.upper())
self.__type = type if isinstance(type, PersistentVolumeType) else PersistentVolumeType(type)
self.__is_instantiated = is_instantiated
Expand All @@ -181,7 +215,6 @@ def __init__(self, id, appliance, type, is_instantiated=False,
else:
self.__deployment = VolumeDeployment()


@property
@swagger.property
def id(self):
Expand All @@ -195,19 +228,6 @@ def id(self):
"""
return self.__id

@property
@swagger.property
def appliance(self):
"""
The appliance in which the volume is shared
---
type: str
example: test-app
read_only: true
"""
return self.__appliance

@property
@swagger.property
def is_instantiated(self):
Expand Down Expand Up @@ -278,11 +298,6 @@ def deployment(self):
"""
return self.__deployment

@appliance.setter
def appliance(self, app):
assert isinstance(app, str) or isinstance(app, appliance.Appliance)
self.__appliance = app

@type.setter
def type(self, type):
self.__type = type
Expand All @@ -304,17 +319,17 @@ def unset_instantiated(self):

def to_render(self):
return dict(id=self.id,
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id,
type=self.type.value, is_instantiated=self.is_instantiated,
type=self.type.value,
is_instantiated=self.is_instantiated,
scope=self.scope.value,
user_schedule_hints=self.user_schedule_hints.to_render(),
sys_schedule_hints=self.sys_schedule_hints.to_render(),
deployment=self.deployment.to_render())

def to_save(self):
return dict(id=self.id,
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id,
type=self.type.value, is_instantiated=self.is_instantiated,
type=self.type.value,
is_instantiated=self.is_instantiated,
scope=self.scope.value,
user_schedule_hints=self.user_schedule_hints.to_save(),
sys_schedule_hints=self.sys_schedule_hints.to_save(),
Expand Down Expand Up @@ -371,4 +386,67 @@ def to_render(self):
return dict(placement=self.placement.to_render())

def to_save(self):
return dict(placement=self.placement.to_render())
return dict(placement=self.placement.to_render())


@swagger.model
class GlobalPersistentVolume(PersistentVolume):

def __init__(self, used_by=[], *args, **kwargs):
kwargs.update(scope=VolumeScope.GLOBAL)
super(GlobalPersistentVolume, self).__init__(*args, **kwargs)
self.__used_by = set(used_by)

@property
@swagger.property
def used_by(self):
"""
Appliances that use the volume
---
type: list
items: str
"""
return list(self.__used_by)

def subscribe(self, app_id):
self.__used_by.add(app_id)

def unsubscribe(self, app_id):
self.__used_by.remove(app_id)

def to_save(self):
return dict(**super(GlobalPersistentVolume, self).to_save(), used_by=self.used_by)


@swagger.model
class LocalPersistentVolume(PersistentVolume):

def __init__(self, appliance, *args, **kwargs):
kwargs.update(scope=VolumeScope.LOCAL)
super(LocalPersistentVolume, self).__init__(*args, **kwargs)
self.__appliance = appliance

@property
@swagger.property
def appliance(self):
"""
The appliance which the persistent volume is bound to
---
type: str
"""
return self.__appliance

@appliance.setter
def appliance(self, app):
assert isinstance(app, str) or isinstance(app, appliance.Appliance)
self.__appliance = app

def to_render(self):
return dict(**super(LocalPersistentVolume, self).to_render(),
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id)

def to_save(self):
return dict(**super(LocalPersistentVolume, self).to_save(),
appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id)
6 changes: 3 additions & 3 deletions volume/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def get(self, app_id):
schema: Error
"""
status, vols, err = await self.__vol_mgr.get_volumes(appliance=app_id)
status, vols, err = await self.__vol_mgr.get_local_volumes(appliance=app_id)
self.set_status(status)
self.write(json_encode([v.to_render() for v in vols] if status == 200 else error(err)))

Expand Down Expand Up @@ -82,7 +82,7 @@ async def get(self, app_id, vol_id):
schema: Error
"""
status, vol, err = await self.__vol_mgr.get_volume(app_id, vol_id)
status, vol, err = await self.__vol_mgr.get_local_volume(app_id, vol_id)
self.set_status(status)
self.write(json_encode(vol.to_render() if status == 200 else error(err)))

Expand Down Expand Up @@ -110,7 +110,7 @@ async def delete(self, app_id, vol_id):
schema: Error
"""
status, msg, err = await self.__vol_mgr.erase_volume(app_id, vol_id)
status, msg, err = await self.__vol_mgr.erase_local_volume(app_id, vol_id)
self.set_status(status)
self.write(json_encode(message(msg) if status == 200 else error(err)))

Expand Down
Loading

0 comments on commit bd7f902

Please sign in to comment.