Skip to content

Commit

Permalink
Implements the cluster events collection for the telemetry module (#59)
Browse files Browse the repository at this point in the history
* events collection

linting

* config flag fix
  • Loading branch information
tsebastiani authored Oct 31, 2023
1 parent c3f2091 commit ef7cb8a
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 249 deletions.
282 changes: 64 additions & 218 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import ast
import json
import logging
import os
import random
import re
import subprocess
import tarfile
import tempfile
import threading
import time
from pathlib import Path
from queue import Queue
from typing import Dict, List, Optional
import arcaflow_lib_kubernetes
Expand All @@ -19,7 +17,6 @@
from kubernetes.dynamic.client import DynamicClient
from kubernetes.stream import stream
from urllib3 import HTTPResponse
from tzlocal import get_localzone

from krkn_lib.models.k8s import (
PVC,
Expand All @@ -33,7 +30,7 @@
VolumeMount,
)
from krkn_lib.models.telemetry import NodeInfo
from krkn_lib.utils import filter_log_file_worker, find_executable_in_path
from krkn_lib.utils import filter_dictionary
from krkn_lib.utils.safe_logger import SafeLogger


Expand Down Expand Up @@ -2060,224 +2057,73 @@ def is_pod_running(self, pod_name: str, namespace: str) -> bool:
except Exception:
return False

def filter_must_gather_ocp_log_folder(
def collect_cluster_events(
self,
src_dir: str,
dst_dir: str,
start_timestamp: Optional[int],
end_timestamp: Optional[int],
log_files_extension: str,
threads: int,
log_filter_patterns: list[str],
):
start_timestamp: int,
end_timestamp: int,
local_timezone: str,
cluster_timezone: str = "UTC",
limit: int = 500,
) -> Optional[str]:
"""
Filters a folder containing logs collected by the
`oc adm must-gather` command
(usually logs) with a given extension.
The time info is extracted matching each line against
the patterns passed as parameters and within the time
range between `start_timestamp`
and `end_timestamp`. if start and end timestamp
are None the bottom and top time limit
will be removed respectively. The filtering is multithreaded.
The timezone of the client and the cluster will
be applied to the filter and the records.
If a file does not contain relevant rows in the
time range won't be written in the
dst_dir
The output of the filter will be the original file name
with all the folder structure (base
folder not included) added as a prefix and
separated by a dot, eg.
src_dir: /tmp/folder
dst_dir: /tmp/filtered
log_file: /tmp/folder/namespaces/openshift-monitoring/pods/prometheus-k8s-0/logs/current.log # NOQA
output: /tmp/filtered/namespaces.openshift-monitoring.pods.prometheus-k8s-0.logs.current.log # NOQA
:param src_dir: the folder containing the files to be filtered
:param dst_dir: the folder where the filtered logs will be written
:param start_timestamp: timestamp of the first relevant entry, if None
will start from the beginning
:param end_timestamp: timestamp of the last relevant entry, if None
will be collected until the latest
:param log_files_extension: the extension of the files that will be filtered
using wildcards (* will consider all the files, log_file_name.log will consider only this file)
:param threads: the number of threads that will do the job
:param log_filter_patterns: a list of regex that will match and extract the time info that will be
parsed by dateutil.parser (it supports several formats by default but not every date format).
Each pattern *must contain* only 1 group that represent the time string that must be extracted
and parsed
"""

if "~" in src_dir:
src_dir = os.path.expanduser(src_dir)
download_folder = [f.path for f in os.scandir(src_dir) if f.is_dir()]
data_folder = [
f.path for f in os.scandir(download_folder[0]) if f.is_dir()
]
# default remote timestamp will be utc
remote_timezone = "UTC"
local_timezone = f"{get_localzone()}"

if os.path.exists(os.path.join(data_folder[0], "timestamp")):
with open(
os.path.join(data_folder[0], "timestamp"), mode="r"
) as timestamp_file:
line = timestamp_file.readline()
remote_timezone = line.split()[3]
if not os.path.exists(dst_dir):
logging.error("Log destination dir do not exist")
raise Exception("Log destination dir do not exist")
queue = Queue()
log_files = list(Path(data_folder[0]).rglob(log_files_extension))
for file in log_files:
queue.put(file)
Collects cluster events querying `/api/v1/events`
filtered in a given time interval and writes them in
a temporary file in json format.
try:
for _ in range(threads):
worker = threading.Thread(
target=filter_log_file_worker,
args=(
start_timestamp,
end_timestamp,
data_folder[0],
dst_dir,
remote_timezone,
local_timezone,
log_filter_patterns,
queue,
),
)
worker.daemon = True
worker.start()
queue.join()
except Exception as e:
logging.error(f"failed to filter log folder: {str(e)}")
raise e
:param start_timestamp: timestamp of the minimum date
after that the event is relevant
:param end_timestamp: timestamp of the maximum date
before that the event is relevant
:param local_timezone: timezone of the local system
:param cluster_timezone: timezone of the remote cluster
:param limit: limit of the number of events to be fetched
from the cluster
def collect_filter_archive_ocp_logs(
self,
src_dir: str,
dst_dir: str,
kubeconfig_path: str,
start_timestamp: Optional[int],
end_timestamp: Optional[int],
log_filter_patterns: list[str],
threads: int,
safe_logger: SafeLogger,
oc_path: str = None,
) -> str:
"""
Collects, filters and finally creates a tar.gz archive containing
the filtered logs matching the criteria passed as parameters.
The logs are used leveraging the oc CLI with must-gather option
(`oc adm must-gather`)
:param src_dir: the folder containing the files to be filtered
:param dst_dir: the folder where the filtered logs will be written
:param kubeconfig_path: path of the kubeconfig file used by the `oc`
CLI to gather the log files from the cluster
:param start_timestamp: timestamp of the first relevant entry, if None
will start from the beginning
:param end_timestamp: timestamp of the last relevant entry, if None
will be collected until the latest
:param threads: the number of threads that will do the job
:param log_filter_patterns: a list of regex that will match and
extract the time info that will be parsed by dateutil.parser
(it supports several formats by default but not every date format)
:param safe_logger: thread safe logger used to log
the output on a file stream
:param oc_path: the path of the `oc` CLI, if None will
be searched in the PATH
:return: the path of the archive containing the filtered logs
"""

OC_COMMAND = "oc"

if oc_path is None and find_executable_in_path(OC_COMMAND) is None:
safe_logger.error(
f"{OC_COMMAND} command not found in $PATH,"
f" skipping log collection"
)
return
oc = find_executable_in_path(OC_COMMAND)
if oc_path is not None:
if not os.path.exists(oc_path):
safe_logger.error(
f"provided oc command path: {oc_path} is not valid"
)
raise Exception(
f"provided oc command path: {oc_path} is not valid"
)
else:
oc = oc_path

if "~" in kubeconfig_path:
kubeconfig_path = os.path.expanduser(kubeconfig_path)
if "~" in src_dir:
src_dir = os.path.expanduser(src_dir)
if "~" in dst_dir:
dst_dir = os.path.expanduser(dst_dir)

if not os.path.exists(kubeconfig_path):
safe_logger.error(
f"provided kubeconfig path: {kubeconfig_path} is not valid"
)
raise Exception(
f"provided kubeconfig path: {kubeconfig_path} is not valid"
)

if not os.path.exists(src_dir):
safe_logger.error(f"provided workdir path: {src_dir} is not valid")
raise Exception(f"provided workdir path: {src_dir} is not valid")

if not os.path.exists(dst_dir):
safe_logger.error(f"provided workdir path: {dst_dir} is not valid")
raise Exception(f"provided workdir path: {dst_dir} is not valid")

# COLLECT: run must-gather in workdir folder
cur_dir = os.getcwd()
os.chdir(src_dir)
safe_logger.info(f"collecting openshift logs in {src_dir}...")
try:
subprocess.Popen(
[oc, "adm", "must-gather", "--kubeconfig", kubeconfig_path],
stdout=subprocess.DEVNULL,
).wait()
os.chdir(cur_dir)
except Exception as e:
safe_logger.error(
f"failed to collect data from openshift "
f"with oc command: {str(e)}"
)
raise e

# FILTER: filtering logs in
try:
safe_logger.info(f"filtering openshift logs in {dst_dir}...")
self.filter_must_gather_ocp_log_folder(
src_dir,
dst_dir,
start_timestamp,
end_timestamp,
"*.log",
threads,
log_filter_patterns,
)
except Exception as e:
safe_logger.error(f"failed to filter logs: {str(e)}")
raise e

# ARCHIVE: creating tar archive of filtered files
archive_name = os.path.join(dst_dir, "logs.tar.gz")
try:
with tarfile.open(archive_name, "w:gz") as tar:
for file in os.listdir(dst_dir):
path = os.path.join(dst_dir, file)
tar.add(path, arcname=file)
path_params: Dict[str, str] = {}
query_params = {"limit": limit}
header_params: Dict[str, str] = {}
auth_settings = ["BearerToken"]
header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)

path = "/api/v1/events"
(data) = self.api_client.call_api(
path,
"GET",
path_params,
query_params,
header_params,
response_type="str",
auth_settings=auth_settings,
)
events = []
json_obj = ast.literal_eval(data[0])
events_list = reversed(json_obj["items"])
for obj in events_list:
filtered_obj = filter_dictionary(
obj,
"firstTimestamp",
start_timestamp,
end_timestamp,
cluster_timezone,
local_timezone,
)
if filtered_obj:
events.append(filtered_obj)

if len(events) > 0:
file_content = json.dumps(events, indent=2)
with tempfile.NamedTemporaryFile(
delete=False, mode="w"
) as file:
file.write(file_content)
file.flush()
return file.name
return None
except Exception as e:
safe_logger.error(f"failed to create logs archive: {str(e)}")

return archive_name
logging.error(str(e))
return None
Loading

0 comments on commit ef7cb8a

Please sign in to comment.