Wait for SERVER_READY or PROXY_READ, instead of expecting it as the first line.
[nepi.git] / src / nepi / util / proxy.py
index a323688..43c22c5 100644 (file)
@@ -61,6 +61,9 @@ GET_TESTBED_ID = 39
 GET_TESTBED_VERSION = 40
 TRACES_INFO = 41
 EXEC_XML = 42
+TESTBED_STATUS  = 43
+STARTED_TIME  = 44
+STOPPED_TIME  = 45
 
 instruction_text = dict({
     OK:     "OK",
@@ -104,6 +107,9 @@ instruction_text = dict({
     TESTBED_ID: "TESTBED_ID",
     TESTBED_VERSION: "TESTBED_VERSION",
     TRACES_INFO: "TRACES_INFO",
+    STARTED_TIME: "STARTED_TIME",
+    STOPPED_TIME: "STOPPED_TIME",
+
     })
 
 def log_msg(server, params):
@@ -135,29 +141,33 @@ def log_reply(server, reply):
 
 def to_server_log_level(log_level):
     return (
-        server.DEBUG_LEVEL
+        DC.DEBUG_LEVEL
             if log_level == DC.DEBUG_LEVEL 
-        else server.ERROR_LEVEL
+        else DC.ERROR_LEVEL
     )
 
 def get_access_config_params(access_config):
+    mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
+    launch = not access_config.get_attribute_value(DC.RECOVER)
     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
     log_level = to_server_log_level(log_level)
-    user = host = port = agent = key = None
     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
     environment_setup = (
         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         else ""
     )
-    if communication == DC.ACCESS_SSH:
-        user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
-        host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
-        port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
-        agent = access_config.get_attribute_value(DC.USE_AGENT)
-        key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
-    return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+    user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+    host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+    port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+    agent = access_config.get_attribute_value(DC.USE_AGENT)
+    sudo = access_config.get_attribute_value(DC.USE_SUDO)
+    key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+    communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+    clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
+    return (mode, launch, root_dir, log_level, communication, user, host, port,
+            key, agent, sudo, environment_setup, clean_root)
 
 class AccessConfiguration(AttributesMap):
     def __init__(self, params = None):
@@ -186,10 +196,16 @@ class PermDir(object):
         self.path = path
 
 def create_experiment_controller(xml, access_config = None):
-    mode = None if not access_config \
-            else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
-    launch = True if not access_config \
-            else not access_config.get_attribute_value(DC.RECOVER)
+    mode = None
+    launch = True
+    log_level = DC.ERROR_LEVEL
+    if access_config:
+        (mode, launch, root_dir, log_level, communication, user, host, port, 
+                key, agent, sudo, environment_setup, clean_root) \
+                        = get_access_config_params(access_config)
+
+    os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
     if not mode or mode == DC.MODE_SINGLE_PROCESS:
         from nepi.core.execute import ExperimentController
         
@@ -209,20 +225,34 @@ def create_experiment_controller(xml, access_config = None):
         
         return controller
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
         try:
             return ExperimentControllerProxy(root_dir, log_level,
-                experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                agent = agent, launch = launch,
-                environment_setup = environment_setup)
+                experiment_xml = xml,
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                ident_key = key,
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
+                environment_setup = environment_setup, 
+                clean_root = clean_root)
         except:
             if not launch:
                 # Maybe controller died, recover from persisted testbed information if possible
                 controller = ExperimentControllerProxy(root_dir, log_level,
-                    experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                    agent = agent, launch = True,
-                    environment_setup = environment_setup)
+                    experiment_xml = xml,
+                    communication = communication,
+                    host = host, 
+                    port = port, 
+                    user = user, 
+                    ident_key = key,
+                    agent = agent, 
+                    sudo = sudo, 
+                    launch = True,
+                    environment_setup = environment_setup,
+                    clean_root = clean_root)
                 controller.recover()
                 return controller
             else:
@@ -230,21 +260,34 @@ def create_experiment_controller(xml, access_config = None):
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def create_testbed_controller(testbed_id, testbed_version, access_config):
-    mode = None if not access_config \
-            else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
-    launch = True if not access_config \
-            else not access_config.get_attribute_value(DC.RECOVER)
+    mode = None
+    launch = True
+    log_level = DC.ERROR_LEVEL
+    if access_config:
+        (mode, launch, root_dir, log_level, communication, user, host, port, 
+                key, agent, sudo, environment_setup, clean_root) \
+                        = get_access_config_params(access_config)
+
+    os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+    
     if not mode or mode == DC.MODE_SINGLE_PROCESS:
         if not launch:
-            raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+            raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
         return  _build_testbed_controller(testbed_id, testbed_version)
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
-        return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
-                testbed_version = testbed_version, host = host, port = port, ident_key = key,
-                user = user, agent = agent, launch = launch,
-                environment_setup = environment_setup)
+        return TestbedControllerProxy(root_dir, log_level, 
+                testbed_id = testbed_id, 
+                testbed_version = testbed_version,
+                communication = communication,
+                host = host, 
+                port = port, 
+                ident_key = key,
+                user = user, 
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
+                environment_setup = environment_setup, 
+                clean_root = clean_root)
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def _build_testbed_controller(testbed_id, testbed_version):
@@ -277,7 +320,7 @@ class Marshalling:
         @staticmethod
         def nullint(sdata):
             return None if sdata == "None" else int(sdata)
