Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add VM Scale Management Steps #861

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add VM Scale Management Steps
  • Loading branch information
ebattat committed Aug 9, 2024
commit 7371a41c14a82728f8b8f7f5486863d66f38227b
40 changes: 34 additions & 6 deletions benchmark_runner/common/oc/oc.py
Original file line number Diff line number Diff line change
@@ -832,6 +832,23 @@ def _get_vm_name(self, vm_name: str, namespace: str = environment_variables.envi
except Exception as err:
raise VMNameNotExist(vm_name=vm_name)

@typechecked
def _get_all_vm_names(self, namespace: str = environment_variables.environment_variables_dict['namespace']):
"""
This method returns a list of VM names in the given namespace.

:param namespace: str, the namespace to look for VMs in. Defaults to the namespace in environment_variables_dict.
:return: list of VM names or an empty list if an error occurs
"""
namespace_option = f'-n {namespace}' if namespace else ''
command = f"{self.__cli} get {namespace_option} vm -o jsonpath='{{.items[*].metadata.name}}'"
try:
vm_names = self.run(command)
return vm_names.split() if vm_names else []
except Exception:
return []


@typechecked
def vm_exists(self, vm_name: str, namespace: str = environment_variables.environment_variables_dict['namespace']):
"""
@@ -926,8 +943,8 @@ def wait_for_vm_status(self, vm_name: str = '', status: VMStatus = VMStatus.Stop
current_wait_time += OC.SLEEP_TIME
raise VMStateTimeout(vm_name=vm_name, state=status)

def wait_for_vm_login(self, vm_name: str = '', node_ip: str = '', vm_node_port: str = '',
timeout: int = SHORT_TIMEOUT):
def wait_for_vm_ssh(self, vm_name: str = '', node_ip: str = '', vm_node_port: str = '',
timeout: int = SHORT_TIMEOUT):
"""
This method waits for VM to be accessible via ssh login
:param vm_name:
@@ -938,14 +955,14 @@ def wait_for_vm_login(self, vm_name: str = '', node_ip: str = '', vm_node_port:
"""
current_wait_time = 0
while timeout <= 0 or current_wait_time <= timeout:
check_vm_login = f"""if [ "$(ssh -o 'BatchMode=yes' -o ConnectTimeout=1 root@{node_ip} -p {vm_node_port} 2>&1|egrep 'denied|verification failed')" ]; then echo 'True'; else echo 'False'; fi"""
result = self.run(check_vm_login)
check_vm_ssh = f"""if [ "$(ssh -o 'BatchMode=yes' -o ConnectTimeout=1 root@{node_ip} -p {vm_node_port} 2>&1|egrep 'denied|verification failed')" ]; then echo 'True'; else echo 'False'; fi"""
result = self.run(check_vm_ssh)
if result == 'True':
return True
# sleep for x seconds
time.sleep(OC.SLEEP_TIME)
current_wait_time += OC.SLEEP_TIME
raise VMStateTimeout(vm_name=vm_name, state='login')
raise VMStateTimeout(vm_name=vm_name, state='ssh')

@logger_time_stamp
def get_vm_node(self, vm_name: str, namespace: str = environment_variables.environment_variables_dict['namespace']):
@@ -956,7 +973,18 @@ def get_vm_node(self, vm_name: str, namespace: str = environment_variables.envir
:return:
"""
namespace = f'-n {namespace}' if namespace else ''
return self.run(f"{self.__cli} get vmi {vm_name} {namespace} -o jsonpath={{.metadata.labels.'kubevirt\.io/nodeName'}}")
command = f"{self.__cli} get vmi {vm_name} {namespace} -o jsonpath={{.metadata.labels.'kubevirt\\.io/nodeName'}}"

try:
result = self.run(command)
if result and "NotFound" not in result:
return result.strip()
return None
except Exception as e:
# Log the exception details if necessary
print(f"Error occurred: {e}")
return None


@typechecked
@logger_time_stamp
2 changes: 1 addition & 1 deletion benchmark_runner/common/oc/oc_exceptions.py
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ def __init__(self, workload):
class VMStateTimeout(OCError):
"""This exception indicates timeout for VM state """
def __init__(self, vm_name, state):
self.message = f'VM: {vm_name} does not reach to start: {state}'
self.message = f'VM: {vm_name} does not reach to state: {state}'
super(VMStateTimeout, self).__init__(self.message)


4 changes: 4 additions & 0 deletions benchmark_runner/main/environment_variables.py
Original file line number Diff line number Diff line change
@@ -84,6 +84,10 @@ def __init__(self):

# windows url
self._environment_variables_dict['windows_url'] = EnvironmentVariables.get_env('WINDOWS_URL', '')
# Delete all resources before and after the run, default True
self._environment_variables_dict['delete_all'] = EnvironmentVariables.get_boolean_from_environment('DELETE_ALL', True)
# Verification only, without running or deleting any resources, default False
self._environment_variables_dict['verification_only'] = EnvironmentVariables.get_boolean_from_environment('VERIFICATION_ONLY', False)

# default parameter - change only if needed
# Parameters below related to 'run_workload()'
141 changes: 96 additions & 45 deletions benchmark_runner/workloads/bootstorm_vm.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

from benchmark_runner.common.logger.logger_time_stamp import logger_time_stamp, logger
from benchmark_runner.common.elasticsearch.elasticsearch_exceptions import ElasticSearchDataNotUploaded
from benchmark_runner.workloads.workloads_exceptions import MissingVMs
from benchmark_runner.workloads.workloads_operations import WorkloadsOperations
from benchmark_runner.common.oc.oc import VMStatus

@@ -66,23 +67,33 @@ def _set_bootstorm_vm_start_time(self, vm_name: str = ''):
self._bootstorm_start_time[vm_name] = time.time()

@logger_time_stamp
def _get_bootstorm_vm_elapsed_time(self, vm_name: str):
def _ssh_vm(self, vm_name: str):
"""
Verify ssh into VM and return vm node in success or False if failed
@return:
"""
self._virtctl.expose_vm(vm_name=vm_name)
# wait till vm ssh login
if self._oc.get_vm_node(vm_name=vm_name):
vm_node = self._oc.get_vm_node(vm_name=vm_name)
if vm_node:
node_ip = self._oc.get_nodes_addresses()[vm_node]
vm_node_port = self._oc.get_exposed_vm_port(vm_name=vm_name)
if self._oc.wait_for_vm_ssh(vm_name=vm_name, node_ip=node_ip, vm_node_port=vm_node_port):
logger.info(f"Successfully ssh into VM: '{vm_name}' in Node: '{vm_node}' ")
return vm_node
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you capture any stderr from the ssh command, so that when it fails you can at least have some hope of figuring out what went wrong (it might simply be a network problem or a credential problem).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled it in self._oc.wait_for_vm_ssh by raising VMStateTimeout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that actually capture the stderr from the ssh command?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the full code here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not appear to me that the ssh output gets reported if the ssh fails.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a custom error for it:
raise VMStateTimeout(vm_name=vm_name, state='ssh')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But does it report the actual error reported by ssh? I want that to be reported to make it easier to debug and to distinguish glitches from real problems. It doesn't look like VMStateTimeout captures the reason for the error (what ssh reports on stderr).


@logger_time_stamp
def _get_bootstorm_vm_elapsed_time(self, vm_name: str, vm_node: str) -> dict:
"""
This method returns boot elapse time for specified VM in milliseconds
@return: Dictionary with vm_name, node and its boot elapse time
Returns the boot elapse time for the specified VM in milliseconds.
@return: Dictionary with vm_name, node, and boot elapse time.
"""
vm_bootstorm_time = {}
self._virtctl.expose_vm(vm_name=vm_name)
# wait till vm login
vm_node = self._oc.get_vm_node(vm_name=vm_name)
node_ip = self._oc.get_nodes_addresses()[vm_node]
vm_node_port = self._oc.get_exposed_vm_port(vm_name=vm_name)
if self._oc.wait_for_vm_login(vm_name=vm_name, node_ip=node_ip, vm_node_port=vm_node_port):
vm_bootstorm_time['vm_name'] = vm_name
vm_bootstorm_time['node'] = vm_node
delta = time.time() - self._bootstorm_start_time[vm_name]
vm_bootstorm_time['bootstorm_time'] = round(delta, 3) * self.MILLISECONDS
return vm_bootstorm_time
if vm_node:
delta = round((time.time() - self._bootstorm_start_time[vm_name]) * self.MILLISECONDS, 3)
return {'vm_name': vm_name, 'node': vm_node, 'bootstorm_time': delta, 'vm_ssh': int(bool(vm_node)),}
return {}

def _create_vm_scale(self, vm_num: str):
"""
@@ -103,8 +114,9 @@ def _finalize_vm(self):
metric_results = self._prometheus_metrics_operation.run_prometheus_queries()
prometheus_result = self._prometheus_metrics_operation.parse_prometheus_metrics(data=metric_results)
# update total vm run time
total_run_time = self._get_bootstorm_vm_total_run_time()
self._data_dict.update({'total_run_time': total_run_time})
if not self._verification_only:
total_run_time = self._get_bootstorm_vm_total_run_time()
self._data_dict.update({'total_run_time': total_run_time})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be a bad idea to stick some distinctive value in there even in verification mode, such as -1, so that the schema is the same both ways.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total_run_time will be empty when no input data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, but that's about testing that total_run_time is handled correctly. Not that big of a deal, but think about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer that it will be empty in ElasticSearch instead of -1, more simple to handle it.

self._data_dict.update(prometheus_result)
if self._es_host:
# upload several run results
@@ -123,14 +135,41 @@ def _run_vm(self):
self._set_bootstorm_vm_first_run_time()
self._set_bootstorm_vm_start_time(vm_name=self._vm_name)
self._virtctl.start_vm_sync(vm_name=self._vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name)
self.vm_node = self._ssh_vm(vm_name=self._vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name, vm_node=self.vm_node)
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url,
f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
self._finalize_vm()
self._oc.delete_vm_sync(
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),
vm_name=self._vm_name)

def _verify_vm_ssh(self):
"""
This method verifies each VM ssh login
:return:
"""
try:
vm_names = self._oc._get_all_vm_names()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to compare this against a list of expected VM names, if that's practical here. And you certainly want to make sure that there is at least one, or you'll simply be "verifying" a null case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You right, add MissingVMs error when no running VMs at all

if not vm_names:
raise MissingVMs
for vm_name in vm_names:
vm_node = self._ssh_vm(vm_name)
self._data_dict = {
'vm_name': vm_name,
'node': vm_node,
'vm_ssh': int(bool(vm_node)),
'run_artifacts_url': os.path.join(
self._run_artifacts_url,
f"{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz"
)
}
self._finalize_vm()
except Exception as err:
# save run artifacts logs
self.save_error_logs()
raise err

def _run_vm_scale(self, vm_num: str):
"""
This method runs VMs in parallel and wait for login to be enabled
@@ -140,7 +179,8 @@ def _run_vm_scale(self, vm_num: str):
self._set_bootstorm_vm_start_time(vm_name=f'{self._workload_name}-{self._trunc_uuid}-{vm_num}')
self._virtctl.start_vm_async(vm_name=f'{self._workload_name}-{self._trunc_uuid}-{vm_num}')
self._virtctl.wait_for_vm_status(vm_name=vm_name, status=VMStatus.Running)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=vm_name)
vm_node = self._ssh_vm(vm_name)
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=vm_name, vm_node=vm_node)
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url, f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-scale-{self._time_stamp_format}.tar.gz')
self._finalize_vm()
except Exception as err:
@@ -203,36 +243,47 @@ def _initialize_run(self):
self._vm_name = f'{self._workload_name}-{self._trunc_uuid}'
self._kind = 'vm'
self._environment_variables_dict['kind'] = 'vm'
# create namespace
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
if not self._verification_only:
# create namespace
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))

