Adding environment setting features for applications under OMF
[nepi.git] / src / nepi / util / proxy.py
index 576aab1..9545a5c 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
 import base64
@@ -61,6 +60,13 @@ GET_TESTBED_ID = 39
 GET_TESTBED_VERSION = 40
 TRACES_INFO = 41
 EXEC_XML = 42
+TESTBED_STATUS  = 43
+STARTED_TIME  = 44
+STOPPED_TIME  = 45
+CURRENT = 46
+ACCESS_CONFIGURATIONS = 47
+CURRENT_ACCESS_CONFIG = 48
+
 
 instruction_text = dict({
     OK:     "OK",
@@ -104,6 +110,12 @@ instruction_text = dict({
     TESTBED_ID: "TESTBED_ID",
     TESTBED_VERSION: "TESTBED_VERSION",
     TRACES_INFO: "TRACES_INFO",
+    STARTED_TIME: "STARTED_TIME",
+    STOPPED_TIME: "STOPPED_TIME",
+    CURRENT: "CURRENT",
+    ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
+    CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
+
     })
 
 def log_msg(server, params):
@@ -135,29 +147,33 @@ def log_reply(server, reply):
 
 def to_server_log_level(log_level):
     return (
-        server.DEBUG_LEVEL
+        DC.DEBUG_LEVEL
             if log_level == DC.DEBUG_LEVEL 
-        else server.ERROR_LEVEL
+        else DC.ERROR_LEVEL
     )
 
 def get_access_config_params(access_config):
+    mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
+    launch = not access_config.get_attribute_value(DC.RECOVER)
     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
     log_level = to_server_log_level(log_level)
-    user = host = port = agent = key = None
     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
     environment_setup = (
         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         else ""
     )
-    if communication == DC.ACCESS_SSH:
-        user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
-        host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
-        port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
-        agent = access_config.get_attribute_value(DC.USE_AGENT)
-        key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
-    return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+    user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+    host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+    port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+    agent = access_config.get_attribute_value(DC.USE_AGENT)
+    sudo = access_config.get_attribute_value(DC.USE_SUDO)
+    key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+    communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+    clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
+    return (mode, launch, root_dir, log_level, communication, user, host, port,
+            key, agent, sudo, environment_setup, clean_root)
 
 class AccessConfiguration(AttributesMap):
     def __init__(self, params = None):
@@ -165,9 +181,9 @@ class AccessConfiguration(AttributesMap):
         
         from nepi.core.metadata import Metadata
         
-        for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
+        for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
             self.add_attribute(**attr_info)
-        
+
         if params:
             for attr_name, attr_value in params.iteritems():
                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
@@ -185,15 +201,57 @@ class PermDir(object):
     def __init__(self, path):
         self.path = path
 
-def create_experiment_controller(xml, access_config = None):
-    mode = None if not access_config \
-            else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
-    launch = True if not access_config \
-            else not access_config.get_attribute_value(DC.RECOVER)
+def create_experiment_suite(xml, access_config, repetitions = None,
+        duration = None, wait_guids = None):
+    mode = None
+    if access_config :
+        (mode, launch, root_dir, log_level, communication, user, host, port, 
+                key, agent, sudo, environment_setup, clean_root) \
+                        = get_access_config_params(access_config)
+
     if not mode or mode == DC.MODE_SINGLE_PROCESS:
-        if not launch:
-            raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+        from nepi.core.execute import ExperimentSuite
+        if not root_dir:
+            root_dir = TempDir()
+        else:
+            root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
+
+        exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
+                wait_guids)
         
+        # inject reference to temporary dir, so that it gets cleaned
+        # up at destruction time.
+        exp_suite._tempdir = root_dir
+        return exp_suite
+    elif mode == DC.MODE_DAEMON:
+        return ExperimentSuiteProxy(root_dir, log_level,
+                xml,
+                repetitions = repetitions, 
+                duration = duration,
+                wait_guids = wait_guids, 
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                ident_key = key,
+                agent = agent, 
+                sudo = sudo, 
+                environment_setup = environment_setup, 
+                clean_root = clean_root)
+    raise RuntimeError("Unsupported access configuration '%s'" % mode)
+
+def create_experiment_controller(xml, access_config = None):
+    mode = None
+    launch = True
+    log_level = DC.ERROR_LEVEL
+    if access_config:
+        (mode, launch, root_dir, log_level, communication, user, host, port, 
+                key, agent, sudo, environment_setup, clean_root) \
+                        = get_access_config_params(access_config)
+
+    os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
+    if not mode or mode == DC.MODE_SINGLE_PROCESS:
         from nepi.core.execute import ExperimentController
         
         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
@@ -206,22 +264,40 @@ def create_experiment_controller(xml, access_config = None):
         # up at destruction time.
         controller._tempdir = root_dir
         
+        if not launch:
+            # try to recover
+            controller.recover()
+        
         return controller
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
         try:
             return ExperimentControllerProxy(root_dir, log_level,
-                experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                agent = agent, launch = launch,
-                environment_setup = environment_setup)
+                experiment_xml = xml,
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                ident_key = key,
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
+                environment_setup = environment_setup, 
+                clean_root = clean_root)
         except:
             if not launch:
                 # Maybe controller died, recover from persisted testbed information if possible
                 controller = ExperimentControllerProxy(root_dir, log_level,
-                    experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                    agent = agent, launch = True,
-                    environment_setup = environment_setup)
+                    experiment_xml = xml,
+                    communication = communication,
+                    host = host, 
+                    port = port, 
+                    user = user, 
+                    ident_key = key,
+                    agent = agent, 
+                    sudo = sudo, 
+                    launch = True,
+                    environment_setup = environment_setup,
+                    clean_root = clean_root)
                 controller.recover()
                 return controller
             else:
@@ -229,21 +305,34 @@ def create_experiment_controller(xml, access_config = None):
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def create_testbed_controller(testbed_id, testbed_version, access_config):
-    mode = None if not access_config \
-            else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
-    launch = True if not access_config \
-            else not access_config.get_attribute_value(DC.RECOVER)
+    mode = None
+    launch = True
+    log_level = DC.ERROR_LEVEL
+    if access_config:
+        (mode, launch, root_dir, log_level, communication, user, host, port, 
+                key, agent, sudo, environment_setup, clean_root) \
+                        = get_access_config_params(access_config)
+
+    os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+    
     if not mode or mode == DC.MODE_SINGLE_PROCESS:
         if not launch:
-            raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+            raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
         return  _build_testbed_controller(testbed_id, testbed_version)
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
-        return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
-                testbed_version = testbed_version, host = host, port = port, ident_key = key,
-                user = user, agent = agent, launch = launch,
-                environment_setup = environment_setup)
+        return TestbedControllerProxy(root_dir, log_level, 
+                testbed_id = testbed_id, 
+                testbed_version = testbed_version,
+                communication = communication,
+                host = host, 
+                port = port, 
+                ident_key = key,
+                user = user, 
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
+                environment_setup = environment_setup, 
+                clean_root = clean_root)
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def _build_testbed_controller(testbed_id, testbed_version):
@@ -276,7 +365,7 @@ class Marshalling:
         @staticmethod
         def nullint(sdata):
             return None if sdata == "None" else int(sdata)
-        
+
         @staticmethod
         def bool(sdata):
             return sdata == 'True'
@@ -288,6 +377,8 @@ class Marshalling:
         
         @staticmethod
         def base64_data(data):
+            if not data:
+                return ""
             return base64.b64encode(data)
         
         @staticmethod
@@ -461,10 +552,100 @@ class BaseServer(server.Server):
         log_reply(self, reply)
         return reply
 
