Bugfixing LinuxNode and LinuxApplication
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 11 Jun 2013 00:36:50 +0000 (17:36 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 11 Jun 2013 00:36:50 +0000 (17:36 -0700)
14 files changed:
examples/linux/ccnx/vlc_2_hosts.py
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/execution/trace.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
src/nepi/util/execfuncs.py
src/nepi/util/sshfuncs.py
test/execution/ec.py
test/resources/linux/application.py
test/resources/linux/node.py
test/util/sshfuncs.py

index 0291d03..f880fca 100755 (executable)
@@ -28,11 +28,12 @@ from optparse import OptionParser, SUPPRESS_HELP
 import os
 import time
 
 import os
 import time
 
-def add_node(ec, host, user):
+def add_node(ec, host, user, ssh_key = None):
     node = ec.register_resource("LinuxNode")
     ec.set(node, "hostname", host)
     ec.set(node, "username", user)
     node = ec.register_resource("LinuxNode")
     ec.set(node, "hostname", host)
     ec.set(node, "username", user)
-    #ec.set(node, "cleanHome", True)
+    ec.set(node, "identity", ssh_key)
+    ec.set(node, "cleanHome", True)
     ec.set(node, "cleanProcesses", True)
     return node
 
     ec.set(node, "cleanProcesses", True)
     return node
 
@@ -40,18 +41,18 @@ def add_ccnd(ec, os_type, peers):
     if os_type == "f12":
         depends = ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
     if os_type == "f12":
         depends = ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
-                " gcc gcc-c++ git pcre-devel ")
+                " gcc gcc-c++ git pcre-devel make ")
     elif os_type == "ubuntu":
         depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
     elif os_type == "ubuntu":
         depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
-                " git-core pkg-config libpcre3-dev ")
+                " git-core pkg-config libpcre3-dev make ")
 
     sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
 
     build = (
         # Evaluate if ccnx binaries are already installed
         " ( "
 
     sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
 
     build = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -d ${EXP_HOME}/ccnx/bin"
+            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
         " ) || ( "
         # If not, untar and build
             " ( "
         " ) || ( "
         # If not, untar and build
             " ( "
@@ -66,7 +67,7 @@ def add_ccnd(ec, os_type, peers):
     install = (
         # Evaluate if ccnx binaries are already installed
         " ( "
     install = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -d ${EXP_HOME}/ccnx/bin "
+            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
         " ) || ( "
             "  mkdir -p ${EXP_HOME}/ccnx/bin && "
             "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
         " ) || ( "
             "  mkdir -p ${EXP_HOME}/ccnx/bin && "
             "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
@@ -75,11 +76,11 @@ def add_ccnd(ec, os_type, peers):
 
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
 
 
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
 
-    # BASH command -> ' ccndstart 2>&1 ; ccndc add ccnx:/ udp  host ;  ccnr 2>&1 '
-    command = "ccndstart 2>&1 ; "
+    # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp  host ;  ccnr '
+    command = "ccndstart ; "
     peers = map(lambda peer: "ccndc add ccnx:/ udp  %s" % peer, peers)
     command += " ; ".join(peers) + " ; "
     peers = map(lambda peer: "ccndc add ccnx:/ udp  %s" % peer, peers)
     command += " ; ".join(peers) + " ; "
-    command += " ccnr 2>&1 "
+    command += " ccnr "
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", depends)
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", depends)
@@ -104,7 +105,7 @@ def add_publish(ec, movie):
 
 def add_stream(ec):
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
 
 def add_stream(ec):
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
-    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1"
+    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) "
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", "vlc")
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", "vlc")
@@ -117,37 +118,56 @@ def add_stream(ec):
 def get_options():
     slicename = os.environ.get("PL_SLICE")
 
 def get_options():
     slicename = os.environ.get("PL_SLICE")
 
-    usage = "usage: %prog -s <pl-slice> -u <user-2> -m <movie> -l <exp-id>"
+    # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
+    # id_rsa_planetlab exists 
+    default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+    default_key = default_key if os.path.exists(default_key) else None
+    pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
+
+    usage = "usage: %prog -s <pl-slice> -u <username> -m <movie> -l <exp-id> -i <ssh_key>"
 
     parser = OptionParser(usage=usage)
     parser.add_option("-s", "--pl-slice", dest="pl_slice", 
 
     parser = OptionParser(usage=usage)
     parser.add_option("-s", "--pl-slice", dest="pl_slice", 
-            help="PlanetLab slicename", default=slicename, type="str")
-    parser.add_option("-u", "--user-2", dest="user2", 
-            help="User for non PlanetLab machine", type="str")
+            help="PlanetLab slicename", default = slicename, type="str")
+    parser.add_option("-u", "--username", dest="username", 
+            help="User for extra host (non PlanetLab)", type="str")
     parser.add_option("-m", "--movie", dest="movie", 
             help="Stream movie", type="str")
     parser.add_option("-l", "--exp-id", dest="exp_id", 
             help="Label to identify experiment", type="str")
     parser.add_option("-m", "--movie", dest="movie", 
             help="Stream movie", type="str")
     parser.add_option("-l", "--exp-id", dest="exp_id", 
             help="Label to identify experiment", type="str")
+    parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
+            help="Path to private SSH key to be used for connection", 
+            default = pl_ssh_key, type="str")
 
     (options, args) = parser.parse_args()
 
     if not options.movie:
         parser.error("movie is a required argument")
 
 
     (options, args) = parser.parse_args()
 
     if not options.movie:
         parser.error("movie is a required argument")
 
-    return (options.pl_slice, options.user2, options.movie, options.exp_id)
+    return (options.pl_slice, options.username, options.movie, options.exp_id, 
+            options.pl_ssh_key)
 
 if __name__ == '__main__':
 
 if __name__ == '__main__':
-    ( pl_slice, user2, movie, exp_id ) = get_options()
+    ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options()
 
     # Search for available RMs
     populate_factory()
     
 
     # Search for available RMs
     populate_factory()
     
+    # PlanetLab node
     host1 = 'planetlab2.u-strasbg.fr'
     host1 = 'planetlab2.u-strasbg.fr'
+    
+    # Another node 
+    # IMPORTANT NOTE: you must replace this host for another one
+    #       you have access to. You must set up your SSH keys so
+    #       the host can be accessed through SSH without prompting
+    #       for a password. The host must allow X forwarding using SSH.
     host2 = 'roseval.pl.sophia.inria.fr'
 
     host2 = 'roseval.pl.sophia.inria.fr'
 
+    # Create the ExperimentController instance
     ec = ExperimentController(exp_id = exp_id)
 
     ec = ExperimentController(exp_id = exp_id)
 
-    node1 = add_node(ec, host1, pl_slice)
+    # Register a ResourceManager (RM) for the PlanetLab node
+    node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
     
     peers = [host2]
     ccnd1 = add_ccnd(ec, "f12", peers)
     
     peers = [host2]
     ccnd1 = add_ccnd(ec, "f12", peers)