def run_vm_workload(self):
if not self._scale:
self._run_vm()
# scale
# verification only w/o running or deleting any resource
if self._verification_only:
self._verify_vm_ssh()
else:
first_run_time_updated = False
# create run bulks
bulks = tuple(self.split_run_bulks(iterable=range(self._scale * len(self._scale_node_list)),
limit=self._threads_limit))
# create, run and delete vms
for target in (self._create_vm_scale, self._run_vm_scale, self._stop_vm_scale, self._wait_for_stop_vm_scale,
self._delete_vm_scale, self._wait_for_delete_vm_scale):
proc = []
for bulk in bulks:
for vm_num in bulk:
# save the first run vm time
if self._run_vm_scale == target and not first_run_time_updated:
self._set_bootstorm_vm_first_run_time()
first_run_time_updated = True
p = Process(target=target, args=(str(vm_num),))
p.start()
proc.append(p)
for p in proc:
p.join()
# sleep between bulks
time.sleep(self._bulk_sleep_time)
if not self._scale:
self._run_vm()
# scale
else:
first_run_time_updated = False
# Run VMs only
if not self._delete_all:
steps = (self._create_vm_scale, self._run_vm_scale)
else:
steps = (self._create_vm_scale, self._run_vm_scale, self._stop_vm_scale,
self._wait_for_stop_vm_scale,self._delete_vm_scale, self._wait_for_delete_vm_scale)

