Bugfixing LinuxNode and LinuxApplication
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 11 Jun 2013 00:36:50 +0000 (17:36 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 11 Jun 2013 00:36:50 +0000 (17:36 -0700)
14 files changed:
examples/linux/ccnx/vlc_2_hosts.py
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/execution/trace.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
src/nepi/util/execfuncs.py
src/nepi/util/sshfuncs.py
test/execution/ec.py
test/resources/linux/application.py
test/resources/linux/node.py
test/util/sshfuncs.py

index 0291d03..f880fca 100755 (executable)
@@ -28,11 +28,12 @@ from optparse import OptionParser, SUPPRESS_HELP
 import os
 import time
 
-def add_node(ec, host, user):
+def add_node(ec, host, user, ssh_key = None):
     node = ec.register_resource("LinuxNode")
     ec.set(node, "hostname", host)
     ec.set(node, "username", user)
-    #ec.set(node, "cleanHome", True)
+    ec.set(node, "identity", ssh_key)
+    ec.set(node, "cleanHome", True)
     ec.set(node, "cleanProcesses", True)
     return node
 
@@ -40,18 +41,18 @@ def add_ccnd(ec, os_type, peers):
     if os_type == "f12":
         depends = ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
-                " gcc gcc-c++ git pcre-devel ")
+                " gcc gcc-c++ git pcre-devel make ")
     elif os_type == "ubuntu":
         depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
-                " git-core pkg-config libpcre3-dev ")
+                " git-core pkg-config libpcre3-dev make ")
 
     sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
 
     build = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -d ${EXP_HOME}/ccnx/bin"
+            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
         " ) || ( "
         # If not, untar and build
             " ( "
@@ -66,7 +67,7 @@ def add_ccnd(ec, os_type, peers):
     install = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -d ${EXP_HOME}/ccnx/bin "
+            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
         " ) || ( "
             "  mkdir -p ${EXP_HOME}/ccnx/bin && "
             "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
@@ -75,11 +76,11 @@ def add_ccnd(ec, os_type, peers):
 
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
 
-    # BASH command -> ' ccndstart 2>&1 ; ccndc add ccnx:/ udp  host ;  ccnr 2>&1 '
-    command = "ccndstart 2>&1 ; "
+    # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp  host ;  ccnr '
+    command = "ccndstart ; "
     peers = map(lambda peer: "ccndc add ccnx:/ udp  %s" % peer, peers)
     command += " ; ".join(peers) + " ; "
-    command += " ccnr 2>&1 "
+    command += " ccnr "
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", depends)
@@ -104,7 +105,7 @@ def add_publish(ec, movie):
 
 def add_stream(ec):
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
-    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1"
+    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) "
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", "vlc")
@@ -117,37 +118,56 @@ def add_stream(ec):
 def get_options():
     slicename = os.environ.get("PL_SLICE")
 
-    usage = "usage: %prog -s <pl-slice> -u <user-2> -m <movie> -l <exp-id>"
+    # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
+    # id_rsa_planetlab exists 
+    default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+    default_key = default_key if os.path.exists(default_key) else None
+    pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
+
+    usage = "usage: %prog -s <pl-slice> -u <username> -m <movie> -l <exp-id> -i <ssh_key>"
 
     parser = OptionParser(usage=usage)
     parser.add_option("-s", "--pl-slice", dest="pl_slice", 
-            help="PlanetLab slicename", default=slicename, type="str")
-    parser.add_option("-u", "--user-2", dest="user2", 
-            help="User for non PlanetLab machine", type="str")
+            help="PlanetLab slicename", default = slicename, type="str")
+    parser.add_option("-u", "--username", dest="username", 
+            help="User for extra host (non PlanetLab)", type="str")
     parser.add_option("-m", "--movie", dest="movie", 
             help="Stream movie", type="str")
     parser.add_option("-l", "--exp-id", dest="exp_id", 
             help="Label to identify experiment", type="str")
+    parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
+            help="Path to private SSH key to be used for connection", 
+            default = pl_ssh_key, type="str")
 
     (options, args) = parser.parse_args()
 
     if not options.movie:
         parser.error("movie is a required argument")
 
-    return (options.pl_slice, options.user2, options.movie, options.exp_id)
+    return (options.pl_slice, options.username, options.movie, options.exp_id, 
+            options.pl_ssh_key)
 
 if __name__ == '__main__':
-    ( pl_slice, user2, movie, exp_id ) = get_options()
+    ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options()
 
     # Search for available RMs
     populate_factory()
     
+    # PlanetLab node
     host1 = 'planetlab2.u-strasbg.fr'
+    
+    # Another node 
+    # IMPORTANT NOTE: you must replace this host for another one
+    #       you have access to. You must set up your SSH keys so
+    #       the host can be accessed through SSH without prompting
+    #       for a password. The host must allow X forwarding using SSH.
     host2 = 'roseval.pl.sophia.inria.fr'
 
+    # Create the ExperimentController instance
     ec = ExperimentController(exp_id = exp_id)
 
-    node1 = add_node(ec, host1, pl_slice)
+    # Register a ResourceManager (RM) for the PlanetLab node
+    node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
     
     peers = [host2]
     ccnd1 = add_ccnd(ec, "f12", peers)
@@ -161,7 +181,7 @@ if __name__ == '__main__':
     ec.register_condition(pub, ResourceAction.START, 
             ccnd1, ResourceState.STARTED)
     