-        
+
         @staticmethod
         def bool(sdata):
             return sdata == 'True'
@@ -463,9 +506,10 @@ class BaseServer(server.Server):
         return reply
 
 class TestbedControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+    def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
+            environment_setup, clean_root):
         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
-            environment_setup = environment_setup )
+            environment_setup = environment_setup, clean_root = clean_root)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
         self._testbed = None
@@ -674,6 +718,12 @@ class TestbedControllerServer(BaseServer):
     def status(self, guid):
         return self._testbed.status(guid)
 
+    @Marshalling.handles(TESTBED_STATUS)
+    @Marshalling.args()
+    @Marshalling.retval(int)
+    def testbed_status(self):
+        return self._testbed.testbed_status()
+
     @Marshalling.handles(GET_ATTRIBUTE_LIST)
     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
     @Marshalling.retval( Marshalling.pickled_data )
@@ -694,9 +744,10 @@ class TestbedControllerServer(BaseServer):
 
 
 class ExperimentControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+    def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
+            clean_root):
         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
-            environment_setup = environment_setup )
+            environment_setup = environment_setup, clean_root = clean_root)
         self._experiment_xml = experiment_xml
         self._experiment = None
 
@@ -711,6 +762,18 @@ class ExperimentControllerServer(BaseServer):
     def guids(self):
         return self._experiment.guids
 
+    @Marshalling.handles(STARTED_TIME)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def started_time(self):
+        return self._experiment.started_time
+
+    @Marshalling.handles(STOPPED_TIME)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def stopped_time(self):
+        return self._experiment.stopped_time
+
     @Marshalling.handles(XML)
     @Marshalling.args()
     @Marshalling.retval()
@@ -805,15 +868,19 @@ class BaseProxy(object):
     _ServerClass = None
     _ServerClassModule = "nepi.util.proxy"
     
-    def __init__(self, 
-            ctor_args, root_dir, 
-            launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
-            environment_setup = ""):
+    def __init__(self, ctor_args, root_dir, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "",
+            clean_root = False):
         if launch:
-            # ssh
-            if host != None:
-                python_code = (
+            python_code = (
                     "from %(classmodule)s import %(classname)s;"
                     "s = %(classname)s%(ctor_args)r;"
                     "s.run()" 
@@ -822,22 +889,34 @@ class BaseProxy(object):
                     classmodule = self._ServerClassModule,
                     ctor_args = ctor_args
                 ) )
-                proc = server.popen_ssh_subprocess(python_code, host = host,
-                    port = port, user = user, agent = agent,
-                    ident_key = ident_key,
-                    environment_setup = environment_setup,
-                    waitcommand = True)
-                if proc.poll():
-                    err = proc.stderr.read()
-                    raise RuntimeError, "Server could not be executed: %s" % (err,)
+            proc = server.popen_python(python_code,
+                        communication = communication,
+                        host = host,
+                        port = port, 
+                        user = user, 
+                        agent = agent,
+                        ident_key = ident_key, 
+                        sudo = sudo,
+                        environment_setup = environment_setup) 
+            # Wait for the server to be ready, otherwise nobody
+            # will be able to connect to it
+            err = []
+            helo = "nope"
+            while helo:
+                helo = proc.stderr.readline()
+                if helo == 'SERVER_READY.\n':
+                    break
+                err.append(helo)
             else:
-                # launch daemon
-                s = self._ServerClass(*ctor_args)
-                s.run()
-
+                raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
         # connect client to server
-        self._client = server.Client(root_dir, host = host, port = port, 
-                user = user, agent = agent, 
+        self._client = server.Client(root_dir, 
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                agent = agent, 
+                sudo = sudo,
                 environment_setup = environment_setup)
     
     @staticmethod
@@ -1028,18 +1107,34 @@ class TestbedControllerProxy(BaseProxy):
     
     _ServerClass = TestbedControllerServer
     
-    def __init__(self, root_dir, log_level, testbed_id = None, 
-            testbed_version = None, launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
-            environment_setup = ""):
+    def __init__(self, root_dir, log_level, 
+            testbed_id = None, 
+            testbed_version = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "", 
+            clean_root = False):
         if launch and (testbed_id == None or testbed_version == None):
             raise RuntimeError("To launch a TesbedControllerServer a "
                     "testbed_id and testbed_version are required")
         super(TestbedControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
+            ctor_args = (root_dir, log_level, testbed_id, testbed_version,
+                environment_setup, clean_root),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
+            launch = launch,
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
             environment_setup = environment_setup)
 
     locals().update( BaseProxy._make_stubs(
@@ -1058,15 +1153,32 @@ class TestbedControllerProxy(BaseProxy):
 class ExperimentControllerProxy(BaseProxy):
     _ServerClass = ExperimentControllerServer
     
-    def __init__(self, root_dir, log_level, experiment_xml = None, 
-            launch = True, host = None, port = None, user = None, 
-            ident_key = None, agent = None, environment_setup = ""):
+    def __init__(self, root_dir, log_level, 
+            experiment_xml = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None, 
+            sudo = False, 
+            environment_setup = "",
+            clean_root = False):
         super(ExperimentControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
+            ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
+                clean_root),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
-            environment_setup = environment_setup)
+            launch = launch, 
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
+            environment_setup = environment_setup,
+            clean_root = clean_root)
 
     locals().update( BaseProxy._make_stubs(
         server_class = ExperimentControllerServer,