Making check_errors method from node return the stdout as well as stderr
[nepi.git] / src / nepi / resources / linux / node.py
index 3f30633..8021258 100644 (file)
@@ -1,10 +1,29 @@
-from neco.execution.attribute import Attribute, Flags
-from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux import rpmfuncs, debfuncs 
-from neco.util import sshfuncs, execfuncs 
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux import rpmfuncs, debfuncs 
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
 
 import collections
-import logging
 import os
 import random
 import re
@@ -19,6 +38,25 @@ import threading
 
 reschedule_delay = "0.5s"
 
+class ExitCode:
+    """
+    Error codes that the rexitcode function can return if unable to
+    check the exit code of a spawned process
+    """
+    FILENOTFOUND = -1
+    CORRUPTFILE = -2
+    ERROR = -3
+    OK = 0
+
+class OSType:
+    """
+    Supported flavors of Linux OS
+    """
+    FEDORA_12 = "f12"
+    FEDORA_14 = "f14"
+    FEDORA = "fedora"
+    UBUNTU = "ubuntu"
+    DEBIAN = "debian"
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -72,8 +110,6 @@ class LinuxNode(ResourceManager):
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
-
-        self._logger = logging.getLogger("LinuxNode")
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -110,13 +146,13 @@ class LinuxNode(ResourceManager):
             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
-            self._os = "f12"
+            self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
-            self._os = "f14"
+            self._os = OSType.FEDORA_14
         elif out.find("Debian") == 0: 
-            self._os = "debian"
+            self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
-            self._os = "ubuntu"
+            self._os = OSType.UBUNTU
         else:
             msg = "Unsupported OS"
             self.error(msg, out)
@@ -128,7 +164,7 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
-    def provision(self, filters = None):
+    def provision(self):
         if not self.is_alive():
             self._state = ResourceState.FAILED
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
@@ -148,15 +184,15 @@ class LinuxNode(ResourceManager):
     def deploy(self):
         if self.state == ResourceState.NEW:
             try:
-               self.discover()
-               self.provision()
+                self.discover()
+                self.provision()
             except:
                 self._state = ResourceState.FAILED
                 raise
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
-        from neco.resources.linux.interface import LinuxInterface
+        from nepi.resources.linux.interface import LinuxInterface
         ifaces = self.get_connected(LinuxInterface.rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
@@ -233,7 +269,6 @@ class LinuxNode(ResourceManager):
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
-
         result = self.copy(src, dst)
 
         # clean up temp file
@@ -248,46 +283,46 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.install_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.install_packages_command(self.os, packages)
+    def install_packages(self, packages, home):
+        command = ""
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+            command = rpmfuncs.install_packages_command(self.os, packages)
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+            command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "instpkg_pid",
-            stdout = "instpkg_out", 
-            stderr = "instpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "instpkg.sh",
+            pidfile = "instpkg_pidfile",
+            ecodefile = "instpkg_exitcode",
+            stdout = "instpkg_stdout", 
+            stderr = "instpkg_stderr",
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.remove_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.remove_packages_command(self.os, packages)
+    def remove_packages(self, packages, home):
+        command = ""
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+            command = rpmfuncs.remove_packages_command(self.os, packages)
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+            command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "rmpkg_pid",
-            stdout = "rmpkg_out", 
-            stderr = "rmpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "rmpkg.sh",
+            pidfile = "rmpkg_pidfile",
+            ecodefile = "rmpkg_exitcode",
+            stdout = "rmpkg_stdout", 
+            stderr = "rmpkg_stderr",
             raise_on_error = True)
          
         return (out, err), proc 