-    node2 = add_node(ec, host2, user2)
+    node2 = add_node(ec, host2, username)
     peers = [host1]
     ccnd2 = add_ccnd(ec, "ubuntu", peers)
     ec.register_connection(ccnd2, node2)
@@ -177,10 +197,13 @@ if __name__ == '__main__':
     ec.register_condition(stream, ResourceAction.START, 
             pub, ResourceState.STARTED)
  
+    # Deploy all ResourceManagers
     ec.deploy()
 
+    # Wait until the applications are finished
     apps = [ccnd1, pub, ccnd2, stream]
     ec.wait_finished(apps)
 
+    # Shutdown the experiment controller
     ec.shutdown()
 
index 3d46edc..c6b973b 100644 (file)
@@ -79,53 +79,53 @@ class Attribute(object):
 
     @property
     def name(self):
-    """ Returns the name of the attribute """
+        """ Returns the name of the attribute """
         return self._name
 
     @property
     def default(self):
-    """ Returns the default value of the attribute """
+        """ Returns the default value of the attribute """
         return self._default
 
     @property
     def type(self):
-    """ Returns the type of the attribute """
+        """ Returns the type of the attribute """
         return self._type
 
     @property
     def help(self):
-    """ Returns the help of the attribute """
+        """ Returns the help of the attribute """
         return self._help
 
     @property
     def flags(self):
-    """ Returns the flags of the attribute """
+        """ Returns the flags of the attribute """
         return self._flags
 
     @property
     def allowed(self):
-    """ Returns the allowed value for this attribute """
+        """ Returns the allowed value for this attribute """
         return self._allowed
 
     @property
     def range(self):
-    """ Returns the range of the attribute """
+        """ Returns the range of the attribute """
         return self._range
 
     def has_flag(self, flag):
-    """ Returns true if the attribute has the flag 'flag'
+        """ Returns true if the attribute has the flag 'flag'
 
         :param flag: Flag that need to be ckecked
         :type flag: Flags
-    """
+        """
         return (self._flags & flag) == flag
 
     def get_value(self):
-    """ Returns the value of the attribute """
+        """ Returns the value of the attribute """
         return self._value
 
     def set_value(self, value):
-    """ Change the value of the attribute after checking the type """
+        """ Change the value of the attribute after checking the type """
         valid = True
 
         if self.type == Types.Enumerate:
index 34c5c50..ba573c4 100644 (file)
@@ -439,6 +439,9 @@ class ExperimentController(object):
         if not group:
             group = self.resources
 
+        if isinstance(group, int):
+            group = [group]
+
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
         # It is likely that the user inserted in the 'group' list closely
@@ -450,7 +453,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        #random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
@@ -467,7 +470,7 @@ class ExperimentController(object):
                 # If all resources are read, we schedule the start
                 for guid in group:
                     rm = self.get_resource(guid)
-                    self.schedule("0.01s", rm.start_with_conditions)
+                    self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
             # Schedule the function that will check all resources are
@@ -479,7 +482,7 @@ class ExperimentController(object):
 
         for guid in group:
             rm = self.get_resource(guid)
-            self.schedule("0.001s", rm.deploy)
+            self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
@@ -518,12 +521,16 @@ class ExperimentController(object):
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
-        It means : Release all the resources and stop the scheduler
+        Releases all the resources and stops task processing thread
 
         """
         self.release()
 
-        self._stop_scheduler()
+        # Mark the EC state as TERMINATED
+        self._state = ECState.TERMINATED
+
+        # Notify condition to wake up the processing thread
+        self._notify()
         
         if self._thread.is_alive():
            self._thread.join()
@@ -554,18 +561,28 @@ class ExperimentController(object):
             self._tasks[task.id] = task
   
         # Notify condition to wake up the processing thread
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
+        self._notify()
 
         return task.id
      
     def _process(self):
-        """ Process at executing the task that are in the scheduler.
+        """ Process scheduled tasks.
+
+        The _process method is executed in an independent thread held by the 
+        ExperimentController for as long as the experiment is running.
+        
+        Tasks are scheduled by invoking the schedule method with a target callback. 
+        The schedule method is givedn a execution time which controls the
+        order in which tasks are processed. 
+
+        Tasks are processed in parallel using multithreading. 
+        The environmental variable NEPI_NTHREADS can be used to control
+        the number of threads used to process tasks. The default value is 50.
 
         """
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
-        runner = ParallelRun(maxthreads = 50)
+        runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         try:
@@ -602,18 +619,18 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-   
-        # Mark EC state as terminated
-        if self.ecstate == ECState.RUNNING:
-            # Synchronize to get errors if occurred
+        finally:   
             runner.sync()
-            self._state = ECState.TERMINATED
 
     def _execute(self, task):
-        """ Invoke the callback of the task 'task'
+        """ Executes a single task. 
+
+            If the invokation of the task callback raises an
+            exception, the processing thread of the ExperimentController
+            will be stopped and the experiment will be aborted.
 
-            :param task: Id of the task
-            :type task: int
+            :param task: Object containing the callback to execute
+            :type task: Task
 
         """
         # Invoke callback
@@ -629,22 +646,21 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            self._stop_scheduler()
+            # Set the EC to FAILED state (this will force to exit the task
+            # processing thread)
+            self._state = ECState.FAILED
+
+            # Notify condition to wake up the processing thread
+            self._notify()
 
             # Propage error to the ParallelRunner
             raise
 
-    def _stop_scheduler(self):
-        """ Stop the scheduler and put the EC into a FAILED State.
-
+    def _notify(self):
+        """ Awakes the processing thread in case it is blocked waiting
+        for a new task to be scheduled.
         """
