diff --git a/virttest/vt_utils/net/__init__.py b/virttest/vt_utils/net/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_utils/net/drivers/__init__.py b/virttest/vt_utils/net/drivers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_utils/net/drivers/bridge.py b/virttest/vt_utils/net/drivers/bridge.py new file mode 100644 index 0000000000..5bb4d7a739 --- /dev/null +++ b/virttest/vt_utils/net/drivers/bridge.py @@ -0,0 +1,68 @@ +# Library for the bridge address related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import json + +from avocado.utils import process + +from virttest.vt_utils.net import interface + + +def find_bridge_name(iface): + """ + Finds bridge name based in the interface name. + + :param iface: The interface name. + :type iface: String. + + :return: The bridge name. If not found, return None. + :rtype: String or None. + """ + cmd = "bridge -json link" + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + bridge_info = json.loads(output) + for bridge in bridge_info: + if bridge["ifname"] in (iface,): + return bridge["master"] + return None + + +def add_to_bridge(ifname, brname): + """ + Add the interface into the bridge. + + :param ifname: The interface name. + :type ifname: String. + + :param brname: The bridge name. + :type brname: String. + """ + # To add an interface into the bridge, its state must be up. + interface.bring_up_ifname(ifname) + cmd_add_to_bridge = "ip link set %s master %s" % (ifname, brname) + process.run(cmd_add_to_bridge, shell=True) + + +def del_from_bridge(ifname): + """ + Delete the interface from the bridge. + NOTE: Bringing the interface down is excluded at this function. + You may call the function about bringing interface down after del_from_bridge(). + + :param ifname: The interface name. + :type ifname: String. + """ + cmd = "ip link set %s nomaster" % ifname + process.run(cmd, shell=True) diff --git a/virttest/vt_utils/net/drivers/macvtap.py b/virttest/vt_utils/net/drivers/macvtap.py new file mode 100644 index 0000000000..e1a2e7b16d --- /dev/null +++ b/virttest/vt_utils/net/drivers/macvtap.py @@ -0,0 +1,126 @@ +# Library for the macvtap address related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import os + +from avocado.utils import process + +from virttest import arch, utils_misc +from virttest.vt_utils.net import interface as iface + + +def create_macvtap(iface=None, tapname=None, mode="vepa", mac_addr=None): + """ + Create a Macvtap device. + + :param iface: The physical interface. + :type iface: String. + :param tapname: The macvtap name. + :type tapname: String. + :param mode: The macvtap type mode (vepa, bridge, private) + :type mode: String. + :param mac_addr: The macvtap mac address. + :type mac_addr: String. + + :raise: A RuntimeError may be raised if running command fails. + """ + if not iface: + iface = get_macvtap_base_iface(iface) + if not tapname: + tapname = "macvtap" + utils_misc.generate_random_id() + cmd = "ip link add link %s name %s type %s" % (iface, tapname, mode) + process.run(cmd, shell=True) + if mac_addr: + cmd = "ip link set %s address %s up" % (tapname, mac_addr) + process.run(cmd, shell=True) + + +def show_macvtap(tapname): + """ + Show the macvtap details. + + :param tapname: The macvtap name. + :type tapname: String. + + :return: The macvtap details. + :rtype: String. + """ + cmd = "ip link show %s" % tapname + cmd_obj = process.run(cmd, shell=True) + return cmd_obj.stdout_text.strip() + + +def delete_macvtap(tapname): + """ + Delete the macvtap. + + :param tapname: The macvtap name. + :type tapname: String. + """ + cmd = "ip link delete %s" % tapname + process.run(cmd, shell=True) + + +def get_macvtap_base_iface(base_interface=None): + """ + Get physical interface to create macvtap, if you assigned base interface + is valid(not belong to any bridge and is up), will use it; else use the + first physical interface, which is not a brport and up. + """ + tap_base_device = None + + (dev_int, _) = iface.get_sorted_net_if() + + if base_interface and base_interface in dev_int: + if (not iface.net_if_is_brport(base_interface)) and ( + iface.is_iface_flag_same(base_interface, arch.IFF_UP) + ): + tap_base_device = base_interface + + if not tap_base_device: + for interface in dev_int: + if iface.net_if_is_brport(interface): + continue + if iface.is_iface_flag_same(interface, arch.IFF_UP): + tap_base_device = interface + break + + return tap_base_device + + +def open_macvtap(macvtap, queues=1): + """ + Open a macvtap device and returns its file descriptors which are used by + fds= parameter of qemu. + + For single queue, only returns one file descriptor, it's used by + fd= legacy parameter of qemu. + + If you not have a switch support vepa in you env, run this type case you + need at least two nic on you host [just workaround]. + + :param macvtap: The macvtap name. + :type macvtap: String. + :param queues: Queue number. + :type queues: Integer. + + :return: The file descriptors which are used + by fds= parameter of qemu. + :rtype: String. + """ + tapfds = [] + macvtap = "/dev/%s" % macvtap + for queue in range(int(queues)): + tapfds.append(str(os.open(macvtap, os.O_RDWR))) + return ":".join(tapfds) diff --git a/virttest/vt_utils/net/drivers/ovs.py b/virttest/vt_utils/net/drivers/ovs.py new file mode 100644 index 0000000000..72c61ac6bc --- /dev/null +++ b/virttest/vt_utils/net/drivers/ovs.py @@ -0,0 +1,61 @@ +# Library for the openvswitch related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +from avocado.utils import process + + +def ovs_br_exists(brname): + """ + Check if bridge exists or not. + + :param brname: Name of the bridge. + :type brname: String. + + :return: True if found, otherwise, return False. + :rtype: Boolean. + + :raise: RuntimeError if executing command fails. + """ + cmd = "ovs-vsctl br-exists %s" % brname + cmd_obj = process.run(cmd, shell=True) + status = cmd_obj.exit_status + if status != 0: + return False + return True + + +def add_ovs_bridge(brname): + """ + Add a bridge. + + :param brname: Name of the bridge. + :type brname: String. + + :raise: RuntimeError if executing command fails. + """ + cmd = "ovs-vsctl --may-exist add-br %s" % brname + process.run(cmd, shell=True) + + +def del_ovs_bridge(brname): + """ + Delete a bridge from ovs. + + :param brname: Name of the bridge. + :type brname: String. + + :raise: RuntimeError if executing command fails. + """ + cmd = "ovs-vsctl --if-exists del-br %s" % brname + process.run(cmd, shell=True) diff --git a/virttest/vt_utils/net/interface.py b/virttest/vt_utils/net/interface.py new file mode 100644 index 0000000000..6564aa2024 --- /dev/null +++ b/virttest/vt_utils/net/interface.py @@ -0,0 +1,400 @@ +# Library for the interface related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import fcntl +import json +import os +import re +import socket +import struct + +from avocado.utils import process + +from virttest import arch + + +def set_iface_flag(ifname, flag, active=True): + """ + Set flag of the given interface. + + :param ifname: The interface name. + :type ifname: String. + :param flag: The flag. ( Such as IFF_BROADCAST or 1<<1 or 2) + :type flag: Integer. + :param active: Activate the flag given. Otherwise, deactivate the flag given. + :type active: Boolean. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + # Get existing device flags + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sockfd = sock.fileno() + ifreq = struct.pack("16sh", ifname.encode(), 0) + flags = struct.unpack("16sh", fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] + + # Set new flags + if active: + flags = flags | flag + else: + flags = flags & ~flag + + ifreq = struct.pack("16sh", ifname.encode(), flags) + fcntl.ioctl(sockfd, arch.SIOCSIFFLAGS, ifreq) + + +def is_iface_flag_same(ifname, flag): + """ + Check whether the flag given is as same as the current flags of interface. + + :param ifname: The interface name. + :type ifname: String. + :param flag: The flag. + :type flag: Integer. + + :return: Return true if it's same. Otherwise, return false. + :rtype: Boolean. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sockfd = sock.fileno() + ifreq = struct.pack("16sh", ifname.encode(), 0) + flags = struct.unpack("16sh", fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] + + return True if flags & flag else False + + +def create_ifname_index_mapping(ifname): + """ + Map an interface name into its corresponding index. + Returns 0 on error, as 0 is not a valid index. + + :param ifname: The interface name. + :type ifname: String. + + :return: The index of the given interface. + :rtype: Integer. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + ifr = struct.pack("16si", ifname.encode(), 0) + r = fcntl.ioctl(sock, arch.SIOCGIFINDEX, ifr) + index = struct.unpack("16si", r)[1] + return index + + +def vnet_support_probe(tapfd, flag): + """ + Check if a flag is support by tun. + + :param tapfd: The file descriptor of /dev/net/tun. + :type tapfd: Integer. + :param flag: The flag checked. ( Such as IFF_MULTI_QUEUE, IFF_VNET_HDR ) + :type flag: String. + + :return: Return true if support. Otherwise, return false. + :rtype: Boolean. + """ + u = struct.pack("I", 0) + try: + r = fcntl.ioctl(tapfd, arch.TUNGETFEATURES, u) + except OverflowError: + return False + flags = struct.unpack("I", r)[0] + flag = eval("arch.%s" % flag) + return True if flags & flag else False + + +def get_net_if_operstate(ifname): + """ + Get linux network device operstate. + + :param ifname: Name of the interface. + :type ifname: String + + :return: the operstate of device. ( Such as up, down ) + :rtype: String + + :raise: A RuntimeError may be raised if running command fails. + Example: The ifname is an un-existed interface. + """ + cmd = "cat /sys/class/net/%s/operstate" % ifname + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + return output + + +def get_net_if_ip_addrs(ifname): + """ + Get network device ip addresses. + + :param ifname: Name of interface. + :type ifname: String. + + :return: List ip addresses of network interface. + { + "ipv4": xxx, + "ipv6": xxx, + "mac": xxx, + } + :rtype: Dictionary. + + :raise: A RuntimeError may be raised if running command fails. + """ + cmd = "ip addr show %s" % ifname + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + return { + "ipv4": re.findall("inet (.+?)/..?", output, re.MULTILINE), + "ipv6": re.findall("inet6 (.+?)/...?", output, re.MULTILINE), + "mac": re.findall("link/ether (.+?) ", output, re.MULTILINE), + } + + +def set_net_if_ip_addrs(if_name, ip_addr): + """ + Set network device ip addresses. + + :param if_name: Name of interface. + :type if_name: String. + :param ip_addr: Interface ip addr in format "ip_address/mask". + :type ip_addr: String. + + :raise: process.CmdError. + """ + cmd = "ip addr add %s dev %s" % (ip_addr, if_name) + process.run(cmd, shell=True) + + +def del_net_if_ip_addrs(if_name, ip_addr): + """ + Delete network device ip addresses. + + :param if_name: Name of interface. + :type if_name: String. + :param ip_addr: Interface ip addr in format "ip_address/mask". + :type ip_addr: String. + + :raise: process.CmdError. + """ + cmd = "ip addr del %s dev %s" % (ip_addr, if_name) + process.run(cmd, shell=True) + + +def get_net_if(state=".*", qdisc=".*", optional=".*"): + """ + Filter the all interfaces based on the parameters. + + :param state: interface state get from ip link. + :type state: String. + :param qdisc: interface qdisc get from ip link. + :type qdisc: String. + :param optional: optional match for interface find. + :type optional: String. + + :return: List of network interfaces. + ['lo', 'eno12409', 'eno8403', 'switch'] + :rtype: List. + + :raise: A RuntimeError may be raised if running command fails. + """ + cmd = "ip link" + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + return re.findall( + r"^\d+: (\S+?)[@:].*%s.*%s.*state %s.*$" % (optional, qdisc, state), + output, + re.MULTILINE, + ) + + +def bring_up_ifname(ifname): + """ + Bring up an interface. + + :param ifname: Name of the interface. + :type ifname: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("16sh", ifname.encode(), arch.IFF_UP) + fcntl.ioctl(sock, arch.SIOCSIFFLAGS, ifr) + + +def bring_down_ifname(ifname): + """ + Bring down an interface. + + :param ifname: Name of the interface. + :type ifname: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("16sh", ifname.encode(), 0) + fcntl.ioctl(sock, arch.SIOCSIFFLAGS, ifr) + + +def net_if_set_macaddress(ifname, mac): + """ + Set the mac address for an interface. + + :param ifname: Name of the interface. + :type ifname: String. + :param mac: Mac address. + :type mac: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("256s", ifname.encode()) + mac_dev = fcntl.ioctl(sock, arch.SIOCGIFHWADDR, ifr)[18:24] + mac_dev = ":".join(["%02x" % ord(m) for m in mac_dev]) + + if mac_dev.lower() == mac.lower(): + return + + ifr = struct.pack( + "16sH14s", + ifname.encode(), + 1, + b"".join([chr(int(m, 16)) for m in mac.split(":")]), + ) + fcntl.ioctl(sock, arch.SIOCSIFHWADDR, ifr) + + +def net_get_ifname(mac_address=""): + """ + Get the interface name through the mac address. + + :param mac_address: The mac address of nic. + :type mac_address: String. + + :return: The names of interface. + :rtype: List. + + :raise: IOError or RuntimeError. + """ + ifname_list = [] + cmd = "ip -json link" + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + ip_info = json.loads(output) + for ip in ip_info: + if mac_address == ip["address"]: + ifname_list.append(ip["ifname"]) + return ifname_list + + +def net_get_iface_info(iface="", mac=None): + """ + Get info of certain interface with given mac address or + interface name. + + :param iface: The name of given interface, defaults to "". + :type iface: String. + :param mac: Mac address of given interface, defaults to None. + :type mac: String. + + :return: Info of interface, None if not get any. + :rtype: Dictionary. + + :raise: IOError or RuntimeError. + """ + cmd = "ip -json addr show %s" % iface + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + ip_info = json.loads(output) + if mac: + for iface in ip_info: + if iface.get("address") == mac: + return iface + return None + if iface: + return ip_info[0] + else: + return ip_info + + +def get_gateway(default_iface=False, ip_ver="ipv4", force_dhcp=False, ifname=""): + """ + Get the default gateway or interface. + + :param default_iface: Whether default interface (True), or default gateway + (False) is returned, defaults to False. + :type default_iface: Boolean. + :param ip_ver: The ip version, defaults to "ipv4". + :type ip_ver: String. + :param force_dhcp: Check whether the protocol is DHCP. + :type force_dhcp: Boolean. + :param ifname: If given, get default gateway only for this device. + :type ifname: String. + + :return: The gateway. + :rtype: String. + + :raise: RuntimeError raised if command fails. + """ + cmd = "ip -json -6 route" if ip_ver == "ipv6" else "ip -json route" + cmd_obj = process.run(cmd, shell=True) + output = cmd_obj.stdout_text.strip() + ip_info = json.loads(output) + gateways = [] + for ip in ip_info: + if not ifname or ifname == ip.get("dev"): + if not force_dhcp or (force_dhcp and ip.get("protocol") == "dhcp"): + if default_iface: + gateways.append(ip.get("dev")) + else: + gateways.append(ip.get("gateway")) + return "\n".join(gateways) + + +def get_sorted_net_if(): + """ + Get all network interfaces, but sort them among physical and virtual if. + + :return: (physical interfaces, virtual interfaces). + :rtype: Tuple. + """ + SYSFS_NET_PATH = "/sys/class/net" + all_interfaces = get_net_if() + phy_interfaces = [] + vir_interfaces = [] + for d in all_interfaces: + path = os.path.join(SYSFS_NET_PATH, d) + if not os.path.isdir(path): + continue + if not os.path.exists(os.path.join(path, "device")): + vir_interfaces.append(d) + else: + phy_interfaces.append(d) + return (phy_interfaces, vir_interfaces) + + +def net_if_is_brport(ifname): + """ + Check Whether this Interface is a bridge port_to_br. + + :param ifname: The interface name. + :type ifname: String. + + :return: True if it's a bridge port_to_br, otherwise return False. + :rtype: Boolean. + """ + SYSFS_NET_PATH = "/sys/class/net" + path = os.path.join(SYSFS_NET_PATH, ifname) + return os.path.exists(os.path.join(path, "brport")) diff --git a/virttest/vt_utils/net/ip.py b/virttest/vt_utils/net/ip.py new file mode 100644 index 0000000000..6a287b3791 --- /dev/null +++ b/virttest/vt_utils/net/ip.py @@ -0,0 +1,79 @@ +# Library for the ip address related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import ipaddress as pyipaddr +import re + +from avocado.utils import process + + +def gen_ipv4_addr(network_num="10.0.0.0", network_prefix="24", exclude_ips=[]): + """ + Generate ipv4 address. + + :param network_num: Network number to be used. + :type network_num: String. + :param network_prefix: Prefix used to get subnet mask to calculate ip range. + :type network_prefix: String. + :param exclude_ips: The list of ipaddress should be excluded. + :type exclude_ips: List. + + :return: The ip address. + :rtype: String + """ + ip_regex = "^\d+.\d+.\d+.\d+$" + exclude_ips = set(exclude_ips) + if not re.match(ip_regex, network_num): + network_num = "10.0.0.0" + if not exclude_ips and network_prefix == "24": + exclude_ips.add(network_num) + exclude_ips.add(".".join(network_num.split(".")[0:3]) + ".%s" % str(1)) + exclude_ips.add((".".join(network_num.split(".")[0:3]) + ".%s" % str(255))) + network = pyipaddr.ip_network("%s/%s" % (network_num, network_prefix)) + for ip_address in network: + if str(ip_address) not in exclude_ips: + yield str(ip_address) + + +def get_correspond_ip(remote_ip): + """ + Get local ip address which is used to contact remote ip. + + :param remote_ip: Remote ip. + :type remote_ip: String. + + :return: Local corespond IP. + :rtype: String. + """ + result = process.run("ip route get %s" % remote_ip).stdout_text + local_ip = re.search("src (.+)", result) + if local_ip is not None: + local_ip = local_ip.groups()[0] + return local_ip + + +def append_hosts(hostname_ip_dict): + """ + Method to map ipaddress and hostname for resolving appropriately + in /etc/hosts file. + + :param hostname_ip_dict: The mapping of hostname and ipaddress. + :type hostname_ip_dict: Dictionary. + + :raise: RuntimeError. + """ + hosts_file = "/etc/hosts" + for hostname, ip in hostname_ip_dict: + cmd = "echo '%s %s' >> %s" % (ip, hostname, hosts_file) + process.run(cmd, shell=True) diff --git a/virttest/vt_utils/net/mac.py b/virttest/vt_utils/net/mac.py new file mode 100644 index 0000000000..01c7ec9bca --- /dev/null +++ b/virttest/vt_utils/net/mac.py @@ -0,0 +1,33 @@ +# Library for the mac address related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +from virttest.vt_utils import tool + + +def generate_mac_address_simple(): + """ + Generate a random mac address. + + :return: A random mac address. + :rtype: String. + """ + ieee_eui8_assignment = tool.ieee_eui_assignment(8)(0, repeat=False) + mac = "9a:%02x:%02x:%02x:%02x:%02x" % ( + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + ) + return mac diff --git a/virttest/vt_utils/net/tap.py b/virttest/vt_utils/net/tap.py new file mode 100644 index 0000000000..ce881cf9ee --- /dev/null +++ b/virttest/vt_utils/net/tap.py @@ -0,0 +1,60 @@ +# Library for the tap device related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import fcntl +import os +import struct + +from virttest import arch +from virttest.vt_utils.net import interface + + +def open_tap(devname, ifname, queues=1, vnet_hdr=True): + """ + Open a tap device and returns its file descriptors which are used by + fds= parameter of qemu. + + For single queue, only returns one file descriptor, it's used by + fd= legacy parameter of qemu. + + :param devname: TUN device path. + :type devname: String. + :param ifname: TAP interface name. + :type ifname: String. + :param queues: Queue number. + :type queues: Integer. + :param vnet_hdr: Whether enable the vnet header. + :type vnet_hdr: Boolean. + + :return: The file descriptors which are used by fds= parameter + of qemu. + :rtype: String. + """ + tapfds = [] + + for i in range(int(queues)): + tapfds.append(str(os.open(devname, os.O_RDWR))) + + flags = arch.IFF_TAP | arch.IFF_NO_PI + + if interface.vnet_support_probe(int(tapfds[i]), "IFF_MULTI_QUEUE"): + flags |= arch.IFF_MULTI_QUEUE + + if vnet_hdr and interface.vnet_support_probe(int(tapfds[i]), "IFF_VNET_HDR"): + flags |= arch.IFF_VNET_HDR + + ifr = struct.pack("16sh", ifname.encode(), flags) + fcntl.ioctl(int(tapfds[i]), arch.TUNSETIFF, ifr) + + return ":".join(tapfds) diff --git a/virttest/vt_utils/tool.py b/virttest/vt_utils/tool.py new file mode 100644 index 0000000000..9330f6f101 --- /dev/null +++ b/virttest/vt_utils/tool.py @@ -0,0 +1,89 @@ +# Library for the utils tool related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import hashlib +import uuid + + +def ieee_eui_generator(base, mask, start=0, repeat=False): + """ + IEEE extended unique identifier(EUI) generator. + + :param base: The base identifier number. + :type base: Integer. + :param mask: The mask to calculate identifiers. + :type mask: Integer. + :param start: The ordinal number of the first identifier. + :type start: Integer. + :param repeat: Whether use repeated identifiers when exhausted. + :type repeat: Boolean. + + :return generator: The target EUI generator. + :rtype: Iterator. + """ + offset = 0 + while True: + out = base + ((start + offset) & mask) + yield out + offset += 1 + if offset > mask: + if not repeat: + break + offset = 0 + + +def ieee_eui_assignment(eui_bits): + """ + IEEE EUI assignment. + + :param eui_bits: The number of EUI bits. + :type eui_bits: Integer. + + :return: Function object. + :rtype: Function object. + """ + + def assignment(oui_bits, prefix=0, repeat=False): + """ + The template of assignment. + + :param oui_bits: The number of OUI bits. + :type oui_bits: Integer. + :param prefix: The prefix of OUI, for example 0x9a. + :type prefix: Integer. + :param repeat: Whether use repeated identifiers when exhausted. + :type repeat: Boolean. + + :return: Iterator. + :rtype: Iterator. + """ + # Using UUID1 combine with `__file__` to avoid getting the same hash + data = uuid.uuid1().hex + __file__ + data = hashlib.sha256(data.encode()).digest()[: (eui_bits // 8)] + sample = 0 + for num in bytearray(data): + sample <<= 8 + sample |= num + bits = eui_bits - oui_bits + mask = (1 << bits) - 1 + start = sample & mask + base = sample ^ start + if prefix > 0: + pbits = eui_bits + (-(prefix.bit_length()) // 4) * 4 + pmask = (1 << pbits) - 1 + prefix <<= pbits + base = prefix | (base & pmask) + return ieee_eui_generator(base, mask, start, repeat=repeat) + + return assignment