From 09175b79ceb4e1327f76322b98fe699ce9afe36a Mon Sep 17 00:00:00 2001 From: jensenzhang Date: Sat, 21 Jan 2017 17:17:56 +0800 Subject: [PATCH] Support parallel to speed up testing For the requirement from issue #2, specify each TestSuite as a thread to parallel execute. The known problem is the performance of docker. When run multiple mininet docker container, each container will expend too many network interfaces and make system network module busy. Currently, I cannot get 4-threads tests run successfully. Signed-off-by: jensenzhang --- bin/sdntest | 93 +++++++++++++++++++++++-- sdntest/examples/customtopo/config.json | 3 +- sdntest/examples/customtopo/main.py | 1 + sdntest/suite.py | 56 ++++++++++----- 4 files changed, 131 insertions(+), 22 deletions(-) diff --git a/bin/sdntest b/bin/sdntest index 510d60d..87ea097 100755 --- a/bin/sdntest +++ b/bin/sdntest @@ -4,7 +4,13 @@ import sys import os import json import logging +if sys.version[0] == '2': + import Queue as queue +else: + import queue from optparse import OptionParser +from math import ceil +from copy import deepcopy from sdntest.suite import TestSuite @@ -31,10 +37,14 @@ class TestRunner(object): self.options = None self.args = None self.testcase = None + self.parallel = 0 def cleanup(self): # Cleanup the environment - if self.testcase: + if type(self.testcase) == list: + for testcase in self.testcase: + testcase.kill_platform() + else: self.testcase.kill_platform() def parseArgs(self): @@ -79,6 +89,46 @@ class TestRunner(object): logger.setLevel(LEVELS[self.options.verbosity]) output.setLevel(LEVELS[self.options.verbosity]) + def partition(self, configs): + """ + Partition testcase for parallel execution. + """ + self.testcase = [] + total_repeat = configs.get('repeat', 0) + max_repeat = int(ceil(float(total_repeat) / self.parallel)) + for group in range(self.parallel): + group_configs = deepcopy(configs) + group_configs['group'] = group + 1 + repeat = min(max_repeat, total_repeat) + total_repeat -= repeat + group_configs['repeat'] = repeat + testcase = TestSuite(group_configs) + self.testcase.append(testcase) + testcase.start() + + def getException(self): + """ + Try to fetch exception from exc_pool of each testcase thread. + + return: exception + """ + if type(self.testcase) == list: + for testcase in self.testcase: + try: + exc = testcase.exc_pool.get(block=False) + except queue.Empty: + pass + else: + return exc + else: + try: + exc = self.testcase.exc_pool.get(block=False) + except queue.Empty: + pass + else: + return exc + raise queue.Empty + def begin(self): """ Start the testcase. @@ -104,10 +154,43 @@ class TestRunner(object): os.chdir(workspace) logging.basicConfig(filename='output.log') configs['workspace'] = workspace - logger.debug("Create test suite:") - logger.debug("TestSuite(%s)", configs) - self.testcase = TestSuite(configs) - self.testcase.run() + + if 'parallel' in configs.keys(): + self.parallel = configs['parallel'] + + if self.parallel > 1: + logger.debug("Parallel number: %d", self.parallel) + logger.debug("Partition for parallel") + self.partition(configs) + else: + logger.debug("Create test suite:") + logger.debug("TestSuite(%s)", configs) + self.testcase = TestSuite(configs) + self.testcase.start() + + # TODO: wait for all testcases finish + while True: + try: + exc = self.getException() + except queue.Empty: + pass + else: + raise exc + + if type(self.testcase) == list: + for testcase in self.testcase: + testcase.join(0.1) + if sum([bool(tc.isAlive()) for tc in self.testcase]): + continue + else: + break + else: + self.testcase.join(0.1) + if self.testcase.isAlive(): + continue + else: + break + if "__main__" == __name__: try: diff --git a/sdntest/examples/customtopo/config.json b/sdntest/examples/customtopo/config.json index c07c8c7..1d14898 100644 --- a/sdntest/examples/customtopo/config.json +++ b/sdntest/examples/customtopo/config.json @@ -1,5 +1,6 @@ { - "repeat": 5, + "repeat": 20, + "parallel": 4, "platform": "odl", "release": "4.4.0", "apps": "odl-nic-core-mdsal odl-nic-renderer-vtn", diff --git a/sdntest/examples/customtopo/main.py b/sdntest/examples/customtopo/main.py index 58ce85d..c3045ab 100755 --- a/sdntest/examples/customtopo/main.py +++ b/sdntest/examples/customtopo/main.py @@ -92,6 +92,7 @@ def test( controller, branch, hop, seconds): # Add host-to-host intent h2hintent( controller, host1.IP(), host2.IP() ) + sleep( 3 ) # Start ping proc = startping( host1, host2.IP(), seconds) diff --git a/sdntest/suite.py b/sdntest/suite.py index f750ff8..8d7cd36 100644 --- a/sdntest/suite.py +++ b/sdntest/suite.py @@ -5,13 +5,20 @@ import docker import logging import json +if sys.version[0] == '2': + import Queue as queue +else: + import queue from time import sleep +from threading import Thread from docker.errors import ImageNotFound from sdntest.exception import PlatformException, WorkspaceException, REASON -class TestSuite(): +class TestSuite(Thread): def __init__(self, configs): + Thread.__init__(self) + self.exc_pool = queue.Queue() self.docker = docker.from_env() self.logger = logging.getLogger("TestSuite") # container instance @@ -27,6 +34,9 @@ def __init__(self, configs): self.apps = "" self.waiting_time = 15 self.net_workflow = None + # parallel options + self.parallel = 0 + self.group = 0 # default values self.default_odl_features = ' '.join([ 'odl-openflowplugin-southbound', @@ -151,7 +161,10 @@ def bootstrap_mininet(self): **opts) self.outputcnt += 1 self.logger.info("Workflow finished: (%d/%d)", self.outputcnt, self.repeat) - outputfile = os.path.join(self.outputdir, 'output.%d.log' % self.outputcnt) + if self.parallel > 1: + outputfile = os.path.join(self.outputdir, 'output.%d.%d.log' % (self.group, self.outputcnt)) + else: + outputfile = os.path.join(self.outputdir, 'output.%d.log' % self.outputcnt) with open(outputfile, 'w') as f: f.write(workflow_output) self.logger.info("Result saved in %s", outputfile) @@ -203,21 +216,32 @@ def setup(self, configs): if 'workflow' in configs.keys(): self.net_workflow = configs['workflow'] + if 'parallel' in configs.keys(): + self.parallel = configs['parallel'] + if 'group' in configs.keys(): + self.group = configs['group'] + def run(self): """ Repeatedly execute test case. """ - self.logger.info("Starting execution...") - for i in range(self.repeat): - self.logger.info("Repeat counter: %d", i+1) - self.logger.info("Bootstrapping SDN platform...") - self.bootstrap_platform() - self.logger.info("\u2714 Bootstrapped SDN platform") - self.logger.info("Waiting for mandatory components loaded...") - sleep(self.waiting_time) - self.logger.info("Bootstrapping Mininet...") - self.bootstrap_mininet() - self.logger.info("\u2714 Mininet test finished") - self.logger.info("Cleaning up SDN platform...") - self.kill_platform() - self.logger.info("\u2714 Environment is clean") + try: + self.logger.info("Starting execution...") + for i in range(self.repeat): + self.logger.info("Repeat counter: %d", i+1) + self.logger.info("Bootstrapping SDN platform...") + self.bootstrap_platform() + self.logger.info("\u2714 Bootstrapped SDN platform") + self.logger.info("Waiting for mandatory components loaded...") + sleep(self.waiting_time) + self.logger.info("Bootstrapping Mininet...") + self.bootstrap_mininet() + self.logger.info("\u2714 Mininet test finished") + self.logger.info("Cleaning up SDN platform...") + self.kill_platform() + self.logger.info("\u2714 Environment is clean") + except Exception: + self.exc_pool.put(sys.exc_info()) + import traceback + stackTrace = traceback.format_exc() + self.logger.debug(stackTrace + "\n")