From 5b6d097e93c7ec8a802112f1286fe48ce8e1ae86 Mon Sep 17 00:00:00 2001 From: dcvan Date: Thu, 29 Nov 2018 17:47:41 -0500 Subject: [PATCH] [#46] finished the basic functions of global persistent volumes --- appliance/manager.py | 32 ++++++++++++++++++++++++-------- container/__init__.py | 16 ++++++++++++++-- container/service.py | 24 +++++------------------- volume/__init__.py | 36 ++++++++++++++++++++++++++++++++---- volume/handler.py | 2 +- volume/manager.py | 16 +++++++++++----- 6 files changed, 87 insertions(+), 39 deletions(-) diff --git a/appliance/manager.py b/appliance/manager.py index d023b3e..498ba04 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -1,5 +1,6 @@ import importlib +import volume import schedule from tornado.gen import multi @@ -37,11 +38,20 @@ async def get_appliance(self, app_id): async def create_appliance(self, data): + vol_mgr = self.__vol_mgr + def validate_volume_mounts(app, vols_existed, vols_declared): all_vols = set(vols_existed) | set(vols_declared) vols_to_mount = set([pv.src for c in app.containers for pv in c.persistent_volumes]) return list(vols_to_mount - all_vols) + def set_container_volume_scope(contrs, vols): + vols = {v.id: v for v in vols} + for c in contrs: + for pv in c.persistent_volumes: + if pv.src in vols: + pv.scope = vols[pv.src].scope + status, app, _ = await self.get_appliance(data['id']) if status == 200 and len(app.containers) > 0: return 409, None, "Appliance '%s' already exists"%data['id'] @@ -53,7 +63,9 @@ def validate_volume_mounts(app, vols_existed, vols_declared): # create persistent volumes if any dp = app.data_persistence if dp: - resps = await multi([self.__vol_mgr.get_volume(app.id, v.id) for v in dp.volumes]) + resps = await multi([vol_mgr.get_volume(app.id if v.scope == volume.VolumeScope.LOCAL + else None, v.id) + for v in dp.volumes]) vols_existed = set([v.id for status, v, _ in resps if status == 200]) vols_declared = set([v.id for v in dp.volumes]) invalid_vols = validate_volume_mounts(app, vols_existed, vols_declared) @@ -61,13 +73,15 @@ def validate_volume_mounts(app, vols_existed, vols_declared): await self._clean_up_incomplete_appliance(app.id) return 400, None, 'Invalid persistent volume(s): %s'%invalid_vols if len(vols_existed) < len(dp.volumes): - resps = await multi([self.__vol_mgr.create_volume(v.to_save()) for v in dp.volumes + resps = await multi([vol_mgr.create_volume(v.to_save()) + for v in dp.volumes if v.id not in vols_existed]) - for status, _, err in resps: + for status, v, err in resps: if status != 201: self.logger.error(err) await self._clean_up_incomplete_appliance(app.id) return status, None, err + set_container_volume_scope(app.containers, dp.volumes) # create containers resps = await multi([self.__contr_mgr.create_container(c.to_save()) for c in app.containers]) @@ -88,6 +102,7 @@ def validate_volume_mounts(app, vols_existed, vols_declared): return 201, app, None async def delete_appliance(self, app_id, erase_data=False): + vol_mgr = self.__vol_mgr self.logger.info('Erase data?: %s'%erase_data) status, app, err = await self.get_appliance(app_id) if status != 200: @@ -102,12 +117,13 @@ async def delete_appliance(self, app_id, erase_data=False): return 207, None, "Failed to deprovision jobs of appliance '%s'"%app_id self.logger.info(msg) - # deprovision/delete persistent volumes if any + # deprovision/delete local persistent volumes if any if app.data_persistence: - _, vols, _ = await self.__vol_mgr.get_volumes(appliance=app_id) - resps = await multi([(self.__vol_mgr.delete_volume(app_id, v.id) - if erase_data else self.__vol_mgr.deprovision_volume(v)) - for v in vols]) + _, vols, _ = await vol_mgr.get_volumes(appliance=app_id) + resps = await multi([(vol_mgr.erase_volume(app_id, v.id) + if erase_data else vol_mgr.deprovision_volume(v)) + for v in vols + if v.scope == volume.VolumeScope.LOCAL]) for i, (status, _, err) in enumerate(resps): if status != 200: self.logger.error(err) diff --git a/container/__init__.py b/container/__init__.py index a74ebbe..429f126 100644 --- a/container/__init__.py +++ b/container/__init__.py @@ -3,6 +3,7 @@ import swagger import appliance +import volume import schedule from enum import Enum @@ -92,10 +93,12 @@ class ContainerVolume: """ - def __init__(self, src, dest, type=ContainerVolumeType.PERSISTENT, *args, **kwargs): + def __init__(self, src, dest, type=ContainerVolumeType.PERSISTENT, scope=volume.VolumeScope.LOCAL, + *args, **kwargs): self.__src = src self.__dest = dest self.__type = type if isinstance(type, ContainerVolumeType) else ContainerVolumeType(type.upper()) + self.__scope = scope if isinstance(scope, volume.VolumeScope) else volume.VolumeScope(scope.upper()) @property @swagger.property @@ -135,11 +138,20 @@ def type(self): """ return self.__type + @property + def scope(self): + return self.__scope + + @scope.setter + def scope(self, scope): + assert isinstance(scope, volume.VolumeScope) + self.__scope = scope + def to_render(self): return dict(src=self.src, dest=self.dest, type=self.type.value) def to_save(self): - return self.to_render() + return dict(**self.to_render(), scope=self.__scope.value) @swagger.model diff --git a/container/service.py b/container/service.py index 3939062..5f5e75e 100644 --- a/container/service.py +++ b/container/service.py @@ -1,6 +1,7 @@ import swagger from container import Container, NetworkMode, ContainerVolumeType, parse_container_short_id +from volume import VolumeScope @swagger.model @@ -159,11 +160,10 @@ class Service(Container): """ - def __init__(self, instances=1, labels={}, health_check=None, default_health_check=False, + def __init__(self, instances=1, health_check=None, default_health_check=False, minimum_capacity=0, *args, **kwargs): super(Service, self).__init__(*args, **kwargs) self.__instances = instances - self.__labels = dict(labels) self.__health_check = health_check and HealthCheck(**health_check) self.__default_health_check = default_health_check self.__minimum_capacity = minimum_capacity @@ -181,19 +181,6 @@ def instances(self): """ return self.__instances - @property - @swagger.property - def labels(self): - """ - Key-value label(s) assigned to the service for facilitating service discovery - --- - type: dict - default: {} - example: - region: us-east1 - """ - return dict(self.__labels) - @property @swagger.property def health_check(self): @@ -242,7 +229,6 @@ def health_check(self, hc): def to_render(self): return dict(**super(Service, self).to_render(), instances=self.instances, - labels=self.labels, health_check=self.health_check.to_render() if self.health_check else None, default_health_check=self.default_health_check, minimum_capacity=self.minimum_capacity) @@ -251,7 +237,6 @@ def to_save(self): self._add_default_health_check() return dict(**super(Service, self).to_save(), instances=self.instances, - labels=self.labels, health_check=self.health_check.to_save() if self.health_check else None, default_health_check=self.default_health_check, minimum_capacity=self.minimum_capacity) @@ -274,7 +259,9 @@ def get_persistent_volumes(): return params params += [dict(key='volume-driver', value=self.appliance.data_persistence.volume_type.driver)] - params += [dict(key='volume', value='%s-%s:%s'%(self.appliance.id, v.src, v.dest)) + params += [dict(key='volume', + value=('%s-%s:%s'%(self.appliance.id, v.src, v.dest) + if v.scope == VolumeScope.LOCAL else '%s:%s'%(v.src, v.dest))) for v in self.persistent_volumes] return params @@ -295,7 +282,6 @@ def merge_env(): r = dict(id=str(self), instances=self.instances, **self.resources.to_request(), env=merge_env(), - labels=self.labels, requirePorts=len(self.ports) > 0, acceptedResourceRoles=["slave_public", "*"], container=dict(type='DOCKER', diff --git a/volume/__init__.py b/volume/__init__.py index a608fc3..c3a3761 100644 --- a/volume/__init__.py +++ b/volume/__init__.py @@ -9,7 +9,7 @@ @swagger.enum class PersistentVolumeType(Enum): """ - Volume type + Persistent volume type """ CEPHFS = 'cephfs', 'heliumdatacommons/cephfs' @@ -105,6 +105,17 @@ def __init__(self, *args, **kwargs): super(VolumeScheduleHints, self).__init__(*args, **kwargs) +@swagger.enum +class VolumeScope(Enum): + """ + Persistent volume scope + + """ + + GLOBAL = 'GLOBAL' + LOCAL = 'LOCAL' + + @swagger.model class PersistentVolume: """ @@ -123,6 +134,8 @@ def parse(cls, data, from_user=True): if missing: return 400, None, "Missing required field(s) of persistence volume: %s"%missing if from_user: + for f in ('deployment', ): + data.pop('deployment', None) sched_hints = data.pop('schedule_hints', None) if sched_hints: status, sched_hints, err = VolumeScheduleHints.parse(sched_hints, from_user) @@ -139,10 +152,11 @@ def parse(cls, data, from_user=True): return 200, PersistentVolume(**data), None def __init__(self, id, appliance, type, is_instantiated=False, - user_schedule_hints=None, sys_schedule_hints=None, deployment=None, - *args, **kwargs): + scope=VolumeScope.LOCAL, user_schedule_hints=None, sys_schedule_hints=None, + deployment=None, *args, **kwargs): self.__id = id self.__appliance = appliance + self.__scope = scope if isinstance(scope, VolumeScope) else VolumeScope(scope.upper()) self.__type = type if isinstance(type, PersistentVolumeType) else PersistentVolumeType(type) self.__is_instantiated = is_instantiated @@ -206,6 +220,17 @@ def is_instantiated(self): """ return self.__is_instantiated + @property + @swagger.property + def scope(self): + """ + Persistent volume scope + --- + type: PersistentVolumeScope + + """ + return self.__scope + @property @swagger.property def type(self): @@ -281,6 +306,7 @@ def to_render(self): return dict(id=self.id, appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id, type=self.type.value, is_instantiated=self.is_instantiated, + scope=self.scope.value, user_schedule_hints=self.user_schedule_hints.to_render(), sys_schedule_hints=self.sys_schedule_hints.to_render(), deployment=self.deployment.to_render()) @@ -289,12 +315,14 @@ def to_save(self): return dict(id=self.id, appliance=self.appliance if isinstance(self.appliance, str) else self.appliance.id, type=self.type.value, is_instantiated=self.is_instantiated, + scope=self.scope.value, user_schedule_hints=self.user_schedule_hints.to_save(), sys_schedule_hints=self.sys_schedule_hints.to_save(), deployment=self.deployment.to_render()) def to_request(self): - req = dict(name='%s-%s'%(self.appliance, self.id)) + req = dict(name=('%s-%s'%(self.appliance, self.id) + if self.scope == VolumeScope.LOCAL else str(self.id))) sched_hints = self.sys_schedule_hints.placement if sched_hints.host: req['placement'] = dict(type='host', value=sched_hints.host) diff --git a/volume/handler.py b/volume/handler.py index d76cfe9..c647719 100644 --- a/volume/handler.py +++ b/volume/handler.py @@ -110,7 +110,7 @@ async def delete(self, app_id, vol_id): schema: Error """ - status, msg, err = await self.__vol_mgr.delete_volume(app_id, vol_id) + status, msg, err = await self.__vol_mgr.erase_volume(app_id, vol_id) self.set_status(status) self.write(json_encode(message(msg) if status == 200 else error(err))) diff --git a/volume/manager.py b/volume/manager.py index 00da9ac..bc0beda 100644 --- a/volume/manager.py +++ b/volume/manager.py @@ -5,7 +5,7 @@ from config import config from commons import MongoClient from commons import APIManager, Manager -from volume import PersistentVolume, VolumeDeployment +from volume import PersistentVolume, VolumeDeployment, VolumeScope from locality import Placement @@ -24,7 +24,8 @@ async def create_volume(self, data): status, vol, err = PersistentVolume.parse(data) if status != 200: return status, vol, err - status, _, _ = await self.__vol_db.get_volume(vol.appliance, vol.id) + app_id = vol.appliance if vol.scope == VolumeScope.LOCAL else None, + status, _, _ = await self.__vol_db.get_volume(app_id, vol.id) if status == 200: return 409, None, "Volume '%s' already exists"%vol.id await self.__vol_db.save_volume(vol) @@ -61,7 +62,7 @@ async def deprovision_volume(self, vol): await self.__vol_db.save_volume(vol) return status, "Persistent volume '%s' has been deprovisioned"%vol.id, None - async def delete_volume(self, app_id, vol_id): + async def erase_volume(self, app_id, vol_id): status, vol, err = await self.get_volume(app_id, vol_id, full_blown=True) if status == 404: return status, vol, err @@ -93,7 +94,9 @@ async def get_volume(self, app_id, vol_id, full_blown=False): async def get_volumes(self, full_blown=False, **filters): vols = await self.__vol_db.get_volumes(**filters) - resps = await multi([self.__vol_api.get_volume(v.app, v.id) for v in vols]) + resps = await multi([self.__vol_api.get_volume(v.appliance + if v.scope == VolumeScope.LOCAL else None, v.id) + for v in vols]) for i, (status, output, err) in enumerate(resps): if status != 200: self.logger.error(err) @@ -114,7 +117,8 @@ def __init__(self): async def get_volume(self, app_id, vol_id): api = config.ceph - status, vol, err = await self.http_cli.get(api.host, api.port, '/fs/%s-%s'%(app_id, vol_id)) + endpoint = '/fs/%s-%s'%(app_id, vol_id) if app_id else '/fs/%s'%vol_id + status, vol, err = await self.http_cli.get(api.host, api.port, endpoint) if status != 200: return status, None, err return status, vol, None @@ -142,6 +146,8 @@ async def get_volumes(self, **filters): return [PersistentVolume.parse(v)[1] async for v in self.__vol_col.find(filters)] async def get_volume(self, app_id, vol_id): + if not app_id: + return await self._get_volume(id=vol_id) return await self._get_volume(id=vol_id, appliance=app_id) async def save_volume(self, vol, upsert=True):