SSH daemonization test fix, along with environment setup fixes.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Jul 2011 14:49:34 +0000 (16:49 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Jul 2011 14:49:34 +0000 (16:49 +0200)
 - NEPI_TESTBEDS: new environment variable that allows the specification of a mapping between testbed_id and testbed modules, for custom (eg: mock) testbeds
 - environment_setup now does work with locally daemonized testbeds (it wasn't working)

src/nepi/core/metadata.py
src/nepi/util/environ.py
src/nepi/util/proxy.py
src/nepi/util/server.py
test/core/execute.py
test/core/integration.py

index d84ccbd..523484e 100644 (file)
@@ -6,6 +6,7 @@ from nepi.core.connector import ConnectorType
 from nepi.core.factory import Factory
 import sys
 import getpass
+import nepi.util.environ
 from nepi.util import tags, validation
 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
         DeploymentConfiguration as DC, \
@@ -503,7 +504,7 @@ class Metadata(object):
         return factories
 
     def _load_metadata_module(self):
-        mod_name = "nepi.testbeds.%s.metadata" % (self._testbed_id.lower())
+        mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
         if not mod_name in sys.modules:
             __import__(mod_name)
         return sys.modules[mod_name]
index 3426ec6..8a2a90d 100644 (file)
@@ -69,4 +69,29 @@ def homepath(path, app='.nepi', mode = 0500):
     
     return path
 
+def find_testbed(testbed_id):
+    mod_name = None
     
+    # look for environment-specified testbeds
+    if 'NEPI_TESTBEDS' in os.environ:
+        try:
+            # parse testbed map
+            #   split space-separated items, filter empty items
+            testbed_map = filter(bool,os.environ['NEPI_TESTBEDS'].strip().split(' '))
+            #   split items, keep pairs only, build map
+            testbed_map = dict([map(str.strip,i.split(':',1)) for i in testbed_map if ':' in i])
+        except:
+            import traceback, sys
+            traceback.print_exc(file=sys.stderr)
+            
+            # ignore malformed environment
+            testbed_map = {}
+        
+        mod_name = testbed_map.get(testbed_id)
+    
+    if mod_name is None:
+        # no explicit map, load built-in testbeds
+        mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+
+    return mod_name
+
index 6c4db32..c0c7bcc 100644 (file)
@@ -3,6 +3,7 @@
 
 import base64
 import nepi.core.execute
+import nepi.util.environ
 from nepi.core.attributes import AttributesMap, Attribute
 from nepi.util import server, validation
 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
@@ -13,6 +14,7 @@ import time
 import tempfile
 import shutil
 import functools
+import os
 
 # PROTOCOL REPLIES
 OK = 0
@@ -233,9 +235,14 @@ def create_testbed_controller(testbed_id, testbed_version, access_config):
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def _build_testbed_controller(testbed_id, testbed_version):
-    mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+    mod_name = nepi.util.environ.find_testbed(testbed_id)
+    
     if not mod_name in sys.modules:
-        __import__(mod_name)
+        try:
+            __import__(mod_name)
+        except ImportError:
+            raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
+    
     module = sys.modules[mod_name]
     tc = module.TestbedController()
     if tc.testbed_version != testbed_version:
@@ -443,8 +450,9 @@ class BaseServer(server.Server):
         return reply
 
 class TestbedControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, testbed_id, testbed_version):
-        super(TestbedControllerServer, self).__init__(root_dir, log_level)
+    def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+        super(TestbedControllerServer, self).__init__(root_dir, log_level, 
+            environment_setup = environment_setup )
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
         self._testbed = None
@@ -666,8 +674,9 @@ class TestbedControllerServer(BaseServer):
         return self._testbed.get_factory_id(guid)
 
 class ExperimentControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, experiment_xml):
-        super(ExperimentControllerServer, self).__init__(root_dir, log_level)
+    def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+        super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
+            environment_setup = environment_setup )
         self._experiment_xml = experiment_xml
         self._experiment = None
 