-
-        # Mark the EC as failed
-        self._state = ECState.FAILED
-
-        # Wake up the EC in case it was sleeping
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
 
-
index 423ca92..078b07d 100644 (file)
@@ -627,17 +627,16 @@ class ResourceFactory(object):
         return rclass(ec, guid)
 
 def populate_factory():
-        """Register all the possible RM that exists in the current version of Nepi.
-
-        """
+    """Register all the possible RM that exists in the current version of Nepi.
+    """
     for rclass in find_types():
         ResourceFactory.register_type(rclass)
 
 def find_types():
-        """Look into the different folders to find all the 
-        availables Resources Managers
+    """Look into the different folders to find all the 
+    availables Resources Managers
 
-        """
+    """
     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
     search_path = set(search_path.split(" "))
    
index 1d16242..3b22eaf 100644 (file)
@@ -45,11 +45,11 @@ class Trace(object):
 
     @property
     def name(self):
-    """ Returns the name of the trace """
+        """ Returns the name of the trace """
         return self._name
 
     @property
     def help(self):
-    """ Returns the help of the trace """
+        """ Returns the help of the trace """
         return self._help
 
index 080c3a4..5f4c998 100644 (file)
@@ -21,16 +21,13 @@ from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs 
+from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
-reschedule_delay = "0.5s"
-state_check_delay = 1
-
 # TODO: Resolve wildcards in commands!!
-# TODO: If command is not set give a warning but do not generate an error!
+
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -215,25 +212,21 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
+        # Upload command
         command = self.get("command")
         x11 = self.get("forwardX11")
-        if not x11 and command:
+        env = self.get("env")
+        
+        if command and not x11:
             self.info("Uploading command '%s'" % command)
 
-            # Export environment
-            environ = ""
-            if self.get("env"):
-                for var in self.get("env").split(" "):
-                    environ += 'export %s\n' % var
-
-            command = environ + command
-
-            # If the command runs asynchronous, pre upload the command 
-            # to the app.sh file in the remote host
-            dst = os.path.join(self.app_home, "app.sh")
+            # replace application specific paths in the command
             command = self.replace_paths(command)
-            self.node.upload(command, dst, text = True)
 
+            self.node.upload_command(command, self.app_home, 
+                    shfile = "app.sh",
+                    env = env)
+       
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
@@ -253,25 +246,29 @@ class LinuxApplication(ResourceManager):
                     http_sources.append(source)
                     sources.remove(source)
 
-            # Download http sources
+            # Download http sources remotely
             if http_sources:
-                cmd = " wget -c --directory-prefix=${SOURCES} "
-                verif = ""
+                command = " wget -c --directory-prefix=${SOURCES} "
+                check = ""
 
                 for source in http_sources:
-                    cmd += " %s " % (source)
-                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                    command += " %s " % (source)
+                    check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
                 
-                # Wget output goes to stderr :S
-                cmd += " 2> /dev/null ; "
-
-                # Add verification
-                cmd += " %s " % verif
+                # Append the command to check that the sources were downloaded
+                command += " ; %s " % check
 
+                # replace application specific paths in the command
+                command = self.replace_paths(command)
+                
                 # Upload the command to a file, and execute asynchronously
-                self.upload_and_run(cmd, 
-                        "http_sources.sh", "http_sources_pid", 
-                        "http_sources_out", "http_sources_err")
+                self.node.run_and_wait(command, self.app_home,
+                        shfile = "http_sources.sh",
+                        pidfile = "http_sources_pidfile", 
+                        ecodefile = "http_sources_exitcode", 
+                        stdout = "http_sources_stdout", 
+                        stderr = "http_sources_stderr")
+
             if sources:
                 self.node.upload(sources, self.src_dir)
 
@@ -299,7 +296,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.app_home)
+            self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
@@ -309,26 +306,40 @@ class LinuxApplication(ResourceManager):
             # create dir for build
             self.node.mkdir(self.build_dir)
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(build, 
-                    "build.sh", "build_pid", 
-                    "build_out", "build_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "build.sh",
+                    pidfile = "build_pidfile", 
+                    ecodefile = "build_exitcode", 
+                    stdout = "build_stdout", 
+                    stderr = "build_stderr")
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(install, 
-                    "install.sh", "install_pid", 
-                    "install_out", "install_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "install.sh",
+                    pidfile = "install_pidfile", 
+                    ecodefile = "install_exitcode", 
+                    stdout = "install_stdout", 
+                    stderr = "install_stderr")
 
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
@@ -352,16 +363,25 @@ class LinuxApplication(ResourceManager):
         x11 = self.get('forwardX11') or False
         failed = False
 
-        super(LinuxApplication, self).start()
-
         if not command:
-            self.info("No command to start ")
+            # If no command was given, then the application 
+            # is directly marked as FINISHED
             self._state = ResourceState.FINISHED
-            return 
+        else:
+            super(LinuxApplication, self).start()
     
         self.info("Starting command '%s'" % command)
 
         if x11:
+            # If X11 forwarding was specified, then the application
+            # can not run detached, so instead of invoking asynchronous
+            # 'run' we invoke synchronous 'execute'.
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
+
             if env:
                 # Export environment
                 environ = ""
@@ -392,28 +412,22 @@ class LinuxApplication(ResourceManager):
                 stderr = stderr,
                 sudo = sudo)
 
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
             if proc.poll() and err:
