Daemonized servers are now always launched with popen, and not directly invoked in...
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 25 Aug 2011 11:12:54 +0000 (13:12 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 25 Aug 2011 11:12:54 +0000 (13:12 +0200)
src/nepi/core/execute.py
src/nepi/core/metadata.py
src/nepi/util/constants.py
src/nepi/util/proxy.py
src/nepi/util/server.py
test/core/integration.py
test/testbeds/netns/integration.py
test/util/server.py

index 552b4a9..0a52691 100644 (file)
@@ -14,6 +14,7 @@ import collections
 import functools
 import time
 import logging
+logging.basicConfig()
 
 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
index 509d1de..b96f009 100644 (file)
@@ -341,6 +341,17 @@ class Metadata(object):
             "validation_function" : validation.is_bool,
             "category" : AC.CATEGORY_DEPLOYMENT,
             }),
+        DC.USE_SUDO : dict({
+            "name" : DC.USE_SUDO,
+            "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.", 
+            "type" : Attribute.BOOL,
+            "value" : False,
+            "flags" : Attribute.ExecReadOnly |\
+                    Attribute.ExecImmutable |\
+                    Attribute.Metadata,
+            "validation_function" : validation.is_bool,
+            "category" : AC.CATEGORY_DEPLOYMENT,
+            }),        
         DC.LOG_LEVEL : dict({
             "name" : DC.LOG_LEVEL,
             "help" : "Log level for instance",
index be566e7..237dc47 100644 (file)
@@ -73,6 +73,7 @@ class DeploymentConfiguration:
     
     ROOT_DIRECTORY = "rootDirectory"
     USE_AGENT = "useAgent"
+    USE_SUDO = "useSudo"
     LOG_LEVEL = "logLevel"
     RECOVER = "recover"
     RECOVERY_POLICY = "recoveryPolicy"
index 6b6bf3f..47a9495 100644 (file)
@@ -150,20 +150,22 @@ def get_access_config_params(access_config):
     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
     log_level = to_server_log_level(log_level)
-    user = host = port = agent = key = None
+    user = host = port = agent = key = sudo = None
     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
     environment_setup = (
         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         else ""
     )
-    if communication == DC.ACCESS_SSH:
-        user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
-        host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
-        port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
-        agent = access_config.get_attribute_value(DC.USE_AGENT)
-        key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
-    return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+    user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+    host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+    port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+    agent = access_config.get_attribute_value(DC.USE_AGENT)
+    sudo = access_config.get_attribute_value(DC.USE_SUDO)
+    key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+    communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+    return (root_dir, log_level, communication, user, host, port, key, agent,
+            sudo, environment_setup)
 
 class AccessConfiguration(AttributesMap):
     def __init__(self, params = None):
@@ -215,19 +217,33 @@ def create_experiment_controller(xml, access_config = None):
         
         return controller
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
+        (root_dir, log_level, communication, user, host, port, key, agent,
+                sudo, environment_setup) = get_access_config_params(access_config)
         try:
             return ExperimentControllerProxy(root_dir, log_level,
-                experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                agent = agent, launch = launch,
+                experiment_xml = xml,
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                ident_key = key,
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
                 environment_setup = environment_setup)
         except:
             if not launch:
                 # Maybe controller died, recover from persisted testbed information if possible
                 controller = ExperimentControllerProxy(root_dir, log_level,
-                    experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
-                    agent = agent, launch = True,
+                    experiment_xml = xml,
+                    communication = communication,
+                    host = host, 
+                    port = port, 
+                    user = user, 
+                    ident_key = key,
+                    agent = agent, 
+                    sudo = sudo, 
+                    launch = True,
                     environment_setup = environment_setup)
                 controller.recover()
                 return controller
@@ -245,11 +261,19 @@ def create_testbed_controller(testbed_id, testbed_version, access_config):
             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
         return  _build_testbed_controller(testbed_id, testbed_version)
     elif mode == DC.MODE_DAEMON:
-        (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
-                get_access_config_params(access_config)
-        return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
-                testbed_version = testbed_version, host = host, port = port, ident_key = key,
-                user = user, agent = agent, launch = launch,
+        (root_dir, log_level, communication, user, host, port, key, agent,
+                sudo, environment_setup) = get_access_config_params(access_config)
+        return TestbedControllerProxy(root_dir, log_level, 
+                testbed_id = testbed_id, 
+                testbed_version = testbed_version,
+                communication = communication,
+                host = host, 
+                port = port, 
+                ident_key = key,
+                user = user, 
+                agent = agent, 
+                sudo = sudo, 
+                launch = launch,
                 environment_setup = environment_setup)
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
@@ -829,15 +853,18 @@ class BaseProxy(object):
     _ServerClass = None
     _ServerClassModule = "nepi.util.proxy"
     
-    def __init__(self, 
-            ctor_args, root_dir, 
-            launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
+    def __init__(self, ctor_args, root_dir, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
             environment_setup = ""):
         if launch:
-            # ssh
-            if host != None:
-                python_code = (
+            python_code = (
                     "from %(classmodule)s import %(classname)s;"
                     "s = %(classname)s%(ctor_args)r;"
                     "s.run()" 
@@ -846,22 +873,29 @@ class BaseProxy(object):
                     classmodule = self._ServerClassModule,
                     ctor_args = ctor_args
                 ) )
-                proc = server.popen_ssh_subprocess(python_code, host = host,
-                    port = port, user = user, agent = agent,
-                    ident_key = ident_key,
-                    environment_setup = environment_setup,
-                    waitcommand = True)
-                if proc.poll():
-                    err = proc.stderr.read()
-                    raise RuntimeError, "Server could not be executed: %s" % (err,)
-            else:
-                # launch daemon
-                s = self._ServerClass(*ctor_args)
-                s.run()
-
+            proc = server.popen_python(python_code,
+                        communication = communication,
+                        host = host,
+                        port = port, 
+                        user = user, 
+                        agent = agent,
+                        ident_key = ident_key, 
+                        sudo = sudo,
+                        environment_setup = environment_setup) 
+            # Wait for the server to be ready, otherwise nobody
+            # will be able to connect to it
+            helo = proc.stderr.readline()
+            if helo != 'SERVER_READY.\n':
+                raise AssertionError, "Expected 'SERVER_READY.', got %r: %s" % (helo,
+                        helo + proc.stderr.read())
         # connect client to server
-        self._client = server.Client(root_dir, host = host, port = port, 
-                user = user, agent = agent, 
+        self._client = server.Client(root_dir, 
+                communication = communication,
+                host = host, 
+                port = port, 
+                user = user, 
+                agent = agent, 
+                sudo = sudo,
                 environment_setup = environment_setup)
     
     @staticmethod
@@ -1052,9 +1086,17 @@ class TestbedControllerProxy(BaseProxy):
     
     _ServerClass = TestbedControllerServer
     
-    def __init__(self, root_dir, log_level, testbed_id = None, 
-            testbed_version = None, launch = True, host = None, 
-            port = None, user = None, ident_key = None, agent = None,
+    def __init__(self, root_dir, log_level, 
+            testbed_id = None, 
+            testbed_version = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None,
+            sudo = False, 
             environment_setup = ""):
         if launch and (testbed_id == None or testbed_version == None):
             raise RuntimeError("To launch a TesbedControllerServer a "
@@ -1062,8 +1104,14 @@ class TestbedControllerProxy(BaseProxy):
         super(TestbedControllerProxy,self).__init__(
             ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
+            launch = launch,
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
             environment_setup = environment_setup)
 
     locals().update( BaseProxy._make_stubs(
@@ -1082,14 +1130,28 @@ class TestbedControllerProxy(BaseProxy):
 class ExperimentControllerProxy(BaseProxy):
     _ServerClass = ExperimentControllerServer
     
-    def __init__(self, root_dir, log_level, experiment_xml = None, 
-            launch = True, host = None, port = None, user = None, 
-            ident_key = None, agent = None, environment_setup = ""):
+    def __init__(self, root_dir, log_level, 
+            experiment_xml = None, 
+            launch = True, 
+            communication = DC.ACCESS_LOCAL,
+            host = None, 
+            port = None, 
+            user = None, 
+            ident_key = None, 
+            agent = None, 
+            sudo = False, 
+            environment_setup = ""):
         super(ExperimentControllerProxy,self).__init__(
             ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
             root_dir = root_dir,
-            launch = launch, host = host, port = port, user = user,
-            ident_key = ident_key, agent = agent, 
+            launch = launch, 
+            communication = communication,
+            host = host, 
+            port = port, 
+            user = user,
+            ident_key = ident_key, 
+            agent = agent, 
+            sudo = sudo, 
             environment_setup = environment_setup)
 
     locals().update( BaseProxy._make_stubs(
index 9808b05..b95f357 100644 (file)
@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+from nepi.util.constants import DeploymentConfiguration as DC
+
 import base64
 import errno
 import os
@@ -98,9 +100,11 @@ class Server(object):
                 # first process (the one that did the first fork) returned.
                 os._exit(0)
         except:
+            print >>sys.stderr, "SERVER_ERROR."
             self.log_error()
             self.cleanup()
             os._exit(0)
+        print >>sys.stderr, "SERVER_READY."
 
     def daemonize(self):
         # pipes for process synchronization
@@ -307,7 +311,7 @@ class Forwarder(object):
 
     def forward(self):
         self.connect()
-        print >>sys.stderr, "READY."
+        print >>sys.stderr, "FORWARDER_READY."
         while not self._stop:
             data = self.read_data()
             self.send_to_server(data)
@@ -373,11 +377,14 @@ class Forwarder(object):
 
 class Client(object):
     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
-            agent = None, environment_setup = ""):
+            agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
+            environment_setup = ""):
         self.root_dir = root_dir
         self.addr = (host, port)
         self.user = user
         self.agent = agent
+        self.sudo = sudo
+        self.communication = communication
         self.environment_setup = environment_setup
         self._stopped = False
         self._deferreds = collections.deque()
@@ -393,31 +400,26 @@ class Client(object):
         (host, port) = self.addr
         user = self.user
         agent = self.agent
+        sudo = self.sudo
+        communication = self.communication
         
         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
                 c.forward()" % (root_dir,)
-        if host != None:
-            self._process = popen_ssh_subprocess(python_code, host, port, 
-                    user, agent,
+
+        self._process = popen_python(python_code, 
+                    communication = communication,
+                    host = host, 
+                    port = port, 
+                    user = user, 
+                    agent = agent, 
+                    sudo = sudo, 
                     environment_setup = self.environment_setup)
-            # popen_ssh_subprocess already waits for readiness
-            if self._process.poll():
-                err = proc.stderr.read()
-                raise RuntimeError("Client could not be reached: %s" % \
-                        err)
-        else:
-            self._process = subprocess.Popen(
-                    ["python", "-c", python_code],
-                    stdin = subprocess.PIPE, 
-                    stdout = subprocess.PIPE,
-                    stderr = subprocess.PIPE
-                )
-                
+               
         # Wait for the forwarder to be ready, otherwise nobody
         # will be able to connect to it
         helo = self._process.stderr.readline()
-        if helo != 'READY.\n':
-            raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
+        if helo != 'FORWARDER_READY.\n':
+            raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
                     helo + self._process.stderr.read())
         
     def send_msg(self, msg):
@@ -520,309 +522,321 @@ def _make_server_key_args(server_key, host, port, args):
     return tmp_known_hosts
 
 def popen_ssh_command(command, host, port, user, agent, 
-            stdin="", 
-            ident_key = None,
-            server_key = None,
-            tty = False,
-            timeout = None,
-            retry = 0,
-            err_on_timeout = True):
-        """
-        Executes a remote commands, returns ((stdout,stderr),process)
-        """
-        if TRACE:
-            print "ssh", host, command
+        stdin="", 
+        ident_key = None,
+        server_key = None,
+        tty = False,
+        timeout = None,
+        retry = 0,
+        err_on_timeout = True):
+    """
+    Executes a remote commands, returns ((stdout,stderr),process)
+    """
+    if TRACE:
+        print "ssh", host, command
+    
+    tmp_known_hosts = None
+    args = ['ssh',
+            # Don't bother with localhost. Makes test easier
+            '-o', 'NoHostAuthenticationForLocalhost=yes',
+            '-l', user, host]
+    if agent:
+        args.append('-A')
+    if port:
+        args.append('-p%d' % port)
+    if ident_key:
+        args.extend(('-i', ident_key))
+    if tty:
+        args.append('-t')
+    if server_key:
+        # Create a temporary server key file
+        tmp_known_hosts = _make_server_key_args(
+            server_key, host, port, args)
+    args.append(command)
+
+    while 1:
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+        
+        try:
+            out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+            break
+        except RuntimeError,e:
+            if retry <= 0:
+                raise
+            if TRACE:
+                print " timedout -> ", e.args
+            retry -= 1
+        
+    if TRACE:
+        print " -> ", out, err
+
+    return ((out, err), proc)
+
+def popen_scp(source, dest, 
+        port = None, 
+        agent = None, 
+        recursive = False,
+        ident_key = None,
+        server_key = None):
+    """
+    Copies from/to remote sites.
+    
+    Source and destination should have the user and host encoded
+    as per scp specs.
+    
+    If source is a file object, a special mode will be used to
+    create the remote file with the same contents.
+    
+    If dest is a file object, the remote file (source) will be
+    read and written into dest.
+    
+    In these modes, recursive cannot be True.
+    
+    Source can be a list of files to copy to a single destination,
+    in which case it is advised that the destination be a folder.
+    """
+    
+    if TRACE:
+        print "scp", source, dest
+    
+    if isinstance(source, file) and source.tell() == 0:
+        source = source.name
+    elif hasattr(source, 'read'):
+        tmp = tempfile.NamedTemporaryFile()
+        while True:
+            buf = source.read(65536)
+            if buf:
+                tmp.write(buf)
+            else:
+                break
+        tmp.seek(0)
+        source = tmp.name
+    
+    if isinstance(source, file) or isinstance(dest, file) \
+            or hasattr(source, 'read')  or hasattr(dest, 'write'):
+        assert not recursive
         
+        # Parse source/destination as <user>@<server>:<path>
+        if isinstance(dest, basestring) and ':' in dest:
+            remspec, path = dest.split(':',1)
+        elif isinstance(source, basestring) and ':' in source:
+            remspec, path = source.split(':',1)
+        else:
+            raise ValueError, "Both endpoints cannot be local"
+        user,host = remspec.rsplit('@',1)
         tmp_known_hosts = None
-        args = ['ssh',
+        
+        args = ['ssh', '-l', user, '-C',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
-                '-l', user, host]
-        if agent:
-            args.append('-A')
+                host ]
         if port:
-            args.append('-p%d' % port)
+            args.append('-P%d' % port)
         if ident_key:
             args.extend(('-i', ident_key))
-        if tty:
-            args.append('-t')
         if server_key:
             # Create a temporary server key file
             tmp_known_hosts = _make_server_key_args(
                 server_key, host, port, args)
-        args.append(command)
-
-        while 1:
-            # connects to the remote host and starts a remote connection
-            proc = subprocess.Popen(args, 
-                    stdout = subprocess.PIPE,
-                    stdin = subprocess.PIPE, 
-                    stderr = subprocess.PIPE)
-            
-            # attach tempfile object to the process, to make sure the file stays
-            # alive until the process is finished with it
-            proc._known_hosts = tmp_known_hosts
-            
-            try:
-                out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-                break
-            except RuntimeError,e:
-                if retry <= 0:
-                    raise
-                if TRACE:
-                    print " timedout -> ", e.args
-                retry -= 1
-            
-        if TRACE:
-            print " -> ", out, err
-
-        return ((out, err), proc)
-def popen_scp(source, dest, 
-            port = None, 
-            agent = None, 
-            recursive = False,
-            ident_key = None,
-            server_key = None):
-        """
-        Copies from/to remote sites.
-        
-        Source and destination should have the user and host encoded
-        as per scp specs.
-        
-        If source is a file object, a special mode will be used to
-        create the remote file with the same contents.
         
-        If dest is a file object, the remote file (source) will be
-        read and written into dest.
-        
-        In these modes, recursive cannot be True.
-        
-        Source can be a list of files to copy to a single destination,
-        in which case it is advised that the destination be a folder.
-        """
-        
-        if TRACE:
-            print "scp", source, dest
+        if isinstance(source, file) or hasattr(source, 'read'):
+            args.append('cat > %s' % (shell_escape(path),))
+        elif isinstance(dest, file) or hasattr(dest, 'write'):
+            args.append('cat %s' % (shell_escape(path),))
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
         
-        if isinstance(source, file) and source.tell() == 0:
-            source = source.name
+        # connects to the remote host and starts a remote connection
+        if isinstance(source, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
+        elif isinstance(dest, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
         elif hasattr(source, 'read'):
-            tmp = tempfile.NamedTemporaryFile()
+            # file-like (but not file) source
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = subprocess.PIPE)
+            
+            buf = None
+            err = []
             while True:
-                buf = source.read(65536)
-                if buf:
-                    tmp.write(buf)
-                else:
+                if not buf:
+                    buf = source.read(4096)
+                if not buf:
+                    #EOF
                     break
-            tmp.seek(0)
-            source = tmp.name
-        
-        if isinstance(source, file) or isinstance(dest, file) \
-                or hasattr(source, 'read')  or hasattr(dest, 'write'):
-            assert not recursive
-            
-            # Parse source/destination as <user>@<server>:<path>
-            if isinstance(dest, basestring) and ':' in dest:
-                remspec, path = dest.split(':',1)
-            elif isinstance(source, basestring) and ':' in source:
-                remspec, path = source.split(':',1)
-            else:
-                raise ValueError, "Both endpoints cannot be local"
-            user,host = remspec.rsplit('@',1)
-            tmp_known_hosts = None
-            
-            args = ['ssh', '-l', user, '-C',
-                    # Don't bother with localhost. Makes test easier
-                    '-o', 'NoHostAuthenticationForLocalhost=yes',
-                    host ]
-            if port:
-                args.append('-P%d' % port)
-            if ident_key:
-                args.extend(('-i', ident_key))
-            if server_key:
-                # Create a temporary server key file
-                tmp_known_hosts = _make_server_key_args(
-                    server_key, host, port, args)
-            
-            if isinstance(source, file) or hasattr(source, 'read'):
-                args.append('cat > %s' % (shell_escape(path),))
-            elif isinstance(dest, file) or hasattr(dest, 'write'):
-                args.append('cat %s' % (shell_escape(path),))
-            else:
-                raise AssertionError, "Unreachable code reached! :-Q"
+                
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr],
+                    [proc.stdin],
+                    [proc.stderr,proc.stdin])
+                
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdin in wrdy:
+                    proc.stdin.write(buf)
+                    buf = None
+                
+                if broken:
+                    break
+            proc.stdin.close()
+            err.append(proc.stderr.read())
+                
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,''.join(err)), proc)
+        elif hasattr(dest, 'write'):
+            # file-like (but not file) dest
+            proc = subprocess.Popen(args, 
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE,
+                    stdin = open('/dev/null','w'))
             
-            # connects to the remote host and starts a remote connection
-            if isinstance(source, file):
-                proc = subprocess.Popen(args, 
-                        stdout = open('/dev/null','w'),
-                        stderr = subprocess.PIPE,
-                        stdin = source)
-                err = proc.stderr.read()
-                proc._known_hosts = tmp_known_hosts
-                eintr_retry(proc.wait)()
-                return ((None,err), proc)
-            elif isinstance(dest, file):
-                proc = subprocess.Popen(args, 
-                        stdout = open('/dev/null','w'),
-                        stderr = subprocess.PIPE,
-                        stdin = source)
-                err = proc.stderr.read()
-                proc._known_hosts = tmp_known_hosts
-                eintr_retry(proc.wait)()
-                return ((None,err), proc)
-            elif hasattr(source, 'read'):
-                # file-like (but not file) source
-                proc = subprocess.Popen(args, 
-                        stdout = open('/dev/null','w'),
-                        stderr = subprocess.PIPE,
-                        stdin = subprocess.PIPE)
+            buf = None
+            err = []
+            while True:
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr, proc.stdout],
+                    [],
+                    [proc.stderr, proc.stdout])
                 
-                buf = None
-                err = []
-                while True:
-                    if not buf:
-                        buf = source.read(4096)
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdout in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    buf = os.read(proc.stdout.fileno(), 4096)
+                    dest.write(buf)
+                    
                     if not buf:
                         #EOF
                         break
-                    
-                    rdrdy, wrdy, broken = select.select(
-                        [proc.stderr],
-                        [proc.stdin],
-                        [proc.stderr,proc.stdin])
-                    
-                    if proc.stderr in rdrdy:
-                        # use os.read for fully unbuffered behavior
-                        err.append(os.read(proc.stderr.fileno(), 4096))
-                    
-                    if proc.stdin in wrdy:
-                        proc.stdin.write(buf)
-                        buf = None
-                    
-                    if broken:
-                        break
-                proc.stdin.close()
-                err.append(proc.stderr.read())
-                    
-                proc._known_hosts = tmp_known_hosts
-                eintr_retry(proc.wait)()
-                return ((None,''.join(err)), proc)
-            elif hasattr(dest, 'write'):
-                # file-like (but not file) dest
-                proc = subprocess.Popen(args, 
-                        stdout = subprocess.PIPE,
-                        stderr = subprocess.PIPE,
-                        stdin = open('/dev/null','w'))
                 
-                buf = None
-                err = []
-                while True:
-                    rdrdy, wrdy, broken = select.select(
-                        [proc.stderr, proc.stdout],
-                        [],
-                        [proc.stderr, proc.stdout])
-                    
-                    if proc.stderr in rdrdy:
-                        # use os.read for fully unbuffered behavior
-                        err.append(os.read(proc.stderr.fileno(), 4096))
-                    
-                    if proc.stdout in rdrdy:
-                        # use os.read for fully unbuffered behavior
-                        buf = os.read(proc.stdout.fileno(), 4096)
-                        dest.write(buf)
-                        
-                        if not buf:
-                            #EOF
-                            break
-                    
-                    if broken:
-                        break
-                err.append(proc.stderr.read())
-                    
-                proc._known_hosts = tmp_known_hosts
-                eintr_retry(proc.wait)()
-                return ((None,''.join(err)), proc)
-            else:
-                raise AssertionError, "Unreachable code reached! :-Q"
-        else:
-            # Parse destination as <user>@<server>:<path>
-            if isinstance(dest, basestring) and ':' in dest:
-                remspec, path = dest.split(':',1)
-            elif isinstance(source, basestring) and ':' in source:
-                remspec, path = source.split(':',1)
-            else:
-                raise ValueError, "Both endpoints cannot be local"
-            user,host = remspec.rsplit('@',1)
-            
-            # plain scp
-            tmp_known_hosts = None
-            args = ['scp', '-q', '-p', '-C',
-                    # Don't bother with localhost. Makes test easier
-                    '-o', 'NoHostAuthenticationForLocalhost=yes' ]
-            if port:
-                args.append('-P%d' % port)
-            if recursive:
-                args.append('-r')
-            if ident_key:
-                args.extend(('-i', ident_key))
-            if server_key:
-                # Create a temporary server key file
-                tmp_known_hosts = _make_server_key_args(
-                    server_key, host, port, args)
-            if isinstance(source,list):
-                args.extend(source)
-            else:
-                args.append(source)
-            args.append(dest)
-
-            # connects to the remote host and starts a remote connection
-            proc = subprocess.Popen(args, 
-                    stdout = subprocess.PIPE,
-                    stdin = subprocess.PIPE, 
-                    stderr = subprocess.PIPE)
+                if broken:
+                    break
+            err.append(proc.stderr.read())
+                
             proc._known_hosts = tmp_known_hosts
-            
-            comm = proc.communicate()
             eintr_retry(proc.wait)()
-            return (comm, proc)
-def popen_ssh_subprocess(python_code, host, port, user, agent, 
+            return ((None,''.join(err)), proc)
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
+    else:
+        # Parse destination as <user>@<server>:<path>
+        if isinstance(dest, basestring) and ':' in dest:
+            remspec, path = dest.split(':',1)
+        elif isinstance(source, basestring) and ':' in source:
+            remspec, path = source.split(':',1)
+        else:
+            raise ValueError, "Both endpoints cannot be local"
+        user,host = remspec.rsplit('@',1)
+        
+        # plain scp
+        tmp_known_hosts = None
+        args = ['scp', '-q', '-p', '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes' ]
+        if port:
+            args.append('-P%d' % port)
+        if recursive:
+            args.append('-r')
+        if ident_key:
+            args.extend(('-i', ident_key))
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
+        if isinstance(source,list):
+            args.extend(source)
+        else:
+            args.append(source)
+        args.append(dest)
+
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        proc._known_hosts = tmp_known_hosts
+        
+        comm = proc.communicate()
+        eintr_retry(proc.wait)()
+        return (comm, proc)
+
+def decode_and_execute():
+    # The python code we want to execute might have characters that 
+    # are not compatible with the 'inline' mode we are using. To avoid
+    # problems we receive the encoded python code in base64 as a input 
+    # stream and decode it for execution.
+    import base64, os
+    cmd = ""
+    while True:
+        cmd += os.read(0, 1)# one byte from stdin
+        if cmd[-1] == "\n": 
+            break
+    cmd = base64.b64decode(cmd)
+    # Uncomment for debug
+    #os.write(2, "Executing python code: %s\n" % cmd)
+    os.write(1, "OK\n") # send a sync message
+    exec(cmd)
+
+def popen_python(python_code, 
+        communication = DC.ACCESS_LOCAL,
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = False, 
         python_path = None,
         ident_key = None,
         server_key = None,
         tty = False,
-        environment_setup = "",
-        waitcommand = False):
-        cmd = ""
-        if python_path:
-            python_path.replace("'", r"'\''")
-            cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
-            cmd += " ; "
-        if environment_setup:
-            cmd += environment_setup
-            cmd += " ; "
-        # Uncomment for debug (to run everything under strace)
-        # We had to verify if strace works (cannot nest them)
-        #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
-        #cmd += "$CMD "
-        #cmd += "strace -f -tt -s 200 -o strace$$.out "
-        cmd += "python -c '"
-        cmd += "import base64, os\n"
-        cmd += "cmd = \"\"\n"
-        cmd += "while True:\n"
-        cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
-        cmd += " if cmd[-1] == \"\\n\": break\n"
-        cmd += "cmd = base64.b64decode(cmd)\n"
-        # Uncomment for debug
-        #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
-        if not waitcommand:
-            cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
-        cmd += "exec(cmd)\n"
-        if waitcommand:
-            cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
-        cmd += "'"
-        
+        sudo = False, 
+        environment_setup = ""):
+
+
+    shell = False
+    cmd = ""
+    if python_path:
+        python_path.replace("'", r"'\''")
+        cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+        cmd += " ; "
+    if environment_setup:
+        cmd += environment_setup
+        cmd += " ; "
+    # Uncomment for debug (to run everything under strace)
+    # We had to verify if strace works (cannot nest them)
+    #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+    #cmd += "$CMD "
+    #cmd += "strace -f -tt -s 200 -o strace$$.out "
+    cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'"
+
+    if communication == DC.ACCESS_SSH:
         tmp_known_hosts = None
         args = ['ssh',
                 # Don't bother with localhost. Makes test easier
@@ -841,23 +855,29 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
             tmp_known_hosts = _make_server_key_args(
                 server_key, host, port, args)
         args.append(cmd)
+    else:
+        args = [cmd]
+        shell = True
 
-        # connects to the remote host and starts a remote rpyc connection
-        proc = subprocess.Popen(args, 
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
+    # connects to the remote host and starts a remote
+    proc = subprocess.Popen(args,
+            shell = shell, 
+            stdout = subprocess.PIPE,
+            stdin = subprocess.PIPE, 
+            stderr = subprocess.PIPE)
+
+    if communication == DC.ACCESS_SSH:
         proc._known_hosts = tmp_known_hosts
-        
-        # send the command to execute
-        os.write(proc.stdin.fileno(),
-                base64.b64encode(python_code) + "\n")
-        msg = os.read(proc.stdout.fileno(), 3)
-        if msg != "OK\n":
-            raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
-                msg, proc.stdout.read(), proc.stderr.read())
-        return proc
 
+    # send the command to execute
+    os.write(proc.stdin.fileno(),
+            base64.b64encode(python_code) + "\n")
+    msg = os.read(proc.stdout.fileno(), 3)
+    if msg != "OK\n":
+        raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
+            msg, proc.stdout.read(), proc.stderr.read())
+
+    return proc
 
 # POSIX
 def _communicate(self, input, timeout=None, err_on_timeout=True):
index 3f22eb4..eecded0 100755 (executable)
@@ -117,6 +117,12 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
+
         controller = proxy.create_experiment_controller(xml, access_config)
 
         controller.start()
@@ -146,6 +152,11 @@ class ExecuteTestCase(unittest.TestCase):
         
         desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         desc.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
 
         xml = exp_desc.to_xml()
         
@@ -182,6 +193,11 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
         controller = proxy.create_experiment_controller(xml, access_config)
 
         controller.start()
@@ -229,6 +245,11 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
         controller = proxy.create_experiment_controller(xml, access_config)
 
         controller.start()
@@ -327,9 +348,6 @@ class ExecuteTestCase(unittest.TestCase):
         controller.shutdown()
 
     def test_ssh_daemonized_integration(self):
-        # TODO: This test doesn't run because
-        # sys.modules["nepi.testbeds.mock"] = mock
-        # is not set in the ssh process
         exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
         env = test_util.test_environment()
         
@@ -337,12 +355,6 @@ class ExecuteTestCase(unittest.TestCase):
         inst_root_dir = os.path.join(self.root_dir, "instance")
         os.mkdir(inst_root_dir)
         desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-        desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
-            "export PYTHONPATH=%r:%r:$PYTHONPATH ; "
-            "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % (
-                os.path.dirname(os.path.dirname(mock.__file__)),
-                os.path.dirname(os.path.dirname(mock2.__file__)),))
-        
         xml = exp_desc.to_xml()
         
         access_config = proxy.AccessConfiguration()
@@ -351,6 +363,11 @@ class ExecuteTestCase(unittest.TestCase):
         access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
         access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
         access_config.set_attribute_value(DC.USE_AGENT, True)
+        access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "export PYTHONPATH=%r:%r:$PYTHONPATH "
+            "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+                os.path.dirname(os.path.dirname(mock.__file__)),
+                os.path.dirname(os.path.dirname(mock2.__file__)),))
         controller = proxy.create_experiment_controller(xml, access_config)
 
         try:
index 72a352f..4c33e83 100755 (executable)
@@ -17,7 +17,8 @@ class NetnsIntegrationTestCase(unittest.TestCase):
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
-    def _test_if(self, daemonize_testbed, controller_access_configuration):
+    def _test_switched(self, controller_access_config = None,
+            testbed_access_config = None):
         exp_desc = ExperimentDescription()
         testbed_id = "netns"
         user = getpass.getuser()
@@ -47,20 +48,15 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         app.connector("node").connect(node1.connector("apps"))
         app.enable_trace("stdout")
 
-        if daemonize_testbed:
-            netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-            inst_root_dir = os.path.join(self.root_dir, "instance")
-            os.mkdir(inst_root_dir)
-            netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-            netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        if testbed_access_config:
+            for attr in testbed_access_config.attributes:
+                if attr.value:
+                    netns_desc.set_attribute_value(attr.name, attr.value)
 
         xml = exp_desc.to_xml()
 
-        if controller_access_configuration:
-            controller = proxy.create_experiment_controller(xml, 
-                controller_access_configuration)
-        else:
-            controller = ExperimentController(xml, self.root_dir)
+        controller = proxy.create_experiment_controller(xml, 
+                controller_access_config)
         
         try:
             controller.start()
@@ -78,52 +74,112 @@ class NetnsIntegrationTestCase(unittest.TestCase):
             controller.shutdown()
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_local_if(self):
-        self._test_if(
-            daemonize_testbed = False,
-            controller_access_configuration = None)
+    def test_switched(self):
+        self._test_switched()
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_all_daemonized_controller(self):
+    def test_daemonized_controller(self):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
         access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
         
