adding back use of sudo in sshfuncs.rexec
[nepi.git] / src / nepi / resources / linux / node.py
index 78de7fd..f5a5429 100644 (file)
@@ -1,29 +1,29 @@
-"""
-    NEPI, a framework to manage network experiments
-    Copyright (C) 2013 INRIA
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
 
 from nepi.execution.attribute import Attribute, Flags
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
-from nepi.util import sshfuncs, execfuncs 
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
 
 import collections
 
 import collections
-import logging
 import os
 import random
 import re
 import os
 import random
 import re
@@ -38,6 +38,25 @@ import threading
 
 reschedule_delay = "0.5s"
 
 
 reschedule_delay = "0.5s"
 
+class ExitCode:
+    """
+    Error codes that the rexitcode function can return if unable to
+    check the exit code of a spawned process
+    """
+    FILENOTFOUND = -1
+    CORRUPTFILE = -2
+    ERROR = -3
+    OK = 0
+
+class OSType:
+    """
+    Supported flavors of Linux OS
+    """
+    FEDORA_12 = "f12"
+    FEDORA_14 = "f14"
+    FEDORA = "fedora"
+    UBUNTU = "ubuntu"
+    DEBIAN = "debian"
 
 @clsinit
 class LinuxNode(ResourceManager):
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -91,8 +110,6 @@ class LinuxNode(ResourceManager):
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
-
-        self._logger = logging.getLogger("LinuxNode")
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -129,13 +146,13 @@ class LinuxNode(ResourceManager):
             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
-            self._os = "f12"
+            self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
         elif out.find("Fedora release 14") == 0:
-            self._os = "f14"
+            self._os = OSType.FEDORA_14
         elif out.find("Debian") == 0: 
         elif out.find("Debian") == 0: 
-            self._os = "debian"
+            self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
         elif out.find("Ubuntu") ==0:
-            self._os = "ubuntu"
+            self._os = OSType.UBUNTU
         else:
             msg = "Unsupported OS"
             self.error(msg, out)
         else:
             msg = "Unsupported OS"
             self.error(msg, out)
@@ -167,8 +184,8 @@ class LinuxNode(ResourceManager):
     def deploy(self):
         if self.state == ResourceState.NEW:
             try:
     def deploy(self):
         if self.state == ResourceState.NEW:
             try:
-               self.discover()
-               self.provision()
+                self.discover()
+                self.provision()
             except:
                 self._state = ResourceState.FAILED
                 raise
             except:
                 self._state = ResourceState.FAILED
                 raise
@@ -267,46 +284,46 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.install_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.install_packages_command(self.os, packages)
+    def install_packages(self, packages, home):
+        command = ""
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+            command = rpmfuncs.install_packages_command(self.os, packages)
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+            command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "instpkg_pid",
-            stdout = "instpkg_out", 
-            stderr = "instpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "instpkg.sh",
+            pidfile = "instpkg_pidfile",
+            ecodefile = "instpkg_exitcode",
+            stdout = "instpkg_stdout", 
+            stderr = "instpkg_stderr",
             raise_on_error = True)
 
         return (out, err), proc 
 
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.remove_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.remove_packages_command(self.os, packages)
+    def remove_packages(self, packages, home):
+        command = ""
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+            command = rpmfuncs.remove_packages_command(self.os, packages)
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+            command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "rmpkg_pid",
-            stdout = "rmpkg_out", 
-            stderr = "rmpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "rmpkg.sh",
+            pidfile = "rmpkg_pidfile",
+            ecodefile = "rmpkg_exitcode",
+            stdout = "rmpkg_stdout", 
+            stderr = "rmpkg_stderr",
             raise_on_error = True)
          
         return (out, err), proc 
             raise_on_error = True)
          
         return (out, err), proc 
@@ -319,22 +336,31 @@ class LinuxNode(ResourceManager):
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
-
-    def run_and_wait(self, command, 
-            home = ".", 
-            pidfile = "pid", 
+        
+    def run_and_wait(self, command, home, 
+            shfile = "cmd.sh",
+            env = None,
+            pidfile = "pidfile", 
+            ecodefile = "exitcode", 
             stdin = None, 
             stdin = None, 
-            stdout = 'stdout'
-            stderr = 'stderr'
+            stdout = "stdout"
+            stderr = "stderr"
             sudo = False,
             tty = False,
             raise_on_error = False):
             sudo = False,
             tty = False,
             raise_on_error = False):
-        """ runs a command in background on the remote host, but waits
-            until the command finishes execution.
-            This is more robust than doing a simple synchronized 'execute',
-            since in the remote host the command can continue to run detached
-            even if network disconnections occur
+        """ 
+        runs a command in background on the remote host, busy-waiting
+        until the command finishes execution.
+        This is more robust than doing a simple synchronized 'execute',
+        since in the remote host the command can continue to run detached
+        even if network disconnections occur
         """
         """
+        self.upload_command(command, home, 
+            shfile = shfile, 
+            ecodefile = ecodefile, 
+            env = env)
+
+        command = "bash ./%s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -359,11 +385,11 @@ class LinuxNode(ResourceManager):
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
-       
-        # check if execution errors occurred
-        (out, err), proc = self.check_output(home, stderr)
+      
+        (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
 
-        if err or out:
+        # Out is what was written in the stderr file
+        if out or err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -371,21 +397,97 @@ class LinuxNode(ResourceManager):
                 raise RuntimeError, msg
         
         return (out, err), proc
                 raise RuntimeError, msg
         
         return (out, err), proc
+
+    def exitcode(self, home, ecodefile = "exitcode"):
+        """
+        Get the exit code of an application.
+        Returns an integer value with the exit code 
+        """
+        (out, err), proc = self.check_output(home, ecodefile)
+
+        # Succeeded to open file, return exit code in the file
+        if proc.wait() == 0:
+            try:
+                return int(out.strip())
+            except:
+                # Error in the content of the file!
+                return ExitCode.CORRUPTFILE
+
+        # No such file or directory
+        if proc.returncode == 1:
+            return ExitCode.FILENOTFOUND
+        
+        # Other error from 'cat'
+        return ExitCode.ERROR
+
+    def upload_command(self, command, home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode",
+            env = None):
+        """ Saves the command as a bash script file in the remote host, and
+        forces to save the exit code of the command execution to the ecodefile
+        """
+      
+        # The exit code of the command will be stored in ecodefile
+        command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
+                'command': command,
+                'ecodefile': ecodefile,
+                } 
+
+        # Export environment
+        environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
+            if env else ""
+
+        # Add environ to command
+        command = environ + command
+
+        dst = os.path.join(home, shfile)
+        return self.upload(command, dst, text = True)
+
+    def check_errors(self, home, 
+            ecodefile = "exitcode", 
+            stderr = "stderr"):
+        """
+        Checks whether errors occurred while running a command.
+        It first checks the exit code for the command, and only if the
+        exit code is an error one it returns the error output.
+        """
+        out = err = ""
+        proc = None
+
+        # get Exit code
+        ecode = self.exitcode(home, ecodefile)
+
+        if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+            err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+        elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+            # The process returned an error code or didn't exist. 
+            # Check standard error.
+            (out, err), proc = self.check_output(home, stderr)
+            
+            # If the stderr file was not found, assume nothing happened.
+            # We just ignore the error.
+            # (cat returns 1 for error "No such file or directory")
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
+                out = err = ""
+       
+        return (out, err), proc
  
  
-    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+    def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
-        for i in xrange(5):
-            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+        for i in xrange(4):
+            pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
-                delay = min(30,delay*1.2)
+                delay = delay * 1.5
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
@@ -398,30 +500,26 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        delay = 1.0
-        first = True
-        bustspin = 0
+        start_delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
             
 
         while True:
             status = self.status(pid, ppid)
             
-            if status is sshfuncs.FINISHED:
+            if status is ProcStatus.FINISHED:
                 break
                 break
-            elif status is not sshfuncs.RUNNING:
-                bustspin += 1
-                time.sleep(delay*(5.5+random.random()))
-                if bustspin > 12:
+            elif status is not ProcStatus.RUNNING:
+                delay = delay * 1.5
+                time.sleep(delay)
+                # If it takes more than 20 seconds to start, then
+                # asume something went wrong
+                if delay > 20:
                     break
             else:
                     break
             else:
-                if first:
-                    first = False
-
-                time.sleep(delay*(0.5+random.random()))
-                delay = min(30,delay*1.2)
-                bustspin = 0
+                # The app is running, just wait...
+                time.sleep(0.5)
 
     def check_output(self, home, filename):
 
     def check_output(self, home, filename):
-        """ checks file content """
+        """ Retrives content of file """
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
@@ -451,7 +549,7 @@ class LinuxNode(ResourceManager):
 
     def copy(self, src, dst):
         if self.localhost:
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc =  execfuncs.lcopy(source, dest, 
+            (out, err), proc = execfuncs.lcopy(source, dest, 
                     recursive = True,
                     strict_host_checking = False)
         else:
                     recursive = True,
                     strict_host_checking = False)
         else:
@@ -478,6 +576,7 @@ class LinuxNode(ResourceManager):
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
+            blocking = True,
             with_lock = False
             ):
         """ Notice that this invocation will block until the
             with_lock = False
             ):
         """ Notice that this invocation will block until the
@@ -511,6 +610,7 @@ class LinuxNode(ResourceManager):
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
+                        blocking = blocking, 
                         strict_host_checking = strict_host_checking
                         )
             else:
                         strict_host_checking = strict_host_checking
                         )
             else:
@@ -531,21 +631,22 @@ class LinuxNode(ResourceManager):
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
 
                     )
 
         return (out, err), proc
 
-    def run(self, command, 
-            home = None,
+    def run(self, command, home,
             create_home = False,
             create_home = False,
-            pidfile = "pid",
+            pidfile = 'pidfile',
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
-
+        
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
@@ -558,10 +659,8 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
                     sudo = sudo,
                     user = user) 
         else:
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
             with self._lock:
             with self._lock:
-                (out,err), proc = sshfuncs.rspawn(
+                (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
                     command,
                     pidfile = pidfile,
                     home = home,
@@ -581,12 +680,12 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-    def checkpid(self, home = ".", pidfile = "pid"):
+    def getpid(self, home, pidfile = "pidfile"):
         if self.localhost:
         if self.localhost:
-            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
             with self._lock:
         else:
             with self._lock:
-                pidtuple = sshfuncs.rcheckpid(
+                pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -597,7 +696,7 @@ class LinuxNode(ResourceManager):
                     )
         
         return pidtuple
                     )
         
         return pidtuple
-    
+
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
@@ -620,7 +719,7 @@ class LinuxNode(ResourceManager):
         proc = None
         status = self.status(pid, ppid)
 
         proc = None
         status = self.status(pid, ppid)
 
-        if status == sshfuncs.RUNNING:
+        if status == sshfuncs.ProcStatus.RUNNING:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
@@ -635,12 +734,6 @@ class LinuxNode(ResourceManager):
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
-        return (out, err), proc
 
 
-    def check_bad_host(self, out, err):
-        badre = re.compile(r'(?:'
-                           r'|Error: disk I/O error'
-                           r')', 
-                           re.I)
-        return badre.search(out) or badre.search(err)
+        return (out, err), proc