-                failed = True
+                self.error(msg, out, err)
+                raise RuntimeError, msg
         
-            if not failed:
-                pid, ppid = self.node.wait_pid(home = self.app_home)
-                if pid: self._pid = int(pid)
-                if ppid: self._ppid = int(ppid)
+            # Check status of process running in background
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
 
+            # If the process is not running, check for error information
+            # on the remote machine
             if not self.pid or not self.ppid:
-                failed = True
-            (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
-
-            if failed or out or chkerr:
-                # check if execution errors occurred
-                msg = " Failed to start command '%s' " % command
-                out = out
-                if err:
-                    err = err
-                elif chkerr:
-                    err = chkerr
-
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
                 self.error(msg, out, err)
 
                 msg2 = " Setting state to Failed"
@@ -456,10 +470,11 @@ class LinuxApplication(ResourceManager):
         if self._state == ResourceState.STARTED:
             # To avoid overwhelming the remote hosts and the local processor
             # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' .
+            # every 'state_check_delay' seconds.
+            state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+                (out, err), proc = self.node.check_errors(self.app_home)
 
                 if out or err:
                     if err.find("No such file or directory") >= 0 :
@@ -474,7 +489,7 @@ class LinuxApplication(ResourceManager):
                 elif self.pid and self.ppid:
                     status = self.node.status(self.pid, self.ppid)
 
-                    if status == sshfuncs.FINISHED:
+                    if status == ProcStatus.FINISHED:
                         self._state = ResourceState.FINISHED
 
 
@@ -482,18 +497,6 @@ class LinuxApplication(ResourceManager):
 
         return self._state
 
-    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
-        dst = os.path.join(self.app_home, fname)
-        cmd = self.replace_paths(cmd)
-        self.node.upload(cmd, dst, text = True)
-
-        cmd = "bash ./%s" % fname
-        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
-            pidfile = pidfile,
-            stdout = outfile, 
-            stderr = errfile, 
-            raise_on_error = True)
-
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.
index 8387def..0f3a01c 100644 (file)
@@ -20,7 +20,8 @@
 from nepi.execution.attribute import Attribute, Flags
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
-from nepi.util import sshfuncs, execfuncs 
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
 
 import collections
 import os
@@ -37,6 +38,15 @@ import threading
 
 reschedule_delay = "0.5s"
 
+class ExitCode:
+    """
+    Error codes that the rexitcode function can return if unable to
+    check the exit code of a spawned process
+    """
+    FILENOTFOUND = -1
+    CORRUPTFILE = -2
+    ERROR = -3
+    OK = 0
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -264,46 +274,46 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
+    def install_packages(self, packages, home):
+        command = ""
         if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.install_packages_command(self.os, packages)
+            command = rpmfuncs.install_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.install_packages_command(self.os, packages)
+            command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "instpkg_pid",
-            stdout = "instpkg_out", 
-            stderr = "instpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "instpkg.sh",
+            pidfile = "instpkg_pidfile",
+            ecodefile = "instpkg_exitcode",
+            stdout = "instpkg_stdout", 
+            stderr = "instpkg_stderr",
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
+    def remove_packages(self, packages, home):
+        command = ""
         if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.remove_packages_command(self.os, packages)
+            command = rpmfuncs.remove_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.remove_packages_command(self.os, packages)
+            command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "rmpkg_pid",
-            stdout = "rmpkg_out", 
-            stderr = "rmpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "rmpkg.sh",
+            pidfile = "rmpkg_pidfile",
+            ecodefile = "rmpkg_exitcode",
+            stdout = "rmpkg_stdout", 
+            stderr = "rmpkg_stderr",
             raise_on_error = True)
          
         return (out, err), proc 