+class ExperimentSuiteServer(BaseServer):
+    def __init__(self, root_dir, log_level, 
+            xml, repetitions, duration, wait_guids, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "", 
+            clean_root = False):
+        super(ExperimentSuiteServer, self).__init__(root_dir, log_level, 
+            environment_setup = environment_setup, clean_root = clean_root)
+        access_config = AccessConfiguration()
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
+        access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
+        access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
+        if user:
+            access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
+        if host:
+            access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
+        if port:
+            access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
+        if agent:    
+            access_config.set_attribute_value(DC.USE_AGENT, agent)
+        if sudo:
+            acess_config.set_attribute_value(DC.USE_SUDO, sudo)
+        if ident_key:
+            access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
+        if communication:
+            access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
+        if clean_root:
+            access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
+        self._experiment_xml = xml
+        self._duration = duration
+        self._repetitions = repetitions
+        self._wait_guids = wait_guids
+        self._access_config = access_config
+        self._experiment_suite = None
+
+    def post_daemonize(self):
+        from nepi.core.execute import ExperimentSuite
+        self._experiment_suite = ExperimentSuite(
+                self._experiment_xml, self._access_config, 
+                self._repetitions, self._duration, self._wait_guids)
+
+    @Marshalling.handles(CURRENT)
+    @Marshalling.args()
+    @Marshalling.retval(int)
+    def current(self):
+        return self._experiment_suite.current()
+   
+    @Marshalling.handles(STATUS)
+    @Marshalling.args()
+    @Marshalling.retval(int)
+    def status(self):
+        return self._experiment_suite.status()
+    
+    @Marshalling.handles(FINISHED)
+    @Marshalling.args()
+    @Marshalling.retval(Marshalling.bool)
+    def is_finished(self):
+        return self._experiment_suite.is_finished()
+
+    @Marshalling.handles(ACCESS_CONFIGURATIONS)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def get_access_configurations(self):
+        return self._experiment_suite.get_access_configurations()
+
+    @Marshalling.handles(START)
+    @Marshalling.args()
+    @Marshalling.retvoid
+    def start(self):
+        self._experiment_suite.start()
+
+    @Marshalling.handles(SHUTDOWN)
+    @Marshalling.args()
+    @Marshalling.retvoid
+    def shutdown(self):
+        self._experiment_suite.shutdown()
+
+    @Marshalling.handles(CURRENT_ACCESS_CONFIG)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def get_current_access_config(self):
+        return self._experiment_suite.get_current_access_config()
+
 class TestbedControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+    def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
+            environment_setup, clean_root):
         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
-            environment_setup = environment_setup )
+            environment_setup = environment_setup, clean_root = clean_root)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
         self._testbed = None
@@ -673,6 +854,12 @@ class TestbedControllerServer(BaseServer):
     def status(self, guid):
         return self._testbed.status(guid)
 
+    @Marshalling.handles(TESTBED_STATUS)
+    @Marshalling.args()
+    @Marshalling.retval(int)
+    def testbed_status(self):
+        return self._testbed.testbed_status()
+
     @Marshalling.handles(GET_ATTRIBUTE_LIST)
     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
     @Marshalling.retval( Marshalling.pickled_data )
@@ -685,10 +872,18 @@ class TestbedControllerServer(BaseServer):
     def get_factory_id(self, guid):
         return self._testbed.get_factory_id(guid)
 
+    @Marshalling.handles(RECOVER)
+    @Marshalling.args()
+    @Marshalling.retvoid
+    def recover(self):
+        self._testbed.recover()
+
+
 class ExperimentControllerServer(BaseServer):
-    def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+    def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
+            clean_root):
         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
-            environment_setup = environment_setup )
+            environment_setup = environment_setup, clean_root = clean_root)
         self._experiment_xml = experiment_xml
         self._experiment = None
 
@@ -703,6 +898,18 @@ class ExperimentControllerServer(BaseServer):
     def guids(self):
         return self._experiment.guids
 
+    @Marshalling.handles(STARTED_TIME)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def started_time(self):
+        return self._experiment.started_time
+
+    @Marshalling.handles(STOPPED_TIME)
+    @Marshalling.args()
+    @Marshalling.retval( Marshalling.pickled_data )
+    def stopped_time(self):
+        return self._experiment.stopped_time
+
     @Marshalling.handles(XML)
     @Marshalling.args()
     @Marshalling.retval()
@@ -733,6 +940,12 @@ class ExperimentControllerServer(BaseServer):
     def is_finished(self, guid):
         return self._experiment.is_finished(guid)
 
+    @Marshalling.handles(STATUS)
+    @Marshalling.args(int)
+    @Marshalling.retval(int)
+    def status(self, guid):
+        return self._experiment.status(guid)
+
     @Marshalling.handles(GET)
     @Marshalling.args(int, Marshalling.base64_data, str)
     @Marshalling.retval( Marshalling.pickled_data )
@@ -791,15 +1004,19 @@ class BaseProxy(object):
     _ServerClass = None
     _ServerClassModule = "nepi.util.proxy"
     
-    def __init__(self, 
-            ctor_args, root_dir, 
-            launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
-            environment_setup = ""):
+    def __init__(self, ctor_args, root_dir, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "",
+            clean_root = False):
         if launch:
-            # ssh
-            if host != None:
-                python_code = (
+            python_code = (
                     "from %(classmodule)s import %(classname)s;"
                     "s = %(classname)s%(ctor_args)r;"
                     "s.run()" 
@@ -808,22 +1025,34 @@ class BaseProxy(object):
                     classmodule = self._ServerClassModule,
                     ctor_args = ctor_args
                 ) )