@@ -161,7 +181,7 @@ if __name__ == '__main__':
     ec.register_condition(pub, ResourceAction.START, 
             ccnd1, ResourceState.STARTED)
     
     ec.register_condition(pub, ResourceAction.START, 
             ccnd1, ResourceState.STARTED)
     
-    node2 = add_node(ec, host2, user2)
+    node2 = add_node(ec, host2, username)
     peers = [host1]
     ccnd2 = add_ccnd(ec, "ubuntu", peers)
     ec.register_connection(ccnd2, node2)
     peers = [host1]
     ccnd2 = add_ccnd(ec, "ubuntu", peers)
     ec.register_connection(ccnd2, node2)
@@ -177,10 +197,13 @@ if __name__ == '__main__':
     ec.register_condition(stream, ResourceAction.START, 
             pub, ResourceState.STARTED)
  
     ec.register_condition(stream, ResourceAction.START, 
             pub, ResourceState.STARTED)
  
+    # Deploy all ResourceManagers
     ec.deploy()
 
     ec.deploy()
 
+    # Wait until the applications are finished
     apps = [ccnd1, pub, ccnd2, stream]
     ec.wait_finished(apps)
 
     apps = [ccnd1, pub, ccnd2, stream]
     ec.wait_finished(apps)
 
+    # Shutdown the experiment controller
     ec.shutdown()
 
     ec.shutdown()
 
index 3d46edc..c6b973b 100644 (file)
@@ -79,53 +79,53 @@ class Attribute(object):
 
     @property
     def name(self):
 
     @property
     def name(self):
-    """ Returns the name of the attribute """
+        """ Returns the name of the attribute """
         return self._name
 
     @property
     def default(self):
         return self._name
 
     @property
     def default(self):
-    """ Returns the default value of the attribute """
+        """ Returns the default value of the attribute """
         return self._default
 
     @property
     def type(self):
         return self._default
 
     @property
     def type(self):
-    """ Returns the type of the attribute """
+        """ Returns the type of the attribute """
         return self._type
 
     @property
     def help(self):
         return self._type
 
     @property
     def help(self):
-    """ Returns the help of the attribute """
+        """ Returns the help of the attribute """
         return self._help
 
     @property
     def flags(self):
         return self._help
 
     @property
     def flags(self):
-    """ Returns the flags of the attribute """
+        """ Returns the flags of the attribute """
         return self._flags
 
     @property
     def allowed(self):
         return self._flags
 
     @property
     def allowed(self):
-    """ Returns the allowed value for this attribute """
+        """ Returns the allowed value for this attribute """
         return self._allowed
 
     @property
     def range(self):
         return self._allowed
 
     @property
     def range(self):
-    """ Returns the range of the attribute """
+        """ Returns the range of the attribute """
         return self._range
 
     def has_flag(self, flag):
         return self._range
 
     def has_flag(self, flag):
-    """ Returns true if the attribute has the flag 'flag'
+        """ Returns true if the attribute has the flag 'flag'
 
         :param flag: Flag that need to be ckecked
         :type flag: Flags
 
         :param flag: Flag that need to be ckecked
         :type flag: Flags
-    """
+        """
         return (self._flags & flag) == flag
 
     def get_value(self):
         return (self._flags & flag) == flag
 
     def get_value(self):
-    """ Returns the value of the attribute """
+        """ Returns the value of the attribute """
         return self._value
 
     def set_value(self, value):
         return self._value
 
     def set_value(self, value):
-    """ Change the value of the attribute after checking the type """
+        """ Change the value of the attribute after checking the type """
         valid = True
 
         if self.type == Types.Enumerate:
         valid = True
 
         if self.type == Types.Enumerate:
index 34c5c50..ba573c4 100644 (file)
@@ -439,6 +439,9 @@ class ExperimentController(object):
         if not group:
             group = self.resources
 
         if not group:
             group = self.resources
 
+        if isinstance(group, int):
+            group = [group]
+
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
         # It is likely that the user inserted in the 'group' list closely
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
         # It is likely that the user inserted in the 'group' list closely
@@ -450,7 +453,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        #random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
 
         def wait_all_and_start(group):
             reschedule = False
@@ -467,7 +470,7 @@ class ExperimentController(object):
                 # If all resources are read, we schedule the start
                 for guid in group:
                     rm = self.get_resource(guid)
                 # If all resources are read, we schedule the start
                 for guid in group:
                     rm = self.get_resource(guid)
-                    self.schedule("0.01s", rm.start_with_conditions)
+                    self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
             # Schedule the function that will check all resources are
 
         if wait_all_ready:
             # Schedule the function that will check all resources are
@@ -479,7 +482,7 @@ class ExperimentController(object):
 
         for guid in group:
             rm = self.get_resource(guid)
 
         for guid in group:
             rm = self.get_resource(guid)
-            self.schedule("0.001s", rm.deploy)
+            self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
@@ -518,12 +521,16 @@ class ExperimentController(object):
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
-        It means : Release all the resources and stop the scheduler
+        Releases all the resources and stops task processing thread
 
         """
         self.release()
 
 
         """
         self.release()
 
-        self._stop_scheduler()
+        # Mark the EC state as TERMINATED
+        self._state = ECState.TERMINATED
+
+        # Notify condition to wake up the processing thread
+        self._notify()
         
         if self._thread.is_alive():
            self._thread.join()
         
         if self._thread.is_alive():
            self._thread.join()
@@ -554,18 +561,28 @@ class ExperimentController(object):
             self._tasks[task.id] = task
   
         # Notify condition to wake up the processing thread
             self._tasks[task.id] = task
   
         # Notify condition to wake up the processing thread
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
+        self._notify()
 
         return task.id
      
     def _process(self):
 
         return task.id
      
     def _process(self):
-        """ Process at executing the task that are in the scheduler.
+        """ Process scheduled tasks.
+
+        The _process method is executed in an independent thread held by the 
+        ExperimentController for as long as the experiment is running.
+        
+        Tasks are scheduled by invoking the schedule method with a target callback. 
+        The schedule method is givedn a execution time which controls the
+        order in which tasks are processed. 
+
+        Tasks are processed in parallel using multithreading. 
+        The environmental variable NEPI_NTHREADS can be used to control
+        the number of threads used to process tasks. The default value is 50.
 
         """
 
         """
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
 
-        runner = ParallelRun(maxthreads = 50)
+        runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         try:
         runner.start()
 
         try:
@@ -602,18 +619,18 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-   
-        # Mark EC state as terminated
-        if self.ecstate == ECState.RUNNING:
-            # Synchronize to get errors if occurred
+        finally:   
             runner.sync()
             runner.sync()
-            self._state = ECState.TERMINATED
 
     def _execute(self, task):
 
     def _execute(self, task):