-        self._test_if(
-            daemonize_testbed = False,
-            controller_access_configuration = access_config)
+        self._test_switched(controller_access_config = access_config)
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_all_daemonized_if(self):
+    def test_daemonized_tbd(self):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
         access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
         
-        self._test_if(
-            daemonize_testbed = True,
-            controller_access_configuration = access_config)
+        self._test_switched(testbed_access_config = access_config)
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_all_ssh_daemonized_if(self):
+    def test_daemonized_all(self):
+        controller_access_config = proxy.AccessConfiguration()
+        controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+        testbed_access_config = proxy.AccessConfiguration()
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+        
+        self._test_switched(
+                controller_access_config = controller_access_config,
+                testbed_access_config = testbed_access_config)
+
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_ssh_daemonized_tbd(self):
         env = test_util.test_environment()
+
+        testbed_access_config = proxy.AccessConfiguration()
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+        testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
         
-        access_config = proxy.AccessConfiguration()
-        access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
-        access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-        access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
-        access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
-        access_config.set_attribute_value(DC.USE_AGENT, True)
+        self._test_switched(
+                testbed_access_config = testbed_access_config)
+
+    def test_sudo_daemonized_tbd(self):
+        env = test_util.test_environment()
+
+        testbed_access_config = proxy.AccessConfiguration()
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+        testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+        testbed_access_config.set_attribute_value(DC.USE_SUDO, True)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
         
