Skip to content

Commit

Permalink
Add task stuffs
Browse files Browse the repository at this point in the history
  • Loading branch information
rmb938 committed Oct 8, 2017
1 parent bbaa500 commit 5d3cc7a
Show file tree
Hide file tree
Showing 21 changed files with 870 additions and 2 deletions.
9 changes: 9 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[run]
omit =
*/test/*

[report]
exclude_lines =
pragma: no cover
def __repr__
raise NotImplementedError
19 changes: 19 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
language: python
python:
- '3.6'
env:
- TOXENV=py36
- TOXENV=flake8
- TOXENV=build
install:
- pip install tox
script: tox
services:
- postgresql
deploy:
provider: pypi
user: rmb938
password:
secure: qFX8R9hokU3iEEdIe28EyR/C39DxA/Rgbx3xQZMDBRMPxNWx2uRyF20qQeKG4rOA+Sccap2CqZi0yWgMUOkULJceTH3TW4DoAGc9/a/Wo5bc6KNUGPX89qfx8J03ZWLifvfs67rXnhgtDpH7GYdq0NYyx3PvGxznbK+PZhpPZNXJjYjokiuOpJxxPNAu5WXTy0qyKjfX75GfQtrH2eE0lpYmKdKZeSOKheQ0UgCznvD9iLxV6QVetpfk5g3k6yg/91KP6stNNGc9SzaaTGLQ/4jdQaXlpE2B20iVYepXPP3nZAKQsn/KAF/5BS0S19ZQ3gZCTzlbeaMypJbaBYHOuMPmeVw7Yz4ASmHyzDFNDBzDeBCxY8n2/hOR90zLyBMOOK2JsnBBJBhQSJg/Mw94iCzj9G80OAFZukuSEheH61Du5a5GXj85d4rGi4A8PJ7q2NzFkZOeVVXKLDe8fMpfleO0hYyMFJCmsqsVvRkgOvNo1NzOMveUSLGuo1ecAr3Uh/GD8xTNjYfTGLGPgAMLt2H0QYxBw/ixyGrSxxiRCA9eR5Yi3BR9H+Adjk7zozsv+3s62gk/saPBaUWJAgPENG53At5ih9h79Kn0sN4rR9+I6HMsK/8QO50WXju7RRqCKdsge6xpXVI13q6ziY62nhvcr11PBHM3uGImzzD0wHc=
on:
tags: true
2 changes: 0 additions & 2 deletions README.md

This file was deleted.

10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ingredients.tasks
-----------------

.. image:: https://travis-ci.org/sandwichcloud/ingredients.tasks.svg?branch=master
:target: https://travis-ci.org/sandwichcloud/ingredients.tasks

.. image:: https://badge.fury.io/py/ingredients.tasks.svg
:target: https://badge.fury.io/py/ingredients.tasks

Task Library for Sandwich Cloud
Empty file added ingredients_tasks/__init__.py
Empty file.
77 changes: 77 additions & 0 deletions ingredients_tasks/celary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import inspect

import celery
from kombu import Queue, Exchange
from sqlalchemy.engine.url import URL

from ingredients_db.database import Database
from ingredients_tasks.conf.loader import SETTINGS

database = Database(SETTINGS.DATABASE_HOST, SETTINGS.DATABASE_PORT, SETTINGS.DATABASE_USERNAME,
SETTINGS.DATABASE_PASSWORD, SETTINGS.DATABASE_DB, SETTINGS.DATABASE_POOL_SIZE)


class Messaging(object):
def __init__(self, host, port, username, password, vhost):
self.host = host
self.port = port
self.username = username
self.password = password
self.vhost = vhost
self.app = celery.Celery()

def connect(self):
connect_args = {
'drivername': 'amqp',
'host': self.host,
'port': self.port,
'username': self.username,
'password': self.password,
'database': self.vhost
}

from ingredients_tasks.tasks import image, instance, network

include, task_queues, task_routes = self.populate_tasks(image, instance, network)

self.app.conf.update(
broker_url=URL(**connect_args).__str__(),
broker_transport_options={
'confirm_publish': True
},
task_acks_late=True,
task_reject_on_worker_last=True,
task_ignore_result=True,
task_store_errors_even_if_ignored=False,
task_soft_time_limit=300,
task_time_limit=600,
worker_prefetch_multiplier=1,
include=include,
task_queues=task_queues,
task_routes=task_routes
)

def start(self, argv):
self.app.start(argv=argv)

def populate_tasks(self, *args):
include = []
task_queues = set()
task_routes = {}

from ingredients_tasks.tasks.tasks import ImageTask, InstanceTask, NetworkTask

for task_module in args:
include.append(task_module.__name__)
for name, method in inspect.getmembers(task_module):
if method in [ImageTask, InstanceTask, NetworkTask]:
continue
if hasattr(method, 'apply_async'):
task_queues.add(Queue(name, exchange=Exchange(task_module.__name__), routing_key=name))
task_routes[task_module.__name__ + "." + name] = {
'queue': name,
'exchange': task_module.__name__,
'routing_key': name
}

return include, task_queues, task_routes
Empty file.
35 changes: 35 additions & 0 deletions ingredients_tasks/conf/default_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Default settings. Override these with settings in the module pointed to by the SETTINGS_MODULE
environment variable.
"""

####################
# DATABASE #
####################

# Only PostgreSQL is supported
DATABASE_HOST = '127.0.0.1'
DATABASE_PORT = '5432'
DATABASE_USERNAME = 'postgres'
DATABASE_PASSWORD = 'postgres'
DATABASE_DB = 'my_db'
DATABASE_POOL_SIZE = 20

####################
# RABBITMQ #
####################

RABBITMQ_VHOST = '/'
RABBITMQ_PORT = 5672
RABBITMQ_HOST = '127.0.0.1'
RABBITMQ_USERNAME = 'guest'
RABBITMQ_PASSWORD = 'guest'

####################
# VMWare #
####################

VCENTER_HOST = ''
VCENTER_PORT = 443
VCENTER_USERNAME = ''
VCENTER_PASSWORD = ''
41 changes: 41 additions & 0 deletions ingredients_tasks/conf/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
The code is derived from DJango's :module:`django.conf`.
"""
import importlib
import os

SETTINGS_MODULE_ENV = "SETTINGS_MODULE"


class SettingsLoader(object):
def __init__(self):
self.settings_module = os.environ.get(SETTINGS_MODULE_ENV)
if not self.settings_module:
raise ValueError(
"Settings Environment Variable " + SETTINGS_MODULE_ENV + " is not set. Cannot load settings.")

def load(self):
global SETTINGS
SETTINGS = Settings(self.settings_module)


class Settings(object):
def __init__(self, settings_module):
self.settings_module = settings_module
mod = importlib.import_module(self.settings_module)

for setting in dir(mod):
if setting.isupper():
setting_value = getattr(mod, setting)
setattr(self, setting, setting_value)

def __repr__(self):
return '<%(cls)s "%(settings_module)s">' % {
'cls': self.__class__.__name__,
'settings_module': self.settings_module,
}


SETTINGS = None
loader = SettingsLoader()
loader.load()
41 changes: 41 additions & 0 deletions ingredients_tasks/omapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from contextlib import contextmanager

from pypureomapi import Omapi, OmapiErrorNotFound

from ingredients_tasks.conf.loader import SETTINGS


class OmapiClient(object):
def __init__(self, dhcp_server, port, key_name, key):
self.dhcp_server = dhcp_server
self.port = port
self.key_name = key_name
self.key = key
self.client = None

def connect(self):
self.client = Omapi(self.dhcp_server, self.port, self.key_name, self.key)

def disconnect(self):
self.client.close()

def add_host(self, ip, mac):
# TODO: lookup if ip already exists and delete it
try:
old_mac = self.client.lookup_mac(ip)
self.client.del_host(old_mac)
except OmapiErrorNotFound:
pass
self.client.add_host(ip, mac)

@classmethod
@contextmanager
def client_session(cls):
omapi_client = OmapiClient(SETTINGS.DHCP_SERVER_IP, SETTINGS.DHCP_OMAPI_PORT, SETTINGS.DHCP_KEY_NAME,
SETTINGS.DHCP_B64_KEY)
omapi_client.connect()

try:
yield omapi_client
finally:
omapi_client.disconnect()
Empty file.
34 changes: 34 additions & 0 deletions ingredients_tasks/tasks/image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import celery
from celery.utils.log import get_task_logger

from ingredients_db.models.images import ImageState
from ingredients_tasks.tasks.tasks import ImageTask

logger = get_task_logger(__name__)


@celery.shared_task(base=ImageTask, bind=True, max_retires=2, default_retry_delay=5)
def create_image(self, **kwargs):
vmware_image = self.vmware_session.get_image(self.image.file_name)

if vmware_image is None:
raise ValueError("Could not find image file")

self.image.state = ImageState.CREATED


@celery.shared_task(base=ImageTask, bind=True, max_retires=2, default_retry_delay=5)
def delete_image(self, **kwargs):
if self.image.state != ImageState.DELETING: # We might be faster than the db so retry
raise self.retry()

vmware_image = self.vmware_session.get_image(self.image.file_name)

if vmware_image is not None:
self.vmware_session.delete_image(vmware_image)
else:
logger.warning("Tried to delete image %s but couldn't find its backing file" % self.image.id)

self.image.state = ImageState.DELETED

self.db_session.delete(self.image)
82 changes: 82 additions & 0 deletions ingredients_tasks/tasks/instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import celery
from celery.utils.log import get_task_logger
from sqlalchemy.orm.exc import NoResultFound

from ingredients_db.models.images import Image
from ingredients_db.models.instance import InstanceState
from ingredients_db.models.network import Network
from ingredients_db.models.network_port import NetworkPort
from ingredients_tasks.tasks.tasks import InstanceTask

logger = get_task_logger(__name__)


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def create_instance(self, **kwargs):
if self.instance.image_id is None:
raise ValueError("Image turned NULL before the instance could be created")

try:
image = self.db_session.query(Image).filter(Image.id == self.instance.image_id).with_for_update().one()
except NoResultFound:
raise LookupError("Image got deleted before the instance could be created")

vmware_image = self.vmware_session.get_image(image.file_name)

if vmware_image is None:
raise LookupError("Could not find image file to clone")

old_vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
if old_vmware_vm is not None:
# A backing vm with the same id exists (how?!) so we probably should delete it
logger.info(
'A backing vm with the id of %s already exists so it is going to be deleted.' % str(self.instance.id))
self.vmware_session.delete_vm(old_vmware_vm)

network_port = self.db_session.query(NetworkPort).filter(
NetworkPort.id == self.instance.network_port_id).with_for_update().first()

network = self.db_session.query(Network).filter(Network.id == network_port.network_id).with_for_update().first()

logger.info('Allocating IP address for instance %s' % str(self.instance.id))
ip_address = network.next_free_address(self.db_session)
if ip_address is None:
raise IndexError("Could not allocate a free ip address. Is the pool full?")
network_port.ip_address = ip_address

port_group = self.vmware_session.get_port_group(network.port_group)
if port_group is None:
raise LookupError("Cloud not find port group to connect to")

logger.info('Creating backing vm for instance %s' % str(self.instance.id))
vmware_vm = self.vmware_session.create_vm(vm_name=str(self.instance.id), image=vmware_image, port_group=port_group)

nic_mac = self.vmware_session.find_vm_mac(vmware_vm)
if nic_mac is None:
raise LookupError("Could not find mac address of nic")

logger.info('Telling DHCP about our IP for instance %s' % str(self.instance.id))
self.omapi_session.add_host(str(network_port.ip_address), nic_mac)

logger.info('Powering on backing vm for instance %s' % str(self.instance.id))
self.vmware_session.power_on_vm(vmware_vm)

self.instance.state = InstanceState.ACTIVE


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def delete_instance(self, **kwargs):
vmware_vm = self.vmware_session.get_vm(self.instance.id)

if vmware_vm is None:
logger.warning('Could not find backing vm for instance %s when trying to delete.' % str(self.instance.id))
else:
logger.info('Deleting backing vm for instance %s' % str(self.instance.id))
self.vmware_session.delete_vm(vmware_vm)

network_port = self.db_session.query(NetworkPort).filter(
NetworkPort.id == self.instance.network_port_id).with_for_update().first()

self.instance.state = InstanceState.DELETED
self.db_session.delete(self.instance)
self.db_session.delete(network_port)
14 changes: 14 additions & 0 deletions ingredients_tasks/tasks/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import celery

from ingredients_db.models.network import NetworkState
from ingredients_tasks.tasks.tasks import NetworkTask


@celery.shared_task(base=NetworkTask, bind=True, max_retires=2, default_retry_delay=5)
def create_network(self, **kwargs):
port_group = self.vmware_session.get_port_group(self.network.port_group)

if port_group is None:
raise ValueError("Could not find port group")

self.network.state = NetworkState.CREATED
Loading

0 comments on commit 5d3cc7a

Please sign in to comment.