-        """ Invoke the callback of the task 'task'
+        """ Executes a single task. 
+
+            If the invokation of the task callback raises an
+            exception, the processing thread of the ExperimentController
+            will be stopped and the experiment will be aborted.
 
 
-            :param task: Id of the task
-            :type task: int
+            :param task: Object containing the callback to execute
+            :type task: Task
 
         """
         # Invoke callback
 
         """
         # Invoke callback
@@ -629,22 +646,21 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            self._stop_scheduler()
+            # Set the EC to FAILED state (this will force to exit the task
+            # processing thread)
+            self._state = ECState.FAILED
+
+            # Notify condition to wake up the processing thread
+            self._notify()
 
             # Propage error to the ParallelRunner
             raise
 
 
             # Propage error to the ParallelRunner
             raise
 
-    def _stop_scheduler(self):
-        """ Stop the scheduler and put the EC into a FAILED State.
-
+    def _notify(self):
+        """ Awakes the processing thread in case it is blocked waiting
+        for a new task to be scheduled.
         """
         """
-
-        # Mark the EC as failed
-        self._state = ECState.FAILED
-
-        # Wake up the EC in case it was sleeping
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
 
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
 
-
index 423ca92..078b07d 100644 (file)
@@ -627,17 +627,16 @@ class ResourceFactory(object):
         return rclass(ec, guid)
 
 def populate_factory():
         return rclass(ec, guid)
 
 def populate_factory():
-        """Register all the possible RM that exists in the current version of Nepi.
-
-        """
+    """Register all the possible RM that exists in the current version of Nepi.
+    """
     for rclass in find_types():
         ResourceFactory.register_type(rclass)
 
 def find_types():
     for rclass in find_types():
         ResourceFactory.register_type(rclass)
 
 def find_types():
-        """Look into the different folders to find all the 
-        availables Resources Managers
+    """Look into the different folders to find all the 
+    availables Resources Managers
 
 
-        """
+    """
     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
     search_path = set(search_path.split(" "))
    
     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
     search_path = set(search_path.split(" "))
    
index 1d16242..3b22eaf 100644 (file)
@@ -45,11 +45,11 @@ class Trace(object):
 
     @property
     def name(self):
 
     @property
     def name(self):
-    """ Returns the name of the trace """
+        """ Returns the name of the trace """
         return self._name
 
     @property
     def help(self):
         return self._name
 
     @property
     def help(self):
-    """ Returns the help of the trace """
+        """ Returns the help of the trace """
         return self._help
 
         return self._help
 
index 080c3a4..5f4c998 100644 (file)
@@ -21,16 +21,13 @@ from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux.node import LinuxNode
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs 
+from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
-reschedule_delay = "0.5s"
-state_check_delay = 1
-
 # TODO: Resolve wildcards in commands!!
 # TODO: Resolve wildcards in commands!!
-# TODO: If command is not set give a warning but do not generate an error!
+
 
 @clsinit
 class LinuxApplication(ResourceManager):
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -215,25 +212,21 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
         # Install
         self.install()
 
+        # Upload command
         command = self.get("command")
         x11 = self.get("forwardX11")
         command = self.get("command")
         x11 = self.get("forwardX11")
-        if not x11 and command:
+        env = self.get("env")
+        
+        if command and not x11:
             self.info("Uploading command '%s'" % command)
 
             self.info("Uploading command '%s'" % command)
 
-            # Export environment
-            environ = ""
-            if self.get("env"):
-                for var in self.get("env").split(" "):
-                    environ += 'export %s\n' % var
-
-            command = environ + command
-
-            # If the command runs asynchronous, pre upload the command 
-            # to the app.sh file in the remote host
-            dst = os.path.join(self.app_home, "app.sh")
+            # replace application specific paths in the command
             command = self.replace_paths(command)
             command = self.replace_paths(command)
-            self.node.upload(command, dst, text = True)
 
 
+            self.node.upload_command(command, self.app_home, 
+                    shfile = "app.sh",
+                    env = env)
+       
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
@@ -253,25 +246,29 @@ class LinuxApplication(ResourceManager):
                     http_sources.append(source)
                     sources.remove(source)
 
                     http_sources.append(source)
                     sources.remove(source)
 
-            # Download http sources
+            # Download http sources remotely
             if http_sources:
             if http_sources:
-                cmd = " wget -c --directory-prefix=${SOURCES} "
-                verif = ""
+                command = " wget -c --directory-prefix=${SOURCES} "
+                check = ""
 
                 for source in http_sources:
 
                 for source in http_sources:
-                    cmd += " %s " % (source)
-                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                    command += " %s " % (source)
+                    check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
                 
                 
-                # Wget output goes to stderr :S
-                cmd += " 2> /dev/null ; "
-
-                # Add verification
-                cmd += " %s " % verif
+                # Append the command to check that the sources were downloaded
+                command += " ; %s " % check
 
 
+                # replace application specific paths in the command
+                command = self.replace_paths(command)
+                
                 # Upload the command to a file, and execute asynchronously
                 # Upload the command to a file, and execute asynchronously
-                self.upload_and_run(cmd, 
-                        "http_sources.sh", "http_sources_pid", 
-                        "http_sources_out", "http_sources_err")
+                self.node.run_and_wait(command, self.app_home,
+                        shfile = "http_sources.sh",
+                        pidfile = "http_sources_pidfile", 
+                        ecodefile = "http_sources_exitcode", 
+                        stdout = "http_sources_stdout", 
+                        stderr = "http_sources_stderr")
+
             if sources:
                 self.node.upload(sources, self.src_dir)
 
             if sources:
                 self.node.upload(sources, self.src_dir)
 
@@ -299,7 +296,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info(" Installing dependencies %s" % depends)
         depends = self.get("depends")
         if depends:
             self.info(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.app_home)
+            self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
 
     def build(self):
         build = self.get("build")
@@ -309,26 +306,40 @@ class LinuxApplication(ResourceManager):
             # create dir for build
             self.node.mkdir(self.build_dir)
 
             # create dir for build
             self.node.mkdir(self.build_dir)
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(build, 
-                    "build.sh", "build_pid", 
-                    "build_out", "build_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "build.sh",
+                    pidfile = "build_pidfile", 
+                    ecodefile = "build_exitcode", 
+                    stdout = "build_stdout", 
+                    stderr = "build_stderr")
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(install, 
-                    "install.sh", "install_pid", 
-                    "install_out", "install_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "install.sh",
+                    pidfile = "install_pidfile", 
+                    ecodefile = "install_exitcode", 
+                    stdout = "install_stdout", 
+                    stderr = "install_stderr")
 
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
 
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
@@ -352,16 +363,25 @@ class LinuxApplication(ResourceManager):
         x11 = self.get('forwardX11') or False
         failed = False
 
         x11 = self.get('forwardX11') or False
         failed = False
 
-        super(LinuxApplication, self).start()
-
         if not command:
         if not command:
-            self.info("No command to start ")
+            # If no command was given, then the application 
+            # is directly marked as FINISHED
             self._state = ResourceState.FINISHED
             self._state = ResourceState.FINISHED
