- NEPI_TESTBEDS: new environment variable that allows the specification of a mapping between testbed_id and testbed modules, for custom (eg: mock) testbeds
- environment_setup now does work with locally daemonized testbeds (it wasn't working)
from nepi.core.factory import Factory
import sys
import getpass
+import nepi.util.environ
from nepi.util import tags, validation
from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
DeploymentConfiguration as DC, \
return factories
def _load_metadata_module(self):
- mod_name = "nepi.testbeds.%s.metadata" % (self._testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
if not mod_name in sys.modules:
__import__(mod_name)
return sys.modules[mod_name]
return path
+def find_testbed(testbed_id):
+ mod_name = None
+ # look for environment-specified testbeds
+ if 'NEPI_TESTBEDS' in os.environ:
+ try:
+ # parse testbed map
+ # split space-separated items, filter empty items
+ testbed_map = filter(bool,os.environ['NEPI_TESTBEDS'].strip().split(' '))
+ # split items, keep pairs only, build map
+ testbed_map = dict([map(str.strip,i.split(':',1)) for i in testbed_map if ':' in i])
+ except:
+ import traceback, sys
+ traceback.print_exc(file=sys.stderr)
+
+ # ignore malformed environment
+ testbed_map = {}
+
+ mod_name = testbed_map.get(testbed_id)
+
+ if mod_name is None:
+ # no explicit map, load built-in testbeds
+ mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+
+ return mod_name
+
import base64
import nepi.core.execute
+import nepi.util.environ
from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
import tempfile
import shutil
import functools
+import os
# PROTOCOL REPLIES
OK = 0
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_controller(testbed_id, testbed_version):
- mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(testbed_id)
+
if not mod_name in sys.modules:
- __import__(mod_name)
+ try:
+ __import__(mod_name)
+ except ImportError:
+ raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
+
module = sys.modules[mod_name]
tc = module.TestbedController()
if tc.testbed_version != testbed_version:
return reply
class TestbedControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+ super(TestbedControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup )
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
return self._testbed.get_factory_id(guid)
class ExperimentControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, experiment_xml):
- super(ExperimentControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup )
self._experiment_xml = experiment_xml
self._experiment = None
raise RuntimeError("To launch a TesbedControllerServer a "
"testbed_id and testbed_version are required")
super(TestbedControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
root_dir = root_dir,
launch = launch, host = host, port = port, user = user,
ident_key = ident_key, agent = agent,
raise RuntimeError("To launch a ExperimentControllerServer a \
xml description of the experiment is required")
super(ExperimentControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, experiment_xml),
+ ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
root_dir = root_dir,
launch = launch, host = host, port = port, user = user,
ident_key = ident_key, agent = agent,
return rv
class Server(object):
- def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
+ def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None):
self._root_dir = root_dir
self._stop = False
self._ctrl_sock = None
self._log_level = log_level
self._rdbuf = ""
+ self._environment_setup = environment_setup
def run(self):
try:
# was opened with 0 buffer
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(stderr.fileno(), sys.stderr.fileno())
+
+ # setup environment
+ if self._environment_setup:
+ # parse environment variables and pass to child process
+ # do it by executing shell commands, in case there's some heavy setup involved
+ envproc = subprocess.Popen(
+ [ "bash", "-c",
+ "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+ ( self._environment_setup, ) ],
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE
+ )
+ out,err = envproc.communicate()
+
+ # parse new environment
+ environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+
+ # apply to current environment
+ for name, value in environment.iteritems():
+ os.environ[name] = value
+
+ # apply pythonpath
+ if 'PYTHONPATH' in environment:
+ sys.path = environment['PYTHONPATH'].split(':') + sys.path
# create control socket
self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
class ExecuteTestCase(unittest.TestCase):
def setUp(self):
sys.modules["nepi.testbeds.mock.metadata"] = mock.metadata
-
- def test_execute(self):
- instance = mock.TestbedController()
+
+ def make_mock_test(self, instance):
instance.defer_configure("fake", True)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
instance.defer_create(7, "Application")
instance.defer_add_trace(7, "fake")
instance.defer_connect(7, "node", 2, "apps")
-
+
+ def do_presteps(self, instance):
instance.do_setup()
instance.do_create()
instance.do_connect_init()
instance.do_connect_compl()
+ instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
+
+ def test_execute(self):
+ instance = mock.TestbedController()
+
+ self.make_mock_test(instance)
+ self.do_presteps(instance)
+
instance.start()
attr_list = instance.get_attribute_list(5)
self.assertEquals(attr_list, ["test", "fake", "cross", "maxAddresses", "label"])
controller.stop()
controller.shutdown()
- def TODO_test_ssh_daemonized_all_integration(self):
+ def test_ssh_daemonized_integration(self):
# TODO: This test doesn't run because
# sys.modules["nepi.testbeds.mock"] = mock
# is not set in the ssh process
inst_root_dir = os.path.join(self.root_dir, "instance")
os.mkdir(inst_root_dir)
desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
- desc.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
- desc.set_attribute_value(DC.USE_AGENT, True)
+ desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH ; "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
xml = exp_desc.to_xml()
access_config.set_attribute_value(DC.USE_AGENT, True)
controller = proxy.create_experiment_controller(xml, access_config)
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- fake_result = controller.trace(app.guid, "fake")
- comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+ try:
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ fake_result = controller.trace(app.guid, "fake")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
"""
- self.assertTrue(fake_result.startswith(comp_result))
- controller.stop()
- controller.shutdown()
+ self.assertTrue(fake_result.startswith(comp_result))
+ finally:
+ controller.stop()
+ controller.shutdown()
if __name__ == '__main__':
unittest.main()