Skip to content

Commit

Permalink
fix tasks from sharing variables and causing weirdness
Browse files Browse the repository at this point in the history
  • Loading branch information
rmb938 committed Oct 22, 2017
1 parent 2d5ad0e commit c57bda6
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 246 deletions.
8 changes: 4 additions & 4 deletions ingredients_tasks/celary.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def connect(self):
'confirm_publish': True
},
task_acks_late=True,
task_reject_on_worker_last=True,
task_reject_on_worker_lost=True,
task_ignore_result=True,
task_store_errors_even_if_ignored=False,
task_soft_time_limit=300, # 5 minutes
task_time_limit=600, # 10 minutes
worker_prefetch_multiplier=1,
worker_prefetch_multiplier=1, # One worker process can only do one type of task at a time
include=include,
task_queues=task_queues,
task_routes=task_routes
Expand All @@ -53,12 +53,12 @@ def populate_tasks(self, *args):
task_queues = set()
task_routes = {}

from ingredients_tasks.tasks.tasks import ImageTask, InstanceTask, NetworkTask
from ingredients_tasks.tasks.tasks import NetworkTask, ImageTask

for task_module in args:
include.append(task_module.__name__)
for name, method in inspect.getmembers(task_module):
if method in [ImageTask, InstanceTask, NetworkTask]:
if method in [NetworkTask, ImageTask]:
continue
if hasattr(method, 'apply_async'):
task_queues.add(Queue(name, exchange=Exchange(task_module.__name__), routing_key=name))
Expand Down
41 changes: 24 additions & 17 deletions ingredients_tasks/tasks/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,49 @@

from ingredients_db.models.images import ImageState
from ingredients_tasks.tasks.tasks import ImageTask
from ingredients_tasks.vmware import VMWareClient

logger = get_task_logger(__name__)


@celery.shared_task(base=ImageTask, bind=True, max_retires=2, default_retry_delay=5)
def create_image(self, **kwargs):
vmware_image = self.vmware_session.get_image(self.image.file_name)
image = self.request.image
with VMWareClient.client_session() as vmware:
vmware_image = vmware.get_image(image.file_name)

if vmware_image is None:
raise ValueError("Could not find image file")
if vmware_image is None:
raise ValueError("Could not find image file")

self.image.state = ImageState.CREATED
image.state = ImageState.CREATED


@celery.shared_task(base=ImageTask, bind=True, max_retires=2, default_retry_delay=5)
def delete_image(self, **kwargs):
vmware_image = self.vmware_session.get_image(self.image.file_name)
image = self.request.image
with VMWareClient.client_session() as vmware:
vmware_image = vmware.get_image(image.file_name)

if vmware_image is not None:
self.vmware_session.delete_image(vmware_image)
else:
logger.warning("Tried to delete image %s but couldn't find its backing file" % self.image.id)
if vmware_image is not None:
vmware.delete_image(vmware_image)
else:
logger.warning("Tried to delete image %s but couldn't find its backing file" % str(image.id))

self.image.state = ImageState.DELETED
image.state = ImageState.DELETED

self.db_session.delete(self.image)
self.request.session.delete(image)


@celery.shared_task(base=ImageTask, bind=True, max_retires=2, default_retry_delay=5)
def convert_vm(self, **kwargs):
vmware_vm = self.vmware_session.get_vm(self.image.file_name)
image = self.request.image
with VMWareClient.client_session() as vmware:
vmware_vm = vmware.get_vm(image.file_name)

if vmware_vm is None:
raise LookupError(
'Could not find backing vm for image %s when trying to convert to template.' % str(self.instance.id))
if vmware_vm is None:
raise LookupError(
'Could not find backing vm for image %s when trying to convert to template.' % str(image.id))

self.vmware_session.template_vm(vmware_vm)
vmware.template_vm(vmware_vm)

self.image.state = ImageState.CREATED
image.state = ImageState.CREATED
155 changes: 84 additions & 71 deletions ingredients_tasks/tasks/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,128 +6,141 @@
from ingredients_db.models.instance import InstanceState
from ingredients_db.models.network import Network
from ingredients_db.models.network_port import NetworkPort
from ingredients_tasks.omapi import OmapiClient
from ingredients_tasks.tasks.tasks import InstanceTask
from ingredients_tasks.vmware import VMWareClient

logger = get_task_logger(__name__)


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def create_instance(self, **kwargs):
if self.instance.image_id is None:
instance = self.request.instance
if instance.image_id is None:
raise ValueError("Image turned NULL before the instance could be created")