@@ -316,22 +326,27 @@ class LinuxNode(ResourceManager):
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
-
-    def run_and_wait(self, command, 
-            home = ".", 
-            pidfile = "pid", 
+        
+    def run_and_wait(self, command, home, 
+            shfile = "cmd.sh",
+            pidfile = "pidfile", 
+            ecodefile = "exitcode", 
             stdin = None, 
-            stdout = 'stdout'
-            stderr = 'stderr'
+            stdout = "stdout"
+            stderr = "stderr"
             sudo = False,
             tty = False,
             raise_on_error = False):
-        """ runs a command in background on the remote host, but waits
-            until the command finishes execution.
-            This is more robust than doing a simple synchronized 'execute',
-            since in the remote host the command can continue to run detached
-            even if network disconnections occur
+        """ 
+        runs a command in background on the remote host, busy-waiting
+        until the command finishes execution.
+        This is more robust than doing a simple synchronized 'execute',
+        since in the remote host the command can continue to run detached
+        even if network disconnections occur
         """
+        self.upload_command(command, home, shfile, ecodefile)
+
+        command = "bash ./%s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -356,11 +371,11 @@ class LinuxNode(ResourceManager):
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
-       
-        # check if execution errors occurred
-        (out, err), proc = self.check_output(home, stderr)
+      
+        (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
-        if err or out:
+        # Out is what was written in the stderr file
+        if out or err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -368,21 +383,93 @@ class LinuxNode(ResourceManager):
                 raise RuntimeError, msg
         
         return (out, err), proc
+
+    def exitcode(self, home, ecodefile = "exitcode"):
+        """
+        Get the exit code of an application.
+        Returns an integer value with the exit code 
+        """
+        (out, err), proc = self.check_output(home, ecodefile)
+
+        # Succeeded to open file, return exit code in the file
+        if proc.wait() == 0:
+            try:
+                return int(out.strip())
+            except:
+                # Error in the content of the file!
+                return ExitCode.CORRUPTFILE
+
+        # No such file or directory
+        if proc.returncode == 1:
+            return ExitCode.FILENOTFOUND
+        
+        # Other error from 'cat'
+        return ExitCode.ERROR
+
+    def upload_command(self, command, home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode",
+            env = None):
+
+        command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
+                'command': command,
+                'ecodefile': ecodefile,
+                } 
+
+        # Export environment
+        environ = ""
+        if env:
+            for var in env.split(" "):
+                environ += 'export %s\n' % var
+
+        command = environ + command
+
+        dst = os.path.join(home, shfile)
+        return self.upload(command, dst, text = True)
+
+    def check_errors(self, home, 
+            ecodefile = "exitcode", 
+            stderr = "stderr"):
+        """
+        Checks whether errors occurred while running a command.
+        It first checks the exit code for the command, and only if the
+        exit code is an error one it returns the error output.
+        """
+        out = err = ""
+        proc = None
+
+        # get Exit code
+        ecode = self.exitcode(home, ecodefile)
+
+        if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+            err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+        elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+            # The process returned an error code or didn't exist. 
+            # Check standard error.
+            (out, err), proc = self.check_output(home, stderr)
+            
+            # If the stderr file was not found, assume nothing happened.
+            # We just ignore the error.
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
+                err = ""
+       
+        return (out, err), proc
  
-    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+    def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
-        for i in xrange(5):
-            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+        for i in xrange(4):
+            pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
-                delay = min(30,delay*1.2)
+                delay = delay * 1.5
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
@@ -395,30 +482,26 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        delay = 1.0
-        first = True
-        bustspin = 0
+        start_delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
             
-            if status is sshfuncs.FINISHED:
+            if status is ProcStatus.FINISHED:
                 break
-            elif status is not sshfuncs.RUNNING:
-                bustspin += 1
-                time.sleep(delay*(5.5+random.random()))
-                if bustspin > 12:
+            elif status is not ProcStatus.RUNNING:
+                delay = delay * 1.5
+                time.sleep(delay)
+                # If it takes more than 20 seconds to start, then
+                # asume something went wrong
+                if delay > 20:
                     break
             else:
-                if first:
-                    first = False
-
-                time.sleep(delay*(0.5+random.random()))
-                delay = min(30,delay*1.2)
-                bustspin = 0
+                # The app is running, just wait...
+                time.sleep(0.5)
 
     def check_output(self, home, filename):
-        """ checks file content """
+        """ Retrives content of file """
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
@@ -448,7 +531,7 @@ class LinuxNode(ResourceManager):
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc =  execfuncs.lcopy(source, dest, 
+            (out, err), proc = execfuncs.lcopy(source, dest, 
                     recursive = True,
                     strict_host_checking = False)
         else:
@@ -533,16 +616,15 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-    def run(self, command, 
-            home = None,
+    def run(self, command, home,
             create_home = False,
-            pidfile = "pid",
+            pidfile = 'pidfile',
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
-
+        
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
@@ -555,10 +637,8 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
             with self._lock:
-                (out,err), proc = sshfuncs.rspawn(
+                (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
@@ -578,12 +658,12 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-    def checkpid(self, home = ".", pidfile = "pid"):
+    def getpid(self, home, pidfile = "pidfile"):
         if self.localhost:
-            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
             with self._lock:
-                pidtuple = sshfuncs.rcheckpid(
+                pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -594,7 +674,7 @@ class LinuxNode(ResourceManager):
                     )
         
         return pidtuple
-    
+
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
@@ -617,7 +697,7 @@ class LinuxNode(ResourceManager):
         proc = None
         status = self.status(pid, ppid)
 
-        if status == sshfuncs.RUNNING:
+        if status == sshfuncs.ProcStatus.RUNNING:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
@@ -632,12 +712,6 @@ class LinuxNode(ResourceManager):
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
-        return (out, err), proc
 
-    def check_bad_host(self, out, err):
-        badre = re.compile(r'(?:'
-                           r'|Error: disk I/O error'
-                           r')', 
-                           re.I)
-        return badre.search(out) or badre.search(err)
+        return (out, err), proc
 
index 13941bd..d832fb6 100644 (file)
@@ -29,7 +29,7 @@ def install_packages_command(os, packages):
     cmd = "( %s )" % install_rpmfusion_command(os)
     for p in packages:
         cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
-            'package': p}
+                    'package': p}
     
     #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
     return " ( %s )" % cmd 
@@ -42,7 +42,7 @@ def remove_packages_command(os, packages):
     for p in packages:
         cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
                     'package': p}
-    
+
     #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
     return cmd 
 
index 773465c..65cea4e 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT 
+from nepi.util.sshfuncs import ProcStatus, STDOUT
 
 import subprocess
 
@@ -134,7 +134,7 @@ def lspawn(command, pidfile,
 
     return (out,err),proc
 
-def lcheckpid(pidfile):
+def lgetpid(pidfile):
     """
     Check the pidfile of a process spawned with remote_spawn.
     
@@ -179,14 +179,14 @@ def lstatus(pid, ppid):
         })
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
  
 
 def lkill(pid, ppid, sudo = False):
index a88dc78..50f5d7c 100644 (file)
@@ -57,20 +57,18 @@ class STDOUT:
     redirect to whatever stdout was redirected to.
     """
 
-class RUNNING:
+class ProcStatus:
     """
