adding first version of ns3wrapper
[nepi.git] / src / neco / resources / linux / node.py
index 3aa063c..e00562f 100644 (file)
@@ -1,28 +1,46 @@
 from neco.execution.resource import Resource
 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
-        rspawn, rcheck_pid, rstatus, rkill, RUNNING 
+        rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING 
 
 import cStringIO
 import logging
 import os.path
+import subprocess
 
 class LinuxNode(Resource):
-    def __init__(self, box, ec):
-        super(LinuxNode, self).__init__(box, ec)
+    def __init__(self, ec, guif):
+        super(LinuxNode, self).__init__(ec, guid)
         self.ip = None
         self.host = None
         self.user = None
         self.port = None
         self.identity_file = None
+        self.enable_x11 = False
+        self.forward_agent = True
+
         # packet management system - either yum or apt for now...
         self._pm = None
        
         # Logging
         loglevel = "debug"
         self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
-                self.box.guid)
+                self.guid)
         self._logger.setLevel(getattr(logging, loglevel.upper()))
 
+        # For ssh connections we use the ControlMaster option which 
+        # allows us to decrease the number of open ssh network connections.
+        # Subsequent ssh connections will reuse a same master connection.
+        # This might pose a problem when using X11 and ssh-agent, since
+        # display and agent forwarded will be those of the first connection,
+        # which created the master. 
+        # To avoid reusing a master created by a previous LinuxNode instance,
+        # we explicitly erase the ControlPath socket.
+        control_path = make_control_path(self.user, self.host, self.port)
+        try:
+            os.remove(control_path)
+        except:
+            pass
+
     @property
     def pm(self):
         if self._pm:
@@ -46,6 +64,10 @@ class LinuxNode(Resource):
 
         return self._pm
 
+    @property
+    def is_localhost(self):
+        return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
+
     def install(self, packages):
         if not isinstance(packages, list):
             packages = [packages]
@@ -66,35 +88,32 @@ class LinuxNode(Resource):
         if not os.path.isfile(src):
             src = cStringIO.StringIO(src)
 
-        (out, err), proc = eintr_retry(rcopy)(
-            src, dst, 
-            self.host or self.ip, 
-            self.user,
-            port = self.port,
-            identity_file = self.identity_file)
-
-        if proc.wait():
-            msg = "Error uploading to %s got:\n%s%s" %\
-                    (self.host or self.ip, out, err)
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+        if not self.is_localhost:
+            # Build destination as <user>@<server>:<path>
+            dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
+        return self.copy(src, dst)
 
+    def download(self, src, dst):
+        if not self.is_localhost:
+            # Build destination as <user>@<server>:<path>
+            src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+        return self.copy(src, dst)
+        
     def is_alive(self, verbose = False):