try:
image = self.db_session.query(Image).filter(Image.id == self.instance.image_id).one()
image = self.request.session.query(Image).filter(Image.id == instance.image_id).one()
except NoResultFound:
raise LookupError("Image got deleted before the instance could be created")

vmware_image = self.vmware_session.get_image(image.file_name)
with VMWareClient.client_session() as vmware:
vmware_image = vmware.get_image(image.file_name)

if vmware_image is None:
raise LookupError("Could not find image file to clone")
if vmware_image is None:
raise LookupError("Could not find image file to clone")

old_vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
if old_vmware_vm is not None:
# A backing vm with the same id exists (how?! the task should have failed) so we probably should delete it
logger.info(
'A backing vm with the id of %s already exists so it is going to be deleted.' % str(self.instance.id))
self.vmware_session.delete_vm(old_vmware_vm)
old_vmware_vm = vmware.get_vm(str(instance.id))
if old_vmware_vm is not None:
# A backing vm with the same id exists (how?! the task should have failed) so we probably should delete it
logger.info(
'A backing vm with the id of %s already exists so it is going to be deleted.' % str(instance.id))
vmware.delete_vm(old_vmware_vm)

# We need a nested transaction because we need to lock the network so we can calculate the next free ip
# Without a nested transaction the lock will last for the total time of the task which could be several minutes
# this will block the api from creating new network_ports. With nested we only block for the time needed to
# calculate the next available ip address which is at most O(n) time with n being the number of
# ip addresses in the cidr
with self.database.session() as nested_session:
network_port = nested_session.query(NetworkPort).filter(
NetworkPort.id == self.instance.network_port_id).first()
# We need a nested transaction because we need to lock the network so we can calculate the next free ip
# Without a nested transaction the lock will last for the total time of the task which could be several minutes
# this will block the api from creating new network_ports. With nested we only block for the time needed to
# calculate the next available ip address which is at most O(n) time with n being the number of
# ip addresses in the cidr
with self.database.session() as nested_session:
network_port = nested_session.query(NetworkPort).filter(NetworkPort.id == instance.network_port_id).first()

network = nested_session.query(Network).filter(Network.id == network_port.network_id).with_for_update().first()
network = nested_session.query(Network).filter(
Network.id == network_port.network_id).with_for_update().first()

logger.info('Allocating IP address for instance %s' % str(self.instance.id))
if network_port.ip_address is not None:
# An ip address was already allocated (how?! the task should have failed) so let's reset it
network_port.ip_address = None
logger.info('Allocating IP address for instance %s' % str(instance.id))
if network_port.ip_address is not None:
# An ip address was already allocated (how?! the task should have failed) so let's reset it
network_port.ip_address = None

ip_address = network.next_free_address(nested_session)
if ip_address is None:
raise IndexError("Could not allocate a free ip address. Is the pool full?")
network_port.ip_address = ip_address
logger.info('Allocated IP address %s for instance %s' % (str(ip_address), str(self.instance.id)))
ip_address = network.next_free_address(nested_session)
if ip_address is None:
raise IndexError("Could not allocate a free ip address. Is the pool full?")
network_port.ip_address = ip_address
logger.info('Allocated IP address %s for instance %s' % (str(ip_address), str(instance.id)))

port_group = self.vmware_session.get_port_group(network.port_group)
if port_group is None:
raise LookupError("Cloud not find port group to connect to")
nested_session.commit()
port_group = vmware.get_port_group(network.port_group)
if port_group is None:
raise LookupError("Cloud not find port group to connect to")
nested_session.commit()

logger.info('Creating backing vm for instance %s' % str(self.instance.id))
vmware_vm = self.vmware_session.create_vm(vm_name=str(self.instance.id), image=vmware_image, port_group=port_group)
logger.info('Creating backing vm for instance %s' % str(instance.id))
vmware_vm = vmware.create_vm(vm_name=str(instance.id), image=vmware_image, port_group=port_group)

nic_mac = self.vmware_session.find_vm_mac(vmware_vm)
if nic_mac is None:
raise LookupError("Could not find mac address of nic")
nic_mac = vmware.find_vm_mac(vmware_vm)
if nic_mac is None:
raise LookupError("Could not find mac address of nic")

logger.info('Telling DHCP about our IP for instance %s' % str(self.instance.id))
self.omapi_session.add_host(str(ip_address), nic_mac)
logger.info('Telling DHCP about our IP for instance %s' % str(instance.id))
with OmapiClient.client_session() as omapi:
omapi.add_host(str(ip_address), nic_mac)

logger.info('Powering on backing vm for instance %s' % str(self.instance.id))
self.vmware_session.power_on_vm(vmware_vm)
logger.info('Powering on backing vm for instance %s' % str(instance.id))
vmware.power_on_vm(vmware_vm)

self.instance.state = InstanceState.ACTIVE
instance.state = InstanceState.ACTIVE


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def delete_instance(self, delete_backing: bool, **kwargs):
instance = self.request.instance
if delete_backing:
vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
with VMWareClient.client_session() as vmware:
vmware_vm = vmware.get_vm(str(instance.id))

if vmware_vm is None:
logger.warning('Could not find backing vm for instance %s when trying to delete.' % str(self.instance.id))
else:
logger.info('Deleting backing vm for instance %s' % str(self.instance.id))
self.vmware_session.power_off_vm(vmware_vm)
self.vmware_session.delete_vm(vmware_vm)
if vmware_vm is None:
logger.warning(
'Could not find backing vm for instance %s when trying to delete.' % str(instance.id))
else:
logger.info('Deleting backing vm for instance %s' % str(instance.id))
vmware.power_off_vm(vmware_vm)
vmware.delete_vm(vmware_vm)

network_port = self.db_session.query(NetworkPort).filter(
NetworkPort.id == self.instance.network_port_id).first()
network_port = self.request.session.query(NetworkPort).filter(NetworkPort.id == instance.network_port_id).first()

self.instance.state = InstanceState.DELETED
self.db_session.delete(self.instance)
self.db_session.delete(network_port)
instance.state = InstanceState.DELETED
self.request.session.delete(instance)
self.request.session.delete(network_port)


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def stop_instance(self, hard=False, timeout=60, **kwargs):
vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
instance = self.request.instance
with VMWareClient.client_session() as vmware:
vmware_vm = vmware.get_vm(str(instance.id))

if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to stop.' % str(self.instance.id))
if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to stop.' % str(instance.id))

self.vmware_session.power_off_vm(vmware_vm, hard=hard, timeout=timeout)
vmware.power_off_vm(vmware_vm, hard=hard, timeout=timeout)

self.instance.state = InstanceState.STOPPED
instance.state = InstanceState.STOPPED


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def start_instance(self, **kwargs):
vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
instance = self.request.instance
with VMWareClient.client_session() as vmware:
vmware_vm = vmware.get_vm(str(instance.id))

if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to start.' % str(self.instance.id))
if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to start.' % str(instance.id))

self.vmware_session.power_on_vm(vmware_vm)
vmware.power_on_vm(vmware_vm)

self.instance.state = InstanceState.ACTIVE
instance.state = InstanceState.ACTIVE


@celery.shared_task(base=InstanceTask, bind=True, max_retires=2, default_retry_delay=5)
def restart_instance(self, hard=False, timeout=60, **kwargs):
vmware_vm = self.vmware_session.get_vm(str(self.instance.id))
instance = self.request.instance
with VMWareClient.client_session() as vmware:
vmware_vm = vmware.get_vm(str(instance.id))

if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to restart.' % str(self.instance.id))
if vmware_vm is None:
raise LookupError('Could not find backing vm for instance %s when trying to restart.' % str(instance.id))

self.vmware_session.power_off_vm(vmware_vm, hard=hard, timeout=timeout)
self.vmware_session.power_on_vm(vmware_vm)
vmware.power_off_vm(vmware_vm, hard=hard, timeout=timeout)
vmware.power_on_vm(vmware_vm)

self.instance.state = InstanceState.ACTIVE
instance.state = InstanceState.ACTIVE
11 changes: 7 additions & 4 deletions ingredients_tasks/tasks/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

from ingredients_db.models.network import NetworkState
from ingredients_tasks.tasks.tasks import NetworkTask
from ingredients_tasks.vmware import VMWareClient


@celery.shared_task(base=NetworkTask, bind=True, max_retires=2, default_retry_delay=5)
def create_network(self, **kwargs):
port_group = self.vmware_session.get_port_group(self.network.port_group)
network = self.request.network
with VMWareClient.client_session() as vmware:
port_group = vmware.get_port_group(network.port_group)

if port_group is None:
raise ValueError("Could not find port group")
if port_group is None:
raise ValueError("Could not find port group")

self.network.state = NetworkState.CREATED
network.state = NetworkState.CREATED
Loading

0 comments on commit c57bda6

Please sign in to comment.