# create run bulks
bulks = tuple(self.split_run_bulks(iterable=range(self._scale * len(self._scale_node_list)),
limit=self._threads_limit))
# create, run and delete vms
for target in steps:
proc = []
for bulk in bulks:
for vm_num in bulk:
# save the first run vm time
if self._run_vm_scale == target and not first_run_time_updated:
self._set_bootstorm_vm_first_run_time()
first_run_time_updated = True
p = Process(target=target, args=(str(vm_num),))
p.start()
proc.append(p)
for p in proc:
p.join()
# sleep between bulks
time.sleep(self._bulk_sleep_time)
proc = []

@logger_time_stamp
def run(self):
19 changes: 11 additions & 8 deletions benchmark_runner/workloads/windows_vm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import sys
import time
from multiprocessing import Process

@@ -24,19 +25,21 @@ def run(self):
:return:
"""
try:
self._initialize_run()
if self._run_type == 'test_ci':
self._es_index = 'windows-test-ci-results'
else:
self._es_index = 'windows-results'
# create windows dv
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
self._oc.wait_for_dv_status(status='Succeeded')
self._initialize_run()
if not self._verification_only:
# create windows dv
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
self._oc.wait_for_dv_status(status='Succeeded')
self.run_vm_workload()
# delete windows dv
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
# delete namespace
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
if self._delete_all:
# delete windows dv
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
# delete namespace
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
except ElasticSearchDataNotUploaded as err:
self._oc.delete_vm_sync(
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),
6 changes: 4 additions & 2 deletions benchmark_runner/workloads/workloads.py
Original file line number Diff line number Diff line change
@@ -29,14 +29,16 @@ def run(self):
workload_module = importlib.import_module(f'benchmark_runner.workloads.{workload}')

try:
self.initialize_workload()
if not self._verification_only:
self.initialize_workload()
success = True
# extract workload module and class
for cls in inspect.getmembers(workload_module, inspect.isclass):
if workload.replace('_', '').lower() == cls[0].lower():
if cls[1]().run() == False:
success = False
self.finalize_workload()
if not self._verification_only:
self.finalize_workload()
return success

except Exception as err:
9 changes: 9 additions & 0 deletions benchmark_runner/workloads/workloads_exceptions.py
Original file line number Diff line number Diff line change
@@ -48,3 +48,12 @@ class MissingRedis(BenchmarkRunnerError):
def __init__(self):
self.message = "Missing redis"
super(MissingRedis, self).__init__(self.message)


class MissingVMs(BenchmarkRunnerError):
"""
This class raises an error for missing VMs
"""
def __init__(self):
self.message = "Missing running VMs"
super(MissingVMs, self).__init__(self.message)
10 changes: 7 additions & 3 deletions benchmark_runner/workloads/workloads_operations.py
Original file line number Diff line number Diff line change
@@ -99,6 +99,8 @@ def __init__(self):
self._prometheus_snap_interval = self._environment_variables_dict.get('prometheus_snap_interval', '')
self._prometheus_metrics_operation = PrometheusMetricsOperation()
self._windows_url = self._environment_variables_dict.get('windows_url', '')
self._delete_all = self._environment_variables_dict.get('delete_all', '')
self._verification_only = self._environment_variables_dict.get('verification_only', '')
if self._windows_url:
file_name = os.path.basename(self._windows_url)
self._windows_os = os.path.splitext(file_name)[0]
@@ -455,8 +457,9 @@ def initialize_workload(self):
# Verify that Kata operator in installed for kata workloads
if '_kata' in self._workload and not self._oc.is_kata_installed():
raise KataNotInstalled(workload=self._workload)
self.delete_all()
self.clear_nodes_cache()
if self._delete_all:
self.delete_all()
self.clear_nodes_cache()
if self._odf_pvc:
self.odf_workload_verification()
self._template.generate_yamls(scale=str(self._scale), scale_nodes=self._scale_node_list, redis=self._redis, thread_limit=self._threads_limit)
@@ -475,4 +478,5 @@ def finalize_workload(self):
self.upload_run_artifacts_to_s3()
if not self._save_artifacts_local:
self.delete_local_artifacts()
self.delete_all()
if self._delete_all:
self.delete_all()