-            return 
+        else:
+            super(LinuxApplication, self).start()
     
         self.info("Starting command '%s'" % command)
 
         if x11:
     
         self.info("Starting command '%s'" % command)
 
         if x11:
+            # If X11 forwarding was specified, then the application
+            # can not run detached, so instead of invoking asynchronous
+            # 'run' we invoke synchronous 'execute'.
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
+
             if env:
                 # Export environment
                 environ = ""
             if env:
                 # Export environment
                 environ = ""
@@ -392,28 +412,22 @@ class LinuxApplication(ResourceManager):
                 stderr = stderr,
                 sudo = sudo)
 
                 stderr = stderr,
                 sudo = sudo)
 
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
             if proc.poll() and err:
             if proc.poll() and err:
-                failed = True
+                self.error(msg, out, err)
+                raise RuntimeError, msg
         
         
-            if not failed:
-                pid, ppid = self.node.wait_pid(home = self.app_home)
-                if pid: self._pid = int(pid)
-                if ppid: self._ppid = int(ppid)
+            # Check status of process running in background
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
 
 
+            # If the process is not running, check for error information
+            # on the remote machine
             if not self.pid or not self.ppid:
             if not self.pid or not self.ppid:
-                failed = True
-            (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
-
-            if failed or out or chkerr:
-                # check if execution errors occurred
-                msg = " Failed to start command '%s' " % command
-                out = out
-                if err:
-                    err = err
-                elif chkerr:
-                    err = chkerr
-
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
                 self.error(msg, out, err)
 
                 msg2 = " Setting state to Failed"
                 self.error(msg, out, err)
 
                 msg2 = " Setting state to Failed"
@@ -456,10 +470,11 @@ class LinuxApplication(ResourceManager):
         if self._state == ResourceState.STARTED:
             # To avoid overwhelming the remote hosts and the local processor
             # with too many ssh queries, the state is only requested
         if self._state == ResourceState.STARTED:
             # To avoid overwhelming the remote hosts and the local processor
             # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' .
+            # every 'state_check_delay' seconds.
+            state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+                (out, err), proc = self.node.check_errors(self.app_home)
 
                 if out or err:
                     if err.find("No such file or directory") >= 0 :
 
                 if out or err:
                     if err.find("No such file or directory") >= 0 :
@@ -474,7 +489,7 @@ class LinuxApplication(ResourceManager):
                 elif self.pid and self.ppid:
                     status = self.node.status(self.pid, self.ppid)
 
                 elif self.pid and self.ppid:
                     status = self.node.status(self.pid, self.ppid)
 
-                    if status == sshfuncs.FINISHED:
+                    if status == ProcStatus.FINISHED:
                         self._state = ResourceState.FINISHED
 
 
                         self._state = ResourceState.FINISHED
 
 
@@ -482,18 +497,6 @@ class LinuxApplication(ResourceManager):
 
         return self._state
 
 
         return self._state
 
-    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
-        dst = os.path.join(self.app_home, fname)
-        cmd = self.replace_paths(cmd)
-        self.node.upload(cmd, dst, text = True)
-
-        cmd = "bash ./%s" % fname
-        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
-            pidfile = pidfile,
-            stdout = outfile, 
-            stderr = errfile, 
-            raise_on_error = True)
-
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.
index 8387def..0f3a01c 100644 (file)
@@ -20,7 +20,8 @@
 from nepi.execution.attribute import Attribute, Flags
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.execution.attribute import Attribute, Flags
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
-from nepi.util import sshfuncs, execfuncs 
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
 
 import collections
 import os
 
 import collections
 import os
@@ -37,6 +38,15 @@ import threading
 
 reschedule_delay = "0.5s"
 
 
 reschedule_delay = "0.5s"
 
+class ExitCode:
+    """
+    Error codes that the rexitcode function can return if unable to
+    check the exit code of a spawned process
+    """
+    FILENOTFOUND = -1
+    CORRUPTFILE = -2
+    ERROR = -3
+    OK = 0
 
 @clsinit
 class LinuxNode(ResourceManager):
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -264,46 +274,46 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
+    def install_packages(self, packages, home):
+        command = ""
         if self.os in ["f12", "f14"]:
         if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.install_packages_command(self.os, packages)
+            command = rpmfuncs.install_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
         elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.install_packages_command(self.os, packages)
+            command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
         else:
             msg = "Error installing packages ( OS not known ) "
             self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "instpkg_pid",
-            stdout = "instpkg_out", 
-            stderr = "instpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "instpkg.sh",
+            pidfile = "instpkg_pidfile",
+            ecodefile = "instpkg_exitcode",
+            stdout = "instpkg_stdout", 
+            stderr = "instpkg_stderr",
             raise_on_error = True)
 
         return (out, err), proc 
 
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
+    def remove_packages(self, packages, home):
+        command = ""
         if self.os in ["f12", "f14"]:
         if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.remove_packages_command(self.os, packages)
+            command = rpmfuncs.remove_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
         elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.remove_packages_command(self.os, packages)
+            command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
         else:
             msg = "Error removing packages ( OS not known ) "
             self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "rmpkg_pid",
-            stdout = "rmpkg_out", 
-            stderr = "rmpkg_err",
+        (out, err), proc = self.run_and_wait(command, home, 
+            shfile = "rmpkg.sh",
+            pidfile = "rmpkg_pidfile",
+            ecodefile = "rmpkg_exitcode",
+            stdout = "rmpkg_stdout", 
+            stderr = "rmpkg_stderr",
             raise_on_error = True)
          
         return (out, err), proc 
             raise_on_error = True)
          
         return (out, err), proc 
@@ -316,22 +326,27 @@ class LinuxNode(ResourceManager):
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
 
     def rmdir(self, path):
         return self.execute("rm -rf %s" % path, with_lock = True)
-
-    def run_and_wait(self, command, 
-            home = ".", 
-            pidfile = "pid", 
+        
+    def run_and_wait(self, command, home, 
+            shfile = "cmd.sh",
+            pidfile = "pidfile", 
+            ecodefile = "exitcode", 
             stdin = None, 
             stdin = None, 
-            stdout = 'stdout'
-            stderr = 'stderr'
+            stdout = "stdout"
+            stderr = "stderr"
             sudo = False,
             tty = False,
             raise_on_error = False):
             sudo = False,
             tty = False,
             raise_on_error = False):
-        """ runs a command in background on the remote host, but waits
-            until the command finishes execution.
-            This is more robust than doing a simple synchronized 'execute',
-            since in the remote host the command can continue to run detached
-            even if network disconnections occur
+        """ 
+        runs a command in background on the remote host, busy-waiting
+        until the command finishes execution.
+        This is more robust than doing a simple synchronized 'execute',
+        since in the remote host the command can continue to run detached
+        even if network disconnections occur
         """
         """