-        self._test_if(
-            daemonize_testbed = True,
-            controller_access_configuration = access_config)
+        self._test_switched(
+                testbed_access_config = testbed_access_config)
+
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_ssh_daemonized_all(self):
+        env = test_util.test_environment()
+
+        controller_access_config = proxy.AccessConfiguration()
+        controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        controller_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        controller_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+        controller_access_config.set_attribute_value(DC.USE_AGENT, True)
+
+        testbed_access_config = proxy.AccessConfiguration()
+        testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+        # BUG! IT DOESN'T WORK WITH 2 LEVELS OF SSH!
+        #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+        #testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+        
+        self._test_switched(
+                controller_access_config = controller_access_config,
+                testbed_access_config = testbed_access_config)
 
     def tearDown(self):
         try:
-            shutil.rmtree(self.root_dir)
+            #shutil.rmtree(self.root_dir)
+            pass
         except:
             # retry
             time.sleep(0.1)
index 7e4c6f1..0bb3a39 100755 (executable)
@@ -1,8 +1,10 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-import getpass
 from nepi.util import server
+from nepi.util.constants import DeploymentConfiguration as DC
+
+import getpass
 import os
 import shutil
 import sys
@@ -83,16 +85,43 @@ class ServerTestCase(unittest.TestCase):
         reply = c.read_reply()
         self.assertEqual(reply, "Stopping server")
 
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_sudo_server(self):
+        env = test_util.test_environment()
+        user = getpass.getuser()
+        # launch server
+        python_code = "from nepi.util import server;s=server.Server('%s');\
+                s.run()" % self.root_dir
+        server.popen_python(python_code, 
+                sudo = True)
+        c = server.Client(self.root_dir, 
+                sudo = True)
+        c.send_msg("Hola")
+        reply = c.read_reply()
+        self.assertEqual(reply, "Reply to: Hola")
+        c.send_stop()
+        reply = c.read_reply()
+        self.assertEqual(reply, "Stopping server")
+
+
     def test_ssh_server(self):
         env = test_util.test_environment()
         user = getpass.getuser()
         # launch server
         python_code = "from nepi.util import server;s=server.Server('%s');\
                 s.run()" % self.root_dir
