Skip to content

Commit

Permalink
Add storage and image management support
Browse files Browse the repository at this point in the history
Added pool_selectors and image_pool_name

Signed-off-by: Zhenchao Liu <[email protected]>
  • Loading branch information
zhencliu authored and nickzhq committed Oct 9, 2024
1 parent aa1850a commit 169fbf1
Show file tree
Hide file tree
Showing 57 changed files with 3,877 additions and 39 deletions.
15 changes: 7 additions & 8 deletions avocado_vt/plugins/vt_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from avocado.core.plugin_interfaces import JobPostTests as Post
from avocado.core.plugin_interfaces import JobPreTests as Pre
from avocado.utils.stacktrace import log_exc_info

from virttest.vt_cluster import cluster, node_metadata
from virttest.vt_imgr import vt_imgr
from virttest.vt_resmgr import resmgr


class ClusterSetupError(Exception):
Expand Down Expand Up @@ -50,21 +53,17 @@ def _pre_node_setup():
def _pre_mgr_setup():
try:
# Pre-setup the cluster manager
# e.g:
# startup_resmgr()
# vt_imgr.startup()
pass
resmgr.startup()
vt_imgr.startup()
except Exception as err:
raise ClusterManagerSetupError(err)

@staticmethod
def _post_mgr_cleanup():
try:
# Post-cleanup the cluster manager
# e.g:
# teardown_resmgr()
# vt_imgr.teardown()
pass
vt_imgr.teardown()
resmgr.teardown()
except Exception as err:
raise ClusterManagerCleanupError(err)

Expand Down
15 changes: 15 additions & 0 deletions avocado_vt/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from virttest._wrappers import load_source
from virttest.vt_cluster import cluster, logger, selector
from virttest.vt_resmgr import resmgr

# avocado-vt no longer needs autotest for the majority of its functionality,
# except by:
Expand Down Expand Up @@ -378,7 +379,21 @@ def _setup_partition(self):
_node.tag = node
self._cluster_partition.add_node(_node)

# Select the pools when the user specifies the pools param
for pool_tag in self.params.objects("pools"):
pool_params = self.params.object_params(pool_tag)
pool_selectors = pool_params.get("pool_selectors")

pools = set(resmgr.get_all_pools()) - set(cluster.partition.pools.values())
pool_id = selector.select_resource_pool(list(pools), pool_selectors)
if not pool_id:
raise selector.SelectorError(
f"No pool selected for {pool_tag} with {pool_selectors}"
)
self._cluster_partition.pools[pool_tag] = pool_id

def _clear_partition(self):
self._cluster_partition.pools.clear()
cluster_dir = os.path.join(self.resultsdir, "cluster")
if self._cluster_partition.nodes:
for node in self._cluster_partition.nodes:
Expand Down
6 changes: 6 additions & 0 deletions virttest/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from avocado.utils import process

from virttest.vt_cluster import cluster, node
from virttest.vt_resmgr import resmgr

from . import arch, asset, cartesian_config, data_dir, defaults, utils_selinux
from .compat import get_opt
Expand Down Expand Up @@ -895,6 +896,10 @@ def _register_hosts(hosts_configs):
LOG.debug("Host %s registered", host)


def _initialize_managers(pools_params):
resmgr.setup(pools_params)


def _config_master_server(master_config):
"""Configure the master server."""
if master_config:
Expand Down Expand Up @@ -1084,6 +1089,7 @@ def bootstrap(options, interactive=False):
cluster_config = _load_cluster_config(vt_cluster_config)
_register_hosts(cluster_config.get("hosts"))
_config_master_server(cluster_config.get("master"))
_initialize_managers(cluster_config.get("pools"))

LOG.info("")
LOG.info("VT-BOOTSTRAP FINISHED")
Expand Down
136 changes: 108 additions & 28 deletions virttest/env_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from virttest.test_setup.verify import VerifyHostDMesg
from virttest.test_setup.vms import UnrequestedVMHandler
from virttest.utils_version import VersionInterval
from virttest.vt_imgr import vt_imgr

utils_libvirtd = lazy_import("virttest.utils_libvirtd")
virsh = lazy_import("virttest.virsh")
Expand Down Expand Up @@ -125,42 +126,70 @@ def _get_qemu_version(qemu_cmd):
return "Unknown"