-        (out, err), proc = eintr_retry(rexec)(
-                "echo 'ALIVE'",
-                self.host or self.ip, 
-                self.user,
-                port = self.port, 
-                identity_file = self.identity_file,
+        if self.is_localhost:
+            return True
+
+        try:
+            out = self.execute("echo 'ALIVE'",
                 timeout = 60,
                 err_on_timeout = False,
                 persistent = False)
-        
-        if proc.wait():
+        except:
             if verbose:
                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
             return False
-        elif out.strip().startswith('ALIVE'):
+
+        if out.strip().startswith('ALIVE'):
             return True
         else:
             if verbose:
@@ -105,24 +124,45 @@ class LinuxNode(Resource):
         if clean:
             self.rmdir(path)
 
-        self.execute(
+        return self.execute(
             "mkdir -p %s" % path,
             timeout = 120,
             retry = 3
             )
 
     def rmdir(self, path):
-        self.execute(
+        return self.execute(
             "rm -rf %s" % path,
             timeout = 120,
             retry = 3
             )
 
+    def copy(self, src, dst):
+        if self.is_localhost:
+            command = ["cp", "-R", src, dst]
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            (out, err), proc = eintr_retry(rcopy)(
+                src, dst, 
+                port = self.port,
+                agent = self.agent,
+                identity_file = self.identity_file)
+
+            if proc.wait():
+                msg = "Error uploading to %s got:\n%s%s" %\
+                        (self.host or self.ip, out, err)
+                self._logger.error(msg)
+                raise RuntimeError(msg)
+
+        return (out, err)
+
     def execute(self, command,
-            agent = True,
             sudo = False,
-            stdin = ""
+            stdin = None
             tty = False,
+            env = None,
             timeout = None,
             retry = 0,
             err_on_timeout = True,
@@ -131,29 +171,46 @@ class LinuxNode(Resource):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
         use 'run' instead."""
-        (out, err), proc = eintr_retry(rexec)(
-                command, 
-                self.host or self.ip, 
-                self.user,
-                port = self.port, 
-                agent = agent,
-                sudo = sudo,
-                stdin = stdin, 
-                identity_file = self.identity_file,
-                tty = tty,
-                timeout = timeout,
-                retry = retry,
-                err_on_timeout = err_on_timeout,
-                connect_timeout = connect_timeout,
-                persistent = persistent)
-
-        if proc.wait():
-            msg = "Failed to execute command %s at node %s: %s %s" % \
-                    (command, self.host or self.ip, out, err,)
-            self._logger.warn(msg)
-            raise RuntimeError(msg)
 
-        return out
+        if self.is_localhost:
+            if env:
+                export = ''
+                for envkey, envval in env.iteritems():
+                    export += '%s=%s ' % (envkey, envval)
+                command = export + command
+
+            if sudo:
+                command = "sudo " + command
+
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            (out, err), proc = eintr_retry(rexec)(
+                    command, 
+                    self.host or self.ip, 
+                    self.user,
+                    port = self.port, 
+                    agent = self.forward_agent,
+                    sudo = sudo,
+                    stdin = stdin, 
+                    identity_file = self.identity_file,
+                    tty = tty,
+                    x11 = self.enable_x11,
+                    env = env,
+                    timeout = timeout,
+                    retry = retry,
+                    err_on_timeout = err_on_timeout,
+                    connect_timeout = connect_timeout,
+                    persistent = persistent)
+
+            if proc.wait():
+                msg = "Failed to execute command %s at node %s: %s %s" % \
+                        (command, self.host or self.ip, out, err,)
+                self._logger.warn(msg)
+                raise RuntimeError(msg)
+
+        return (out, err)
 
     def run(self, command, home, 
             stdin = None, 
@@ -161,25 +218,58 @@ class LinuxNode(Resource):
             stderr = 'stderr', 
             sudo = False):
         self._logger.info("Running %s", command)
-
-        # Start process in a "daemonized" way, using nohup and heavy
-        # stdin/out redirection to avoid connection issues
-        (out,err), proc = rspawn(
-            command,
-            pidfile = './pid',
-            home = home,
-            stdin = stdin if stdin is not None else '/dev/null',
-            stdout = stdout if stdout else '/dev/null',
-            stderr = stderr if stderr else '/dev/null',
-            sudo = sudo,
-            host = self.host,
-            user = self.user,
-            port = self.port,
-            identity_file = self.identity_file
-            )
         
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+        pidfile = './pid',
+
+        if self.is_localhost:
+            if stderr == stdout:
+                stderr = '&1'
+            else:
+                stderr = ' ' + stderr
+            
+            daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+                'command' : command,
+                'pidfile' : pidfile,
+                
+                'stdout' : stdout,
+                'stderr' : stderr,
+                'stdin' : stdin,
+            }
+            
+            cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
+                    'command' : daemon_command,
+                    
+                    'sudo' : 'sudo -S' if sudo else '',
+                    
+                    'pidfile' : pidfile,
+                    'gohome' : 'cd %s ; ' % home if home else '',
+                    'create' : 'mkdir -p %s ; ' % home if create_home else '',
+                }
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            # Start process in a "daemonized" way, using nohup and heavy
+            # stdin/out redirection to avoid connection issues
+            (out,err), proc = rspawn(
+                command,
+                pidfile = pidfile,
+                home = home,
+                stdin = stdin if stdin is not None else '/dev/null',
+                stdout = stdout if stdout else '/dev/null',
+                stderr = stderr if stderr else '/dev/null',
+                sudo = sudo,
+                host = self.host,
+                user = self.user,
+                port = self.port,
+                agent = self.forward_agent,
+                identity_file = self.identity_file
+                )
+            
+            if proc.wait():
+                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        return (out, err)
     
     def checkpid(self, path):            
         # Get PID/PPID
@@ -189,6 +279,7 @@ class LinuxNode(Resource):
             host = self.host,
             user = self.user,
             port = self.port,
+            agent = self.forward_agent,
             identity_file = self.identity_file
             )
         
@@ -200,6 +291,7 @@ class LinuxNode(Resource):
                 host = self.host,
                 user = self.user,
                 port = self.port,
+                agent = self.forward_agent,
                 identity_file = self.identity_file
                 )
            
@@ -214,6 +306,7 @@ class LinuxNode(Resource):
                 host = self.host,
                 user = self.user,
                 port = self.port,
+                agent = self.forward_agent,
                 sudo = sudo,
                 identity_file = self.identity_file
                 )