Skip to content

Commit

Permalink
few small changes
Browse files Browse the repository at this point in the history
Signed-off-by: Paige Patton <[email protected]>
  • Loading branch information
paigerube14 committed Nov 5, 2024
1 parent 9597662 commit 5700746
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 105 deletions.
2 changes: 2 additions & 0 deletions docs/node_scenarios.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ kind was primarily designed for testing Kubernetes itself, but may be used for l
#### GCP
Cloud setup instructions can be found [here](cloud_setup.md#gcp). Sample scenario config can be found [here](https://github.com/krkn-chaos/krkn/blob/main/scenarios/openshift/gcp_node_scenarios.yml).

NOTE: The parallel option is not available for GCP, the api doesn't perform processes at the same time


#### Openstack

Expand Down
38 changes: 22 additions & 16 deletions krkn/scenario_plugins/node_actions/common_node_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@
node_general = False


def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
killable_nodes = kubecli.list_killable_nodes()
for node_name in node_name_list:
if node_name not in killable_nodes:
logging.info(
f"Node with provided ${node_name} does not exist or the node might "
"be in NotReady state."
)
return
return node_name_list


# Pick a random node with specified label selector
def get_node(node_name, label_selector, instance_kill_count, kubecli: KrknKubernetes):
if node_name in kubecli.list_killable_nodes():
return [node_name]
elif node_name:
logging.info(
"Node with provided node_name does not exist or the node might "
"be in NotReady state."
)
nodes = kubecli.list_killable_nodes(label_selector)
def get_node(label_selector, instance_kill_count, kubecli: KrknKubernetes):

label_selector_list = label_selector.split(",")
nodes = []
for label_selector in label_selector_list:
nodes.extend(kubecli.list_killable_nodes(label_selector))
if not nodes:
raise Exception("Ready nodes with the provided label selector do not exist")
logging.info("Ready nodes with the label selector %s: %s" % (label_selector, nodes))
logging.info("Ready nodes with the label selector %s: %s" % (label_selector_list, nodes))
number_of_nodes = len(nodes)
if instance_kill_count == number_of_nodes:
return nodes
Expand All @@ -35,22 +44,19 @@ def get_node(node_name, label_selector, instance_kill_count, kubecli: KrknKubern
# krkn_lib
# Wait until the node status becomes Ready
def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes):
resource_version = kubecli.get_node_resource_version(node)
kubecli.watch_node_status(node, "True", timeout, resource_version)
kubecli.watch_node_status(node, "True", timeout)


# krkn_lib
# Wait until the node status becomes Not Ready
def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes):
resource_version = kubecli.get_node_resource_version(node)
kubecli.watch_node_status(node, "False", timeout, resource_version)
kubecli.watch_node_status(node, "False", timeout)


# krkn_lib
# Wait until the node status becomes Unknown
def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes):
resource_version = kubecli.get_node_resource_version(node)
kubecli.watch_node_status(node, "Unknown", timeout, resource_version)
kubecli.watch_node_status(node, "Unknown", timeout)


# Get the ip of the cluster node
Expand Down
190 changes: 110 additions & 80 deletions krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import time
from multiprocessing.pool import ThreadPool
from itertools import repeat

import yaml
from krkn_lib.k8s import KrknKubernetes
Expand Down Expand Up @@ -120,100 +122,128 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes):
def inject_node_scenario(
self, action, node_scenario, node_scenario_object, kubecli: KrknKubernetes
):
generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario")
# Get the node scenario configurations
run_kill_count = get_yaml_item_value(node_scenario, "runs", 1)

# Get the node scenario configurations for setting nodes

instance_kill_count = get_yaml_item_value(node_scenario, "instance_count", 1)
node_name = get_yaml_item_value(node_scenario, "node_name", "")
label_selector = get_yaml_item_value(node_scenario, "label_selector", "")
parallel_nodes = get_yaml_item_value(node_scenario, "parallel", False)

# Get the node to apply the scenario
if node_name:
node_name_list = node_name.split(",")
nodes = common_node_functions.get_node_by_name(node_name_list, kubecli)
else:
nodes = common_node_functions.get_node(
label_selector, instance_kill_count, kubecli
)

# GCP api doesn't support multiprocessing calls, will only actually run 1
if parallel_nodes and node_scenario['cloud_type'].lower() is not "gcp":
self.multiprocess_nodes(nodes, node_scenario_object, action, node_scenario)
else:
for single_node in nodes:
self.run_node(single_node, node_scenario_object, action, node_scenario)

def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario):
try:
logging.info("parallely call to nodes")
# pool object with number of element
pool = ThreadPool(processes=len(nodes))

pool.starmap(self.run_node,zip(nodes, repeat(node_scenario_object), repeat(action), repeat(node_scenario)))

pool.close()
except Exception as e:
logging.info("Error on pool multiprocessing: " + str(e))


def run_node(self, single_node, node_scenario_object, action, node_scenario):
logging.info("action" + str(action))
# Get the scenario specifics for running action nodes
run_kill_count = get_yaml_item_value(node_scenario, "runs", 1)
if action == "node_stop_start_scenario":
duration = get_yaml_item_value(node_scenario, "duration", 120)

