Skip to content

Commit

Permalink
Support parallel to speed up testing
Browse files Browse the repository at this point in the history
For the requirement from issue #2, specify each TestSuite
as a thread to parallel execute.

The known problem is the performance of docker. When run
multiple mininet docker container, each container will
expend too many network interfaces and make system network
module busy. Currently, I cannot get 4-threads tests run
successfully.

Signed-off-by: jensenzhang <[email protected]>
  • Loading branch information
fno2010 committed Jan 21, 2017
1 parent 6617a1f commit 09175b7
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 22 deletions.
93 changes: 88 additions & 5 deletions bin/sdntest
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import sys
import os
import json
import logging
if sys.version[0] == '2':
import Queue as queue
else:
import queue
from optparse import OptionParser
from math import ceil
from copy import deepcopy

from sdntest.suite import TestSuite

Expand All @@ -31,10 +37,14 @@ class TestRunner(object):
self.options = None
self.args = None
self.testcase = None
self.parallel = 0

def cleanup(self):
# Cleanup the environment
if self.testcase:
if type(self.testcase) == list:
for testcase in self.testcase:
testcase.kill_platform()
else:
self.testcase.kill_platform()

def parseArgs(self):
Expand Down Expand Up @@ -79,6 +89,46 @@ class TestRunner(object):
logger.setLevel(LEVELS[self.options.verbosity])
output.setLevel(LEVELS[self.options.verbosity])

def partition(self, configs):
"""
Partition testcase for parallel execution.
"""
self.testcase = []
total_repeat = configs.get('repeat', 0)
max_repeat = int(ceil(float(total_repeat) / self.parallel))
for group in range(self.parallel):
group_configs = deepcopy(configs)
group_configs['group'] = group + 1
repeat = min(max_repeat, total_repeat)
total_repeat -= repeat
group_configs['repeat'] = repeat
testcase = TestSuite(group_configs)
self.testcase.append(testcase)
testcase.start()

def getException(self):
"""
Try to fetch exception from exc_pool of each testcase thread.
return: exception
"""
if type(self.testcase) == list:
for testcase in self.testcase:
try:
exc = testcase.exc_pool.get(block=False)
except queue.Empty:
pass
else:
return exc
else:
try:
exc = self.testcase.exc_pool.get(block=False)
except queue.Empty:
pass
else:
return exc
raise queue.Empty

def begin(self):
"""
Start the testcase.
Expand All @@ -104,10 +154,43 @@ class TestRunner(object):
os.chdir(workspace)
logging.basicConfig(filename='output.log')
configs['workspace'] = workspace
logger.debug("Create test suite:")
logger.debug("TestSuite(%s)", configs)
self.testcase = TestSuite(configs)
self.testcase.run()

if 'parallel' in configs.keys():
self.parallel = configs['parallel']

if self.parallel > 1:
logger.debug("Parallel number: %d", self.parallel)
logger.debug("Partition for parallel")
self.partition(configs)
else:
logger.debug("Create test suite:")
logger.debug("TestSuite(%s)", configs)
self.testcase = TestSuite(configs)
self.testcase.start()

# TODO: wait for all testcases finish
while True:
try:
exc = self.getException()
except queue.Empty:
pass
else:
raise exc

if type(self.testcase) == list:
for testcase in self.testcase:
testcase.join(0.1)
if sum([bool(tc.isAlive()) for tc in self.testcase]):
continue
else:
break
else:
self.testcase.join(0.1)
if self.testcase.isAlive():
continue
else:
break


if "__main__" == __name__:
try:
Expand Down
3 changes: 2 additions & 1 deletion sdntest/examples/customtopo/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"repeat": 5,
"repeat": 20,
"parallel": 4,
"platform": "odl",
"release": "4.4.0",
"apps": "odl-nic-core-mdsal odl-nic-renderer-vtn",
Expand Down
1 change: 1 addition & 0 deletions sdntest/examples/customtopo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def test( controller, branch, hop, seconds):
# Add host-to-host intent
h2hintent( controller, host1.IP(), host2.IP() )

sleep( 3 )
# Start ping
proc = startping( host1, host2.IP(), seconds)

Expand Down
56 changes: 40 additions & 16 deletions sdntest/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
import docker
import logging
import json
if sys.version[0] == '2':
import Queue as queue
else:
import queue
from time import sleep
from threading import Thread
from docker.errors import ImageNotFound
from sdntest.exception import PlatformException, WorkspaceException, REASON

class TestSuite():
class TestSuite(Thread):

def __init__(self, configs):
Thread.__init__(self)
self.exc_pool = queue.Queue()
self.docker = docker.from_env()
self.logger = logging.getLogger("TestSuite")
# container instance
Expand All @@ -27,6 +34,9 @@ def __init__(self, configs):
self.apps = ""
self.waiting_time = 15
self.net_workflow = None
# parallel options
self.parallel = 0
self.group = 0
# default values
self.default_odl_features = ' '.join([
'odl-openflowplugin-southbound',
Expand Down Expand Up @@ -151,7 +161,10 @@ def bootstrap_mininet(self):
**opts)
self.outputcnt += 1
self.logger.info("Workflow finished: (%d/%d)", self.outputcnt, self.repeat)
outputfile = os.path.join(self.outputdir, 'output.%d.log' % self.outputcnt)
if self.parallel > 1:
outputfile = os.path.join(self.outputdir, 'output.%d.%d.log' % (self.group, self.outputcnt))
else:
outputfile = os.path.join(self.outputdir, 'output.%d.log' % self.outputcnt)
with open(outputfile, 'w') as f:
f.write(workflow_output)
self.logger.info("Result saved in %s", outputfile)
Expand Down Expand Up @@ -203,21 +216,32 @@ def setup(self, configs):
if 'workflow' in configs.keys():
self.net_workflow = configs['workflow']

if 'parallel' in configs.keys():
self.parallel = configs['parallel']
if 'group' in configs.keys():
self.group = configs['group']

def run(self):
"""
Repeatedly execute test case.
"""
self.logger.info("Starting execution...")
for i in range(self.repeat):
self.logger.info("Repeat counter: %d", i+1)
self.logger.info("Bootstrapping SDN platform...")
self.bootstrap_platform()
self.logger.info("\u2714 Bootstrapped SDN platform")
self.logger.info("Waiting for mandatory components loaded...")
sleep(self.waiting_time)
self.logger.info("Bootstrapping Mininet...")
self.bootstrap_mininet()
self.logger.info("\u2714 Mininet test finished")
self.logger.info("Cleaning up SDN platform...")
self.kill_platform()
self.logger.info("\u2714 Environment is clean")
try:
self.logger.info("Starting execution...")
for i in range(self.repeat):
self.logger.info("Repeat counter: %d", i+1)
self.logger.info("Bootstrapping SDN platform...")
self.bootstrap_platform()
self.logger.info("\u2714 Bootstrapped SDN platform")
self.logger.info("Waiting for mandatory components loaded...")
sleep(self.waiting_time)
self.logger.info("Bootstrapping Mininet...")
self.bootstrap_mininet()
self.logger.info("\u2714 Mininet test finished")
self.logger.info("Cleaning up SDN platform...")
self.kill_platform()
self.logger.info("\u2714 Environment is clean")
except Exception:
self.exc_pool.put(sys.exc_info())
import traceback
stackTrace = traceback.format_exc()
self.logger.debug(stackTrace + "\n")

0 comments on commit 09175b7

Please sign in to comment.