From d9c8a628eaef9058d8fb6d661dbad319ad8d92cd Mon Sep 17 00:00:00 2001 From: "Houqi (Nick) Zuo" Date: Wed, 24 Apr 2024 14:39:28 +0800 Subject: [PATCH] net/if.py: refactor module Refactor the utils_net.py: 1. Split net/if module from utils_net module. Signed-off-by: Houqi (Nick) Zuo --- virttest/vt_utils/net/__init__.py | 0 virttest/vt_utils/net/if.py | 423 ++++++++++++++++++++++++++++++ virttest/vt_utils/tool.py | 89 +++++++ 3 files changed, 512 insertions(+) create mode 100644 virttest/vt_utils/net/__init__.py create mode 100644 virttest/vt_utils/net/if.py create mode 100644 virttest/vt_utils/tool.py diff --git a/virttest/vt_utils/net/__init__.py b/virttest/vt_utils/net/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_utils/net/if.py b/virttest/vt_utils/net/if.py new file mode 100644 index 0000000000..ad6c7e3db2 --- /dev/null +++ b/virttest/vt_utils/net/if.py @@ -0,0 +1,423 @@ +# Library for the interface related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import fcntl +import ipaddress as pyipaddr +import json +import re +import socket +import struct + +from avocado.utils import process + +from virttest import arch +from virttest.vt_utils import tool + + +def set_iface_flag(ifname, flag, active=True): + """ + Set flag of the given interface. + + :param ifname: The interface name. + :type ifname: String. + :param flag: The flag. ( Such as IFF_BROADCAST or 1<<1 or 2) + :type flag: Integer. + :param active: Activate the flag given. Otherwise, deactivate the flag given. + :type active: Boolean. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + # Get existing device flags + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sockfd = sock.fileno() + ifreq = struct.pack("16sh", ifname.encode(), 0) + flags = struct.unpack("16sh", fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] + + # Set new flags + if active: + flags = flags | flag + else: + flags = flags & ~flag + + ifreq = struct.pack("16sh", ifname.encode(), flags) + fcntl.ioctl(sockfd, arch.SIOCSIFFLAGS, ifreq) + + +def is_iface_flag_same(ifname, flag): + """ + Check whether the flag given is as same as the current flags of interface. + + :param ifname: The interface name. + :type ifname: String. + :param flag: The flag. + :type flag: Integer. + + :return: Return true if it's same. Otherwise, return false. + :rtype: Boolean. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sockfd = sock.fileno() + ifreq = struct.pack("16sh", ifname.encode(), 0) + flags = struct.unpack("16sh", fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] + + return True if flags & flag else False + + +def create_ifname_index_mapping(ifname): + """ + Map an interface name into its corresponding index. + Returns 0 on error, as 0 is not a valid index. + + :param ifname: The interface name. + :type ifname: String. + + :return: The index of the given interface. + :rtype: Integer. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + ifr = struct.pack("16si", ifname.encode(), 0) + r = fcntl.ioctl(sock, arch.SIOCGIFINDEX, ifr) + index = struct.unpack("16si", r)[1] + return index + + +def vnet_support_probe(tapfd, flag): + """ + Check if a flag is support by tun. + + :param tapfd: The file descriptor of /dev/net/tun. + :type tapfd: Integer. + :param flag: The flag checked. ( Such as IFF_MULTI_QUEUE, IFF_VNET_HDR ) + :type flag: String. + + :return: Return true if support. Otherwise, return false. + :rtype: Boolean. + """ + u = struct.pack("I", 0) + try: + r = fcntl.ioctl(tapfd, arch.TUNGETFEATURES, u) + except OverflowError: + return False + flags = struct.unpack("I", r)[0] + flag = eval("arch.%s" % flag) + return True if flags & flag else False + + +def get_net_if_operstate(ifname): + """ + Get linux network device operstate. + + :param ifname: Name of the interface. + :type ifname: String + + :return: the operstate of device. ( Such as up, down ) + :rtype: String + + :raise: A RuntimeError may be raised if running command fails. + Example: The ifname is an un-existed interface. + """ + cmd = "cat /sys/class/net/%s/operstate" % ifname + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + return output + + +def get_net_if_ip_addrs(ifname): + """ + Get network device ip addresses. + + :param ifname: Name of interface. + :type ifname: String. + + :return: List ip addresses of network interface. + { + "ipv4": xxx, + "ipv6": xxx, + "mac": xxx, + } + :rtype: Dictionary. + """ + cmd = "ip addr show %s" % (ifname) + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + return { + "ipv4": re.findall("inet (.+?)/..?", output, re.MULTILINE), + "ipv6": re.findall("inet6 (.+?)/...?", output, re.MULTILINE), + "mac": re.findall("link/ether (.+?) ", output, re.MULTILINE), + } + + +def set_net_if_ip_addrs(if_name, ip_addr): + """ + Set network device ip addresses. + + :param if_name: Name of interface. + :type if_name: String. + :param ip_addr: Interface ip addr in format "ip_address/mask". + :type ip_addr: String. + + :raise: RuntimeError. + """ + cmd = "ip addr add %s dev %s" % (ip_addr, if_name) + cmd_obj = process.run(cmd, shell=True) + if cmd_obj.exit_status != 0: + raise RuntimeError("Failed to run command!") + + +def del_net_if_ip_addrs(if_name, ip_addr): + """ + Delete network device ip addresses. + + :param if_name: Name of interface. + :type if_name: String. + :param ip_addr: Interface ip addr in format "ip_address/mask". + :type ip_addr: String. + + :raise: RuntimeError. + """ + cmd = "ip addr del %s dev %s" % (ip_addr, if_name) + cmd_obj = process.run(cmd, shell=True) + if cmd_obj.exit_status != 0: + raise RuntimeError("Failed to run command!") + + +def get_net_if(state=".*", qdisc=".*", optional=".*"): + """ + Filter the all interfaces based on the parameters. + + :param state: interface state get from ip link. + :type state: String. + :param qdisc: interface qdisc get from ip link. + :type qdisc: String. + :param optional: optional match for interface find. + :type optional: String. + + :return: List of network interfaces. + ['lo', 'eno12409', 'eno8403', 'switch'] + :rtype: List. + """ + cmd = "ip link" + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + return re.findall( + r"^\d+: (\S+?)[@:].*%s.*%s.*state %s.*$" % (optional, qdisc, state), + output, + re.MULTILINE, + ) + + +def bring_up_ifname(ifname): + """ + Bring up an interface. + + :param ifname: Name of the interface. + :type ifname: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("16sh", ifname.encode(), arch.IFF_UP) + fcntl.ioctl(sock, arch.SIOCSIFFLAGS, ifr) + + +def bring_down_ifname(ifname): + """ + Bring down an interface. + + :param ifname: Name of the interface. + :type ifname: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("16sh", ifname.encode(), 0) + fcntl.ioctl(sock, arch.SIOCSIFFLAGS, ifr) + + +def net_if_set_macaddress(ifname, mac): + """ + Set the mac address for an interface. + + :param ifname: Name of the interface. + :type ifname: String. + :param mac: Mac address. + :type mac: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) as sock: + ifr = struct.pack("256s", ifname.encode()) + mac_dev = fcntl.ioctl(sock, arch.SIOCGIFHWADDR, ifr)[18:24] + mac_dev = ":".join(["%02x" % ord(m) for m in mac_dev]) + + if mac_dev.lower() == mac.lower(): + return + + ifr = struct.pack( + "16sH14s", + ifname.encode(), + 1, + b"".join([chr(int(m, 16)) for m in mac.split(":")]), + ) + fcntl.ioctl(sock, arch.SIOCSIFHWADDR, ifr) + + +def generate_mac_address_simple(): + """ + Generate a random mac address. + + :return: A random mac address. + :rtype: String. + """ + ieee_eui8_assignment = tool.ieee_eui_assignment(8)(0, repeat=False) + mac = "9a:%02x:%02x:%02x:%02x:%02x" % ( + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + next(ieee_eui8_assignment), + ) + return mac + + +def gen_ipv4_addr(network_num="10.0.0.0", network_prefix="24", exclude_ips=[]): + """ + Generate ipv4 address. + + :param network_num: Network number to be used. + :type network_num: String. + :param network_prefix: Prefix used to get subnet mask to calculate ip range. + :type network_prefix: String. + :param exclude_ips: The list of ipaddress should be excluded. + :type exclude_ips: List. + + :return: The ip address. + :rtype: String + """ + ip_regex = "^\d+.\d+.\d+.\d+$" + exclude_ips = set(exclude_ips) + if not re.match(ip_regex, network_num): + network_num = "10.0.0.0" + if not exclude_ips and network_prefix == "24": + exclude_ips.add(network_num) + exclude_ips.add(".".join(network_num.split(".")[0:3]) + ".%s" % str(1)) + exclude_ips.add((".".join(network_num.split(".")[0:3]) + ".%s" % str(255))) + network = pyipaddr.ip_network("%s/%s" % (network_num, network_prefix)) + for ip_address in network: + if str(ip_address) not in exclude_ips: + yield str(ip_address) + + +def net_get_ifname(mac_address=""): + """ + Get the interface name through the mac address. + + :param mac_address: The mac address of nic. + :type mac_address: String. + + :return: The names of interface. + :rtype: List. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + ifname_list = [] + cmd = "ip -json link" + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + ip_info = json.loads(output) + for ip in ip_info: + if mac_address == ip["address"]: + ifname_list.append(ip["ifname"]) + return ifname_list + + +def net_get_iface_info(iface="", mac=None): + """ + Get info of certain interface with given mac address or + interface name. + + :param iface: The name of given interface, defaults to "". + :type iface: String. + :param mac: Mac address of given interface, defaults to None. + :type mac: String. + + :return: Info of interface, None if not get any. + :rtype: Dictionary. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + cmd = "ip -json addr show %s" % iface + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + ip_info = json.loads(output) + if mac: + for iface in ip_info: + if iface.get("address") == mac: + return iface + return None + if iface: + return ip_info[0] + else: + return ip_info + + +def get_gateway(default_iface=False, ip_ver="ipv4", force_dhcp=False, ifname=""): + """ + Get the default gateway or interface. + + :param default_iface: Whether default interface (True), or default gateway + (False) is returned, defaults to False. + :type default_iface: Boolean. + :param ip_ver: The ip version, defaults to "ipv4". + :type ip_ver: String. + :param force_dhcp: Check whether the protocol is DHCP. + :type force_dhcp: Boolean. + :param ifname: If given, get default gateway only for this device. + :type ifname: String. + + :return: The gateway. + :rtype: String. + + :raise: IOError raised if fcntl.ioctl() command fails. + """ + cmd = "ip -json -6 route" if ip_ver == "ipv6" else "ip -json route" + cmd_obj = process.run(cmd, shell=True) + status, output = cmd_obj.exit_status, cmd_obj.stdout_text.strip() + if status != 0: + raise RuntimeError("Failed to run command!") + ip_info = json.loads(output) + gateways = [] + for ip in ip_info: + if not ifname or ifname == ip.get("dev"): + if not force_dhcp or (force_dhcp and ip.get("protocol") == "dhcp"): + if default_iface: + gateways.append(ip.get("dev")) + else: + gateways.append(ip.get("gateway")) + return "\n".join(gateways) + diff --git a/virttest/vt_utils/tool.py b/virttest/vt_utils/tool.py new file mode 100644 index 0000000000..9330f6f101 --- /dev/null +++ b/virttest/vt_utils/tool.py @@ -0,0 +1,89 @@ +# Library for the utils tool related functions. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; specifically version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat (c) 2024 and Avocado contributors +# Author: Houqi Zuo +import hashlib +import uuid + + +def ieee_eui_generator(base, mask, start=0, repeat=False): + """ + IEEE extended unique identifier(EUI) generator. + + :param base: The base identifier number. + :type base: Integer. + :param mask: The mask to calculate identifiers. + :type mask: Integer. + :param start: The ordinal number of the first identifier. + :type start: Integer. + :param repeat: Whether use repeated identifiers when exhausted. + :type repeat: Boolean. + + :return generator: The target EUI generator. + :rtype: Iterator. + """ + offset = 0 + while True: + out = base + ((start + offset) & mask) + yield out + offset += 1 + if offset > mask: + if not repeat: + break + offset = 0 + + +def ieee_eui_assignment(eui_bits): + """ + IEEE EUI assignment. + + :param eui_bits: The number of EUI bits. + :type eui_bits: Integer. + + :return: Function object. + :rtype: Function object. + """ + + def assignment(oui_bits, prefix=0, repeat=False): + """ + The template of assignment. + + :param oui_bits: The number of OUI bits. + :type oui_bits: Integer. + :param prefix: The prefix of OUI, for example 0x9a. + :type prefix: Integer. + :param repeat: Whether use repeated identifiers when exhausted. + :type repeat: Boolean. + + :return: Iterator. + :rtype: Iterator. + """ + # Using UUID1 combine with `__file__` to avoid getting the same hash + data = uuid.uuid1().hex + __file__ + data = hashlib.sha256(data.encode()).digest()[: (eui_bits // 8)] + sample = 0 + for num in bytearray(data): + sample <<= 8 + sample |= num + bits = eui_bits - oui_bits + mask = (1 << bits) - 1 + start = sample & mask + base = sample ^ start + if prefix > 0: + pbits = eui_bits + (-(prefix.bit_length()) // 4) * 4 + pmask = (1 << pbits) - 1 + prefix <<= pbits + base = prefix | (base & pmask) + return ieee_eui_generator(base, mask, start, repeat=repeat) + + return assignment