-    Process is still running
+    Codes for status of remote spawned process
     """
+    # Process is still running
+    RUNNING = 1
 
-class FINISHED:
-    """
-    Process is finished
-    """
-
-class NOT_STARTED:
-    """
-    Process hasn't started running yet (this should be very rare)
-    """
+    # Process is finished
+    FINISHED = 2
+    
+    # Process hasn't started running yet (this should be very rare)
+    NOT_STARTED = 3
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
@@ -511,6 +509,9 @@ def rcopy(source, dest,
         tmp_known_hosts = None
 
         args = ['scp', '-q', '-p', '-C',
+                # Speed up transfer using blowfish cypher specification which is 
+                # faster than the default one (3des)
+                '-c', 'blowfish',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
                 '-o', 'ConnectTimeout=60',
@@ -588,7 +589,7 @@ def rcopy(source, dest,
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
-        stdin = '/dev/null', 
+        stdin = '/dev/null',
         home = None, 
         create_home = False, 
         sudo = False,
@@ -600,28 +601,41 @@ def rspawn(command, pidfile,
         server_key = None,
         tty = False):
     """
-    Spawn a remote command such that it will continue working asynchronously.
-    
-    Parameters:
-        command: the command to run - it should be a single line.
-        
-        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
-        
-        stdout: path of a file to redirect standard output to - must be a string.
-            Defaults to /dev/null
-        stderr: path of a file to redirect standard error to - string or the special STDOUT value
-            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
-        stdin: path of a file with input to be piped into the command's standard input
+    Spawn a remote command such that it will continue working asynchronously in 
+    background. 
+
+        :param command: The command to run, it should be a single line.
+        :type command: str
+
+        :param pidfile: Path to a file where to store the pid and ppid of the 
+                        spawned process
+        :type pidfile: str
+
+        :param stdout: Path to file to redirect standard output. 
+                       The default value is /dev/null
+        :type stdout: str
+
+        :param stderr: Path to file to redirect standard error.
+                       If the special STDOUT value is used, stderr will 
+                       be redirected to the same file as stdout
+        :type stderr: str
+
+        :param stdin: Path to a file with input to be piped into the command's standard input
+        :type stdin: str
+
+        :param home: Path to working directory folder. 
+                    It is assumed to exist unless the create_home flag is set.
+        :type home: str
+
+        :param create_home: Flag to force creation of the home folder before 
+                            running the command
+        :type create_home: bool
+        :param sudo: Flag forcing execution with sudo user
+        :type sudo: bool
         