@@ -1001,7 +1010,7 @@ class TestbedControllerProxy(BaseProxy):
             raise RuntimeError("To launch a TesbedControllerServer a "
                     "testbed_id and testbed_version are required")
         super(TestbedControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+            ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
             root_dir = root_dir,
             launch = launch, host = host, port = port, user = user,
             ident_key = ident_key, agent = agent, 
@@ -1030,7 +1039,7 @@ class ExperimentControllerProxy(BaseProxy):
             raise RuntimeError("To launch a ExperimentControllerServer a \
                     xml description of the experiment is required")
         super(ExperimentControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, experiment_xml),
+            ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
             root_dir = root_dir,
             launch = launch, host = host, port = port, user = user,
             ident_key = ident_key, agent = agent, 
index 92212f7..5cc50da 100644 (file)
@@ -75,12 +75,13 @@ def eintr_retry(func):
     return rv
 
 class Server(object):
-    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
+    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None):
         self._root_dir = root_dir
         self._stop = False
         self._ctrl_sock = None
         self._log_level = log_level
         self._rdbuf = ""
+        self._environment_setup = environment_setup
 
     def run(self):
         try:
@@ -161,6 +162,31 @@ class Server(object):
         # was opened with 0 buffer
         os.dup2(stdout.fileno(), sys.stdout.fileno())
         os.dup2(stderr.fileno(), sys.stderr.fileno())
+        
+        # setup environment
+        if self._environment_setup:
+            # parse environment variables and pass to child process
+            # do it by executing shell commands, in case there's some heavy setup involved
+            envproc = subprocess.Popen(
+                [ "bash", "-c", 
+                    "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+                        ( self._environment_setup, ) ],
+                stdin = subprocess.PIPE, 
+                stdout = subprocess.PIPE,
+                stderr = subprocess.PIPE
+            )
+            out,err = envproc.communicate()
+
+            # parse new environment
+            environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+            
+            # apply to current environment
+            for name, value in environment.iteritems():
+                os.environ[name] = value
+            
+            # apply pythonpath
+            if 'PYTHONPATH' in environment:
+                sys.path = environment['PYTHONPATH'].split(':') + sys.path
 
         # create control socket
         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
index bfb1b4b..de8de1b 100755 (executable)
@@ -13,9 +13,8 @@ import unittest
 class ExecuteTestCase(unittest.TestCase):
     def setUp(self):
         sys.modules["nepi.testbeds.mock.metadata"] = mock.metadata
-
-    def test_execute(self):
-        instance = mock.TestbedController()
+    
+    def make_mock_test(self, instance):
         instance.defer_configure("fake", True)
         instance.defer_create(2, "Node")
         instance.defer_create(3, "Node")
@@ -30,12 +29,22 @@ class ExecuteTestCase(unittest.TestCase):
         instance.defer_create(7, "Application")
         instance.defer_add_trace(7, "fake")
         instance.defer_connect(7, "node", 2, "apps")
-
+    
+    def do_presteps(self, instance):
         instance.do_setup()
         instance.do_create()
         instance.do_connect_init()
         instance.do_connect_compl()
+        instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
+
+    def test_execute(self):
+        instance = mock.TestbedController()
+        
+        self.make_mock_test(instance)
+        self.do_presteps(instance)
+
         instance.start()
         attr_list = instance.get_attribute_list(5)
         self.assertEquals(attr_list, ["test", "fake", "cross", "maxAddresses", "label"])
index 4d76c4d..1c1e11a 100755 (executable)
@@ -318,7 +318,7 @@ class ExecuteTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
 
-    def TODO_test_ssh_daemonized_all_integration(self):
+    def test_ssh_daemonized_integration(self):
         # TODO: This test doesn't run because
         # sys.modules["nepi.testbeds.mock"] = mock
         # is not set in the ssh process
@@ -329,9 +329,11 @@ class ExecuteTestCase(unittest.TestCase):
         inst_root_dir = os.path.join(self.root_dir, "instance")
         os.mkdir(inst_root_dir)
         desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-        desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
-        desc.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
-        desc.set_attribute_value(DC.USE_AGENT, True)
+        desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH ; "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
         
         xml = exp_desc.to_xml()
         
@@ -343,18 +345,20 @@ class ExecuteTestCase(unittest.TestCase):
         access_config.set_attribute_value(DC.USE_AGENT, True)
         controller = proxy.create_experiment_controller(xml, access_config)
 
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        fake_result = controller.trace(app.guid, "fake")
-        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+        try:
+            controller.start()
+            while not controller.is_finished(app.guid):
+                time.sleep(0.5)
+            fake_result = controller.trace(app.guid, "fake")
+            comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
 
 --- 10.0.0.2 ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time 0ms
 """
-        self.assertTrue(fake_result.startswith(comp_result))
-        controller.stop()
-        controller.shutdown()
+            self.assertTrue(fake_result.startswith(comp_result))
+        finally:
+            controller.stop()
+            controller.shutdown()
  
 if __name__ == '__main__':
     unittest.main()