From faafd2bc8264a71da449c735c73922dfe772eeed Mon Sep 17 00:00:00 2001 From: Krishna Harsha Voora Date: Wed, 26 Apr 2023 09:36:24 +0530 Subject: [PATCH 1/4] Add new parameter to deploy on targetted worker nodes This PR will allow users to target worker nodes, identified via unique labels and deploy workload only on targetted nodes. Signed-off-by: Krishna Harsha Voora --- rosa-hypershift/rosa-hosted-wrapper.py | 98 +++++++++++++++++++------- 1 file changed, 71 insertions(+), 27 deletions(-) diff --git a/rosa-hypershift/rosa-hosted-wrapper.py b/rosa-hypershift/rosa-hosted-wrapper.py index 0bbebc4..b7c0f37 100644 --- a/rosa-hypershift/rosa-hosted-wrapper.py +++ b/rosa-hypershift/rosa-hosted-wrapper.py @@ -570,7 +570,7 @@ def _namespace_wait(kubeconfig, cluster_id, cluster_name, type): return 0 -def _build_cluster(ocm_cmnd, rosa_cmnd, cluster_name_seed, must_gather_all, mgmt_cluster_name, provision_shard, create_vpc, vpc_info, wait_time, cluster_load, load_duration, job_iterations, worker_nodes, my_path, my_uuid, my_inc, es, es_url, index, index_retry, mgmt_kubeconfig, sc_kubeconfig, all_clusters_installed, svc_cluster_name, oidc_config_id): +def _build_cluster(ocm_cmnd, rosa_cmnd, cluster_name_seed, must_gather_all, mgmt_cluster_name, provision_shard, create_vpc, vpc_info, wait_time, cluster_load, load_duration, job_iterations, worker_nodes, my_path, my_uuid, my_inc, es, es_url, index, index_retry, mgmt_kubeconfig, sc_kubeconfig, all_clusters_installed, svc_cluster_name, oidc_config_id, workload_type, kube_burner_version, e2e_git_details, git_branch, worker_label): # pass that dir as the cwd to subproccess cluster_path = my_path + "/" + cluster_name_seed + "-" + str(my_inc).zfill(4) os.mkdir(cluster_path) @@ -666,7 +666,7 @@ def _build_cluster(ocm_cmnd, rosa_cmnd, cluster_name_seed, must_gather_all, mgmt logging.info('Waiting for all clusters to be installed to start e2e-benchmarking execution on %s' % cluster_name) all_clusters_installed.wait() logging.info('Executing e2e-benchmarking to add load on the cluster %s with %s nodes during %s with %d iterations' % (cluster_name, str(worker_nodes), load_duration, job_iterations)) - _cluster_load(kubeconfig, cluster_path, cluster_name, mgmt_cluster_name, svc_cluster_name, load_duration, job_iterations, es_url) + _cluster_load(kubeconfig, cluster_path, cluster_name, mgmt_cluster_name, svc_cluster_name, load_duration, job_iterations, es_url, mgmt_kubeconfig, workload_type, kube_burner_version, e2e_git_details, git_branch, worker_label) logging.info('Finished execution of e2e-benchmarking workload on %s' % cluster_name) if must_gather_all or process.returncode != 0: random_sleep = random.randint(60, 300) @@ -742,45 +742,38 @@ def _wait_for_workers(kubeconfig, worker_nodes, wait_time, cluster_name): return ready_nodes -def _cluster_load(kubeconfig, my_path, hosted_cluster_name, mgmt_cluster_name, svc_cluster_name, load_duration, jobs, es_url): +def _cluster_load(kubeconfig, my_path, hosted_cluster_name, mgmt_cluster_name, svc_cluster_name, load_duration, jobs, es_url, mgmt_kubeconfig, workload_type, kube_burner_version, e2e_git_details, git_branch, worker_label): load_env = os.environ.copy() load_env["KUBECONFIG"] = kubeconfig - logging.info('Cloning e2e-benchmarking repo https://github.com/cloud-bulldozer/e2e-benchmarking.git') - Repo.clone_from("https://github.com/cloud-bulldozer/e2e-benchmarking.git", my_path + '/e2e-benchmarking') - url = 'https://github.com/cloud-bulldozer/kube-burner/releases/download/v1.3/kube-burner-1.3-Linux-x86_64.tar.gz' - dest = my_path + "/kube-burner-1.3-Linux-x86_64.tar.gz" + load_env["MC_KUBECONFIG"] = mgmt_kubeconfig + logging.info('Cloning e2e-benchmarking repo %s', ) + Repo.clone_from(e2e_git_details, my_path + '/e2e-benchmarking', branch=git_branch) + url = "https://github.com/cloud-bulldozer/kube-burner/releases/download/v" + kube_burner_version + "/kube-burner-" + kube_burner_version + "-Linux-x86_64.tar.gz" + dest = my_path + "/kube-burner-" + kube_burner_version + "-Linux-x86_64.tar.gz" response = requests.get(url, stream=True) with open(dest, 'wb') as f: f.write(response.raw.read()) - untar_kb = ["tar", "xzf", my_path + "/kube-burner-1.3-Linux-x86_64.tar.gz", "-C", my_path + "/"] + untar_kb = ["tar", "xzf", my_path + "/kube-burner-" + kube_burner_version + "-Linux-x86_64.tar.gz", "-C", my_path + "/"] logging.debug(untar_kb) untar_kb_process = subprocess.Popen(untar_kb, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, env=load_env) stdout, stderr = untar_kb_process.communicate() if untar_kb_process.returncode != 0: - logging.error("Failed to untar Kube-burner from %s to %s" % (my_path + "/kube-burner-1.3-Linux-x86_64.tar.gz", my_path + "/kube-burner")) + logging.error("Failed to untar Kube-burner from %s to %s" % (my_path + "/kube-burner-" + kube_burner_version + "-Linux-x86_64.tar.gz", my_path + "/kube-burner")) return 1 os.chmod(my_path + '/kube-burner', 0o777) - os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner') - load_env["JOB_ITERATIONS"] = str(jobs) - load_env["CHURN"] = "true" - load_env["CHURN_DURATION"] = load_duration - load_env["CHURN_PERCENT"] = "10" - load_env["CHURN_DELAY"] = "30s" - load_env["JOB_TIMEOUT"] = "6h" - load_env["CLEANUP_WHEN_FINISH"] = "true" - load_env["INDEXING"] = "true" - load_env["HYPERSHIFT"] = "true" - load_env["MGMT_CLUSTER_NAME"] = mgmt_cluster_name + "-.*" - load_env["SVC_CLUSTER_NAME"] = svc_cluster_name + "-.*" - load_env["HOSTED_CLUSTER_NS"] = ".*-" + hosted_cluster_name + os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner-ocp-wrapper') + load_env["ITERATIONS"] = str(jobs) + load_env["EXTRA_FLAGS"] = "--churn-duration=" + load_duration + " --churn-percent=10 --churn-delay=30s" if es_url is not None: load_env["ES_SERVER"] = es_url load_env["LOG_LEVEL"] = "debug" - load_env["WORKLOAD"] = "cluster-density-ms" - load_env["JOB_PAUSE"] = str(randrange(100, 1000)) + "s" + load_env["WORKLOAD"] = str(workload_type) load_env["KUBE_DIR"] = my_path - load_command = ["./run.sh"] + load_command = ["./run.sh"] + if worker_label is not None: + logging.info('Updating the metrics yaml files with label %s information' % worker_label) + _run_on_labels(my_path, cluster_name, worker_label, workload_type) logging.debug(load_command) load_log = open(my_path + '/cluster_load.log', 'w') if not force_terminate: @@ -789,6 +782,25 @@ def _cluster_load(kubeconfig, my_path, hosted_cluster_name, mgmt_cluster_name, s else: logging.warning("Not starting e2e on cluster %s after capturing Ctrl-C" % hosted_cluster_name) +def _run_on_labels(my_path, worker_label, workload_type): + logging.info('Extracting the %s workload metric files from kube-burner' % workload_type) + os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner-ocp-wrapper') + extract_command = ["kube-burner ", "ocp ", worker_label, " --extract"] + logging.debug(extract_command) + extract_process = subprocess.Popen(extract_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + extract_stdout, extract_stderr = extract_process.communicate() + if process.returncode != 0: + logging.error('unable to execute `kube-burner ocp %s`' % extract_command) + logging.error(extract_stderr) + exit(1) + else: + logging.info('`%s` execution OK' % extract_command) + logging.debug(extract_stdout) + + logging.info('Search and replace the label %s' % worker_label) + replace_command = ["sed -i .bak ", "'s/worker/", worker_label, "/g'", " $(find . -type f -name '*.yml"] + # sed -i '.bak' 's/worker/kube-burner/g' $(find . -type f -name '*.yml') + replace_command = [] def _get_must_gather(cluster_path, cluster_name): myenv = os.environ.copy() @@ -1002,6 +1014,7 @@ def _cleanup_cluster(rosa_cmnd, cluster_name, mgmt_cluster_name, my_path, my_uui metadata["timestamp"] = datetime.datetime.utcnow().isoformat() es_ignored_metadata = "" common._index_result(es, index, metadata, es_ignored_metadata, index_retry) + def main(): @@ -1129,6 +1142,37 @@ def main(): '--oidc-config-id', type=str, help='Pass a custom oidc config id to use for the oidc provider. NOTE: this is not deleted on cleanup') + parser.add_argument( + '--workload-type', + type=str, + help="Pass the workload type: cluster-density, cluster-density-v2, cluster-density-ms", + default="cluster-density-ms" + ) + parser.add_argument( + '--kube-burner-version', + type=str, + help= 'Kube-burner version, if none provided defaults to 1.5 ', + default='1.5' + ) + parser.add_argument( + '--e2e-git-details', + type=str, + help= 'Supply the e2e-benchmarking Git URL', + default="https://github.com/cloud-bulldozer/e2e-benchmarking.git" + ) + parser.add_argument( + '--git-branch', + type=str, + help='Specify a desired branch of the corresponding git', + default='master' + ) + + parser.add_argument( + '--worker-labelling', + type=str, + help = 'Runs kube-burner workload on specific nodes' + ) + # Delete following parameter and code when default security group wont be used parser.add_argument( '--manually-cleanup-secgroups', @@ -1376,7 +1420,7 @@ def main(): vpc_info = vpcs[(loop_counter - 1)] logging.debug("Creating cluster on VPC %s, with subnets: %s" % (vpc_info[0], vpc_info[1])) try: - thread = threading.Thread(target=_build_cluster, args=(ocm_cmnd, rosa_cmnd, cluster_name_seed, args.must_gather_all, args.mgmt_cluster, mgmt_metadata['provision_shard'], args.create_vpc, vpc_info, args.workers_wait_time, args.add_cluster_load, args.cluster_load_duration, jobs, workers, my_path, my_uuid, loop_counter, es, args.es_url, args.es_index, args.es_index_retry, mgmt_kubeconfig_path, sc_kubeconfig_path, all_clusters_installed, args.service_cluster, oidc_config_id)) + thread = threading.Thread(target=_build_cluster, args=(ocm_cmnd, rosa_cmnd, cluster_name_seed, args.must_gather_all, args.mgmt_cluster, mgmt_metadata['provision_shard'], args.create_vpc, vpc_info, args.workers_wait_time, args.add_cluster_load, args.cluster_load_duration, jobs, workers, my_path, my_uuid, loop_counter, es, args.es_url, args.es_index, args.es_index_retry, mgmt_kubeconfig_path, sc_kubeconfig_path, all_clusters_installed, args.service_cluster, oidc_config_id, args.workload_type, args.kube_burner_version, args.e2e_git_details, args.git_branch, worker_label)) except Exception as err: logging.error(err) cluster_thread_list.append(thread) @@ -1491,4 +1535,4 @@ def main(): if __name__ == '__main__': - sys.exit(main()) + sys.exit(main()) \ No newline at end of file From 5757b5373db20123fb636be65f744cca922ddabc Mon Sep 17 00:00:00 2001 From: Krishna Harsha Voora Date: Wed, 26 Apr 2023 19:01:32 +0530 Subject: [PATCH 2/4] Updates the logic to identify and replace labels Signed-off-by: Krishna Harsha Voora --- rosa-hypershift/rosa-hosted-wrapper.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/rosa-hypershift/rosa-hosted-wrapper.py b/rosa-hypershift/rosa-hosted-wrapper.py index b7c0f37..633fed9 100644 --- a/rosa-hypershift/rosa-hosted-wrapper.py +++ b/rosa-hypershift/rosa-hosted-wrapper.py @@ -773,7 +773,7 @@ def _cluster_load(kubeconfig, my_path, hosted_cluster_name, mgmt_cluster_name, s load_command = ["./run.sh"] if worker_label is not None: logging.info('Updating the metrics yaml files with label %s information' % worker_label) - _run_on_labels(my_path, cluster_name, worker_label, workload_type) + _run_on_labels(my_path, worker_label, workload_type) logging.debug(load_command) load_log = open(my_path + '/cluster_load.log', 'w') if not force_terminate: @@ -783,13 +783,15 @@ def _cluster_load(kubeconfig, my_path, hosted_cluster_name, mgmt_cluster_name, s logging.warning("Not starting e2e on cluster %s after capturing Ctrl-C" % hosted_cluster_name) def _run_on_labels(my_path, worker_label, workload_type): - logging.info('Extracting the %s workload metric files from kube-burner' % workload_type) + default_label = "worker" + logging.info('Extracting the %s workload files from kube-burner' % workload_type) os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner-ocp-wrapper') extract_command = ["kube-burner ", "ocp ", worker_label, " --extract"] logging.debug(extract_command) extract_process = subprocess.Popen(extract_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) extract_stdout, extract_stderr = extract_process.communicate() - if process.returncode != 0: + + if extract_process.returncode != 0: logging.error('unable to execute `kube-burner ocp %s`' % extract_command) logging.error(extract_stderr) exit(1) @@ -797,10 +799,16 @@ def _run_on_labels(my_path, worker_label, workload_type): logging.info('`%s` execution OK' % extract_command) logging.debug(extract_stdout) - logging.info('Search and replace the label %s' % worker_label) - replace_command = ["sed -i .bak ", "'s/worker/", worker_label, "/g'", " $(find . -type f -name '*.yml"] - # sed -i '.bak' 's/worker/kube-burner/g' $(find . -type f -name '*.yml') - replace_command = [] + logging.info('Search and replace the label %s' % worker_label) + for root, dirs, filename in os.walk('.'): + for files in filename: + if files.endswith(".yml") or files.endswith(".yaml"): + with open(os.path.join(root, files), 'r+') as f: + contents = f.read() + f.seek(0) + updated_contents = contents.replace(default_label, workload_type) + f.write(updated_contents) + f.close() def _get_must_gather(cluster_path, cluster_name): myenv = os.environ.copy() From 561ad80d960ddff7195c1111463b1256a5a14be4 Mon Sep 17 00:00:00 2001 From: Krishna Harsha Voora Date: Wed, 26 Apr 2023 21:32:19 +0530 Subject: [PATCH 3/4] Updates the minor-nit Signed-off-by: Krishna Harsha Voora --- rosa-hypershift/rosa-hosted-wrapper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosa-hypershift/rosa-hosted-wrapper.py b/rosa-hypershift/rosa-hosted-wrapper.py index 79cf4d6..8118f29 100644 --- a/rosa-hypershift/rosa-hosted-wrapper.py +++ b/rosa-hypershift/rosa-hosted-wrapper.py @@ -786,13 +786,13 @@ def _run_on_labels(my_path, worker_label, workload_type): default_label = "worker" logging.info('Extracting the %s workload files from kube-burner' % workload_type) os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner-ocp-wrapper') - extract_command = ["kube-burner ", "ocp ", worker_label, " --extract"] + extract_command = ["kube-burner ", "ocp ", workload_type, " --extract"] logging.debug(extract_command) extract_process = subprocess.Popen(extract_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) extract_stdout, extract_stderr = extract_process.communicate() if extract_process.returncode != 0: - logging.error('unable to execute `kube-burner ocp %s`' % extract_command) + logging.error('unable to execute `%s`' % extract_command) logging.error(extract_stderr) exit(1) else: @@ -806,7 +806,7 @@ def _run_on_labels(my_path, worker_label, workload_type): with open(os.path.join(root, files), 'r+') as f: contents = f.read() f.seek(0) - updated_contents = contents.replace(default_label, workload_type) + updated_contents = contents.replace(default_label, worker_label) f.write(updated_contents) f.close() @@ -1427,7 +1427,7 @@ def main(): vpc_info = vpcs[(loop_counter - 1)] logging.debug("Creating cluster on VPC %s, with subnets: %s" % (vpc_info[0], vpc_info[1])) try: - thread = threading.Thread(target=_build_cluster, args=(ocm_cmnd, rosa_cmnd, cluster_name_seed, args.must_gather_all, args.mgmt_cluster, mgmt_metadata['provision_shard'], args.create_vpc, vpc_info, args.workers_wait_time, args.add_cluster_load, args.cluster_load_duration, jobs, workers, my_path, my_uuid, loop_counter, es, args.es_url, args.es_index, args.es_index_retry, mgmt_kubeconfig_path, sc_kubeconfig_path, all_clusters_installed, args.service_cluster, oidc_config_id, args.workload_type, args.kube_burner_version, args.e2e_git_details, args.git_branch, worker_label)) + thread = threading.Thread(target=_build_cluster, args=(ocm_cmnd, rosa_cmnd, cluster_name_seed, args.must_gather_all, args.mgmt_cluster, mgmt_metadata['provision_shard'], args.create_vpc, vpc_info, args.workers_wait_time, args.add_cluster_load, args.cluster_load_duration, jobs, workers, my_path, my_uuid, loop_counter, es, args.es_url, args.es_index, args.es_index_retry, mgmt_kubeconfig_path, sc_kubeconfig_path, all_clusters_installed, args.service_cluster, oidc_config_id, args.workload_type, args.kube_burner_version, args.e2e_git_details, args.git_branch, args.worker_label)) except Exception as err: logging.error(err) cluster_thread_list.append(thread) From a3e30d6fb3ddcdb1c3499bd6ff671b3ce5c3c695 Mon Sep 17 00:00:00 2001 From: Krishna Harsha Voora Date: Mon, 15 May 2023 23:54:07 +0530 Subject: [PATCH 4/4] Updates to the spacing Signed-off-by: Krishna Harsha Voora --- rosa-hypershift/rosa-hosted-wrapper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosa-hypershift/rosa-hosted-wrapper.py b/rosa-hypershift/rosa-hosted-wrapper.py index 8124440..8c1b261 100644 --- a/rosa-hypershift/rosa-hosted-wrapper.py +++ b/rosa-hypershift/rosa-hosted-wrapper.py @@ -851,18 +851,18 @@ def _run_on_labels(my_path, worker_label, workload_type): default_label = "worker" logging.info('Extracting the %s workload files from kube-burner' % workload_type) os.chdir(my_path + '/e2e-benchmarking/workloads/kube-burner-ocp-wrapper') - extract_command = ["kube-burner ", "ocp ", workload_type, " --extract"] + extract_command = ["kube-burner", "ocp", workload_type, "--extract"] logging.debug(extract_command) extract_process = subprocess.Popen(extract_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - extract_stdout, extract_stderr = extract_process.communicate() + stdout, stderr = extract_process.communicate() if extract_process.returncode != 0: logging.error('unable to execute `%s`' % extract_command) - logging.error(extract_stderr) + logging.error(stderr) exit(1) else: logging.info('`%s` execution OK' % extract_command) - logging.debug(extract_stdout) + logging.debug(stdout) logging.info('Search and replace the label %s' % worker_label) for root, dirs, filename in os.walk('.'):