-        home: path of a folder to use as working directory - should exist, unless you specify create_home
-        
-        create_home: if True, the home folder will be created first with mkdir -p
-        
-        sudo: whether the command needs to be executed as root
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: touple
+
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
@@ -667,7 +681,7 @@ def rspawn(command, pidfile,
     return ((out, err), proc)
 
 @eintr_retry
-def rcheckpid(pidfile,
+def rgetpid(pidfile,
         host = None, 
         port = None, 
         user = None, 
@@ -675,19 +689,21 @@ def rcheckpid(pidfile,
         identity = None,
         server_key = None):
     """
-    Check the pidfile of a process spawned with remote_spawn.
-    
-    Parameters:
-        pidfile: the pidfile passed to remote_span
+    Returns the pid and ppid of a process from a remote file where the 
+    information was stored.
+
+        :param home: Path to directory where the pidfile is located
+        :type home: str
+
+        :param pidfile: Name of file containing the pid information
+        :type pidfile: str
         
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: int
         
-        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
-        or None if the pidfile isn't valid yet (maybe the process is still starting).
-    """
+        A (pid, ppid) tuple useful for calling rstatus and rkill,
+        or None if the pidfile isn't valid yet (can happen when process is staring up)
 
+    """
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
@@ -719,18 +735,17 @@ def rstatus(pid, ppid,
         identity = None,
         server_key = None):
     """
-    Check the status of a process spawned with remote_spawn.
+    Returns a code representing the the status of a remote process
+
+        :param pid: Process id of the process
+        :type pid: int
+
+        :param ppid: Parent process id of process
+        :type ppid: int
     
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        host/port/user/agent/identity: see rexec
+        :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
     
-    Returns:
-        
-        One of NOT_STARTED, RUNNING, FINISHED
     """
-
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
@@ -746,7 +761,7 @@ def rstatus(pid, ppid,
         )
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if err:
@@ -755,8 +770,8 @@ def rstatus(pid, ppid,
     elif out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
 
 @eintr_retry
 def rkill(pid, ppid,
@@ -769,23 +784,21 @@ def rkill(pid, ppid,
         server_key = None, 
         nowait = False):
     """
-    Kill a process spawned with remote_spawn.
-    
+    Sends a kill signal to a remote process.
+
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
-    
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        sudo: whether the command was run with sudo - careful killing like this.
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :param pid: Process id of process to be killed
+        :type pid: int
+
+        :param ppid: Parent process id of process to be killed
+        :type ppid: int
+
+        :param sudo: Flag indicating if sudo should be used to kill the process
+        :type sudo: bool
         
-        Nothing, should have killed the process
     """
-    
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
index 2a24547..4602148 100755 (executable)
@@ -72,7 +72,7 @@ class ExecuteControllersTestCase(unittest.TestCase):
 
     def test_schedule_exception(self):
         def raise_error():
-            raise RuntimeError, "the error"
+            raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
 
         ec = ExperimentController()
         ec.schedule("2s", raise_error)
index 75a43d1..1393f6a 100755 (executable)
@@ -34,13 +34,13 @@ import unittest
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
-        self.fedora_user = 'inria_nepi'
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
+        self.fedora_user = "inria_nepi"
 
-        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
-        self.ubuntu_user = 'alina'
+        self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+        self.ubuntu_user = "alina"
         
-        self.target = 'nepi5.pl.sophia.inria.fr'
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
@@ -69,7 +69,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        stdout = ec.trace(app, 'stdout')
+        stdout = ec.trace(app, "stdout")
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
@@ -102,16 +102,16 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        stdout = ec.trace(app, 'stdout')
-        size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+        stdout = ec.trace(app, "stdout")
+        size = ec.trace(app, "stdout", attr = TraceAttr.SIZE)
         self.assertEquals(len(stdout), size)
         
-        block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+        block = ec.trace(app, "stdout", attr = TraceAttr.STREAM, block = 5, offset = 1)
         self.assertEquals(block, stdout[5:10])
 
-        path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+        path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
         rm = ec.get_resource(app)
-        p = os.path.join(rm.app_home, 'stdout')
+        p = os.path.join(rm.app_home, "stdout")
         self.assertEquals(path, p)
 
         ec.shutdown()
@@ -202,7 +202,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(server) == ResourceState.FINISHED)
         self.assertTrue(ec.state(client) == ResourceState.FINISHED)
 
-        stdout = ec.trace(client, 'stdout')
+        stdout = ec.trace(client, "stdout")
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
@@ -222,8 +222,8 @@ class LinuxApplicationTestCase(unittest.TestCase):
         ec.set(node, "cleanHome", True)
         ec.set(node, "cleanProcesses", True)
 
-        sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \
-            "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf"
+        sources = "http://nepi.inria.fr/code/nef/archive/tip.tar.gz " \
+                " http://nepi.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png"
 
         app = ec.register_resource("LinuxApplication")
         ec.set(app, "sources", sources)
@@ -237,12 +237,12 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        err = ec.trace(app, 'http_sources_err')
-        self.assertTrue(err == "")
+        exitcode = ec.trace(app, "http_sources_exitcode")
+        self.assertTrue(exitcode.strip() == "0")
         
-        out = ec.trace(app, 'http_sources_out')
-        self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1)
-        self.assertTrue(out.find("nepi_integration_framework.pdf") > -1)
+        out = ec.trace(app, "http_sources_stdout")
+        self.assertTrue(out.find("tip.tar.gz") > -1)
+        self.assertTrue(out.find("connect.png") > -1)
 
         ec.shutdown()
 
@@ -277,8 +277,6 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
 
-    # TODO: test compilation, sources, dependencies, etc!!!
-
 if __name__ == '__main__':
     unittest.main()
 
index 1a7fa09..259f578 100755 (executable)
@@ -19,8 +19,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
-from nepi.resources.linux.node import LinuxNode
-from nepi.util.sshfuncs import RUNNING, FINISHED
+from nepi.resources.linux.node import LinuxNode, ExitCode
+from nepi.util.sshfuncs import ProcStatus
 
 from test_utils import skipIfNotAlive, skipInteractive, create_node
 
@@ -31,27 +31,13 @@ import unittest
 
 class LinuxNodeTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
-        self.fedora_user = 'inria_nepi'
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
+        self.fedora_user = "inria_nepi"
 
-        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
-        self.ubuntu_user = 'alina'
+        self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+        self.ubuntu_user = "alina"
         
-        self.target = 'nepi5.pl.sophia.inria.fr'
-
-    @skipIfNotAlive
-    def t_xterm(self, host, user):
-        node, ec = create_node(host, user)
-
-        node.install_packages('xterm')
-
-        (out, err), proc = node.execute('xterm', forward_x11 = True)
-        
-        self.assertEquals(out, "")
-
-        (out, err), proc = node.remove_packages('xterm')
-        
-        self.assertEquals(out, "")
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_execute(self, host, user):
@@ -74,14 +60,14 @@ class LinuxNodeTestCase(unittest.TestCase):
         
         command = "ping %s" % self.target
         node.run(command, app_home)
-        pid, ppid = node.checkpid(app_home)
+        pid, ppid = node.getpid(app_home)
 
         status = node.status(pid, ppid)
-        self.assertTrue(status, RUNNING)
+        self.assertTrue(status, ProcStatus.RUNNING)
 
         node.kill(pid, ppid)
         status = node.status(pid, ppid)
-        self.assertTrue(status, FINISHED)
+        self.assertTrue(status, ProcStatus.FINISHED)
         
         (out, err), proc = node.check_output(app_home, "stdout")
 
@@ -91,22 +77,120 @@ class LinuxNodeTestCase(unittest.TestCase):
 
         node.rmdir(app_home)
 
+    @skipIfNotAlive
+    def t_exitcode_ok(self, host, user):
+        command = "echo 'OK!'"
+        
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+         
+        (out, err), proc = node.run_and_wait(command, app_home,
+            shfile = "cmd.sh",
+            pidfile = "pid",
+            ecodefile = "exitcode",
+            stdout = "stdout", 
+            stderr = "stderr",
+            raise_on_error = True)
+        # get the pid of the process
+        ecode = node.exitcode(app_home)
+        self.assertEquals(ecode, ExitCode.OK)
+
+    @skipIfNotAlive
+    def t_exitcode_kill(self, host, user):
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+       
+        # Upload command that will not finish
+        command = "ping localhost"
+        (out, err), proc = node.upload_command(command, app_home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode")
+
+        (out, err), proc = node.run(command, app_home,
+            pidfile = "pidfile",
+            stdout = "stdout", 
+            stderr = "stderr")
+        # Just wait to make sure the ping started
+        time.sleep(5)
+
+        # The process is still running, so no retfile has been created yet
+        ecode = node.exitcode(app_home)
+        self.assertEquals(ecode, ExitCode.FILENOTFOUND)
+        
+        (out, err), proc = node.check_errors(app_home)
+        self.assertEquals(err, "")
+        
+        # Now kill the app
+        pid, ppid = node.getpid(app_home)
+        node.kill(pid, ppid)
+         
+        (out, err), proc = node.check_errors(app_home)
+        self.assertEquals(err, "")
+
+    @skipIfNotAlive
+    def t_exitcode_error(self, host, user):
+        # Try to execute a command that doesn't exist
+        command = "unexistent-command"
+        
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+         
+        (out, err), proc = node.run_and_wait(command, app_home,
+            shfile = "cmd.sh",
+            pidfile = "pid",
+            ecodefile = "exitcode",
+            stdout = "stdout", 
+            stderr = "stderr",
+            raise_on_error = False)
+        # get the pid of the process
+        ecode = node.exitcode(app_home)
+        # bash erro 127 - command not found
+        self.assertEquals(ecode, 127)
+        (out, err), proc = node.check_errors(app_home)
+        self.assertNotEquals(out, "")
+
     @skipIfNotAlive
     def t_install(self, host, user):
         node, ec = create_node(host, user)
 
-        (out, err), proc = node.mkdir(node.node_home, clean=True)
+        (out, err), proc = node.mkdir(node.node_home, clean = True)
         self.assertEquals(out, "")
 
-        (out, err), proc = node.install_packages('gcc')
+        (out, err), proc = node.install_packages("gcc", node.node_home)
         self.assertEquals(out, "")
 
-        (out, err), proc = node.remove_packages('gcc')
+        (out, err), proc = node.remove_packages("gcc", node.node_home)
         self.assertEquals(out, "")
 
         (out, err), proc = node.rmdir(node.exp_home)
         self.assertEquals(out, "")
 
+    @skipIfNotAlive
+    def t_xterm(self, host, user):
+        node, ec = create_node(host, user)
+
+        (out, err), proc = node.mkdir(node.node_home, clean = True)
+        self.assertEquals(out, "")
+        
+        node.install_packages("xterm", node.node_home)
+        self.assertEquals(out, "")
+
+        (out, err), proc = node.execute("xterm", forward_x11 = True)
+        self.assertEquals(out, "")
+
+        (out, err), proc = node.remove_packages("xterm", node.node_home)
+        self.assertEquals(out, "")
+
     @skipIfNotAlive
     def t_compile(self, host, user):
         node, ec = create_node(host, user)
@@ -128,7 +212,7 @@ main (void)
         node.upload(prog, dst, text = True)
 
         # install gcc
-        node.install_packages('gcc')
+        node.install_packages('gcc', app_home)
 
         # compile the program using gcc
         command = "cd %s; gcc -Wall hello.c -o hello" % app_home
@@ -147,12 +231,12 @@ main (void)
 
         # retrieve the output file 
         src = os.path.join(app_home, "hello.out")
-        f = tempfile.NamedTemporaryFile(delete=False)
+        f = tempfile.NamedTemporaryFile(delete = False)
         dst = f.name
         node.download(src, dst)
         f.close()
 
-        node.remove_packages('gcc')
+        node.remove_packages("gcc", app_home)
         node.rmdir(app_home)
 
         f = open(dst, "r")
@@ -184,6 +268,24 @@ main (void)
 
     def test_compile_ubuntu(self):
         self.t_compile(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_ok_fedora(self):
+        self.t_exitcode_ok(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_ok_ubuntu(self):
+        self.t_exitcode_ok(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_kill_fedora(self):
+        self.t_exitcode_kill(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_kill_ubuntu(self):
+        self.t_exitcode_kill(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_error_fedora(self):
+        self.t_exitcode_error(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_error_ubuntu(self):
+        self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user)
     
     @skipInteractive
     def test_xterm_ubuntu(self):
index 3342157..1a55b6b 100755 (executable)
@@ -19,8 +19,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
-from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
-        RUNNING, FINISHED 
+from nepi.util.sshfuncs import rexec, rcopy, rspawn, rgetpid, rstatus, rkill,\
+        ProcStatus
 
 import getpass
 import unittest
@@ -231,7 +231,7 @@ class SSHfuncsTestCase(unittest.TestCase):
 
         time.sleep(2)
 
-        (pid, ppid) = rcheckpid(pidfile,
+        (pid, ppid) = rgetpid(pidfile,
                 host = host,
                 user = user,
                 port = env.port,
@@ -243,7 +243,7 @@ class SSHfuncsTestCase(unittest.TestCase):
                 port = env.port, 
                 agent = True)
 
-        self.assertEquals(status, RUNNING)
+        self.assertEquals(status, ProcStatus.RUNNING)
 
         rkill(pid, ppid,
                 host = host,
@@ -257,7 +257,7 @@ class SSHfuncsTestCase(unittest.TestCase):
                 port = env.port, 
                 agent = True)
         
-        self.assertEquals(status, FINISHED)
+        self.assertEquals(status, ProcStatus.FINISHED)
 
 
 if __name__ == '__main__':