+        self.upload_command(command, home, shfile, ecodefile)
+
+        command = "bash ./%s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -356,11 +371,11 @@ class LinuxNode(ResourceManager):
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
 
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
-       
-        # check if execution errors occurred
-        (out, err), proc = self.check_output(home, stderr)
+      
+        (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
 
-        if err or out:
+        # Out is what was written in the stderr file
+        if out or err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -368,21 +383,93 @@ class LinuxNode(ResourceManager):
                 raise RuntimeError, msg
         
         return (out, err), proc
                 raise RuntimeError, msg
         
         return (out, err), proc
+
+    def exitcode(self, home, ecodefile = "exitcode"):
+        """
+        Get the exit code of an application.
+        Returns an integer value with the exit code 
+        """
+        (out, err), proc = self.check_output(home, ecodefile)
+
+        # Succeeded to open file, return exit code in the file
+        if proc.wait() == 0:
+            try:
+                return int(out.strip())
+            except:
+                # Error in the content of the file!
+                return ExitCode.CORRUPTFILE
+
+        # No such file or directory
+        if proc.returncode == 1:
+            return ExitCode.FILENOTFOUND
+        
+        # Other error from 'cat'
+        return ExitCode.ERROR
+
+    def upload_command(self, command, home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode",
+            env = None):
+
+        command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
+                'command': command,
+                'ecodefile': ecodefile,
+                } 
+
+        # Export environment
+        environ = ""
+        if env:
+            for var in env.split(" "):
+                environ += 'export %s\n' % var
+
+        command = environ + command
+
+        dst = os.path.join(home, shfile)
+        return self.upload(command, dst, text = True)
+
+    def check_errors(self, home, 
+            ecodefile = "exitcode", 
+            stderr = "stderr"):
+        """
+        Checks whether errors occurred while running a command.
+        It first checks the exit code for the command, and only if the
+        exit code is an error one it returns the error output.
+        """
+        out = err = ""
+        proc = None
+
+        # get Exit code
+        ecode = self.exitcode(home, ecodefile)
+
+        if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+            err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+        elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+            # The process returned an error code or didn't exist. 
+            # Check standard error.
+            (out, err), proc = self.check_output(home, stderr)
+            
+            # If the stderr file was not found, assume nothing happened.
+            # We just ignore the error.
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
+                err = ""
+       
+        return (out, err), proc
  
  
-    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+    def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
         """ Waits until the pid file for the command is generated, 
             and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
-        for i in xrange(5):
-            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+        for i in xrange(4):
+            pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
             
             if pidtuple:
                 pid, ppid = pidtuple
                 break
             else:
                 time.sleep(delay)
-                delay = min(30,delay*1.2)
+                delay = delay * 1.5
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
         else:
             msg = " Failed to get pid for pidfile %s/%s " % (
                     home, pidfile )
@@ -395,30 +482,26 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        delay = 1.0
-        first = True
-        bustspin = 0
+        start_delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
             
 
         while True:
             status = self.status(pid, ppid)
             
-            if status is sshfuncs.FINISHED:
+            if status is ProcStatus.FINISHED:
                 break
                 break
-            elif status is not sshfuncs.RUNNING:
-                bustspin += 1
-                time.sleep(delay*(5.5+random.random()))
-                if bustspin > 12:
+            elif status is not ProcStatus.RUNNING:
+                delay = delay * 1.5
+                time.sleep(delay)
+                # If it takes more than 20 seconds to start, then
+                # asume something went wrong
+                if delay > 20:
                     break
             else:
                     break
             else:
-                if first:
-                    first = False
-
-                time.sleep(delay*(0.5+random.random()))
-                delay = min(30,delay*1.2)
-                bustspin = 0
+                # The app is running, just wait...
+                time.sleep(0.5)
 
     def check_output(self, home, filename):
 
     def check_output(self, home, filename):
-        """ checks file content """
+        """ Retrives content of file """
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
         (out, err), proc = self.execute("cat %s" % 
             os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
@@ -448,7 +531,7 @@ class LinuxNode(ResourceManager):
 
     def copy(self, src, dst):
         if self.localhost:
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc =  execfuncs.lcopy(source, dest, 
+            (out, err), proc = execfuncs.lcopy(source, dest, 
                     recursive = True,
                     strict_host_checking = False)
         else:
                     recursive = True,
                     strict_host_checking = False)
         else:
@@ -533,16 +616,15 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-    def run(self, command, 
-            home = None,
+    def run(self, command, home,
             create_home = False,
             create_home = False,
-            pidfile = "pid",
+            pidfile = 'pidfile',
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
-
+        
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
@@ -555,10 +637,8 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
                     sudo = sudo,
                     user = user) 
         else:
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
             with self._lock:
             with self._lock:
-                (out,err), proc = sshfuncs.rspawn(
+                (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
                     command,
                     pidfile = pidfile,
                     home = home,
@@ -578,12 +658,12 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-    def checkpid(self, home = ".", pidfile = "pid"):
+    def getpid(self, home, pidfile = "pidfile"):
         if self.localhost:
         if self.localhost:
-            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
             with self._lock:
         else:
             with self._lock:
-                pidtuple = sshfuncs.rcheckpid(
+                pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -594,7 +674,7 @@ class LinuxNode(ResourceManager):
                     )
         
         return pidtuple
                     )
         
         return pidtuple
-    
+
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
@@ -617,7 +697,7 @@ class LinuxNode(ResourceManager):
         proc = None
         status = self.status(pid, ppid)
 
         proc = None
         status = self.status(pid, ppid)
 
-        if status == sshfuncs.RUNNING:
+        if status == sshfuncs.ProcStatus.RUNNING:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
@@ -632,12 +712,6 @@ class LinuxNode(ResourceManager):
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
-        return (out, err), proc
 
 
-    def check_bad_host(self, out, err):
-        badre = re.compile(r'(?:'
-                           r'|Error: disk I/O error'
-                           r')', 
-                           re.I)
-        return badre.search(out) or badre.search(err)
+        return (out, err), proc
 
 
index 13941bd..d832fb6 100644 (file)
@@ -29,7 +29,7 @@ def install_packages_command(os, packages):
     cmd = "( %s )" % install_rpmfusion_command(os)
     for p in packages:
         cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
     cmd = "( %s )" % install_rpmfusion_command(os)
     for p in packages:
         cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
-            'package': p}
+                    'package': p}
     
     #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
     return " ( %s )" % cmd 
     
     #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
     return " ( %s )" % cmd 
@@ -42,7 +42,7 @@ def remove_packages_command(os, packages):
     for p in packages:
         cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
                     'package': p}
     for p in packages:
         cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
                     'package': p}
-    
+
     #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
     return cmd 
 
     #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
     return cmd 
 
index 773465c..65cea4e 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT 
+from nepi.util.sshfuncs import ProcStatus, STDOUT
 
 import subprocess
 
 
 import subprocess
 
@@ -134,7 +134,7 @@ def lspawn(command, pidfile,
 
     return (out,err),proc
 
 
     return (out,err),proc
 
-def lcheckpid(pidfile):
+def lgetpid(pidfile):
     """
     Check the pidfile of a process spawned with remote_spawn.
     
     """
     Check the pidfile of a process spawned with remote_spawn.
     
@@ -179,14 +179,14 @@ def lstatus(pid, ppid):
         })
     
     if proc.wait():
         })
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if out:
         status = (out.strip() == 'wait')
     else:
     
     status = False
     if out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
  
 
 def lkill(pid, ppid, sudo = False):
  
 
 def lkill(pid, ppid, sudo = False):
index a88dc78..50f5d7c 100644 (file)
@@ -57,20 +57,18 @@ class STDOUT:
     redirect to whatever stdout was redirected to.
     """
 
     redirect to whatever stdout was redirected to.
     """
 
-class RUNNING:
+class ProcStatus:
     """
     """
-    Process is still running
+    Codes for status of remote spawned process
     """
     """
+    # Process is still running
+    RUNNING = 1
 
 
-class FINISHED:
-    """
-    Process is finished
-    """
-
-class NOT_STARTED:
-    """
-    Process hasn't started running yet (this should be very rare)
-    """
+    # Process is finished
+    FINISHED = 2
+    
+    # Process hasn't started running yet (this should be very rare)
+    NOT_STARTED = 3
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
@@ -511,6 +509,9 @@ def rcopy(source, dest,
         tmp_known_hosts = None
 
         args = ['scp', '-q', '-p', '-C',
         tmp_known_hosts = None
 
         args = ['scp', '-q', '-p', '-C',
+                # Speed up transfer using blowfish cypher specification which is 
+                # faster than the default one (3des)
+                '-c', 'blowfish',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
                 '-o', 'ConnectTimeout=60',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
                 '-o', 'ConnectTimeout=60',
@@ -588,7 +589,7 @@ def rcopy(source, dest,
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
-        stdin = '/dev/null', 
+        stdin = '/dev/null',
         home = None, 
         create_home = False, 
         sudo = False,
         home = None, 
         create_home = False, 
         sudo = False,
@@ -600,28 +601,41 @@ def rspawn(command, pidfile,
         server_key = None,
         tty = False):
     """
         server_key = None,
         tty = False):
     """
-    Spawn a remote command such that it will continue working asynchronously.
-    
-    Parameters:
-        command: the command to run - it should be a single line.
-        
-        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
-        
-        stdout: path of a file to redirect standard output to - must be a string.
-            Defaults to /dev/null
-        stderr: path of a file to redirect standard error to - string or the special STDOUT value
-            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
-        stdin: path of a file with input to be piped into the command's standard input
+    Spawn a remote command such that it will continue working asynchronously in 
+    background. 
+
+        :param command: The command to run, it should be a single line.
+        :type command: str
+
+        :param pidfile: Path to a file where to store the pid and ppid of the 
+                        spawned process
+        :type pidfile: str
+
+        :param stdout: Path to file to redirect standard output. 
+                       The default value is /dev/null
+        :type stdout: str
+
+        :param stderr: Path to file to redirect standard error.
+                       If the special STDOUT value is used, stderr will 
+                       be redirected to the same file as stdout
+        :type stderr: str
+
+        :param stdin: Path to a file with input to be piped into the command's standard input
+        :type stdin: str
+
+        :param home: Path to working directory folder. 
+                    It is assumed to exist unless the create_home flag is set.
+        :type home: str
+
+        :param create_home: Flag to force creation of the home folder before 
+                            running the command
+        :type create_home: bool
+        :param sudo: Flag forcing execution with sudo user
+        :type sudo: bool
         
         
-        home: path of a folder to use as working directory - should exist, unless you specify create_home
-        
-        create_home: if True, the home folder will be created first with mkdir -p
-        
-        sudo: whether the command needs to be executed as root
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: touple
+
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
@@ -667,7 +681,7 @@ def rspawn(command, pidfile,
     return ((out, err), proc)
 
 @eintr_retry
     return ((out, err), proc)
 
 @eintr_retry
-def rcheckpid(pidfile,
+def rgetpid(pidfile,
         host = None, 
         port = None, 
         user = None, 
         host = None, 
         port = None, 
         user = None, 
@@ -675,19 +689,21 @@ def rcheckpid(pidfile,
         identity = None,
         server_key = None):
     """
         identity = None,
         server_key = None):
     """
-    Check the pidfile of a process spawned with remote_spawn.
-    
-    Parameters:
-        pidfile: the pidfile passed to remote_span
+    Returns the pid and ppid of a process from a remote file where the 
+    information was stored.
+
+        :param home: Path to directory where the pidfile is located
+        :type home: str
+
+        :param pidfile: Name of file containing the pid information
+        :type pidfile: str
         
         
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: int
         
         
-        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
-        or None if the pidfile isn't valid yet (maybe the process is still starting).
-    """
+        A (pid, ppid) tuple useful for calling rstatus and rkill,
+        or None if the pidfile isn't valid yet (can happen when process is staring up)
 
 
+    """
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
@@ -719,18 +735,17 @@ def rstatus(pid, ppid,
         identity = None,
         server_key = None):
     """
         identity = None,
         server_key = None):
     """
-    Check the status of a process spawned with remote_spawn.
+    Returns a code representing the the status of a remote process
+
+        :param pid: Process id of the process
+        :type pid: int
+
+        :param ppid: Parent process id of process
+        :type ppid: int
     
     
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        host/port/user/agent/identity: see rexec
+        :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
     
     
-    Returns:
-        
-        One of NOT_STARTED, RUNNING, FINISHED
     """
     """
-
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
@@ -746,7 +761,7 @@ def rstatus(pid, ppid,
         )
     
     if proc.wait():
         )
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if err:
     
     status = False
     if err:
@@ -755,8 +770,8 @@ def rstatus(pid, ppid,
     elif out:
         status = (out.strip() == 'wait')
     else:
     elif out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
 
 @eintr_retry
 def rkill(pid, ppid,
 
 @eintr_retry
 def rkill(pid, ppid,
@@ -769,23 +784,21 @@ def rkill(pid, ppid,
         server_key = None, 
         nowait = False):
     """
         server_key = None, 
         nowait = False):
     """
-    Kill a process spawned with remote_spawn.
-    
+    Sends a kill signal to a remote process.
+
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
-    
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        sudo: whether the command was run with sudo - careful killing like this.
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :param pid: Process id of process to be killed
+        :type pid: int
+
+        :param ppid: Parent process id of process to be killed
+        :type ppid: int
+
+        :param sudo: Flag indicating if sudo should be used to kill the process
+        :type sudo: bool
         
         
-        Nothing, should have killed the process
     """
     """
-    
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
index 2a24547..4602148 100755 (executable)
@@ -72,7 +72,7 @@ class ExecuteControllersTestCase(unittest.TestCase):
 
     def test_schedule_exception(self):
         def raise_error():
 
     def test_schedule_exception(self):
         def raise_error():
-            raise RuntimeError, "the error"
+            raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
 
         ec = ExperimentController()
         ec.schedule("2s", raise_error)
 
         ec = ExperimentController()
         ec.schedule("2s", raise_error)
index 75a43d1..1393f6a 100755 (executable)
@@ -34,13 +34,13 @@ import unittest
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
-        self.fedora_user = 'inria_nepi'
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
+        self.fedora_user = "inria_nepi"
 
 
-        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
-        self.ubuntu_user = 'alina'
+        self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+        self.ubuntu_user = "alina"
         
         
-        self.target = 'nepi5.pl.sophia.inria.fr'
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
@@ -69,7 +69,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        stdout = ec.trace(app, 'stdout')
+        stdout = ec.trace(app, "stdout")
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
@@ -102,16 +102,16 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        stdout = ec.trace(app, 'stdout')
-        size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+        stdout = ec.trace(app, "stdout")
+        size = ec.trace(app, "stdout", attr = TraceAttr.SIZE)
         self.assertEquals(len(stdout), size)
         
         self.assertEquals(len(stdout), size)
         
-        block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+        block = ec.trace(app, "stdout", attr = TraceAttr.STREAM, block = 5, offset = 1)
         self.assertEquals(block, stdout[5:10])
 
         self.assertEquals(block, stdout[5:10])
 
-        path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+        path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
         rm = ec.get_resource(app)
         rm = ec.get_resource(app)
-        p = os.path.join(rm.app_home, 'stdout')
+        p = os.path.join(rm.app_home, "stdout")
         self.assertEquals(path, p)
 
         ec.shutdown()
         self.assertEquals(path, p)
 
         ec.shutdown()
@@ -202,7 +202,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(server) == ResourceState.FINISHED)
         self.assertTrue(ec.state(client) == ResourceState.FINISHED)
 
         self.assertTrue(ec.state(server) == ResourceState.FINISHED)
         self.assertTrue(ec.state(client) == ResourceState.FINISHED)
 
-        stdout = ec.trace(client, 'stdout')
+        stdout = ec.trace(client, "stdout")
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
         self.assertTrue(stdout.strip() == "HOLA")
 
         ec.shutdown()
@@ -222,8 +222,8 @@ class LinuxApplicationTestCase(unittest.TestCase):
         ec.set(node, "cleanHome", True)
         ec.set(node, "cleanProcesses", True)
 
         ec.set(node, "cleanHome", True)
         ec.set(node, "cleanProcesses", True)
 
-        sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \
-            "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf"
+        sources = "http://nepi.inria.fr/code/nef/archive/tip.tar.gz " \
+                " http://nepi.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png"
 
         app = ec.register_resource("LinuxApplication")
         ec.set(app, "sources", sources)
 
         app = ec.register_resource("LinuxApplication")
         ec.set(app, "sources", sources)
@@ -237,12 +237,12 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
 
-        err = ec.trace(app, 'http_sources_err')
-        self.assertTrue(err == "")
+        exitcode = ec.trace(app, "http_sources_exitcode")
+        self.assertTrue(exitcode.strip() == "0")
         
         
-        out = ec.trace(app, 'http_sources_out')
-        self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1)
-        self.assertTrue(out.find("nepi_integration_framework.pdf") > -1)
+        out = ec.trace(app, "http_sources_stdout")
+        self.assertTrue(out.find("tip.tar.gz") > -1)
+        self.assertTrue(out.find("connect.png") > -1)
 
         ec.shutdown()
 
 
         ec.shutdown()
 
@@ -277,8 +277,6 @@ class LinuxApplicationTestCase(unittest.TestCase):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
 
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
 
-    # TODO: test compilation, sources, dependencies, etc!!!
-
 if __name__ == '__main__':
     unittest.main()
 
 if __name__ == '__main__':
     unittest.main()
 
index 1a7fa09..259f578 100755 (executable)
@@ -19,8 +19,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
-from nepi.resources.linux.node import LinuxNode
-from nepi.util.sshfuncs import RUNNING, FINISHED
+from nepi.resources.linux.node import LinuxNode, ExitCode
+from nepi.util.sshfuncs import ProcStatus
 
 from test_utils import skipIfNotAlive, skipInteractive, create_node
 
 
 from test_utils import skipIfNotAlive, skipInteractive, create_node
 
@@ -31,27 +31,13 @@ import unittest
 
 class LinuxNodeTestCase(unittest.TestCase):
     def setUp(self):
 
 class LinuxNodeTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
-        self.fedora_user = 'inria_nepi'
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
+        self.fedora_user = "inria_nepi"
 
 
-        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
-        self.ubuntu_user = 'alina'
+        self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+        self.ubuntu_user = "alina"
         
         
-        self.target = 'nepi5.pl.sophia.inria.fr'
-
-    @skipIfNotAlive
-    def t_xterm(self, host, user):
-        node, ec = create_node(host, user)
-
-        node.install_packages('xterm')
-
-        (out, err), proc = node.execute('xterm', forward_x11 = True)
-        
-        self.assertEquals(out, "")
-
-        (out, err), proc = node.remove_packages('xterm')
-        
-        self.assertEquals(out, "")
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_execute(self, host, user):
 
     @skipIfNotAlive
     def t_execute(self, host, user):
@@ -74,14 +60,14 @@ class LinuxNodeTestCase(unittest.TestCase):
         
         command = "ping %s" % self.target
         node.run(command, app_home)
         
         command = "ping %s" % self.target
         node.run(command, app_home)
-        pid, ppid = node.checkpid(app_home)
+        pid, ppid = node.getpid(app_home)
 
         status = node.status(pid, ppid)
 
         status = node.status(pid, ppid)
-        self.assertTrue(status, RUNNING)
+        self.assertTrue(status, ProcStatus.RUNNING)
 
         node.kill(pid, ppid)
         status = node.status(pid, ppid)
 
         node.kill(pid, ppid)
         status = node.status(pid, ppid)
-        self.assertTrue(status, FINISHED)
+        self.assertTrue(status, ProcStatus.FINISHED)
         
         (out, err), proc = node.check_output(app_home, "stdout")
 
         
         (out, err), proc = node.check_output(app_home, "stdout")
 
@@ -91,22 +77,120 @@ class LinuxNodeTestCase(unittest.TestCase):
 
         node.rmdir(app_home)
 
 
         node.rmdir(app_home)
 
+    @skipIfNotAlive
+    def t_exitcode_ok(self, host, user):
+        command = "echo 'OK!'"
+        
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+         
+        (out, err), proc = node.run_and_wait(command, app_home,
+            shfile = "cmd.sh",
+            pidfile = "pid",
+            ecodefile = "exitcode",
+            stdout = "stdout", 
+            stderr = "stderr",
+            raise_on_error = True)
+        # get the pid of the process
+        ecode = node.exitcode(app_home)
+        self.assertEquals(ecode, ExitCode.OK)
+
+    @skipIfNotAlive
+    def t_exitcode_kill(self, host, user):
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+       
+        # Upload command that will not finish
+        command = "ping localhost"
+        (out, err), proc = node.upload_command(command, app_home, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode")
+
+        (out, err), proc = node.run(command, app_home,
+            pidfile = "pidfile",
+            stdout = "stdout", 
+            stderr = "stderr")
+        # Just wait to make sure the ping started
+        time.sleep(5)
+
+        # The process is still running, so no retfile has been created yet
+        ecode = node.exitcode(app_home)
+        self.assertEquals(ecode, ExitCode.FILENOTFOUND)
+        
+        (out, err), proc = node.check_errors(app_home)
+        self.assertEquals(err, "")
+        
+        # Now kill the app
+        pid, ppid = node.getpid(app_home)
+        node.kill(pid, ppid)
+         
+        (out, err), proc = node.check_errors(app_home)
+        self.assertEquals(err, "")
+
+    @skipIfNotAlive
+    def t_exitcode_error(self, host, user):
+        # Try to execute a command that doesn't exist
+        command = "unexistent-command"
+        
+        node, ec = create_node(host, user)
+         
+        app_home = os.path.join(node.exp_home, "my-app")
+        node.mkdir(app_home, clean = True)
+         
+        (out, err), proc = node.run_and_wait(command, app_home,
+            shfile = "cmd.sh",
+            pidfile = "pid",
+            ecodefile = "exitcode",
+            stdout = "stdout", 
+            stderr = "stderr",
+            raise_on_error = False)
+        # get the pid of the process
+        ecode = node.exitcode(app_home)
+        # bash erro 127 - command not found
+        self.assertEquals(ecode, 127)
+        (out, err), proc = node.check_errors(app_home)
+        self.assertNotEquals(out, "")
+
     @skipIfNotAlive
     def t_install(self, host, user):
         node, ec = create_node(host, user)
 
     @skipIfNotAlive
     def t_install(self, host, user):
         node, ec = create_node(host, user)
 
-        (out, err), proc = node.mkdir(node.node_home, clean=True)
+        (out, err), proc = node.mkdir(node.node_home, clean = True)
         self.assertEquals(out, "")
 
         self.assertEquals(out, "")
 
-        (out, err), proc = node.install_packages('gcc')
+        (out, err), proc = node.install_packages("gcc", node.node_home)
         self.assertEquals(out, "")
 
         self.assertEquals(out, "")
 
-        (out, err), proc = node.remove_packages('gcc')
+        (out, err), proc = node.remove_packages("gcc", node.node_home)
         self.assertEquals(out, "")
 
         (out, err), proc = node.rmdir(node.exp_home)
         self.assertEquals(out, "")
 
         self.assertEquals(out, "")
 
         (out, err), proc = node.rmdir(node.exp_home)
         self.assertEquals(out, "")
 
+    @skipIfNotAlive
+    def t_xterm(self, host, user):
+        node, ec = create_node(host, user)
+
+        (out, err), proc = node.mkdir(node.node_home, clean = True)
+        self.assertEquals(out, "")
+        
+        node.install_packages("xterm", node.node_home)
+        self.assertEquals(out, "")
+
+        (out, err), proc = node.execute("xterm", forward_x11 = True)
+        self.assertEquals(out, "")
+
+        (out, err), proc = node.remove_packages("xterm", node.node_home)
+        self.assertEquals(out, "")
+
     @skipIfNotAlive
     def t_compile(self, host, user):
         node, ec = create_node(host, user)
     @skipIfNotAlive
     def t_compile(self, host, user):
         node, ec = create_node(host, user)
@@ -128,7 +212,7 @@ main (void)
         node.upload(prog, dst, text = True)
 
         # install gcc
         node.upload(prog, dst, text = True)
 
         # install gcc
-        node.install_packages('gcc')
+        node.install_packages('gcc', app_home)
 
         # compile the program using gcc
         command = "cd %s; gcc -Wall hello.c -o hello" % app_home
 
         # compile the program using gcc
         command = "cd %s; gcc -Wall hello.c -o hello" % app_home
@@ -147,12 +231,12 @@ main (void)
 
         # retrieve the output file 
         src = os.path.join(app_home, "hello.out")
 
         # retrieve the output file 
         src = os.path.join(app_home, "hello.out")
-        f = tempfile.NamedTemporaryFile(delete=False)
+        f = tempfile.NamedTemporaryFile(delete = False)
         dst = f.name
         node.download(src, dst)
         f.close()
 
         dst = f.name
         node.download(src, dst)
         f.close()
 
-        node.remove_packages('gcc')
+        node.remove_packages("gcc", app_home)
         node.rmdir(app_home)
 
         f = open(dst, "r")
         node.rmdir(app_home)
 
         f = open(dst, "r")
@@ -184,6 +268,24 @@ main (void)
 
     def test_compile_ubuntu(self):
         self.t_compile(self.ubuntu_host, self.ubuntu_user)
 
     def test_compile_ubuntu(self):
         self.t_compile(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_ok_fedora(self):
+        self.t_exitcode_ok(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_ok_ubuntu(self):
+        self.t_exitcode_ok(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_kill_fedora(self):
+        self.t_exitcode_kill(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_kill_ubuntu(self):
+        self.t_exitcode_kill(self.ubuntu_host, self.ubuntu_user)
+
+    def test_exitcode_error_fedora(self):
+        self.t_exitcode_error(self.fedora_host, self.fedora_user)
+
+    def test_exitcode_error_ubuntu(self):
+        self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user)
     
     @skipInteractive
     def test_xterm_ubuntu(self):
     
     @skipInteractive
     def test_xterm_ubuntu(self):
index 3342157..1a55b6b 100755 (executable)
@@ -19,8 +19,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 
-from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
-        RUNNING, FINISHED 
+from nepi.util.sshfuncs import rexec, rcopy, rspawn, rgetpid, rstatus, rkill,\
+        ProcStatus
 
 import getpass
 import unittest
 
 import getpass
 import unittest
@@ -231,7 +231,7 @@ class SSHfuncsTestCase(unittest.TestCase):
 
         time.sleep(2)
 
 
         time.sleep(2)
 
-        (pid, ppid) = rcheckpid(pidfile,
+        (pid, ppid) = rgetpid(pidfile,
                 host = host,
                 user = user,
                 port = env.port,
                 host = host,
                 user = user,
                 port = env.port,
@@ -243,7 +243,7 @@ class SSHfuncsTestCase(unittest.TestCase):
                 port = env.port, 
                 agent = True)
 
                 port = env.port, 
                 agent = True)
 
-        self.assertEquals(status, RUNNING)
+        self.assertEquals(status, ProcStatus.RUNNING)
 
         rkill(pid, ppid,
                 host = host,
 
         rkill(pid, ppid,
                 host = host,
@@ -257,7 +257,7 @@ class SSHfuncsTestCase(unittest.TestCase):
                 port = env.port, 
                 agent = True)
         
                 port = env.port, 
                 agent = True)
         
-        self.assertEquals(status, FINISHED)
+        self.assertEquals(status, ProcStatus.FINISHED)
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':