From 491f59d152b336c99e087d139ba5a3c0f75bd712 Mon Sep 17 00:00:00 2001 From: Paige Patton Date: Mon, 4 Nov 2024 16:10:53 -0500 Subject: [PATCH] few small changes Signed-off-by: Paige Patton --- docs/node_scenarios.md | 2 + .../node_actions/common_node_functions.py | 38 ++-- .../node_actions_scenario_plugin.py | 190 ++++++++++-------- requirements.txt | 2 +- scenarios/openshift/aws_node_scenarios.yml | 17 +- 5 files changed, 144 insertions(+), 105 deletions(-) diff --git a/docs/node_scenarios.md b/docs/node_scenarios.md index 3913c0f1..6d81b1dd 100644 --- a/docs/node_scenarios.md +++ b/docs/node_scenarios.md @@ -57,6 +57,8 @@ kind was primarily designed for testing Kubernetes itself, but may be used for l #### GCP Cloud setup instructions can be found [here](cloud_setup.md#gcp). Sample scenario config can be found [here](https://github.com/krkn-chaos/krkn/blob/main/scenarios/openshift/gcp_node_scenarios.yml). +NOTE: The parallel option is not available for GCP, the api doesn't perform processes at the same time + #### Openstack diff --git a/krkn/scenario_plugins/node_actions/common_node_functions.py b/krkn/scenario_plugins/node_actions/common_node_functions.py index f4e47ae1..ddd78807 100644 --- a/krkn/scenario_plugins/node_actions/common_node_functions.py +++ b/krkn/scenario_plugins/node_actions/common_node_functions.py @@ -8,19 +8,28 @@ node_general = False +def get_node_by_name(node_name_list, kubecli: KrknKubernetes): + killable_nodes = kubecli.list_killable_nodes() + for node_name in node_name_list: + if node_name not in killable_nodes: + logging.info( + f"Node with provided ${node_name} does not exist or the node might " + "be in NotReady state." + ) + return + return node_name_list + + # Pick a random node with specified label selector -def get_node(node_name, label_selector, instance_kill_count, kubecli: KrknKubernetes): - if node_name in kubecli.list_killable_nodes(): - return [node_name] - elif node_name: - logging.info( - "Node with provided node_name does not exist or the node might " - "be in NotReady state." - ) - nodes = kubecli.list_killable_nodes(label_selector) +def get_node(label_selector, instance_kill_count, kubecli: KrknKubernetes): + + label_selector_list = label_selector.split(",") + nodes = [] + for label_selector in label_selector_list: + nodes.extend(kubecli.list_killable_nodes(label_selector)) if not nodes: raise Exception("Ready nodes with the provided label selector do not exist") - logging.info("Ready nodes with the label selector %s: %s" % (label_selector, nodes)) + logging.info("Ready nodes with the label selector %s: %s" % (label_selector_list, nodes)) number_of_nodes = len(nodes) if instance_kill_count == number_of_nodes: return nodes @@ -35,22 +44,19 @@ def get_node(node_name, label_selector, instance_kill_count, kubecli: KrknKubern # krkn_lib # Wait until the node status becomes Ready def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes): - resource_version = kubecli.get_node_resource_version(node) - kubecli.watch_node_status(node, "True", timeout, resource_version) + kubecli.watch_node_status(node, "True", timeout) # krkn_lib # Wait until the node status becomes Not Ready def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes): - resource_version = kubecli.get_node_resource_version(node) - kubecli.watch_node_status(node, "False", timeout, resource_version) + kubecli.watch_node_status(node, "False", timeout) # krkn_lib # Wait until the node status becomes Unknown def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes): - resource_version = kubecli.get_node_resource_version(node) - kubecli.watch_node_status(node, "Unknown", timeout, resource_version) + kubecli.watch_node_status(node, "Unknown", timeout) # Get the ip of the cluster node diff --git a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py index 421f7472..486e8a21 100644 --- a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py +++ b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py @@ -1,5 +1,7 @@ import logging import time +from multiprocessing.pool import ThreadPool +from itertools import repeat import yaml from krkn_lib.k8s import KrknKubernetes @@ -120,100 +122,128 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): def inject_node_scenario( self, action, node_scenario, node_scenario_object, kubecli: KrknKubernetes ): - generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario") - # Get the node scenario configurations - run_kill_count = get_yaml_item_value(node_scenario, "runs", 1) + + # Get the node scenario configurations for setting nodes + instance_kill_count = get_yaml_item_value(node_scenario, "instance_count", 1) node_name = get_yaml_item_value(node_scenario, "node_name", "") label_selector = get_yaml_item_value(node_scenario, "label_selector", "") + parallel_nodes = get_yaml_item_value(node_scenario, "parallel", False) + + # Get the node to apply the scenario + if node_name: + node_name_list = node_name.split(",") + nodes = common_node_functions.get_node_by_name(node_name_list, kubecli) + else: + nodes = common_node_functions.get_node( + label_selector, instance_kill_count, kubecli + ) + + # GCP api doesn't support multiprocessing calls, will only actually run 1 + if parallel_nodes and node_scenario['cloud_type'].lower() is not "gcp": + self.multiprocess_nodes(nodes, node_scenario_object, action, node_scenario) + else: + for single_node in nodes: + self.run_node(single_node, node_scenario_object, action, node_scenario) + + def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario): + try: + logging.info("parallely call to nodes") + # pool object with number of element + pool = ThreadPool(processes=len(nodes)) + + pool.starmap(self.run_node,zip(nodes, repeat(node_scenario_object), repeat(action), repeat(node_scenario))) + + pool.close() + except Exception as e: + logging.info("Error on pool multiprocessing: " + str(e)) + + + def run_node(self, single_node, node_scenario_object, action, node_scenario): + logging.info("action" + str(action)) + # Get the scenario specifics for running action nodes + run_kill_count = get_yaml_item_value(node_scenario, "runs", 1) if action == "node_stop_start_scenario": duration = get_yaml_item_value(node_scenario, "duration", 120) + timeout = get_yaml_item_value(node_scenario, "timeout", 120) service = get_yaml_item_value(node_scenario, "service", "") ssh_private_key = get_yaml_item_value( node_scenario, "ssh_private_key", "~/.ssh/id_rsa" ) - # Get the node to apply the scenario - if node_name: - node_name_list = node_name.split(",") - else: - node_name_list = [node_name] - for single_node_name in node_name_list: - nodes = common_node_functions.get_node( - single_node_name, label_selector, instance_kill_count, kubecli + generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario") + + if node_general and action not in generic_cloud_scenarios: + logging.info( + "Scenario: " + + action + + " is not set up for generic cloud type, skipping action" ) - for single_node in nodes: - if node_general and action not in generic_cloud_scenarios: - logging.info( - "Scenario: " - + action - + " is not set up for generic cloud type, skipping action" + else: + if action == "node_start_scenario": + node_scenario_object.node_start_scenario( + run_kill_count, single_node, timeout + ) + elif action == "node_stop_scenario": + node_scenario_object.node_stop_scenario( + run_kill_count, single_node, timeout + ) + elif action == "node_stop_start_scenario": + node_scenario_object.node_stop_start_scenario( + run_kill_count, single_node, timeout, duration + ) + elif action == "node_termination_scenario": + node_scenario_object.node_termination_scenario( + run_kill_count, single_node, timeout + ) + elif action == "node_reboot_scenario": + node_scenario_object.node_reboot_scenario( + run_kill_count, single_node, timeout + ) + elif action == "stop_start_kubelet_scenario": + node_scenario_object.stop_start_kubelet_scenario( + run_kill_count, single_node, timeout + ) + elif action == "restart_kubelet_scenario": + node_scenario_object.restart_kubelet_scenario( + run_kill_count, single_node, timeout + ) + elif action == "stop_kubelet_scenario": + node_scenario_object.stop_kubelet_scenario( + run_kill_count, single_node, timeout + ) + elif action == "node_crash_scenario": + node_scenario_object.node_crash_scenario( + run_kill_count, single_node, timeout + ) + elif action == "stop_start_helper_node_scenario": + if node_scenario["cloud_type"] != "openstack": + logging.error( + "Scenario: " + action + " is not supported for " + "cloud type " + + node_scenario["cloud_type"] + + ", skipping action" ) else: - if action == "node_start_scenario": - node_scenario_object.node_start_scenario( - run_kill_count, single_node, timeout - ) - elif action == "node_stop_scenario": - node_scenario_object.node_stop_scenario( - run_kill_count, single_node, timeout - ) - elif action == "node_stop_start_scenario": - node_scenario_object.node_stop_start_scenario( - run_kill_count, single_node, timeout, duration - ) - elif action == "node_termination_scenario": - node_scenario_object.node_termination_scenario( - run_kill_count, single_node, timeout - ) - elif action == "node_reboot_scenario": - node_scenario_object.node_reboot_scenario( - run_kill_count, single_node, timeout - ) - elif action == "stop_start_kubelet_scenario": - node_scenario_object.stop_start_kubelet_scenario( - run_kill_count, single_node, timeout - ) - elif action == "restart_kubelet_scenario": - node_scenario_object.restart_kubelet_scenario( - run_kill_count, single_node, timeout - ) - elif action == "stop_kubelet_scenario": - node_scenario_object.stop_kubelet_scenario( - run_kill_count, single_node, timeout - ) - elif action == "node_crash_scenario": - node_scenario_object.node_crash_scenario( - run_kill_count, single_node, timeout - ) - elif action == "stop_start_helper_node_scenario": - if node_scenario["cloud_type"] != "openstack": - logging.error( - "Scenario: " + action + " is not supported for " - "cloud type " - + node_scenario["cloud_type"] - + ", skipping action" - ) - else: - if not node_scenario["helper_node_ip"]: - logging.error("Helper node IP address is not provided") - raise Exception( - "Helper node IP address is not provided" - ) - node_scenario_object.helper_node_stop_start_scenario( - run_kill_count, node_scenario["helper_node_ip"], timeout - ) - node_scenario_object.helper_node_service_status( - node_scenario["helper_node_ip"], - service, - ssh_private_key, - timeout, - ) - else: - logging.info( - "There is no node action that matches %s, skipping scenario" - % action + if not node_scenario["helper_node_ip"]: + logging.error("Helper node IP address is not provided") + raise Exception( + "Helper node IP address is not provided" ) + node_scenario_object.helper_node_stop_start_scenario( + run_kill_count, node_scenario["helper_node_ip"], timeout + ) + node_scenario_object.helper_node_service_status( + node_scenario["helper_node_ip"], + service, + ssh_private_key, + timeout, + ) + else: + logging.info( + "There is no node action that matches %s, skipping scenario" + % action + ) def get_scenario_types(self) -> list[str]: return ["node_scenarios"] diff --git a/requirements.txt b/requirements.txt index 1a9f5dc6..3f69aff1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ google-api-python-client==2.116.0 ibm_cloud_sdk_core==3.18.0 ibm_vpc==0.20.0 jinja2==3.1.4 -krkn-lib==4.0.3 +krkn-lib==4.0.4 lxml==5.1.0 kubernetes==28.1.0 numpy==1.26.4 diff --git a/scenarios/openshift/aws_node_scenarios.yml b/scenarios/openshift/aws_node_scenarios.yml index 57d00c49..9ce36812 100644 --- a/scenarios/openshift/aws_node_scenarios.yml +++ b/scenarios/openshift/aws_node_scenarios.yml @@ -1,13 +1,14 @@ node_scenarios: - - actions: # node chaos scenarios to be injected + - actions: # node chaos scenarios to be injected - node_stop_start_scenario - node_name: # node on which scenario has to be injected; can set multiple names separated by comma - label_selector: node-role.kubernetes.io/worker # when node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection - instance_count: 1 # Number of nodes to perform action/select that match the label selector - runs: 1 # number of times to inject each scenario under actions (will perform on same node each time) - timeout: 360 # duration to wait for completion of node scenario injection - duration: 120 # duration to stop the node before running the start action - cloud_type: aws # cloud type on which Kubernetes/OpenShift runs + node_name: # node on which scenario has to be injected; can set multiple names separated by comma + label_selector: node-role.kubernetes.io/worker # when node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection + instance_count: 2 # Number of nodes to perform action/select that match the label selector + runs: 1 # number of times to inject each scenario under actions (will perform on same node each time) + timeout: 360 # duration to wait for completion of node scenario injection + duration: 20 # duration to stop the node before running the start action + cloud_type: aws # cloud type on which Kubernetes/OpenShift runs + parallel: true # Run action on label or node name in parallel or sequential, defaults to sequential - actions: - node_reboot_scenario node_name: