diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index c37b7edb..a5aedecb 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -20,6 +20,7 @@ from kubernetes.stream import stream from urllib3 import HTTPResponse from tzlocal import get_localzone + from krkn_lib.models.k8s import ( PVC, ApiRequestException, @@ -209,31 +210,6 @@ def get_host(self) -> str: return self.cli.api_client.configuration.get_default_copy().host - def get_clusterversion_string(self) -> str: - """ - Return clusterversion status text on OpenShift, empty string - on other distributions - - :return: clusterversion status - """ - - try: - cvs = self.custom_object_client.list_cluster_custom_object( - "config.openshift.io", - "v1", - "clusterversions", - ) - for cv in cvs["items"]: - for condition in cv["status"]["conditions"]: - if condition["type"] == "Progressing": - return condition["message"] - return "" - except client.exceptions.ApiException as e: - if e.status == 404: - return "" - else: - raise e - # def list_namespaces(self, label_selector: str = None) -> list[str]: """ @@ -369,7 +345,7 @@ def list_nodes(self, label_selector: str = None) -> list[str]: # TODO: refactoring to work both in k8s and OpenShift def list_killable_nodes(self, label_selector: str = None) -> list[str]: """ - List nodes in the cluster that can be killed (OpenShift only) + List nodes in the cluster that can be killed :param label_selector: filter by label selector (optional default `None`) @@ -1762,80 +1738,6 @@ def get_nodes_infos(self) -> list[NodeInfo]: return result - def get_cluster_infrastructure(self) -> str: - """ - Get the cluster Cloud infrastructure name when available - - :return: the cluster infrastructure name or `Unknown` when unavailable - """ - api_client = self.api_client - if api_client: - try: - path_params: Dict[str, str] = {} - query_params: List[str] = [] - header_params: Dict[str, str] = {} - auth_settings = ["BearerToken"] - header_params["Accept"] = api_client.select_header_accept( - ["application/json"] - ) - - path = "/apis/config.openshift.io/v1/infrastructures/cluster" - (data) = api_client.call_api( - path, - "GET", - path_params, - query_params, - header_params, - response_type="str", - auth_settings=auth_settings, - ) - json_obj = ast.literal_eval(data[0]) - return json_obj["status"]["platform"] - except Exception as e: - logging.warning("V1ApiException -> %s", str(e)) - return "Unknown" - - return None - - def get_cluster_network_plugins(self) -> list[str]: - """ - Get the cluster Cloud network plugins list - - :return: the cluster infrastructure name or `Unknown` when unavailable - """ - api_client = self.api_client - network_plugins = list[str]() - if api_client: - try: - path_params: Dict[str, str] = {} - query_params: List[str] = [] - header_params: Dict[str, str] = {} - auth_settings = ["BearerToken"] - header_params["Accept"] = api_client.select_header_accept( - ["application/json"] - ) - - path = "/apis/config.openshift.io/v1/networks" - (data) = api_client.call_api( - path, - "GET", - path_params, - query_params, - header_params, - response_type="str", - auth_settings=auth_settings, - ) - json_obj = ast.literal_eval(data[0]) - for plugin in json_obj["items"]: - network_plugins.append(plugin["status"]["networkType"]) - except Exception as e: - logging.warning( - "Impossible to retrieve cluster Network plugins -> %s", - str(e), - ) - network_plugins.append("Unknown") - return network_plugins - def delete_file_from_pod( self, pod_name: str, container_name: str, namespace: str, filename: str ): diff --git a/src/krkn_lib/models/telemetry/models.py b/src/krkn_lib/models/telemetry/models.py index a7a32290..34194b56 100644 --- a/src/krkn_lib/models/telemetry/models.py +++ b/src/krkn_lib/models/telemetry/models.py @@ -109,7 +109,7 @@ class ChaosRunTelemetry: """ node_infos: list[NodeInfo] """ - Cloud infrastructure (if available) of the target cluster + List of node Infos collected from the target cluster """ kubernetes_objects_count: dict[str, int] """ @@ -118,7 +118,7 @@ class ChaosRunTelemetry: """ network_plugins: list[str] """ - List of node Infos collected from the target cluster + Network plugins deployed in the target cluster """ node_count: int = 0 """ @@ -126,7 +126,7 @@ class ChaosRunTelemetry: """ cloud_infrastructure: str = "Unknown" """ - Network plugins deployed in the target cluster + Cloud infrastructure (if available) of the target cluster """ run_uuid: str = "" """ @@ -137,7 +137,7 @@ def __init__(self, json_object: any = None): self.scenarios = list[ScenarioTelemetry]() self.node_infos = list[NodeInfo]() self.kubernetes_objects_count = dict[str, int]() - self.network_plugins = list[str]() + self.network_plugins = ["Unknown"] if json_object is not None: scenarios = json_object.get("scenarios") if scenarios is None or isinstance(scenarios, list) is False: diff --git a/src/krkn_lib/ocp/__init__.py b/src/krkn_lib/ocp/__init__.py new file mode 100644 index 00000000..043ae5a2 --- /dev/null +++ b/src/krkn_lib/ocp/__init__.py @@ -0,0 +1 @@ +from .krkn_openshift import * # NOQA diff --git a/src/krkn_lib/ocp/krkn_openshift.py b/src/krkn_lib/ocp/krkn_openshift.py new file mode 100644 index 00000000..6b6c1b2b --- /dev/null +++ b/src/krkn_lib/ocp/krkn_openshift.py @@ -0,0 +1,355 @@ +import ast +import logging +import os +import shutil +import subprocess +import tarfile +import threading + +from pathlib import Path +from queue import Queue +from typing import Optional +from kubernetes import client +from tzlocal import get_localzone +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.utils import filter_log_file_worker, SafeLogger + + +class KrknOpenshift(KrknKubernetes): + def __init__( + self, + kubeconfig_path: str = None, + *, + kubeconfig_string: str = None, + ): + super().__init__( + kubeconfig_path=kubeconfig_path, + kubeconfig_string=kubeconfig_string, + ) + + def get_clusterversion_string(self) -> str: + """ + Return clusterversion status text on OpenShift, empty string + on other distributions + + :return: clusterversion status + """ + + try: + cvs = self.custom_object_client.list_cluster_custom_object( + "config.openshift.io", + "v1", + "clusterversions", + ) + for cv in cvs["items"]: + for condition in cv["status"]["conditions"]: + if condition["type"] == "Progressing": + return condition["message"] + return "" + except client.exceptions.ApiException as e: + if e.status == 404: + return "" + else: + raise e + + def get_cloud_infrastructure(self) -> str: + """ + Get the cluster Cloud infrastructure name when available + + :return: the cluster infrastructure name or `Unknown` when unavailable + """ + api_client = self.api_client + if api_client: + try: + path_params: dict[str, str] = {} + query_params: list[str] = [] + header_params: dict[str, str] = {} + auth_settings = ["BearerToken"] + header_params["Accept"] = api_client.select_header_accept( + ["application/json"] + ) + + path = "/apis/config.openshift.io/v1/infrastructures/cluster" + (data) = api_client.call_api( + path, + "GET", + path_params, + query_params, + header_params, + response_type="str", + auth_settings=auth_settings, + ) + json_obj = ast.literal_eval(data[0]) + return json_obj["status"]["platform"] + except Exception as e: + logging.warning("V1ApiException -> %s", str(e)) + return "Unknown" + + return None + + def get_cluster_network_plugins(self) -> list[str]: + """ + Get the cluster Cloud network plugins list + + :return: the cluster infrastructure name or `Unknown` when unavailable + """ + api_client = self.api_client + network_plugins = list[str]() + if api_client: + try: + path_params: dict[str, str] = {} + query_params: list[str] = [] + header_params: dict[str, str] = {} + auth_settings = ["BearerToken"] + header_params["Accept"] = api_client.select_header_accept( + ["application/json"] + ) + + path = "/apis/config.openshift.io/v1/networks" + (data) = api_client.call_api( + path, + "GET", + path_params, + query_params, + header_params, + response_type="str", + auth_settings=auth_settings, + ) + json_obj = ast.literal_eval(data[0]) + for plugin in json_obj["items"]: + network_plugins.append(plugin["status"]["networkType"]) + except Exception as e: + logging.warning( + "Impossible to retrieve cluster Network plugins -> %s", + str(e), + ) + network_plugins.append("Unknown") + return network_plugins + + def filter_must_gather_ocp_log_folder( + self, + src_dir: str, + dst_dir: str, + start_timestamp: Optional[int], + end_timestamp: Optional[int], + log_files_extension: str, + threads: int, + log_filter_patterns: list[str], + ): + """ + Filters a folder containing logs collected by the + `oc adm must-gather` command + (usually logs) with a given extension. + The time info is extracted matching each line against + the patterns passed as parameters and within the time + range between `start_timestamp` + and `end_timestamp`. if start and end timestamp + are None the bottom and top time limit + will be removed respectively. The filtering is multithreaded. + The timezone of the client and the cluster will + be applied to the filter and the records. + If a file does not contain relevant rows in the + time range won't be written in the + dst_dir + The output of the filter will be the original file name + with all the folder structure (base + folder not included) added as a prefix and + separated by a dot, eg. + src_dir: /tmp/folder + dst_dir: /tmp/filtered + log_file: /tmp/folder/namespaces/openshift-monitoring/pods/prometheus-k8s-0/logs/current.log # NOQA + output: /tmp/filtered/namespaces.openshift-monitoring.pods.prometheus-k8s-0.logs.current.log # NOQA + + + :param src_dir: the folder containing the files to be filtered + :param dst_dir: the folder where the filtered logs will be written + :param start_timestamp: timestamp of the first relevant entry, if None + will start from the beginning + :param end_timestamp: timestamp of the last relevant entry, if None + will be collected until the latest + :param log_files_extension: the extension of the files that will be filtered + using wildcards (* will consider all the files, log_file_name.log will consider only this file) + :param threads: the number of threads that will do the job + :param log_filter_patterns: a list of regex that will match and extract the time info that will be + parsed by dateutil.parser (it supports several formats by default but not every date format). + Each pattern *must contain* only 1 group that represent the time string that must be extracted + and parsed + """ + + if "~" in src_dir: + src_dir = os.path.expanduser(src_dir) + download_folder = [f.path for f in os.scandir(src_dir) if f.is_dir()] + data_folder = [ + f.path for f in os.scandir(download_folder[0]) if f.is_dir() + ] + # default remote timestamp will be utc + remote_timezone = "UTC" + local_timezone = f"{get_localzone()}" + + if os.path.exists(os.path.join(data_folder[0], "timestamp")): + with open( + os.path.join(data_folder[0], "timestamp"), mode="r" + ) as timestamp_file: + line = timestamp_file.readline() + remote_timezone = line.split()[3] + if not os.path.exists(dst_dir): + logging.error("Log destination dir do not exist") + raise Exception("Log destination dir do not exist") + queue = Queue() + log_files = list(Path(data_folder[0]).rglob(log_files_extension)) + for file in log_files: + queue.put(file) + + try: + for _ in range(threads): + worker = threading.Thread( + target=filter_log_file_worker, + args=( + start_timestamp, + end_timestamp, + data_folder[0], + dst_dir, + remote_timezone, + local_timezone, + log_filter_patterns, + queue, + ), + ) + worker.daemon = True + worker.start() + queue.join() + except Exception as e: + logging.error(f"failed to filter log folder: {str(e)}") + raise e + + def collect_filter_archive_ocp_logs( + self, + src_dir: str, + dst_dir: str, + kubeconfig_path: str, + start_timestamp: Optional[int], + end_timestamp: Optional[int], + log_filter_patterns: list[str], + threads: int, + safe_logger: SafeLogger, + oc_path: str = None, + ) -> str: + """ + Collects, filters and finally creates a tar.gz archive containing + the filtered logs matching the criteria passed as parameters. + The logs are used leveraging the oc CLI with must-gather option + (`oc adm must-gather`) + + :param src_dir: the folder containing the files to be filtered + :param dst_dir: the folder where the filtered logs will be written + :param kubeconfig_path: path of the kubeconfig file used by the `oc` + CLI to gather the log files from the cluster + :param start_timestamp: timestamp of the first relevant entry, if None + will start from the beginning + :param end_timestamp: timestamp of the last relevant entry, if None + will be collected until the latest + :param threads: the number of threads that will do the job + :param log_filter_patterns: a list of regex that will match and + extract the time info that will be parsed by dateutil.parser + (it supports several formats by default but not every date format) + :param safe_logger: thread safe logger used to log + the output on a file stream + :param oc_path: the path of the `oc` CLI, if None will + be searched in the PATH + + :return: the path of the archive containing the filtered logs + """ + + OC_COMMAND = "oc" + + if oc_path is None and shutil.which(OC_COMMAND) is None: + safe_logger.error( + f"{OC_COMMAND} command not found in $PATH," + f" skipping log collection" + ) + return + oc = shutil.which(OC_COMMAND) + if oc_path is not None: + if not os.path.exists(oc_path): + safe_logger.error( + f"provided oc command path: {oc_path} is not valid" + ) + raise Exception( + f"provided oc command path: {oc_path} is not valid" + ) + else: + oc = oc_path + + if "~" in kubeconfig_path: + kubeconfig_path = os.path.expanduser(kubeconfig_path) + if "~" in src_dir: + src_dir = os.path.expanduser(src_dir) + if "~" in dst_dir: + dst_dir = os.path.expanduser(dst_dir) + + if not os.path.exists(kubeconfig_path): + safe_logger.error( + f"provided kubeconfig path: {kubeconfig_path} is not valid" + ) + raise Exception( + f"provided kubeconfig path: {kubeconfig_path} is not valid" + ) + + if not os.path.exists(src_dir): + safe_logger.error(f"provided workdir path: {src_dir} is not valid") + raise Exception(f"provided workdir path: {src_dir} is not valid") + + if not os.path.exists(dst_dir): + safe_logger.error(f"provided workdir path: {dst_dir} is not valid") + raise Exception(f"provided workdir path: {dst_dir} is not valid") + + # COLLECT: run must-gather in workdir folder + cur_dir = os.getcwd() + os.chdir(src_dir) + safe_logger.info(f"collecting openshift logs in {src_dir}...") + try: + subprocess.Popen( + [ + oc, + "adm", + "must-gather", + "--kubeconfig", + kubeconfig_path, + ], + stdout=subprocess.DEVNULL, + ).wait() + os.chdir(cur_dir) + except Exception as e: + safe_logger.error( + f"failed to collect data from openshift " + f"with oc command: {str(e)}" + ) + raise e + + # FILTER: filtering logs in + try: + safe_logger.info(f"filtering openshift logs in {dst_dir}...") + self.filter_must_gather_ocp_log_folder( + src_dir, + dst_dir, + start_timestamp, + end_timestamp, + "*.log", + threads, + log_filter_patterns, + ) + except Exception as e: + safe_logger.error(f"failed to filter logs: {str(e)}") + raise e + + # ARCHIVE: creating tar archive of filtered files + archive_name = os.path.join(dst_dir, "logs.tar.gz") + try: + with tarfile.open(archive_name, "w:gz") as tar: + for file in os.listdir(dst_dir): + path = os.path.join(dst_dir, file) + tar.add(path, arcname=file) + except Exception as e: + safe_logger.error(f"failed to create logs archive: {str(e)}") + + return archive_name diff --git a/src/krkn_lib/telemetry/__init__.py b/src/krkn_lib/telemetry/__init__.py deleted file mode 100644 index 908ee867..00000000 --- a/src/krkn_lib/telemetry/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .krkn_telemetry import * # NOQA diff --git a/src/krkn_lib/telemetry/k8s/__init__.py b/src/krkn_lib/telemetry/k8s/__init__.py new file mode 100644 index 00000000..75ae6d72 --- /dev/null +++ b/src/krkn_lib/telemetry/k8s/__init__.py @@ -0,0 +1 @@ +from .krkn_telemetry_kubernetes import * # NOQA diff --git a/src/krkn_lib/telemetry/krkn_telemetry.py b/src/krkn_lib/telemetry/k8s/krkn_telemetry_kubernetes.py similarity index 76% rename from src/krkn_lib/telemetry/krkn_telemetry.py rename to src/krkn_lib/telemetry/k8s/krkn_telemetry_kubernetes.py index bd6ef7ca..37a8efcd 100644 --- a/src/krkn_lib/telemetry/krkn_telemetry.py +++ b/src/krkn_lib/telemetry/k8s/krkn_telemetry_kubernetes.py @@ -1,21 +1,19 @@ import base64 -import os -import datetime import threading import time -from queue import Queue -from typing import Optional - -import requests import yaml - +import requests +import os import krkn_lib.utils as utils + +from queue import Queue from krkn_lib.k8s import KrknKubernetes from krkn_lib.models.telemetry import ChaosRunTelemetry, ScenarioTelemetry +from typing import Optional from krkn_lib.utils.safe_logger import SafeLogger -class KrknTelemetry: +class KrknTelemetryKubernetes: kubecli: KrknKubernetes = None safe_logger: SafeLogger = None @@ -39,12 +37,7 @@ def collect_cluster_metadata(self, chaos_telemetry: ChaosRunTelemetry): the cluster metadata """ self.safe_logger.info("collecting telemetry data, please wait....") - chaos_telemetry.cloud_infrastructure = ( - self.kubecli.get_cluster_infrastructure() - ) - chaos_telemetry.network_plugins = ( - self.kubecli.get_cluster_network_plugins() - ) + chaos_telemetry.kubernetes_objects_count = ( self.kubecli.get_all_kubernetes_object_count( [ @@ -119,18 +112,30 @@ def send_telemetry( self.safe_logger.info("successfully sent telemetry data") return json_data - def get_ocp_prometheus_data( + def get_prometheus_pod_data( self, telemetry_config: dict, request_id: str, + prometheus_pod_name: str, + prometheus_container_name: str, + prometheus_namespace: str, + remote_archive_path: str = "/prometheus", ) -> list[(int, str)]: """ - Downloads the OCP prometheus metrics folder + Downloads the prometheus metrics folder from a prometheus pod :param telemetry_config: krkn telemetry conf section will be stored :param request_id: uuid of the session that will represent the temporary archive files + :param prometheus_pod_name: the name of the prometheus pod from + which the data will be archived + :param prometheus_container_name: the name of the container in the + prometheus pod + :param prometheus_namespace: the namespace in which the prometheus + pod lives + :param remote_archive_path: (Optional) the path where prometheus logs + are stored, if not specified will default to `/prometheus` :return: the list of the archive number and filenames downloaded """ file_list = list[(int, str)]() @@ -182,10 +187,6 @@ def get_ocp_prometheus_data( if not prometheus_backup: return file_list - prometheus_pod_name = "prometheus-k8s-0" - prometheus_container_name = "prometheus" - prometheus_namespace = "openshift-monitoring" - remote_archive_path = "/prometheus" prometheus_pod = self.kubecli.get_pod_info( prometheus_pod_name, prometheus_namespace ) @@ -227,14 +228,14 @@ def get_ocp_prometheus_data( self.safe_logger.error(exception_string) raise Exception(exception_string) - def put_ocp_prometheus_data( + def put_prometheus_data( self, telemetry_config: dict, archive_volumes: list[(int, str)], request_id: str, ): """ - Puts a list of files on telemetry S3 bucket, mulithread. + Puts a list of files on telemetry S3 bucket, multithreading. :param telemetry_config: telemetry section of kraken config.yaml :param archive_volumes: a list of tuples containing the @@ -320,138 +321,6 @@ def put_ocp_prometheus_data( except Exception as e: self.safe_logger.error(str(e)) - def put_ocp_logs( - self, - request_id: str, - telemetry_config: dict, - start_timestamp: int, - end_timestamp: int, - ): - """ - Collects, filters, archive and put on the telemetry S3 Buckets - Openshift logs. Built on top of the `oc adm must-gather` command - and custom filtering. - - :param request_id: uuid of the session that will represent the - S3 folder on which the filtered log files archive will be stored - :param telemetry_config: telemetry section of kraken config.yaml - :param start_timestamp: timestamp of the first relevant entry, if None - will start filter starting from the earliest - :param end_timestamp: timestamp of the last relevant entry, if None - will end filtering until the latest - """ - - queue = Queue() - logs_backup = telemetry_config.get("logs_backup") - url = telemetry_config.get("api_url") - username = telemetry_config.get("username") - password = telemetry_config.get("password") - backup_threads = telemetry_config.get("backup_threads") - max_retries = telemetry_config.get("max_retries") - archive_path = telemetry_config.get("archive_path") - logs_filter_patterns = telemetry_config.get("logs_filter_patterns") - oc_cli_path = telemetry_config.get("oc_cli_path") - exceptions = [] - is_exception = False - if logs_backup is None: - exceptions.append("telemetry -> logs_backup flag is missing") - is_exception = True - if backup_threads is None: - exceptions.append("telemetry -> backup_threads is missing") - is_exception = True - - if not isinstance(backup_threads, int): - exceptions.append( - "telemetry -> backup_threads must be a number not a string" - ) - is_exception = True - if url is None: - exceptions.append("telemetry -> api_url is missing") - is_exception = True - if username is None: - exceptions.append("telemetry -> username is missing") - is_exception = True - if password is None: - exceptions.append("telemetry -> password is missing") - is_exception = True - if max_retries is None: - exceptions.append("telemetry -> max_retries is missing") - is_exception = True - if archive_path is None: - exceptions.append("telemetry -> archive_path is missing") - is_exception = True - if logs_filter_patterns is None: - exceptions.append("telemetry -> logs_filter_pastterns is missing") - is_exception = True - - # if oc_cli_path is set to empty will be set to - # None to let the config flexible - if oc_cli_path == "": - oc_cli_path = None - - if not isinstance(logs_filter_patterns, list): - exceptions.append( - "telemetry -> logs_filter_pastterns must be a " - "list of regex pattern" - ) - is_exception = True - - if is_exception: - raise Exception(", ".join(exceptions)) - - if not logs_backup: - return - - try: - timestamp = datetime.datetime.now().timestamp() - workdir = os.path.join(archive_path, f"gathered-logs-{timestamp}") - dst_dir = os.path.join(archive_path, f"filtered-logs-{timestamp}") - os.mkdir(workdir) - os.mkdir(dst_dir) - archive_path = self.kubecli.collect_filter_archive_ocp_logs( - workdir, - dst_dir, - self.kubecli.get_kubeconfig_path(), - start_timestamp, - end_timestamp, - logs_filter_patterns, - backup_threads, - self.safe_logger, - oc_cli_path, - ) - # volume_number : 0 only one file - # archive_path: path of the archived logs - # retries: 0 - queue.put((0, archive_path, 0)) - except Exception as e: - self.safe_logger.error(f"failed to collect ocp logs: {str(e)}") - raise e - # this parameter has doesn't have an utility in this context - # used to match the method signature and reuse it (Poor design?) - uploaded_files = list[str]() - queue_size = queue.qsize() - self.safe_logger.info("uploading ocp logs...") - worker = threading.Thread( - target=self.generate_url_and_put_to_s3_worker, - args=( - queue, - queue_size, - request_id, - f"{url}/presigned-url", - username, - password, - 0, - uploaded_files, - max_retries, - "logs-", - ".tar.gz", - ), - ) - worker.daemon = True - worker.start() - queue.join() - self.safe_logger.info("ocp logs successfully uploaded") - def generate_url_and_put_to_s3_worker( self, queue: Queue, diff --git a/src/krkn_lib/telemetry/ocp/__init__.py b/src/krkn_lib/telemetry/ocp/__init__.py new file mode 100644 index 00000000..85ca5731 --- /dev/null +++ b/src/krkn_lib/telemetry/ocp/__init__.py @@ -0,0 +1 @@ +from .krkn_telemetry_openshift import * # NOQA diff --git a/src/krkn_lib/telemetry/ocp/krkn_telemetry_openshift.py b/src/krkn_lib/telemetry/ocp/krkn_telemetry_openshift.py new file mode 100644 index 00000000..29b5e3af --- /dev/null +++ b/src/krkn_lib/telemetry/ocp/krkn_telemetry_openshift.py @@ -0,0 +1,184 @@ +import datetime +import os +import threading +from queue import Queue + +from krkn_lib.models.telemetry import ChaosRunTelemetry +from krkn_lib.ocp import KrknOpenshift +from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes +from krkn_lib.utils import SafeLogger + + +class KrknTelemetryOpenshift(KrknTelemetryKubernetes): + ocpcli: KrknOpenshift + + def __init__(self, safe_logger: SafeLogger, lib_openshift: KrknOpenshift): + super().__init__(safe_logger=safe_logger, lib_kubernetes=lib_openshift) + self.ocpcli = lib_openshift + + def get_ocp_prometheus_data( + self, + telemetry_config: dict, + request_id: str, + ) -> list[(int, str)]: + """ + Downloads the Openshift prometheus metrics + + :param telemetry_config: krkn telemetry conf section + will be stored + :param request_id: uuid of the session that will represent the + temporary archive files + :return: the list of the archive number and filenames downloaded + """ + prometheus_ocp_pod_name = "prometheus-k8s-0" + prometheus_ocp_container_name = "prometheus" + prometheus_ocp_namespace = "openshift-monitoring" + + return self.get_prometheus_pod_data( + telemetry_config, + request_id, + prometheus_ocp_pod_name, + prometheus_ocp_container_name, + prometheus_ocp_namespace, + ) + + def collect_cluster_metadata(self, chaos_telemetry: ChaosRunTelemetry): + super().collect_cluster_metadata(chaos_telemetry) + chaos_telemetry.cloud_infrastructure = ( + self.ocpcli.get_cloud_infrastructure() + ) + chaos_telemetry.network_plugins = ( + self.ocpcli.get_cluster_network_plugins() + ) + + def put_ocp_logs( + self, + request_id: str, + telemetry_config: dict, + start_timestamp: int, + end_timestamp: int, + ): + """ + Collects, filters, archive and put on the telemetry S3 Buckets + Openshift logs. Built on top of the `oc adm must-gather` command + and custom filtering. + + :param request_id: uuid of the session that will represent the + S3 folder on which the filtered log files archive will be stored + :param telemetry_config: telemetry section of kraken config.yaml + :param start_timestamp: timestamp of the first relevant entry, if None + will start filter starting from the earliest + :param end_timestamp: timestamp of the last relevant entry, if None + will end filtering until the latest + """ + + queue = Queue() + logs_backup = telemetry_config.get("logs_backup") + url = telemetry_config.get("api_url") + username = telemetry_config.get("username") + password = telemetry_config.get("password") + backup_threads = telemetry_config.get("backup_threads") + max_retries = telemetry_config.get("max_retries") + archive_path = telemetry_config.get("archive_path") + logs_filter_patterns = telemetry_config.get("logs_filter_patterns") + oc_cli_path = telemetry_config.get("oc_cli_path") + exceptions = [] + is_exception = False + if logs_backup is None: + exceptions.append("telemetry -> logs_backup flag is missing") + is_exception = True + if backup_threads is None: + exceptions.append("telemetry -> backup_threads is missing") + is_exception = True + + if not isinstance(backup_threads, int): + exceptions.append( + "telemetry -> backup_threads must be a number not a string" + ) + is_exception = True + if url is None: + exceptions.append("telemetry -> api_url is missing") + is_exception = True + if username is None: + exceptions.append("telemetry -> username is missing") + is_exception = True + if password is None: + exceptions.append("telemetry -> password is missing") + is_exception = True + if max_retries is None: + exceptions.append("telemetry -> max_retries is missing") + is_exception = True + if archive_path is None: + exceptions.append("telemetry -> archive_path is missing") + is_exception = True + if logs_filter_patterns is None: + exceptions.append("telemetry -> logs_filter_pastterns is missing") + is_exception = True + + # if oc_cli_path is set to empty will be set to + # None to let the config flexible + if oc_cli_path == "": + oc_cli_path = None + + if not isinstance(logs_filter_patterns, list): + exceptions.append( + "telemetry -> logs_filter_pastterns must be a " + "list of regex pattern" + ) + is_exception = True + + if is_exception: + raise Exception(", ".join(exceptions)) + + if not logs_backup: + return + + try: + timestamp = datetime.datetime.now().timestamp() + workdir = os.path.join(archive_path, f"gathered-logs-{timestamp}") + dst_dir = os.path.join(archive_path, f"filtered-logs-{timestamp}") + os.mkdir(workdir) + os.mkdir(dst_dir) + archive_path = self.ocpcli.collect_filter_archive_ocp_logs( + workdir, + dst_dir, + self.kubecli.get_kubeconfig_path(), + start_timestamp, + end_timestamp, + logs_filter_patterns, + backup_threads, + self.safe_logger, + oc_cli_path, + ) + # volume_number : 0 only one file + # archive_path: path of the archived logs + # retries: 0 + queue.put((0, archive_path, 0)) + except Exception as e: + self.safe_logger.error(f"failed to collect ocp logs: {str(e)}") + raise e + # this parameter has doesn't have an utility in this context + # used to match the method signature and reuse it (Poor design?) + uploaded_files = list[str]() + queue_size = queue.qsize() + self.safe_logger.info("uploading ocp logs...") + worker = threading.Thread( + target=self.generate_url_and_put_to_s3_worker, + args=( + queue, + queue_size, + request_id, + f"{url}/presigned-url", + username, + password, + 0, + uploaded_files, + max_retries, + "logs-", + ".tar.gz", + ), + ) + worker.daemon = True + worker.start() + queue.join() + self.safe_logger.info("ocp logs successfully uploaded") diff --git a/src/krkn_lib/tests/base_test.py b/src/krkn_lib/tests/base_test.py index 0e6273e4..839ccb01 100644 --- a/src/krkn_lib/tests/base_test.py +++ b/src/krkn_lib/tests/base_test.py @@ -15,20 +15,30 @@ from requests import ConnectTimeout from krkn_lib.k8s import KrknKubernetes -from krkn_lib.telemetry import KrknTelemetry +from krkn_lib.ocp import KrknOpenshift +from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift from krkn_lib.utils import SafeLogger class BaseTest(unittest.TestCase): lib_k8s: KrknKubernetes - lib_telemetry: KrknTelemetry + lib_ocp: KrknOpenshift + lib_telemetry_k8s: KrknTelemetryKubernetes + lib_telemetry_ocp: KrknTelemetryOpenshift @classmethod def setUpClass(cls): cls.lib_k8s = KrknKubernetes(config.KUBE_CONFIG_DEFAULT_LOCATION) - cls.lib_telemetry = KrknTelemetry(SafeLogger(), cls.lib_k8s) + cls.lib_ocp = KrknOpenshift(config.KUBE_CONFIG_DEFAULT_LOCATION) + cls.lib_telemetry_k8s = KrknTelemetryKubernetes( + SafeLogger(), cls.lib_k8s + ) + cls.lib_telemetry_ocp = KrknTelemetryOpenshift( + SafeLogger(), cls.lib_ocp + ) host = cls.lib_k8s.api_client.configuration.host - logging.disable(logging.CRITICAL) + # logging.disable(logging.CRITICAL) try: requests.get(host, timeout=2, verify=False) except ConnectTimeout: diff --git a/src/krkn_lib/tests/test_krkn_kubernetes.py b/src/krkn_lib/tests/test_krkn_kubernetes.py index f9c59051..8a1e9067 100644 --- a/src/krkn_lib/tests/test_krkn_kubernetes.py +++ b/src/krkn_lib/tests/test_krkn_kubernetes.py @@ -1,4 +1,3 @@ -import datetime import os import random import re @@ -13,7 +12,6 @@ from jinja2 import Environment, FileSystemLoader from krkn_lib.k8s import ApiRequestException, KrknKubernetes from krkn_lib.tests import BaseTest -from krkn_lib.utils import SafeLogger class KrknKubernetesTests(BaseTest): @@ -71,11 +69,6 @@ def test_get_kubeconfig_path(self): test_kubeconfig = test.read() self.assertEqual(test_kubeconfig, kubeconfig_str) - def test_get_cluster_version(self): - # TODO - result = self.lib_k8s.get_clusterversion_string() - self.assertIsNotNone(result) - def test_list_namespaces(self): # test all namespaces result = self.lib_k8s.list_namespaces() @@ -657,16 +650,6 @@ def test_get_nodes_infos(self): self.assertTrue(node.kernel_version) self.assertTrue(node.kubelet_version) - def test_get_cluster_infrastructure(self): - resp = self.lib_k8s.get_cluster_infrastructure() - self.assertTrue(resp) - self.assertEqual(resp, "Unknown") - - def test_get_cluster_network_plugins(self): - resp = self.lib_k8s.get_cluster_network_plugins() - self.assertTrue(len(resp) > 0) - self.assertEqual(resp[0], "Unknown") - def test_download_folder_from_pod_as_archive(self): workdir_basepath = os.getenv("TEST_WORKDIR") workdir = self.get_random_string(10) @@ -747,94 +730,6 @@ def test_is_pod_running(self): result = self.lib_k8s.is_pod_running("do_not_exist", "do_not_exist") self.assertFalse(result) - def test_filter_must_gather_ocp_log_folder(self): - # 1694473200 12 Sep 2023 01:00 AM GMT+2 - # 1694476200 12 Sep 2023 01:50 AM GMT+2 - filter_patterns = [ - # Sep 9 11:20:36.123425532 - r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", - # kinit 2023/09/15 11:20:36 log - r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", - # 2023-09-15T11:20:36.123425532Z log - r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", - ] - dst_dir = f"/tmp/filtered_logs.{datetime.datetime.now().timestamp()}" - os.mkdir(dst_dir) - self.lib_k8s.filter_must_gather_ocp_log_folder( - "src/testdata/must-gather", - dst_dir, - 1694473200, - 1694476200, - "*.log", - 3, - filter_patterns, - ) - - test_file_1 = os.path.join( - dst_dir, - "namespaces.openshift-monitoring.pods." - "openshift-state-metrics-" - "78df59b4d5-mjvhd.openshift-state-metrics." - "openshift-state-metrics.logs.current.log", - ) - - test_file_2 = os.path.join( - dst_dir, - "namespaces.openshift-monitoring.pods.prometheus-" - "k8s-0.prometheus.prometheus.logs.current.log", - ) - self.assertTrue(os.path.exists(test_file_1)) - self.assertTrue(os.path.exists(test_file_2)) - - test_file_1_lines = 0 - test_file_2_lines = 0 - - with open(test_file_1) as file: - for _ in file: - test_file_1_lines += 1 - - with open(test_file_2) as file: - for _ in file: - test_file_2_lines += 1 - - self.assertEqual(test_file_1_lines, 7) - self.assertEqual(test_file_2_lines, 4) - - def _test_collect_filter_archive_ocp_logs(self): - ################################################## - # This test is incomplete and inactive because # - # we don't have an OCP Integration env yet. # - ################################################## - - base_dir = os.path.join( - "/tmp", f"log-filter-test.{datetime.datetime.now().timestamp()}" - ) - work_dir = os.path.join(base_dir, "must-gather") - dst_dir = os.path.join(base_dir, "filtered_logs") - os.mkdir(base_dir) - os.mkdir(work_dir) - os.mkdir(dst_dir) - start = 1695218445 - end = 1695219345 - filter_patterns = [ - # Sep 9 11:20:36.123425532 - r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", - # kinit 2023/09/15 11:20:36 log - r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", - # 2023-09-15T11:20:36.123425532Z log - r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", - ] - self.lib_k8s.collect_filter_archive_ocp_logs( - work_dir, - dst_dir, - "/home/tsebasti/OCP/auth/kubeconfig", - start, - end, - filter_patterns, - 5, - SafeLogger(), - ) - if __name__ == "__main__": unittest.main() diff --git a/src/krkn_lib/tests/test_krkn_openshift.py b/src/krkn_lib/tests/test_krkn_openshift.py new file mode 100644 index 00000000..8bc5311f --- /dev/null +++ b/src/krkn_lib/tests/test_krkn_openshift.py @@ -0,0 +1,115 @@ +import os +from datetime import datetime + +from krkn_lib.tests import BaseTest +from krkn_lib.utils import SafeLogger + + +class KrknOpenshiftTest(BaseTest): + def test_get_cluster_version_string(self): + # TODO + result = self.lib_ocp.get_clusterversion_string() + self.assertIsNotNone(result) + + def test_get_cluster_network_plugins(self): + resp = self.lib_ocp.get_cluster_network_plugins() + self.assertTrue(len(resp) > 0) + self.assertEqual(resp[0], "Unknown") + + def test_get_cloud_infrastructure(self): + resp = self.lib_ocp.get_cloud_infrastructure() + self.assertTrue(resp) + self.assertEqual(resp, "Unknown") + + def test_filter_must_gather_ocp_log_folder(self): + # 1694473200 12 Sep 2023 01:00 AM GMT+2 + # 1694476200 12 Sep 2023 01:50 AM GMT+2 + filter_patterns = [ + # Sep 9 11:20:36.123425532 + r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", + # kinit 2023/09/15 11:20:36 log + r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", + # 2023-09-15T11:20:36.123425532Z log + r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", + ] + dst_dir = f"/tmp/filtered_logs.{datetime.now().timestamp()}" + os.mkdir(dst_dir) + self.lib_ocp.filter_must_gather_ocp_log_folder( + "src/testdata/must-gather", + dst_dir, + 1694473200, + 1694476200, + "*.log", + 3, + filter_patterns, + ) + + test_file_1 = os.path.join( + dst_dir, + "namespaces.openshift-monitoring.pods." + "openshift-state-metrics-" + "78df59b4d5-mjvhd.openshift-state-metrics." + "openshift-state-metrics.logs.current.log", + ) + + test_file_2 = os.path.join( + dst_dir, + "namespaces.openshift-monitoring.pods.prometheus-" + "k8s-0.prometheus.prometheus.logs.current.log", + ) + self.assertTrue(os.path.exists(test_file_1)) + self.assertTrue(os.path.exists(test_file_2)) + + test_file_1_lines = 0 + test_file_2_lines = 0 + + with open(test_file_1) as file: + for _ in file: + test_file_1_lines += 1 + + with open(test_file_2) as file: + for _ in file: + test_file_2_lines += 1 + + self.assertEqual(test_file_1_lines, 7) + self.assertEqual(test_file_2_lines, 4) + + def test_get_cluster_version(self): + # TODO + result = self.lib_ocp.get_clusterversion_string() + self.assertIsNotNone(result) + + def _test_collect_filter_archive_ocp_logs(self): + ################################################## + # This test is incomplete and inactive because # + # we don't have an OCP Integration env yet. # + ################################################## + + base_dir = os.path.join( + "/tmp", f"log-filter-test.{datetime.now().timestamp()}" + ) + work_dir = os.path.join(base_dir, "must-gather") + dst_dir = os.path.join(base_dir, "filtered_logs") + os.mkdir(base_dir) + os.mkdir(work_dir) + os.mkdir(dst_dir) + start = 1695218445 + end = 1695219345 + filter_patterns = [ + # Sep 9 11:20:36.123425532 + r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", + # kinit 2023/09/15 11:20:36 log + r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", + # 2023-09-15T11:20:36.123425532Z log + r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", + ] + self.lib_ocp.collect_filter_archive_ocp_logs( + work_dir, + dst_dir, + "/home/tsebasti/OCP/auth/kubeconfig", + start, + end, + filter_patterns, + 5, + SafeLogger(), + ) diff --git a/src/krkn_lib/tests/test_krkn_telemetry.py b/src/krkn_lib/tests/test_krkn_telemetry_kubernetes.py similarity index 80% rename from src/krkn_lib/tests/test_krkn_telemetry.py rename to src/krkn_lib/tests/test_krkn_telemetry_kubernetes.py index 8aca78b8..ba3fe9cc 100644 --- a/src/krkn_lib/tests/test_krkn_telemetry.py +++ b/src/krkn_lib/tests/test_krkn_telemetry_kubernetes.py @@ -1,5 +1,4 @@ import base64 -import datetime import os import tempfile import time @@ -9,19 +8,18 @@ import boto3 import yaml -from krkn_lib.k8s import KrknKubernetes from krkn_lib.models.telemetry import ChaosRunTelemetry, ScenarioTelemetry -from krkn_lib.telemetry import KrknTelemetry from krkn_lib.tests import BaseTest -from krkn_lib.utils import SafeLogger -class KrknTelemetryTests(BaseTest): +class KrknTelemetryKubernetesTests(BaseTest): def test_set_parameters_base_64(self): file_path = "src/testdata/input.yaml" scenario_telemetry = ScenarioTelemetry() - self.lib_telemetry.set_parameters_base64(scenario_telemetry, file_path) + self.lib_telemetry_k8s.set_parameters_base64( + scenario_telemetry, file_path + ) with open(file_path, "rb") as file_stream: input_file_data_orig = file_stream.read().decode("utf-8") self.assertIsNotNone(input_file_data_orig) @@ -180,7 +178,7 @@ def test_upload_download_prometheus(self): archive_part_size=100, download_path=test_workdir, ) - self.lib_telemetry.put_ocp_prometheus_data( + self.lib_telemetry_k8s.put_prometheus_data( telemetry_config, file_list, bucket_folder ) @@ -199,8 +197,9 @@ def test_collect_cluster_metadata(self): self.assertEqual( len(chaos_telemetry.kubernetes_objects_count.keys()), 0 ) - self.assertEqual(len(chaos_telemetry.network_plugins), 0) - self.lib_telemetry.collect_cluster_metadata(chaos_telemetry) + self.assertEqual(len(chaos_telemetry.network_plugins), 1) + self.assertEqual(chaos_telemetry.network_plugins[0], "Unknown") + self.lib_telemetry_k8s.collect_cluster_metadata(chaos_telemetry) self.assertNotEqual(len(chaos_telemetry.node_infos), 0) self.assertNotEqual(chaos_telemetry.node_count, 0) self.assertNotEqual( @@ -221,9 +220,9 @@ def test_send_telemetry(self): "enabled": True, } chaos_telemetry = ChaosRunTelemetry() - self.lib_telemetry.collect_cluster_metadata(chaos_telemetry) + self.lib_telemetry_k8s.collect_cluster_metadata(chaos_telemetry) try: - self.lib_telemetry.send_telemetry( + self.lib_telemetry_k8s.send_telemetry( telemetry_config, request_id, chaos_telemetry ) except Exception as e: @@ -258,14 +257,14 @@ def test_get_bucket_url_for_filename(self): file.flush() try: - url = self.lib_telemetry.get_bucket_url_for_filename( + url = self.lib_telemetry_k8s.get_bucket_url_for_filename( f'{telemetry_config["api_url"]}/presigned-url', test_workdir, os.path.basename(file.name), telemetry_config["username"], telemetry_config["password"], ) - self.lib_telemetry.put_file_to_url(url, file.name) + self.lib_telemetry_k8s.put_file_to_url(url, file.name) bucket_name = os.getenv("BUCKET_NAME") s3 = boto3.client("s3") @@ -284,49 +283,6 @@ def test_get_bucket_url_for_filename(self): except Exception as e: self.assertTrue(False, f"test failed with exception: {str(e)}") - def _test_put_ocp_logs(self): - ################################################## - # This test is incomplete and inactive because # - # we don't have an OCP Integration env yet. # - ################################################## - - krkn_kubernetes = KrknKubernetes( - kubeconfig_path="~/OCP/auth/kubeconfig" - ) - safe_logger = SafeLogger() - krkn_telemetry = KrknTelemetry(safe_logger, krkn_kubernetes) - - test_workdir = "/tmp/" - telemetry_config = { - "username": os.getenv("API_USER"), - "password": os.getenv("API_PASSWORD"), - "max_retries": 5, - "api_url": "https://ulnmf9xv7j.execute-api.us-west-2.amazonaws.com/production", # NOQA - "backup_threads": 6, - "archive_path": test_workdir, - "prometheus_backup": "True", - "enabled": True, - "logs_backup": True, - "logs_filter_patterns": [ - # Sep 9 11:20:36.123425532 - r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", - # kinit 2023/09/15 11:20:36 log - r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", - # 2023-09-15T11:20:36.123425532Z log - r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", - ], - } - now = datetime.datetime.now() - - ten_minutes_ago = now - datetime.timedelta(minutes=10) - ten_minutes_fwd = now + datetime.timedelta(minutes=10) - krkn_telemetry.put_ocp_logs( - f"test-must-gather-{str(uuid.uuid1())}", - telemetry_config, - int(ten_minutes_ago.timestamp()), - int(ten_minutes_fwd.timestamp()), - ) - if __name__ == "__main__": unittest.main() diff --git a/src/krkn_lib/tests/test_krkn_telemetry_openshift.py b/src/krkn_lib/tests/test_krkn_telemetry_openshift.py new file mode 100644 index 00000000..d1c4bd2b --- /dev/null +++ b/src/krkn_lib/tests/test_krkn_telemetry_openshift.py @@ -0,0 +1,51 @@ +import datetime +import os +import uuid + +from krkn_lib.ocp import KrknOpenshift +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.tests import BaseTest +from krkn_lib.utils import SafeLogger + + +class KrknTelemetryOpenshiftTests(BaseTest): + def _test_put_ocp_logs(self): + ################################################## + # This test is incomplete and inactive because # + # we don't have an OCP Integration env yet. # + ################################################## + + krkn_ocp = KrknOpenshift(kubeconfig_path="~/OCP/auth/kubeconfig") + safe_logger = SafeLogger() + krkn_telemetry_ocp = KrknTelemetryOpenshift(safe_logger, krkn_ocp) + + test_workdir = "/tmp/" + telemetry_config = { + "username": os.getenv("API_USER"), + "password": os.getenv("API_PASSWORD"), + "max_retries": 5, + "api_url": "https://ulnmf9xv7j.execute-api.us-west-2.amazonaws.com/production", # NOQA + "backup_threads": 6, + "archive_path": test_workdir, + "prometheus_backup": "True", + "enabled": True, + "logs_backup": True, + "logs_filter_patterns": [ + # Sep 9 11:20:36.123425532 + r"(\w{3}\s\d{1,2}\s\d{2}:\d{2}:\d{2}\.\d+).+", + # kinit 2023/09/15 11:20:36 log + r"kinit (\d+/\d+/\d+\s\d{2}:\d{2}:\d{2})\s+", + # 2023-09-15T11:20:36.123425532Z log + r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z).+", + ], + } + now = datetime.datetime.now() + + ten_minutes_ago = now - datetime.timedelta(minutes=10) + ten_minutes_fwd = now + datetime.timedelta(minutes=10) + krkn_telemetry_ocp.put_ocp_logs( + f"test-must-gather-{str(uuid.uuid1())}", + telemetry_config, + int(ten_minutes_ago.timestamp()), + int(ten_minutes_fwd.timestamp()), + )