timeout = get_yaml_item_value(node_scenario, "timeout", 120)
service = get_yaml_item_value(node_scenario, "service", "")
ssh_private_key = get_yaml_item_value(
node_scenario, "ssh_private_key", "~/.ssh/id_rsa"
)
# Get the node to apply the scenario
if node_name:
node_name_list = node_name.split(",")
else:
node_name_list = [node_name]
for single_node_name in node_name_list:
nodes = common_node_functions.get_node(
single_node_name, label_selector, instance_kill_count, kubecli
generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario")

if node_general and action not in generic_cloud_scenarios:
logging.info(
"Scenario: "
+ action
+ " is not set up for generic cloud type, skipping action"
)
for single_node in nodes:
if node_general and action not in generic_cloud_scenarios:
logging.info(
"Scenario: "
+ action
+ " is not set up for generic cloud type, skipping action"
else:
if action == "node_start_scenario":
node_scenario_object.node_start_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_stop_scenario":
node_scenario_object.node_stop_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_stop_start_scenario":
node_scenario_object.node_stop_start_scenario(
run_kill_count, single_node, timeout, duration
)
elif action == "node_termination_scenario":
node_scenario_object.node_termination_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_reboot_scenario":
node_scenario_object.node_reboot_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_start_kubelet_scenario":
node_scenario_object.stop_start_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "restart_kubelet_scenario":
node_scenario_object.restart_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_kubelet_scenario":
node_scenario_object.stop_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_crash_scenario":
node_scenario_object.node_crash_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_start_helper_node_scenario":
if node_scenario["cloud_type"] != "openstack":
logging.error(
"Scenario: " + action + " is not supported for "
"cloud type "
+ node_scenario["cloud_type"]
+ ", skipping action"
)
else:
if action == "node_start_scenario":
node_scenario_object.node_start_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_stop_scenario":
node_scenario_object.node_stop_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_stop_start_scenario":
node_scenario_object.node_stop_start_scenario(
run_kill_count, single_node, timeout, duration
)
elif action == "node_termination_scenario":
node_scenario_object.node_termination_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_reboot_scenario":
node_scenario_object.node_reboot_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_start_kubelet_scenario":
node_scenario_object.stop_start_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "restart_kubelet_scenario":
node_scenario_object.restart_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_kubelet_scenario":
node_scenario_object.stop_kubelet_scenario(
run_kill_count, single_node, timeout
)
elif action == "node_crash_scenario":
node_scenario_object.node_crash_scenario(
run_kill_count, single_node, timeout
)
elif action == "stop_start_helper_node_scenario":
if node_scenario["cloud_type"] != "openstack":
logging.error(
"Scenario: " + action + " is not supported for "
"cloud type "
+ node_scenario["cloud_type"]
+ ", skipping action"
)
else:
if not node_scenario["helper_node_ip"]:
logging.error("Helper node IP address is not provided")
raise Exception(
"Helper node IP address is not provided"
)
node_scenario_object.helper_node_stop_start_scenario(
run_kill_count, node_scenario["helper_node_ip"], timeout
)
node_scenario_object.helper_node_service_status(
node_scenario["helper_node_ip"],
service,
ssh_private_key,
timeout,
)
else:
logging.info(
"There is no node action that matches %s, skipping scenario"
% action
if not node_scenario["helper_node_ip"]:
logging.error("Helper node IP address is not provided")
raise Exception(
"Helper node IP address is not provided"
)
node_scenario_object.helper_node_stop_start_scenario(
run_kill_count, node_scenario["helper_node_ip"], timeout
)
node_scenario_object.helper_node_service_status(
node_scenario["helper_node_ip"],
service,
ssh_private_key,
timeout,
)
else:
logging.info(
"There is no node action that matches %s, skipping scenario"
% action
)

def get_scenario_types(self) -> list[str]:
return ["node_scenarios"]
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ google-api-python-client==2.116.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.4
krkn-lib==4.0.3
krkn-lib==4.0.4
lxml==5.1.0
kubernetes==28.1.0
numpy==1.26.4
Expand Down
17 changes: 9 additions & 8 deletions scenarios/openshift/aws_node_scenarios.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
node_scenarios:
- actions: # node chaos scenarios to be injected
- actions: # node chaos scenarios to be injected
- node_stop_start_scenario
node_name: # node on which scenario has to be injected; can set multiple names separated by comma
label_selector: node-role.kubernetes.io/worker # when node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection
instance_count: 1 # Number of nodes to perform action/select that match the label selector
runs: 1 # number of times to inject each scenario under actions (will perform on same node each time)
timeout: 360 # duration to wait for completion of node scenario injection
duration: 120 # duration to stop the node before running the start action
cloud_type: aws # cloud type on which Kubernetes/OpenShift runs
node_name: # node on which scenario has to be injected; can set multiple names separated by comma
label_selector: node-role.kubernetes.io/worker # when node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection
instance_count: 2 # Number of nodes to perform action/select that match the label selector
runs: 1 # number of times to inject each scenario under actions (will perform on same node each time)
timeout: 360 # duration to wait for completion of node scenario injection
duration: 20 # duration to stop the node before running the start action
cloud_type: aws # cloud type on which Kubernetes/OpenShift runs
parallel: true # Run action on label or node name in parallel or sequential, defaults to sequential
- actions:
- node_reboot_scenario
node_name:
Expand Down

0 comments on commit 5700746

Please sign in to comment.