def preprocess_image(test, params, image_name, vm_process_status=None):
def preprocess_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Preprocess a single QEMU image according to the instructions in params.
:param test: Autotest test object.
:param params: A dict containing image preprocessing parameters.
:param vm_process_status: This is needed in postprocess_image. Add it here
only for keep it work with process_images()
:param vm_name: vm tag defined in 'vms'
:note: Currently this function just creates an image if requested.
"""
# FIXME:
image_id = None
if params.get_boolean("multihost"):
params = params.copy()
params[f"image_owner_{image_name}"] = vm_name
image_config = vt_imgr.define_image_config(image_name, params)
image_id = vt_imgr.create_image_object(image_config)

params = params.object_params(image_name)
base_dir = params.get("images_base_dir", data_dir.get_data_dir())

if not storage.preprocess_image_backend(base_dir, params, image_name):
LOG.error("Backend can't be prepared correctly.")

image_filename = storage.get_image_filename(params, base_dir)
image_filename = None
if not params.get_boolean("multihost"):
image_filename = storage.get_image_filename(params, base_dir)

create_image = False
if params.get("force_create_image") == "yes":
create_image = True
elif params.get("create_image") == "yes" and not storage.file_exists(
params, image_filename
):
create_image = True
elif params.get("create_image") == "yes":
# FIXME: check all volumes allocated
if params.get_boolean("multihost"):
volume = vt_imgr.get_image_info(
image_id, request=f"spec.virt-images.{image_name}.spec.volume.meta"
)
create_image = True if not volume["meta"]["allocated"] else False
else:
create_image = (
True if not storage.file_exists(params, image_filename) else False
)
else:
# FIXME: sync all volumes configurations
if params.get_boolean("multihost"):
vt_imgr.get_image_info(image_id)
# TODO: check if file allocated

if params.get("backup_image_before_testing", "no") == "yes":
# FIXME: add backup_image
image = qemu_storage.QemuImg(params, base_dir, image_name)
image.backup_image(params, base_dir, "backup", True, True)
if create_image:
if storage.file_exists(params, image_filename):
# As rbd image can not be covered, so need remove it if we need
# force create a new image.
storage.file_remove(params, image_filename)
image = qemu_storage.QemuImg(params, base_dir, image_name)
LOG.info("Create image on %s." % image.storage_type)
image.create(params)
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"create": {}})
else:
if storage.file_exists(params, image_filename):
# As rbd image can not be covered, so need remove it if we need
# force create a new image.
storage.file_remove(params, image_filename)
image = qemu_storage.QemuImg(params, base_dir, image_name)
LOG.info("Create image on %s." % image.storage_type)
image.create(params)


def preprocess_fs_source(test, params, fs_name, vm_process_status=None):
Expand Down Expand Up @@ -444,7 +473,7 @@ def preprocess_vm(test, params, env, name):
)


def check_image(test, params, image_name, vm_process_status=None):
def check_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Check a single QEMU image according to the instructions in params.
Expand All @@ -453,6 +482,7 @@ def check_image(test, params, image_name, vm_process_status=None):
:param vm_process_status: (optional) vm process status like running, dead
or None for no vm exist.
"""
params = params.object_params(image_name)
clone_master = params.get("clone_master", None)
base_dir = data_dir.get_data_dir()
check_image_flag = params.get("check_image") == "yes"
Expand Down Expand Up @@ -523,7 +553,7 @@ def check_image(test, params, image_name, vm_process_status=None):
raise e


def postprocess_image(test, params, image_name, vm_process_status=None):
def postprocess_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Postprocess a single QEMU image according to the instructions in params.
Expand All @@ -540,6 +570,16 @@ def postprocess_image(test, params, image_name, vm_process_status=None):
)
return

# FIXME: multihost
image_id = None
if params.get_boolean("multihost"):
image_id = vt_imgr.query_image(image_name, vm_name)
if image_id is None:
LOG.warning(f"Cannot find the image {image_name}")
image_config = vt_imgr.define_image_config(image_name, params)
image_id = vt_imgr.create_image_object(image_config)
params = params.object_params(image_name)

restored, removed = (False, False)
clone_master = params.get("clone_master", None)
base_dir = params.get("images_base_dir", data_dir.get_data_dir())
Expand Down Expand Up @@ -597,10 +637,18 @@ def postprocess_image(test, params, image_name, vm_process_status=None):
)
LOG.info("Remove image on %s." % image.storage_type)
if clone_master is None:
image.remove()
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"destroy": {}})
vt_imgr.destroy_image_object(image_id)
else:
image.remove()
elif clone_master == "yes":
if image_name in params.get("master_images_clone").split():
image.remove()
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"destroy": {}})
vt_imgr.destroy_image_object(image_id)
else:
image.remove()


