self._root_dir = root_dir
self._netreffed_testbeds = set()
self._guids_in_testbed_cache = dict()
-
- self.persist_experiment_xml()
+
+ if experiment_xml is None and root_dir is not None:
+ # Recover
+ self.load_experiment_xml()
+ self.load_execute_xml()
+ else:
+ self.persist_experiment_xml()
@property
def experiment_design_xml(self):
f.write(self._experiment_execute_xml)
f.close()
+ def load_experiment_xml(self):
+ xml_path = os.path.join(self._root_dir, "experiment-design.xml")
+ f = open(xml_path, "r")
+ self._experiment_design_xml = f.read()
+ f.close()
+
+ def load_execute_xml(self):
+ xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
+ f = open(xml_path, "r")
+ self._experiment_execute_xml = f.read()
+ f.close()
+
def trace(self, guid, trace_id, attribute='value'):
testbed = self._testbed_for_guid(guid)
if testbed != None:
def _load_testbed_proxies(self):
TYPEMAP = {
- STRING : 'get',
- INTEGER : 'getint',
- FLOAT : 'getfloat',
- BOOLEAN : 'getboolean',
+ Attribute.STRING : 'get',
+ Attribute.BOOL : 'getboolean',
+ Attribute.ENUM : 'get',
+ Attribute.DOUBLE : 'getfloat',
+ Attribute.INTEGER : 'getint',
}
+ TRANSIENT = ('Recover',)
+
# deferred import because proxy needs
# our class definitions to define proxies
import nepi.util.proxy as proxy
conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
for testbed_guid in conf.sections():
testbed_config = proxy.AccessConfiguration()
- for attr in conf.options(testbed_guid):
- testbed_config.set_attribute_value(attr,
- conf.get(testbed_guid, attr) )
-
testbed_guid = str(testbed_guid)
- conf.add_section(testbed_guid)
for attr in testbed_config.get_attribute_list():
if attr not in TRANSIENT:
getter = getattr(conf, TYPEMAP.get(
testbed_config.get_attribute_type(attr),
'get') )
testbed_config.set_attribute_value(
- testbed_guid, attr, getter(attr))
+ attr, getter(testbed_guid, attr))
def _unpersist_testbed_proxies(self):
try:
# reload perviously persisted testbed access configurations
self._load_testbed_proxies()
+ # Parse experiment xml
+ parser = XmlExperimentParser()
+ data = parser.from_xml_to_data(self._experiment_design_xml)
+
# recreate testbed proxies by reconnecting only
- self._init_testbed_controllers(recover = True)
+ self._init_testbed_controllers(data, recover = True)
# another time, for netrefs
- self._init_testbed_controllers(recover = True)
+ self._init_testbed_controllers(data, recover = True)
+
+ print >>sys.stderr, "RECOVERED"
def is_finished(self, guid):
testbed = self._testbed_for_guid(guid)
from nepi.core.factory import Factory
import sys
import getpass
+import nepi.util.environ
from nepi.util import tags, validation
from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
DeploymentConfiguration as DC, \
return factories
def _load_metadata_module(self):
- mod_name = "nepi.testbeds.%s.metadata" % (self._testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
if not mod_name in sys.modules:
__import__(mod_name)
return sys.modules[mod_name]
else:
logger.debug("Performing %s on %s", action, guid)
perform_action(guid)
+
+ # sync
+ if runner:
+ runner.sync()
# post hook
if poststep:
# Install application
try:
self._popen_ssh_command(
- "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % \
+ "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/{install,build}log >&2 && false )" % \
{
'command' : self._replace_paths(self.install),
'home' : server.shell_escape(self.home_path),
" test -f lib/libns3.so "
" ) || ( "
# Not working, rebuild
- "wget -q -c -O pybindgen-src.zip %(pybindgen_source_url)s && " # continue, to exploit the case when it has already been dl'ed
- "wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && "
- "wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && "
- "wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && "
+ # Archive SHA1 sums to check
+ "echo '7158877faff2254e6c094bf18e6b4283cac19137 pygccxml-1.0.0.zip' > archive_sums.txt && "
+ "echo 'ddc7c5d288e1bacb1307114878956762c5146fac pybindgen-src.zip' >> archive_sums.txt && "
+ " ( " # check existing files
+ " sha1sum -c archive_sums.txt && "
+ " test -f passfd-src.tar.gz && "
+ " test -f ns3-src.tar.gz "
+ " ) || ( " # nope? re-download
+ " rm -f pybindgen-src.zip pygccxml-1.0.0.zip passfd-src.tar.gz ns3-src.tar.gz && "
+ " wget -q -c -O pybindgen-src.zip %(pybindgen_source_url)s && " # continue, to exploit the case when it has already been dl'ed
+ " wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && "
+ " wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && "
+ " wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && "
+ " sha1sum -c archive_sums.txt " # Check SHA1 sums when applicable
+ " ) && "
"unzip -n pybindgen-src.zip && " # Do not overwrite files, to exploit the case when it has already been built
"unzip -n pygccxml-1.0.0.zip && "
"mkdir -p ns3-src && "
raise RuntimeError("Error executing `%s': %s" % (" ".join(cmd), err))
return out
-def homepath(path, app='.nepi', mode = 0500):
+def homepath(path, app='.nepi', mode = 0500, directory = False):
home = os.environ.get('HOME')
if home is None:
home = os.path.join(os.sep, 'home', os.getlogin())
path = os.path.join(home, app, path)
- dirname = os.path.dirname(path)
+ if directory:
+ dirname = path
+ else:
+ dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
return path
+def find_testbed(testbed_id):
+ mod_name = None
+ # look for environment-specified testbeds
+ if 'NEPI_TESTBEDS' in os.environ:
+ try:
+ # parse testbed map
+ # split space-separated items, filter empty items
+ testbed_map = filter(bool,os.environ['NEPI_TESTBEDS'].strip().split(' '))
+ # split items, keep pairs only, build map
+ testbed_map = dict([map(str.strip,i.split(':',1)) for i in testbed_map if ':' in i])
+ except:
+ import traceback, sys
+ traceback.print_exc(file=sys.stderr)
+
+ # ignore malformed environment
+ testbed_map = {}
+
+ mod_name = testbed_map.get(testbed_id)
+
+ if mod_name is None:
+ # no explicit map, load built-in testbeds
+ mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+
+ return mod_name
+
if self.delayed_exceptions:
typ,val,loc = self.delayed_exceptions[0]
raise typ,val,loc
+
+ def sync(self):
+ self.queue.join()
def worker(self):
while True:
import base64
import nepi.core.execute
+import nepi.util.environ
from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
import tempfile
import shutil
import functools
+import os
# PROTOCOL REPLIES
OK = 0
environment_setup = (
access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
- else None
+ else ""
)
if communication == DC.ACCESS_SSH:
user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
elif mode == DC.MODE_DAEMON:
(root_dir, log_level, user, host, port, key, agent, environment_setup) = \
get_access_config_params(access_config)
- return ExperimentControllerProxy(root_dir, log_level,
+ try:
+ return ExperimentControllerProxy(root_dir, log_level,
experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
agent = agent, launch = launch,
environment_setup = environment_setup)
+ except:
+ if not launch:
+ # Maybe controller died, recover from persisted testbed information if possible
+ controller = ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
+ agent = agent, launch = True,
+ environment_setup = environment_setup)
+ controller.recover()
+ return controller
+ else:
+ raise
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def create_testbed_controller(testbed_id, testbed_version, access_config):
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_controller(testbed_id, testbed_version):
- mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(testbed_id)
+
if not mod_name in sys.modules:
- __import__(mod_name)
+ try:
+ __import__(mod_name)
+ except ImportError:
+ raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
+
module = sys.modules[mod_name]
tc = module.TestbedController()
if tc.testbed_version != testbed_version:
return reply
class TestbedControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+ super(TestbedControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup )
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
return self._testbed.get_factory_id(guid)
class ExperimentControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, experiment_xml):
- super(ExperimentControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup )
self._experiment_xml = experiment_xml
self._experiment = None
raise RuntimeError("To launch a TesbedControllerServer a "
"testbed_id and testbed_version are required")
super(TestbedControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
root_dir = root_dir,
launch = launch, host = host, port = port, user = user,
ident_key = ident_key, agent = agent,
def __init__(self, root_dir, log_level, experiment_xml = None,
launch = True, host = None, port = None, user = None,
ident_key = None, agent = None, environment_setup = ""):
- if launch and experiment_xml is None:
- raise RuntimeError("To launch a ExperimentControllerServer a \
- xml description of the experiment is required")
super(ExperimentControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, experiment_xml),
+ ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
root_dir = root_dir,
launch = launch, host = host, port = port, user = user,
ident_key = ident_key, agent = agent,
return rv
class Server(object):
- def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
+ def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
self._root_dir = root_dir
self._stop = False
self._ctrl_sock = None
self._log_level = log_level
self._rdbuf = ""
+ self._environment_setup = environment_setup
def run(self):
try:
# was opened with 0 buffer
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(stderr.fileno(), sys.stderr.fileno())
+
+ # setup environment
+ if self._environment_setup:
+ # parse environment variables and pass to child process
+ # do it by executing shell commands, in case there's some heavy setup involved
+ envproc = subprocess.Popen(
+ [ "bash", "-c",
+ "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+ ( self._environment_setup, ) ],
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE
+ )
+ out,err = envproc.communicate()
+
+ # parse new environment
+ if out:
+ environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+
+ # apply to current environment
+ for name, value in environment.iteritems():
+ os.environ[name] = value
+
+ # apply pythonpath
+ if 'PYTHONPATH' in environment:
+ sys.path = environment['PYTHONPATH'].split(':') + sys.path
# create control socket
self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
class ExecuteTestCase(unittest.TestCase):
def setUp(self):
sys.modules["nepi.testbeds.mock.metadata"] = mock.metadata
-
- def test_execute(self):
- instance = mock.TestbedController()
+
+ def make_mock_test(self, instance):
instance.defer_configure("fake", True)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
instance.defer_create(7, "Application")
instance.defer_add_trace(7, "fake")
instance.defer_connect(7, "node", 2, "apps")
-
+
+ def do_presteps(self, instance):
instance.do_setup()
instance.do_create()
instance.do_connect_init()
instance.do_connect_compl()
+ instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
+
+ def test_execute(self):
+ instance = mock.TestbedController()
+
+ self.make_mock_test(instance)
+ self.do_presteps(instance)
+
instance.start()
attr_list = instance.get_attribute_list(5)
self.assertEquals(attr_list, ["test", "fake", "cross", "maxAddresses", "label"])
controller.stop()
controller.shutdown()
- def TODO_test_ssh_daemonized_all_integration(self):
+ def test_ssh_daemonized_integration(self):
# TODO: This test doesn't run because
# sys.modules["nepi.testbeds.mock"] = mock
# is not set in the ssh process
inst_root_dir = os.path.join(self.root_dir, "instance")
os.mkdir(inst_root_dir)
desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
- desc.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
- desc.set_attribute_value(DC.USE_AGENT, True)
+ desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH ; "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
xml = exp_desc.to_xml()
access_config.set_attribute_value(DC.USE_AGENT, True)
controller = proxy.create_experiment_controller(xml, access_config)
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- fake_result = controller.trace(app.guid, "fake")
- comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+ try:
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ fake_result = controller.trace(app.guid, "fake")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
"""
- self.assertTrue(fake_result.startswith(comp_result))
- controller.stop()
- controller.shutdown()
+ self.assertTrue(fake_result.startswith(comp_result))
+ finally:
+ controller.stop()
+ controller.shutdown()
if __name__ == '__main__':
unittest.main()
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- self.port_base = self.port_base + 100
+ self.__class__.port_base = self.port_base + 100
def tearDown(self):
try:
instance.defer_configure("authUser", pl_user)
instance.defer_configure("authPass", pl_pwd)
instance.defer_configure("plcHost", plchost)
+ instance.defer_configure("tapPortBase", self.port_base)
return instance
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- self.port_base = self.port_base + 100
+ self.__class__.port_base = self.port_base + 100
def tearDown(self):
try:
pl_desc.set_attribute_value("authUser", pl_user)
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost)
+ pl_desc.set_attribute_value("tapPortBase", self.port_base)
return pl_desc, exp_desc
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- self.port_base = self.port_base + 100
+ self.__class__.port_base = self.port_base + 100
def tearDown(self):
try:
pl_desc.set_attribute_value("authUser", pl_user)
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost1)
+ pl_desc.set_attribute_value("tapPortBase", self.port_base)
pl_desc2 = exp_desc.add_testbed_description(pl_provider)
pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
pl_desc2.set_attribute_value("authUser", pl_user)
pl_desc2.set_attribute_value("authPass", pl_pwd)
pl_desc2.set_attribute_value("plcHost", plchost2)
+ pl_desc2.set_attribute_value("tapPortBase", self.port_base+100)
return pl_desc, pl_desc2, exp_desc
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- self.port_base = self.port_base + 100
+ self.__class__.port_base = self.port_base + 100
def tearDown(self):
try:
pl_desc.set_attribute_value("authUser", pl_user)
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost1)
+ pl_desc.set_attribute_value("tapPortBase", self.port_base)
pl_desc2 = exp_desc.add_testbed_description(pl_provider)
pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
pl_desc2.set_attribute_value("authUser", pl_user)
pl_desc2.set_attribute_value("authPass", pl_pwd)
pl_desc2.set_attribute_value("plcHost", plchost2)
+ pl_desc.set_attribute_value("tapPortBase", self.port_base+100)
return pl_desc, pl_desc2, exp_desc
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- self.port_base = self.port_base + 100
+ self.__class__.port_base = self.port_base + 100
def tearDown(self):
try:
pl_desc.set_attribute_value("authUser", pl_user)
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost)
+ pl_desc.set_attribute_value("tapPortBase", self.port_base)
return pl_desc, exp_desc