@@ -300,22 +335,31 @@ class LinuxNode(ResourceManager):
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
-
-    def run_and_wait(self, command, 
-            home = ".", 
-            pidfile = "pid", 
+        
+    def run_and_wait(self, command, home, 
+            shfile = "cmd.sh",
+            env = None,
+            pidfile = "pidfile", 
+            ecodefile = "exitcode", 
             stdin = None, 
-            stdout = 'stdout'
-            stderr = 'stderr'
+            stdout = "stdout"
+            stderr = "stderr"
             sudo = False,
             tty = False,
             raise_on_error = False):
-        """ runs a command in background on the remote host, but waits
-            until the command finishes execution.
-            This is more robust than doing a simple synchronized 'execute',
-            since in the remote host the command can continue to run detached
-            even if network disconnections occur
+        """ 
+        runs a command in background on the remote host, busy-waiting
+        until the command finishes execution.
+        This is more robust than doing a simple synchronized 'execute',
+        since in the remote host the command can continue to run detached
+        even if network disconnections occur
         """
+        self.upload_command(command, home, 
+            shfile = shfile, 
+            ecodefile = ecodefile, 
+            env = env)
+
+        command = "bash ./%s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -331,7 +375,6 @@ class LinuxNode(ResourceManager):
             self.error(msg, out, err)
             if raise_on_error:
                 raise RuntimeError, msg
-
         # Wait for pid file to be generated
         pid, ppid = self.wait_pid(
                 home = home, 
@@ -340,11 +383,11 @@ class LinuxNode(ResourceManager):
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
-       
-        # check if execution errors occurred
-        (out, err), proc = self.check_output(home, stderr)
+      
+        (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
-        if err or out:
+        # Out is what was written in the stderr file
+        if err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -352,21 +395,111 @@ class LinuxNode(ResourceManager):
                 raise RuntimeError, msg
         
         return (out, err), proc
+
+    def exitcode(self, home, ecodefile = "exitcode"):
+        """
+        Get the exit code of an application.
+        Returns an integer value with the exit code 
+        """
+        (out, err), proc = self.check_output(home, ecodefile)
+
+        # Succeeded to open file, return exit code in the file
+        if proc.wait() == 0:
+            try:
+                return int(out.strip())
+            except:
+                # Error in the content of the file!
+                return ExitCode.CORRUPTFILE
+
+        # No such file or directory
+        if proc.returncode == 1:
+            return ExitCode.FILENOTFOUND
+        
+        # Other error from 'cat'
+        return ExitCode.ERROR
+
+    def upload_command(self, command, home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode",
+            env = None):
+        """ Saves the command as a bash script file in the remote host, and
+        forces to save the exit code of the command execution to the ecodefile
+        """
+      
+        # The exit code of the command will be stored in ecodefile
+        command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
+                'command': command,
+                'ecodefile': ecodefile,
+                } 
+
+        # Export environment
+        environ = self.format_environment(env)
+
+        # Add environ to command
+        command = environ + command
+
+        dst = os.path.join(home, shfile)
+        return self.upload(command, dst, text = True)
+
+    def format_environment(self, env, inline = False):
+        """Format environmental variables for command to be executed either
+        as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
+        as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
+        """
+        sep = " " if inline else "\n"
+        export = " " if inline else "export"
+        return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \
+                + sep if env else ""
+
+    def check_errors(self, home, 
+            ecodefile = "exitcode", 
+            stderr = "stderr",
+            stdout = "stdout"):
+        """
+        Checks whether errors occurred while running a command.
+        It first checks the exit code for the command, and only if the
+        exit code is an error one it returns the error output.
+
+        """
+        out = err = ""
+        proc = None
+
+        # get exit code saved in the 'exitcode' file
+        ecode = self.exitcode(home, ecodefile)
+
+        if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+            err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+        elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+            # The process returned an error code or didn't exist. 
+            # Check standard error.
+            (err, eerr), proc = self.check_output(home, stderr)
+
+            # Alsow retrive standard output for information
+            (out, oerr), oproc = self.check_output(home, stdout)
+
+            # If the stderr file was not found, assume nothing bad happened,
+            # and just ignore the error.
+            # (cat returns 1 for error "No such file or directory")
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
+                err = "" 
+       
+        return (out, err), proc
  
-    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+    def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
-        for i in xrange(5):
-            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+        for i in xrange(4):
+            pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
-                delay = min(30,delay*1.2)
+                delay = delay * 1.5
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
@@ -379,30 +512,26 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        delay = 1.0
-        first = True
-        bustspin = 0
+        start_delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
             
-            if status is sshfuncs.FINISHED:
+            if status is ProcStatus.FINISHED:
                 break
-            elif status is not sshfuncs.RUNNING:
-                bustspin += 1
-                time.sleep(delay*(5.5+random.random()))
-                if bustspin > 12:
+            elif status is not ProcStatus.RUNNING:
+                delay = delay * 1.5
+                time.sleep(delay)
+                # If it takes more than 20 seconds to start, then
+                # asume something went wrong
+                if delay > 20:
                     break
             else:
-                if first:
-                    first = False
-
-                time.sleep(delay*(0.5+random.random()))
-                delay = min(30,delay*1.2)
-                bustspin = 0
+                # The app is running, just wait...
+                time.sleep(0.5)
 
     def check_output(self, home, filename):
-        """ checks file content """
+        """ Retrives content of file """
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
@@ -432,7 +561,7 @@ class LinuxNode(ResourceManager):
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc =  execfuncs.lcopy(source, dest, 
+            (out, err), proc = execfuncs.lcopy(source, dest, 
                     recursive = True,
                     strict_host_checking = False)
         else:
@@ -459,6 +588,7 @@ class LinuxNode(ResourceManager):
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
+            blocking = True,
             with_lock = False
             ):
         """ Notice that this invocation will block until the
@@ -492,6 +622,7 @@ class LinuxNode(ResourceManager):
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
+                        blocking = blocking, 
                         strict_host_checking = strict_host_checking
                         )
             else:
@@ -512,21 +643,22 @@ class LinuxNode(ResourceManager):
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
 
-    def run(self, command, 
-            home = None,
+    def run(self, command, home,
             create_home = False,
-            pidfile = "pid",
+            pidfile = 'pidfile',
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
-
+        
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
@@ -539,10 +671,8 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
             with self._lock:
-                (out,err), proc = sshfuncs.rspawn(
+                (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
@@ -562,12 +692,12 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-    def checkpid(self, home = ".", pidfile = "pid"):
+    def getpid(self, home, pidfile = "pidfile"):
         if self.localhost:
-            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
             with self._lock:
-                pidtuple = sshfuncs.rcheckpid(
+                pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -578,7 +708,7 @@ class LinuxNode(ResourceManager):
                     )
         
         return pidtuple
-    
+
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
@@ -601,7 +731,7 @@ class LinuxNode(ResourceManager):
         proc = None
         status = self.status(pid, ppid)
 
-        if status == sshfuncs.RUNNING:
+        if status == sshfuncs.ProcStatus.RUNNING:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
@@ -616,18 +746,6 @@ class LinuxNode(ResourceManager):
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
-        return (out, err), proc
 
-    def check_bad_host(self, out, err):
-        badre = re.compile(r'(?:'
-                           r'|Error: disk I/O error'
-                           r')', 
-                           re.I)
-        return badre.search(out) or badre.search(err)
-
-    def blacklist(self):
-        # TODO!!!!
-        self.warn(" Blacklisting malfunctioning node ")
-        #import util
-        #util.appendBlacklist(self.hostname)
+        return (out, err), proc