def postprocess_fs_source(test, params, fs_name, vm_process_status=None):
Expand Down Expand Up @@ -749,13 +797,21 @@ def process_command(test, params, env, command, command_timeout, command_noncrit


class _CreateImages(threading.Thread):

"""
Thread which creates images. In case of failure it stores the exception
in self.exc_info
"""

def __init__(self, image_func, test, images, params, exit_event, vm_process_status):
def __init__(
self,
image_func,
test,
images,
params,
exit_event,
vm_process_status,
vm_name=None,
):
threading.Thread.__init__(self)
self.image_func = image_func
self.test = test
Expand All @@ -764,6 +820,7 @@ def __init__(self, image_func, test, images, params, exit_event, vm_process_stat
self.exit_event = exit_event
self.exc_info = None
self.vm_process_status = vm_process_status
self.vm_name = vm_name

def run(self):
try:
Expand All @@ -774,13 +831,14 @@ def run(self):
self.params,
self.exit_event,
self.vm_process_status,
self.vm_name,
)
except Exception:
self.exc_info = sys.exc_info()
self.exit_event.set()


def process_images(image_func, test, params, vm_process_status=None):
def process_images(image_func, test, params, vm_process_status=None, vm_name=None):
"""
Wrapper which chooses the best way to process images.
Expand All @@ -793,11 +851,20 @@ def process_images(image_func, test, params, vm_process_status=None):
images = params.objects("images")
if len(images) > 20: # Lets do it in parallel
_process_images_parallel(
image_func, test, params, vm_process_status=vm_process_status
image_func,
test,
params,
vm_process_status=vm_process_status,
vm_name=vm_name,
)
else:
_process_images_serial(
image_func, test, images, params, vm_process_status=vm_process_status
image_func,
test,
images,
params,
vm_process_status=vm_process_status,
vm_name=vm_name,
)


Expand All @@ -817,7 +884,13 @@ def process_fs_sources(fs_source_func, test, params, vm_process_status=None):


def _process_images_serial(
image_func, test, images, params, exit_event=None, vm_process_status=None
image_func,
test,
images,
params,
exit_event=None,
vm_process_status=None,
vm_name=None,
):
"""
Original process_image function, which allows custom set of images
Expand All @@ -830,14 +903,17 @@ def _process_images_serial(
or None for no vm exist.
"""
for image_name in images:
image_params = params.object_params(image_name)
image_func(test, image_params, image_name, vm_process_status)
# image_params = params.object_params(image_name)
# image_func(test, image_params, image_name, vm_process_status)
image_func(test, params, image_name, vm_process_status, vm_name)
if exit_event and exit_event.is_set():
LOG.error("Received exit_event, stop processing of images.")
break


def _process_images_parallel(image_func, test, params, vm_process_status=None):
def _process_images_parallel(
image_func, test, params, vm_process_status=None, vm_name=None
):
"""
The same as _process_images but in parallel.
:param image_func: Process function
Expand All @@ -853,7 +929,9 @@ def _process_images_parallel(image_func, test, params, vm_process_status=None):
for i in xrange(no_threads):
imgs = images[i::no_threads]
threads.append(
_CreateImages(image_func, test, imgs, params, exit_event, vm_process_status)
_CreateImages(
image_func, test, imgs, params, exit_event, vm_process_status, vm_name
)
)
threads[-1].start()

Expand Down Expand Up @@ -908,7 +986,9 @@ def _call_image_func():
unpause_vm = True
vm_params["skip_cluster_leak_warn"] = "yes"
try:
process_images(image_func, test, vm_params, vm_process_status)
process_images(
image_func, test, vm_params, vm_process_status, vm_name
)
finally:
if unpause_vm:
vm.resume()
Expand Down
2 changes: 1 addition & 1 deletion virttest/staging/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def __init__(
# :param runlevel: sys_v runlevel to set as default in inittab
# :type runlevel: str
# """
# raise NotImplemented
# raise NotImplementedError


def convert_sysv_runlevel(level):
Expand Down
Loading

0 comments on commit 169fbf1

Please sign in to comment.