-                proc = server.popen_ssh_subprocess(python_code, host = host,
-                    port = port, user = user, agent = agent,
-                    ident_key = ident_key,
-                    environment_setup = environment_setup,
-                    waitcommand = True)
-                if proc.poll():
-                    err = proc.stderr.read()
-                    raise RuntimeError, "Server could not be executed: %s" % (err,)
+            proc = server.popen_python(python_code,
+                        communication = communication,
+                        host = host,
+                        port = port, 
+                        user = user, 
+                        agent = agent,
+                        ident_key = ident_key, 
+                        sudo = sudo,
+                        environment_setup = environment_setup) 
+            # Wait for the server to be ready, otherwise nobody
+            # will be able to connect to it
+            err = []
+            helo = "nope"
+            while helo:
+                helo = proc.stderr.readline()
+                if helo == 'SERVER_READY.\n':
+                    break
+                err.append(helo)
             else:
-                # launch daemon
-                s = self._ServerClass(*ctor_args)
-                s.run()
-
+                raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
         # connect client to server
-        self._client = server.Client(root_dir, host = host, port = port, 
-                user = user, agent = agent, 
+        self._client = server.Client(root_dir, 
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                agent = agent, 
+                sudo = sudo,
                 environment_setup = environment_setup)
     
     @staticmethod
@@ -1009,23 +1238,92 @@ class BaseProxy(object):
                         setattr(template_class, dmethname, dmeth)
         
         return rv
-                        
+
+class ExperimentSuiteProxy(BaseProxy):
+    
+    _ServerClass = ExperimentSuiteServer
+    
+    def __init__(self, root_dir, log_level,
+            xml, repetitions, duration, wait_guids, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "", 
+            clean_root = False):
+        super(ExperimentSuiteProxy,self).__init__(
+            ctor_args = (root_dir, log_level,
+                xml, 
+                repetitions, 
+                duration,
+                wait_guids, 
+                communication,
+                host, 
+                port, 
+                user, 
+                ident_key,
+                agent, 
+                sudo, 
+                environment_setup, 
+                clean_root),
+            root_dir = root_dir,
+            launch = True, #launch
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
+            environment_setup = environment_setup)
+
+    locals().update( BaseProxy._make_stubs(
+        server_class = ExperimentSuiteServer,
+        template_class = nepi.core.execute.ExperimentSuite,
+    ) )
+    
+    # Shutdown stops the serverside...
+    def shutdown(self, _stub = shutdown):
+        rv = _stub(self)
+        self._client.send_stop()
+        self._client.read_reply() # wait for it
+        return rv
+
 class TestbedControllerProxy(BaseProxy):
     
     _ServerClass = TestbedControllerServer
     
-    def __init__(self, root_dir, log_level, testbed_id = None, 
-            testbed_version = None, launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
-            environment_setup = ""):
+    def __init__(self, root_dir, log_level, 
+            testbed_id = None, 
+            testbed_version = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
+            environment_setup = "", 
+            clean_root = False):
         if launch and (testbed_id == None or testbed_version == None):
             raise RuntimeError("To launch a TesbedControllerServer a "
                     "testbed_id and testbed_version are required")
         super(TestbedControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
+            ctor_args = (root_dir, log_level, testbed_id, testbed_version,
+                environment_setup, clean_root),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
+            launch = launch,
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
             environment_setup = environment_setup)
 
     locals().update( BaseProxy._make_stubs(
@@ -1044,22 +1342,38 @@ class TestbedControllerProxy(BaseProxy):
 class ExperimentControllerProxy(BaseProxy):
     _ServerClass = ExperimentControllerServer
     
-    def __init__(self, root_dir, log_level, experiment_xml = None, 
-            launch = True, host = None, port = None, user = None, 
-            ident_key = None, agent = None, environment_setup = ""):
+    def __init__(self, root_dir, log_level, 
+            experiment_xml = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None, 
+            sudo = False, 
+            environment_setup = "",
+            clean_root = False):
         super(ExperimentControllerProxy,self).__init__(
-            ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
+            ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
+                clean_root),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
-            environment_setup = environment_setup)
+            launch = launch, 
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
+            environment_setup = environment_setup,
+            clean_root = clean_root)
 
     locals().update( BaseProxy._make_stubs(
         server_class = ExperimentControllerServer,
         template_class = nepi.core.execute.ExperimentController,
     ) )
 
-    
     # Shutdown stops the serverside...
     def shutdown(self, _stub = shutdown):
         rv = _stub(self)