-        server.popen_ssh_subprocess(python_code, host = "localhost", 
-                port = env.port, user = user, agent = True)
-        c = server.Client(self.root_dir, host = "localhost", port = env.port,
-                user = user, agent = True)
+        server.popen_python(python_code, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port, 
+                user = user, 
+                agent = True)
+        c = server.Client(self.root_dir, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port,
+                user = user, 
+                agent = True)
         c.send_msg("Hola")
         reply = c.read_reply()
         self.assertEqual(reply, "Reply to: Hola")
@@ -106,11 +135,19 @@ class ServerTestCase(unittest.TestCase):
         # launch server
         python_code = "from nepi.util import server;s=server.Server('%s');\
                 s.run()" % self.root_dir
-        server.popen_ssh_subprocess(python_code, host = "localhost", 
-                port = env.port, user = user, agent = True)
+        server.popen_python(python_code, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port, 
+                user = user, 
+                agent = True)
         
-        c = server.Client(self.root_dir, host = "localhost", port = env.port,
-                user = user, agent = True)
+        c = server.Client(self.root_dir, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port,
+                user = user, 
+                agent = True)
                 
         c.send_msg("Hola")
         reply = c.read_reply()
@@ -120,8 +157,12 @@ class ServerTestCase(unittest.TestCase):
         del c
         
         # reconnect
-        c = server.Client(self.root_dir, host = "localhost", port = env.port,
-                user = user, agent = True)
+        c = server.Client(self.root_dir,
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port,
+                user = user, 
+                agent = True)
                 
         c.send_msg("Hola")
         reply = c.read_reply()
@@ -137,11 +178,19 @@ class ServerTestCase(unittest.TestCase):
         # launch server
         python_code = "from nepi.util import server;s=server.Server('%s');\
                 s.run()" % self.root_dir
-        server.popen_ssh_subprocess(python_code, host = "localhost", 
-                port = env.port, user = user, agent = True)
+        server.popen_python(python_code, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port, 
+                user = user, 
+                agent = True)
         
-        c = server.Client(self.root_dir, host = "localhost", port = env.port,
-                user = user, agent = True)
+        c = server.Client(self.root_dir, 
+                communication = DC.ACCESS_SSH,
+                host = "localhost", 
+                port = env.port,
+                user = user, 
+                agent = True)
                 
         c.send_msg("Hola")
         reply = c.read_reply()