LinuxApplication: Changed directory structure to store experiment files in the Linux...
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 4 Jul 2013 19:28:55 +0000 (12:28 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 4 Jul 2013 19:28:55 +0000 (12:28 -0700)
15 files changed:
examples/linux/ccn/ccncat_2_nodes.py
examples/linux/ccn/ccncat_extended_ring_topo.py
examples/linux/scalability.py
src/nepi/execution/ec.py
src/nepi/resources/all/collector.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/node.py
src/nepi/util/sshfuncs.py
test/resources/linux/application.py
test/resources/linux/node.py

index 55133da..60d4362 100755 (executable)
@@ -71,14 +71,14 @@ def add_ccnd(ec, os_type, peers):
     build = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+            "  test -f ${BIN}/ccnx-0.7.1/bin/ccnd"
         " ) || ( "
         # If not, untar and build
             " ( "
-                " mkdir -p ${SOURCES}/ccnx && "
-                " tar xf ${SOURCES}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SOURCES}/ccnx "
+                " mkdir -p ${SRC}/ccnx-0.7.1 && "
+                " tar xf ${SRC}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SRC}/ccnx-0.7.1 "
              " ) && "
-                "cd ${SOURCES}/ccnx && "
+                "cd ${SRC}/ccnx-0.7.1 && "
                 # Just execute and silence warnings...
                 "( ./configure && make ) "
          " )") 
@@ -86,14 +86,14 @@ def add_ccnd(ec, os_type, peers):
     install = (
         # Evaluate if ccnx binaries are already installed
         " ( "
-            "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+            "  test -f ${BIN}/ccnx-0.7.1/bin/ccnd"
         " ) || ( "
-            "  mkdir -p ${EXP_HOME}/ccnx/bin && "
-            "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+            "  mkdir -p ${BIN}/ccnx-0.7.1/bin && "
+            "  cp -r ${SRC}/ccnx-0.7.1/bin ${BIN}/ccnx-0.7.1"
         " )"
     )
 
-    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+    env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
 
     # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp  host ;  ccnr '
     command = "ccndstart && "
@@ -112,7 +112,7 @@ def add_ccnd(ec, os_type, peers):
     return app
 
 def add_publish(ec, movie):
-    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+    env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
     command = "ccnseqwriter -r ccnx:/VIDEO"
 
     app = ec.register_resource("LinuxApplication")
@@ -123,7 +123,7 @@ def add_publish(ec, movie):
     return app
 
 def add_stream(ec):
-    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+    env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
     command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) "
 
     app = ec.register_resource("LinuxApplication")
index 08625ea..e91351a 100755 (executable)
@@ -109,7 +109,7 @@ def get_options():
     default_key = default_key if os.path.exists(default_key) else None
     pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
 
-    usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results"
+    usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results>"
 
     parser = OptionParser(usage=usage)
     parser.add_option("-s", "--pl-user", dest="pl_user", 
index ffc8535..0cd06ec 100755 (executable)
@@ -79,7 +79,7 @@ if __name__ == '__main__':
              #"planetlab1.uc3m.es",
              #"planetlab2.um.es",
              "planet1.servers.ua.pt",
-             "planetlab2.fct.ualg.pt",
+             #"planetlab2.fct.ualg.pt",
              "planetlab-1.tagus.ist.utl.pt",
              "planetlab-2.tagus.ist.utl.pt",
              "planetlab-um00.di.uminho.pt",
@@ -90,9 +90,9 @@ if __name__ == '__main__':
              "planetlab1.pjwstk.edu.pl",
              "ple2.tu.koszalin.pl",
              "planetlab2.ci.pwr.wroc.pl",
-             "planetlab2.cyfronet.pl",
+             #"planetlab2.cyfronet.pl",
              "plab2.ple.silweb.pl",
-             "planetlab1.cyfronet.pl",
+             #"planetlab1.cyfronet.pl",
              "plab4.ple.silweb.pl",
              "ple2.dmcs.p.lodz.pl",
              "planetlab2.pjwstk.edu.pl",
@@ -102,14 +102,14 @@ if __name__ == '__main__':
              "planetlab-1.ing.unimo.it",
              "gschembra4.diit.unict.it",
              "iraplab1.iralab.uni-karlsruhe.de",
-             "planetlab-1.fokus.fraunhofer.de",
+             #"planetlab-1.fokus.fraunhofer.de",
              "iraplab2.iralab.uni-karlsruhe.de",
              "planet2.zib.de",
              #"pl2.uni-rostock.de",
              "onelab-1.fhi-fokus.de",
              "planet2.l3s.uni-hannover.de",
              "planetlab1.exp-math.uni-essen.de",
-             "planetlab-2.fokus.fraunhofer.de",
+             #"planetlab-2.fokus.fraunhofer.de",
              "planetlab02.tkn.tu-berlin.de",
              "planetlab1.informatik.uni-goettingen.de",
              "planetlab1.informatik.uni-erlangen.de",
@@ -148,7 +148,7 @@ if __name__ == '__main__':
              "orval.infonet.fundp.ac.be",
              "rochefort.infonet.fundp.ac.be",
             ]
+
     ec = ExperimentController(exp_id = exp_id)
 
     for host in hostnames:
index c323e36..5ec3411 100644 (file)
@@ -27,14 +27,15 @@ import threading
 
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
-from nepi.util.timefuncs import tnow, tdiffsec, stabsformat 
+from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
+# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 class ECState(object):
     """ State of the Experiment Controller
@@ -48,37 +49,72 @@ class ExperimentController(object):
     """
     .. class:: Class Args :
       
-        :param exp_id: Human readable identifier for the experiment. 
-                        It will be used in the name of the directory 
+        :param exp_id: Human readable identifier for the experiment scenario
+                       It will be used in the name of the directory 
                         where experiment related information is stored
-        :type exp_id: int
-
-        :param root_dir: Root directory where experiment specific folder
-                         will be created to store experiment information
-        :type root_dir: str
+        :type exp_id: str
 
     .. note::
+
+        An experiment, or scenario, is defined by a concrete use, behavior,
+        configuration and interconnection of resources that describe a single
+        experiment case (We call this the experiment description). 
+        A same experiment (scenario) can be run many times.
+
         The ExperimentController (EC), is the entity responsible for 
-        managing a single experiment. 
+        managing an experiment instance (run). The same scenario can be 
+        recreated (and re-run) by instantiating an EC and recreating 
+        the same experiment description. 
+
+        In NEPI, an experiment is represented as a graph of interconnected
+        resources. A resource is a generic concept in the sense that any
+        component taking part of an experiment, whether physical of
+        virtual, is considered a resource. A resources could be a host, 
+        a virtual machine, an application, a simulator, a IP address.
+
+        A ResourceManager (RM), is the entity responsible for managing a 
+        single resource. ResourceManagers are specific to a resource
+        type (i.e. An RM to control a Linux application will not be
+        the same as the RM used to control a ns-3 simulation).
+        In order for a new type of resource to be supported in NEPI
+        a new RM must be implemented. NEPI already provides different
+        RMs to control basic resources, and new can be extended from
+        the existing ones.
+
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe the experiment.
-        
-        Only when the 'deploy()' method is invoked, the EC will take actions
-        to transform the 'described' experiment into a 'running' experiment.
+        configure them and interconnect them, in order to describe an experiment.
+        Describing an experiment through the EC does not run the experiment.
+        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
         create/configure/connect RMs, and to deploy them to involve new
-        resources in the experiment.
-
+        resources in the experiment (this is known as 'interactive' deployment).
+        
+        An experiments in NEPI is identified by a string id, 
+        which is either given by the user, or automatically generated by NEPI.  
+        The purpose of this identifier is to separate files and results that 
+        belong to different experiment scenarios. 
+        However, since a same 'experiment' can be run many times, the experiment
+        id is not enough to identify an experiment instance (run).
+        For this reason, the ExperimentController has two identifier, the 
+        exp_id, which can be re-used by different ExperimentController instances,
+        and the run_id, which unique to a ExperimentController instance, and
+        is automatically generated by NEPI.
+        
     """
 
-    def __init__(self, exp_id = None, root_dir = "/tmp"): 
+    def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
         # root directory to store files
-        self._root_dir = root_dir
 
-        # experiment identifier given by the user
-        self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+        # Run identifier. It identifies a concrete instance (run) of an experiment.
+        # Since a same experiment (same configuration) can be run many times,
+        # this id permits to identify concrete exoeriment run
+        self._run_id = tsformat()
+
+        # Experiment identifier. Usually assigned by the user
+        self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
@@ -120,13 +156,17 @@ class ExperimentController(object):
 
     @property
     def exp_id(self):
-        """ Return the experiment ID
+        """ Return the experiment id assigned by the user
+
+        """
+        return self._exp_id
+
+    @property
+    def run_id(self):
+        """ Return the experiment instance (run) identifier  
 
         """
-        exp_id = self._exp_id
-        if not exp_id.startswith("nepi-"):
-            exp_id = "nepi-" + exp_id
-        return exp_id
+        return self._run_id
 
     @property
     def finished(self):
index a9f7667..407d0bb 100644 (file)
@@ -22,7 +22,6 @@ from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
         ResourceAction
 from nepi.util.sshfuncs import ProcStatus
-from nepi.util.timefuncs import tsformat
 
 import os
 import tempfile
@@ -73,8 +72,7 @@ class Collector(ResourceManager):
             raise RuntimeError, msg
 
         store_dir = self.get("storeDir")
-        timestamp = tsformat()
-        self._store_path = os.path.join(store_dir, self.ec.exp_id, timestamp)
+        self._store_path = os.path.join(store_dir, self.ec.exp_id, self.ec.run_id)
         
         msg = "Creating local directory at %s to store %s traces " % (
             store_dir, trace_name)
index 6c7f82c..d0a8c32 100644 (file)
@@ -29,16 +29,68 @@ import os
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
-# TODO: compare_hash for all files that are uploaded!
+# TODO: During provisioning, everything that is not scp could be
+#       uploaded to a same script, http_sources download, etc...
+#       and like that require performing less ssh connections!!!
 
 
 @clsinit
 class LinuxApplication(ResourceManager):
+    """
+    .. class:: Class Args :
+      
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+
+    .. note::
+
+    A LinuxApplication RM represents a process that can be executed in
+    a remote Linux host using SSH.
+
+    The LinuxApplication RM takes care of uploadin sources and any files
+    needed to run the experiment, to the remote host. 
+    It also allows to provide source compilation (build) and installation 
+    instructions, and takes care of automating the sources build and 
+    installation tasks for the user.
+
+    It is important to note that files uploaded to the remote host have
+    two possible scopes: single-experiment or multi-experiment.
+    Single experiment files are those that will not be re-used by other 
+    experiments. Multi-experiment files are those that will.
+    Sources and shared files are always made available to all experiments.
+
+    Directory structure:
+
+    The directory structure used by LinuxApplication RM at the Linux
+    host is the following:
+
+        ${HOME}/nepi-usr --> Base directory for multi-experiment files
+                      |
+        ${LIB}        |- /lib --> Base directory for libraries
+        ${BIN}        |- /bin --> Base directory for binary files
+        ${SRC}        |- /src --> Base directory for sources
+        ${SHARE}      |- /share --> Base directory for other files
+
+        ${HOME}/nepi-exp --> Base directory for single-experiment files
+                      |
+        ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
+                          |
+        ${APP_HOME}       |- /<app-guid> --> Base directory for application 
+                               |     specific files (e.g. command.sh, input)
+                               | 
+        ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
+
+    """
+
     _rtype = "LinuxApplication"
 
     @classmethod
     def _register_attributes(cls):
-        command = Attribute("command", "Command to execute", 
+        command = Attribute("command", "Command to execute at application start. "
+                "Note that commands will be executed in the ${RUN_HOME} directory, "
+                "make sure to take this into account when using relative paths. ", 
                 flags = Flags.ExecReadOnly)
         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
                 flags = Flags.ExecReadOnly)
@@ -50,40 +102,48 @@ class LinuxApplication(ResourceManager):
                 "Space-separated list of packages required to run the application",
                 flags = Flags.ExecReadOnly)
         sources = Attribute("sources", 
-                "Space-separated list of regular files to be deployed in the working "
-                "path prior to building. Archives won't be expanded automatically.",
+                "Space-separated list of regular files to be uploaded to ${SRC} "
+                "directory prior to building. Archives won't be expanded automatically. "
+                "Sources are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all sources). ",
+                flags = Flags.ExecReadOnly)
+        files = Attribute("files", 
+                "Space-separated list of regular miscellaneous files to be uploaded "
+                "to ${SHARE} directory. "
+                "Files are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
+                flags = Flags.ExecReadOnly)
+        libs = Attribute("libs", 
+                "Space-separated list of libraries (e.g. .so files) to be uploaded "
+                "to ${LIB} directory. "
+                "Libraries are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
+                flags = Flags.ExecReadOnly)
+        bins = Attribute("bins", 
+                "Space-separated list of binary files to be uploaded "
+                "to ${BIN} directory. "
+                "Binaries are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
                 flags = Flags.ExecReadOnly)
         code = Attribute("code", 
-                "Plain text source code to be uploaded to the server. It will be stored "
-                "under ${SOURCES}/code",
+                "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
                 flags = Flags.ExecReadOnly)
         build = Attribute("build", 
                 "Build commands to execute after deploying the sources. "
-                "Sources will be in the ${SOURCES} folder. "
-                "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
-                "Try to make the commands return with a nonzero exit code on error.\n"
-                "Also, do not install any programs here, use the 'install' attribute. This will "
-                "help keep the built files constrained to the build folder (which may "
-                "not be the home folder), and will result in faster deployment. Also, "
-                "make sure to clean up temporary files, to reduce bandwidth usage between "
-                "nodes when transferring built packages.",
+                "Sources are uploaded to the ${SRC} directory and code "
+                "is uploaded to the ${APP_HOME} directory. \n"
+                "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
+                "./configure && make && make clean.\n"
+                "Make sure to make the build commands return with a nonzero exit "
+                "code on error.",
                 flags = Flags.ReadOnly)
         install = Attribute("install", 
                 "Commands to transfer built files to their final destinations. "
-                "Sources will be in the initial working folder, and a special "
-                "tag ${SOURCES} can be used to reference the experiment's "
-                "home folder (where the application commands will run).\n"
-                "ALL sources and targets needed for execution must be copied there, "
-                "if building has been enabled.\n"
-                "That is, 'slave' nodes will not automatically get any source files. "
-                "'slave' nodes don't get build dependencies either, so if you need "
-                "make and other tools to install, be sure to provide them as "
-                "actual dependencies instead.",
+                "Install commands are executed after build commands. ",
                 flags = Flags.ReadOnly)
-        stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
-        stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
-        stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
-        tear_down = Attribute("tearDown", "Bash script to be executed before "
+        stdin = Attribute("stdin", "Standard input for the 'command'", 
+                flags = Flags.ExecReadOnly)
+        tear_down = Attribute("tearDown", "Command to be executed just before " 
                 "releasing the resource", 
                 flags = Flags.ReadOnly)
 
@@ -94,11 +154,12 @@ class LinuxApplication(ResourceManager):
         cls._register_attribute(depends)
         cls._register_attribute(sources)
         cls._register_attribute(code)
+        cls._register_attribute(files)
+        cls._register_attribute(bins)
+        cls._register_attribute(libs)
         cls._register_attribute(build)
         cls._register_attribute(install)
         cls._register_attribute(stdin)
-        cls._register_attribute(stdout)
-        cls._register_attribute(stderr)
         cls._register_attribute(tear_down)
 
     @classmethod
@@ -122,7 +183,7 @@ class LinuxApplication(ResourceManager):
 
         # timestamp of last state check of the application
         self._last_state_check = tnow()
-    
+
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
                 self.node.get("hostname"), msg)
@@ -138,12 +199,8 @@ class LinuxApplication(ResourceManager):
         return os.path.join(self.node.exp_home, self._home)
 
     @property
-    def src_dir(self):
-        return os.path.join(self.app_home, 'src')
-
-    @property
-    def build_dir(self):
-        return os.path.join(self.app_home, 'build')
+    def run_home(self):
+        return os.path.join(self.app_home, self.ec.run_id)
 
     @property
     def pid(self):
@@ -169,7 +226,7 @@ class LinuxApplication(ResourceManager):
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
-        path = os.path.join(self.app_home, name)
+        path = os.path.join(self.run_home, name)
         
         command = "(test -f %s && echo 'success') || echo 'error'" % path
         (out, err), proc = self.node.execute(command)
@@ -183,7 +240,7 @@ class LinuxApplication(ResourceManager):
             return path
 
         if attr == TraceAttr.ALL:
-            (out, err), proc = self.node.check_output(self.app_home, name)
+            (out, err), proc = self.node.check_output(self.run_home, name)
             
             if err and proc.poll():
                 msg = " Couldn't read trace %s " % name
@@ -210,26 +267,36 @@ class LinuxApplication(ResourceManager):
         return out
             
     def provision(self):
-        # create home dir for application
-        self.node.mkdir(self.app_home)
-
-        # upload sources
-        self.upload_sources()
-
-        # upload code
-        self.upload_code()
-
-        # upload stdin
-        self.upload_stdin()
-
-        # install dependencies
-        self.install_dependencies()
-
-        # build
-        self.build()
-
-        # Install
-        self.install()
+        # create run dir for application
+        self.node.mkdir(self.run_home)
+    
+        steps = [
+            # upload sources
+            self.upload_sources,
+            # upload files
+            self.upload_files,
+            # upload binaries
+            self.upload_binaries,
+            # upload libraries
+            self.upload_libraries,
+            # upload code
+            self.upload_code,
+            # upload stdin
+            self.upload_stdin,
+            # install dependencies
+            self.install_dependencies,
+            # build
+            self.build,
+            # Install
+            self.install]
+
+        # Since provisioning takes a long time, before
+        # each step we check that the EC is still 
+        for step in steps:
+            if self.ec.finished:
+                raise RuntimeError, "EC finished"
+
+            step()
 
         # Upload command to remote bash script
         # - only if command can be executed in background and detached
@@ -245,8 +312,10 @@ class LinuxApplication(ResourceManager):
             env = self.get("env")
             env = env and self.replace_paths(env)
 
-            self.node.upload_command(command, self.app_home, 
-                    shfile = "app.sh",
+            shfile = os.path.join(self.app_home, "app.sh")
+
+            self.node.upload_command(command, 
+                    shfile = shfile,
                     env = env)
        
         self.info("Provisioning finished")
@@ -255,34 +324,36 @@ class LinuxApplication(ResourceManager):
 
     def upload_sources(self):
         sources = self.get("sources")
+
         if sources:
             self.info("Uploading sources ")
 
-            # create dir for sources
-            self.node.mkdir(self.src_dir)
-
             sources = sources.split(' ')
 
-            http_sources = list()
+            # Separate sources that should be downloaded from 
+            # the web, from sources that should be uploaded from
+            # the local machine
+            command = []
             for source in list(sources):
                 if source.startswith("http") or source.startswith("https"):
-                    http_sources.append(source)
+                    # remove the hhtp source from the sources list
                     sources.remove(source)
 
-            # Download http sources remotely
-            if http_sources:
-                command = [" wget -c --directory-prefix=${SOURCES} "]
-                check = []
-
-                for source in http_sources:
-                    command.append(" %s " % (source))
-                    check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
-                
-                command = " ".join(command)
-                check = " ; ".join(check)
-
-                # Append the command to check that the sources were downloaded
-                command += " ; %s " % check
+                    command.append( " ( " 
+                            # Check if the source already exists
+                            " ls ${SRC}/%(basename)s "
+                            " || ( "
+                            # If source doesn't exist, download it and check
+                            # that it it downloaded ok
+                            "   wget -c --directory-prefix=${SRC} %(source)s && "
+                            "   ls ${SRC}/%(basename)s "
+                            " ) ) " % {
+                                "basename": os.path.basename(source),
+                                "source": source
+                                })
+
+            if command:
+                command = " && ".join(command)
 
                 # replace application specific paths in the command
                 command = self.replace_paths(command)
@@ -290,64 +361,78 @@ class LinuxApplication(ResourceManager):
                 # Upload the command to a bash script and run it
                 # in background ( but wait until the command has
                 # finished to continue )
-                self.node.run_and_wait(command, self.app_home,
-                        shfile = "http_sources.sh",
+                self.node.run_and_wait(command, self.run_home,
+                        shfile = os.path.join(self.app_home, "http_sources.sh"),
+                        overwrite = False,
                         pidfile = "http_sources_pidfile", 
                         ecodefile = "http_sources_exitcode", 
                         stdout = "http_sources_stdout", 
                         stderr = "http_sources_stderr")
 
             if sources:
-                self.node.upload(sources, self.src_dir)
+                sources = ' '.join(sources)
+                self.node.upload(sources, self.node.src_dir, overwrite = False)
+
+    def upload_files(self):
+        files = self.get("files")
+
+        if files:
+            self.info("Uploading files %s " % files)
+            self.node.upload(files, self.node.share_dir, overwrite = False)
+
+    def upload_libraries(self):
+        libs = self.get("libs")
+
+        if libs:
+            self.info("Uploading libraries %s " % libaries)
+            self.node.upload(libs, self.node.lib_dir, overwrite = False)
+
+    def upload_binaries(self):
+        bins = self.get("bins")
+
+        if bins:
+            self.info("Uploading binaries %s " % binaries)
+            self.node.upload(bins, self.node.bin_dir, overwrite = False)
 
     def upload_code(self):
         code = self.get("code")
-        if code:
-            # create dir for sources
-            self.node.mkdir(self.src_dir)
 
-            self.info("Uploading code ")
+        if code:
+            self.info("Uploading code")
 
-            dst = os.path.join(self.src_dir, "code")
-            self.node.upload(sources, dst, text = True)
+            dst = os.path.join(self.app_home, "code")
+            self.node.upload(code, dst, overwrite = False, text = True)
 
     def upload_stdin(self):
         stdin = self.get("stdin")
         if stdin:
             # create dir for sources
-            self.info(" Uploading stdin ")
+            self.info("Uploading stdin")
             
             dst = os.path.join(self.app_home, "stdin")
-
-            # If what we are uploading is a file, check whether
-            # the same file already exists (using md5sum)
-            if self.compare_hash(stdin, dst):
-                return
-
-            self.node.upload(stdin, dst, text = True)
+            self.node.upload(stdin, dst, overwrite = False, text = True)
 
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
             self.info("Installing dependencies %s" % depends)
-            self.node.install_packages(depends, self.app_home)
+            self.node.install_packages(depends, self.app_home, self.run_home)
 
     def build(self):
         build = self.get("build")
+
         if build:
             self.info("Building sources ")
             
-            # create dir for build
-            self.node.mkdir(self.build_dir)
-
             # replace application specific paths in the command
             command = self.replace_paths(build)
 
             # Upload the command to a bash script and run it
             # in background ( but wait until the command has
             # finished to continue )
-            self.node.run_and_wait(command, self.app_home,
-                    shfile = "build.sh",
+            self.node.run_and_wait(command, self.run_home,
+                    shfile = os.path.join(self.app_home, "build.sh"),
+                    overwrite = False,
                     pidfile = "build_pidfile", 
                     ecodefile = "build_exitcode", 
                     stdout = "build_stdout", 
@@ -355,6 +440,7 @@ class LinuxApplication(ResourceManager):
  
     def install(self):
         install = self.get("install")
+
         if install:
             self.info("Installing sources ")
 
@@ -364,8 +450,9 @@ class LinuxApplication(ResourceManager):
             # Upload the command to a bash script and run it
             # in background ( but wait until the command has
             # finished to continue )
-            self.node.run_and_wait(command, self.app_home,
-                    shfile = "install.sh",
+            self.node.run_and_wait(command, self.run_home,
+                    shfile = os.path.join(self.app_home, "install.sh"),
+                    overwrite = False,
                     pidfile = "install_pidfile", 
                     ecodefile = "install_exitcode", 
                     stdout = "install_stdout", 
@@ -409,10 +496,13 @@ class LinuxApplication(ResourceManager):
 
     def _start_in_foreground(self):
         command = self.get("command")
-        stdin = "stdin" if self.get("stdin") else None
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
 
+        # For a command being executed in foreground, if there is stdin,
+        # it is expected to be text string not a file or pipe
+        stdin = self.get("stdin") or None
+
         # Command will be launched in foreground and attached to the
         # terminal using the node 'execute' in non blocking mode.
 
@@ -440,17 +530,20 @@ class LinuxApplication(ResourceManager):
     def _start_in_background(self):
         command = self.get("command")
         env = self.get("env")
-        stdin = "stdin" if self.get("stdin") else None
-        stdout = "stdout" if self.get("stdout") else "stdout"
-        stderr = "stderr" if self.get("stderr") else "stderr"
         sudo = self.get("sudo") or False
 
-        # Command will be as a daemon in baground and detached from any terminal.
-        # The real command to run was previously uploaded to a bash script
-        # during deployment, now launch the remote script using 'run'
-        # method from the node
-        cmd = "bash ./app.sh"
-        (out, err), proc = self.node.run(cmd, self.app_home, 
+        stdout = "stdout"
+        stderr = "stderr"
+        stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
+                else None
+
+        # Command will be run as a daemon in baground and detached from any
+        # terminal.
+        # The command to run was previously uploaded to a bash script
+        # during deployment, now we launch the remote script using 'run'
+        # method from the node.
+        cmd = "bash %s" % os.path.join(self.app_home, "app.sh")
+        (out, err), proc = self.node.run(cmd, self.run_home, 
             stdin = stdin, 
             stdout = stdout,
             stderr = stderr,
@@ -465,14 +558,14 @@ class LinuxApplication(ResourceManager):
             raise RuntimeError, msg
     
         # Wait for pid file to be generated
-        pid, ppid = self.node.wait_pid(self.app_home)
+        pid, ppid = self.node.wait_pid(self.run_home)
         if pid: self._pid = int(pid)
         if ppid: self._ppid = int(ppid)
 
         # If the process is not running, check for error information
         # on the remote machine
         if not self.pid or not self.ppid:
-            (out, err), proc = self.node.check_errors(self.app_home,
+            (out, err), proc = self.node.check_errors(self.run_home,
                     stderr = stderr) 
 
             # Out is what was written in the stderr file
@@ -557,7 +650,7 @@ class LinuxApplication(ResourceManager):
                 state_check_delay = 0.5
                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
                     # check if execution errors occurred
-                    (out, err), proc = self.node.check_errors(self.app_home)
+                    (out, err), proc = self.node.check_errors(self.run_home)
 
                     if err:
                         msg = " Failed to execute command '%s'" % self.get("command")
@@ -580,49 +673,20 @@ class LinuxApplication(ResourceManager):
         """
         Replace all special path tags with shell-escaped actual paths.
         """
-        def absolute_dir(d):
-            return d if d.startswith("/") else os.path.join("${HOME}", d)
-
         return ( command
-            .replace("${SOURCES}", absolute_dir(self.src_dir))
-            .replace("${BUILD}", absolute_dir(self.build_dir))
-            .replace("${APP_HOME}", absolute_dir(self.app_home))
-            .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
-            .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+            .replace("${USR}", self.node.usr_dir)
+            .replace("${LIB}", self.node.lib_dir)
+            .replace("${BIN}", self.node.bin_dir)
+            .replace("${SRC}", self.node.src_dir)
+            .replace("${SHARE}", self.node.share_dir)
+            .replace("${EXP}", self.node.exp_dir)
+            .replace("${EXP_HOME}", self.node.exp_home)
+            .replace("${APP_HOME}", self.app_home)
+            .replace("${RUN_HOME}", self.run_home)
+            .replace("${NODE_HOME}", self.node.node_home)
+            .replace("${HOME}", self.node.home_dir)
             )
 
-    def compare_hash(self, local, remote):
-        # getting md5sum from remote file
-        (out, err), proc = self.node.execute("md5sum %s " % remote)
-
-        if proc.poll() == 0: #OK
-            if not os.path.isfile(local):
-                # store to a tmp file
-                f = tempfile.NamedTemporaryFile()
-                f.write(local)
-                f.flush()
-                local = f.name
-
-            lproc = subprocess.Popen(["md5sum", local],
-                stdout = subprocess.PIPE,
-                stderr = subprocess.PIPE) 
-
-            # getting md5sum from local file
-            (lout, lerr) = lproc.communicate()
-
-            # files are the same, no need to upload
-            lchk = lout.strip().split(" ")[0]
-            rchk = out.strip().split(" ")[0]
-
-            msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
-                    local, lchk, remote, rchk)
-            self.debug(msg)
-
-            if lchk == rchk:
-                return True
-
-        return False
-
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
index e5ae00e..39ac2a9 100644 (file)
@@ -51,7 +51,7 @@ class LinuxCCNApplication(LinuxApplication):
 
     @property
     def _environment(self):
-        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+        env = "PATH=$PATH:${STORE}/ccnx/bin "
         return env            
        
     def execute_command(self, command, env):
@@ -59,12 +59,7 @@ class LinuxCCNApplication(LinuxApplication):
         command = environ + command
         command = self.replace_paths(command)
 
-        (out, err), proc = self.node.execute(command)
-
-        if proc.poll():
-            self._state = ResourceState.FAILED
-            self.error(msg, out, err)
-            raise RuntimeError, msg
+        return self.node.execute(command)
 
     def valid_connection(self, guid):
         # TODO: Validate!
index e5216d2..8b79974 100644 (file)
@@ -87,7 +87,13 @@ class LinuxCCNContent(LinuxCCNApplication):
             # Run the command as a bash script in the background, 
             # in the host ( but wait until the command has
             # finished to continue )
-            self.execute_command(command, env)
+            (out, err), proc = self.execute_command(command, env)
+
+            if proc.poll():
+                self._state = ResourceState.FAILED
+                msg = "Failed to execute command"
+                self.error(msg, out, err)
+                raise RuntimeError, msg
 
             self.debug("----- READY ---- ")
             self._ready_time = tnow()
index 44c1afb..7c150ea 100644 (file)
@@ -126,6 +126,8 @@ class LinuxCCND(LinuxApplication):
     def __init__(self, ec, guid):
         super(LinuxCCND, self).__init__(ec, guid)
         self._home = "ccnd-%s" % self.guid
+        self._version = None
+        self._environment = None
 
     def deploy(self):
         if not self.node or self.node.state < ResourceState.READY:
@@ -294,15 +296,15 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " test -f ${STORE}/ccnx/bin/ccnd && "
                 " echo 'sources found, nothing to do' "
             " ) || ( "
             # If not, untar and build
                 " ( "
-                    " mkdir -p ${SOURCES}/ccnx && "
-                    " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
+                    " mkdir -p ${STORE}/ccnx && "
+                    " tar xf ${STORE}/%(sources)s --strip-components=1 -C ${STORE}/ccnx "
                  " ) && "
-                    "cd ${SOURCES}/ccnx && "
+                    "cd ${STORE}/ccnx && "
                     # Just execute and silence warnings...
                     " ( ./configure && make ) "
              " )") % ({ 'sources': sources })
@@ -312,12 +314,12 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " test -f ${SOURCES}/ccnx/bin/ccnd && "
                 " echo 'sources found, nothing to do' "
             " ) || ( "
             # If not, install
-                "  mkdir -p ${EXP_HOME}/ccnx/bin && "
-                "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+                "  mkdir -p ${SOURCES}/ccnx/bin && "
+                "  cp -r ${}/ccnx ${STORE}"
             " )"
             )
 
@@ -339,7 +341,7 @@ class LinuxCCND(LinuxApplication):
             "prefix" : "CCND_PREFIX",
             })
 
-        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+        env = "PATH=$PATH:${SOURCES}/ccnx/bin "
         env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), str(self.get(k))) \
             if self.get(k) else "", envs.keys()))
         
index cdf48e3..920b3f9 100644 (file)
@@ -268,7 +268,7 @@ class LinuxCCNR(LinuxCCNApplication):
             "ccnsSyncScope": "CCNS_SYNC_SCOPE",
             })
 
-        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+        env = "PATH=$PATH:${STORE}/ccnx/bin "
         env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
             if self.get(k) else "", envs.keys()))
        
index c6398fe..9f4d1d3 100644 (file)
@@ -82,7 +82,14 @@ class LinuxFIBEntry(LinuxCCNApplication):
             self.info("Deploying command '%s' " % command)
 
             self.node.mkdir(self.app_home)
-            self.execute_command(command, env)
+            (out, err), proc = self.execute_command(command, env)
+
+            if proc.poll():
+                self._state = ResourceState.FAILED
+                msg = "Failed to execute command"
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+
 
             self.debug("----- READY ---- ")
             self._ready_time = tnow()
@@ -109,7 +116,10 @@ class LinuxFIBEntry(LinuxCCNApplication):
             self.info("Stopping command '%s'" % command)
 
             command = self._stop_command
-            self.execute_command(command, env)
+            (out, err), proc = self.execute_command(command, env)
+
+            if proc.poll():
+                pass
 
             self._stop_time = tnow()
             self._state = ResourceState.STOPPED
index 0d2597f..5080a6c 100644 (file)
@@ -31,13 +31,11 @@ import re
 import tempfile
 import time
 import threading
+import traceback
 
-# TODO: Verify files and dirs exists already
-# TODO: Blacklist nodes!
 # TODO: Unify delays!!
 # TODO: Validate outcome of uploads!! 
 
-
 class ExitCode:
     """
     Error codes that the rexitcode function can return if unable to
@@ -165,8 +163,12 @@ class LinuxNode(ResourceManager):
         server_key = Attribute("serverKey", "Server public key", 
                 flags = Flags.ExecReadOnly)
         
-        clean_home = Attribute("cleanHome", "Remove all files and directories " + \
-                " from home folder before starting experiment", 
+        clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
+                " from node home folder before starting experiment", 
+                flags = Flags.ExecReadOnly)
+
+        clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
+                " from a previous same experiment, before the new experiment starts", 
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
@@ -184,12 +186,15 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(identity)
         cls._register_attribute(server_key)
         cls._register_attribute(clean_home)
+        cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
+        # home directory at Linux host
+        self._home_dir = ""
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
@@ -199,17 +204,47 @@ class LinuxNode(ResourceManager):
                 self.get("hostname"), msg)
 
     @property
-    def home(self):
-        return self.get("home") or ""
+    def home_dir(self):
+        home = self.get("home") or ""
+        if not home.startswith("/"):
+           home = os.path.join(self._home_dir, home) 
+        return home
+
+    @property
+    def usr_dir(self):
+        return os.path.join(self.home_dir, "nepi-usr")
+
+    @property
+    def lib_dir(self):
+        return os.path.join(self.usr_dir, "lib")
+
+    @property
+    def bin_dir(self):
+        return os.path.join(self.usr_dir, "bin")
+
+    @property
+    def src_dir(self):
+        return os.path.join(self.usr_dir, "src")
+
+    @property
+    def share_dir(self):
+        return os.path.join(self.usr_dir, "share")
+
+    @property
+    def exp_dir(self):
+        return os.path.join(self.home_dir, "nepi-exp")
 
     @property
     def exp_home(self):
-        return os.path.join(self.home, self.ec.exp_id)
+        return os.path.join(self.exp_dir, self.ec.exp_id)
 
     @property
     def node_home(self):
-        node_home = "node-%d" % self.guid
-        return os.path.join(self.exp_home, node_home)
+        return os.path.join(self.exp_home, "node-%d" % self.guid)
+
+    @property
+    def run_home(self):
+        return os.path.join(self.node_home, self.ec.run_id)
 
     @property
     def os(self):
@@ -259,18 +294,32 @@ class LinuxNode(ResourceManager):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
     def provision(self):
+        # check if host is alive
         if not self.is_alive():
-            self._state = ResourceState.FAILED
+            self.fail()
+            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
 
+        self.find_home()
+
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
             self.clean_home()
-       
+        if self.get("cleanExperiment"):
+            self.clean_experiment()
+    
+        # Create shared directory structure
+        self.mkdir(self.lib_dir)
+        self.mkdir(self.bin_dir)
+        self.mkdir(self.src_dir)
+        self.mkdir(self.share_dir)
+
+        # Create experiment node home directory
         self.mkdir(self.node_home)
 
         super(LinuxNode, self).provision()
@@ -300,6 +349,8 @@ class LinuxNode(ResourceManager):
         if tear_down:
             self.execute(tear_down)
 
+        self.clean_processes()
+
         super(LinuxNode, self).release()
 
     def valid_connection(self, guid):
@@ -327,21 +378,220 @@ class LinuxNode(ResourceManager):
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
     def clean_home(self):
+        """ Cleans all NEPI related folders in the Linux host
+        """
         self.info("Cleaning up home")
         
-        cmd = (
-            # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
-            "find . -maxdepth 1 -name 'nepi-*' " +
-            " -execdir rm -rf {} + "
-            )
+        cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+                self.home_dir )
+
+        return self.execute(cmd, with_lock = True)
+
+    def clean_experiment(self):
+        """ Cleans all experiment related files in the Linux host.
+        It preserves NEPI files and folders that have a multi experiment
+        scope.
+        """
+        self.info("Cleaning up experiment files")
+        
+        cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
+                self.exp_dir,
+                self.ec.exp_id )
             
-        if self.home:
-            cmd = "cd %s ; " % self.home + cmd
+        return self.execute(cmd, with_lock = True)
+
+    def execute(self, command,
+            sudo = False,
+            stdin = None, 
+            env = None,
+            tty = False,
+            forward_x11 = False,
+            timeout = None,
+            retry = 3,
+            err_on_timeout = True,
+            connect_timeout = 30,
+            strict_host_checking = False,
+            persistent = True,
+            blocking = True,
+            with_lock = False
+            ):
+        """ Notice that this invocation will block until the
+        execution finishes. If this is not the desired behavior,
+        use 'run' instead."""
 
+        if self.localhost:
+            (out, err), proc = execfuncs.lexec(command, 
+                    user = user,
+                    sudo = sudo,
+                    stdin = stdin,
+                    env = env)
+        else:
+            if with_lock:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rexec(
+                        command, 
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        stdin = stdin,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey"),
+                        env = env,
+                        tty = tty,
+                        forward_x11 = forward_x11,
+                        timeout = timeout,
+                        retry = retry,
+                        err_on_timeout = err_on_timeout,
+                        connect_timeout = connect_timeout,
+                        persistent = persistent,
+                        blocking = blocking, 
+                        strict_host_checking = strict_host_checking
+                        )
+            else:
+                (out, err), proc = sshfuncs.rexec(
+                    command, 
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    sudo = sudo,
+                    stdin = stdin,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    env = env,
+                    tty = tty,
+                    forward_x11 = forward_x11,
+                    timeout = timeout,
+                    retry = retry,
+                    err_on_timeout = err_on_timeout,
+                    connect_timeout = connect_timeout,
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
+                    )
+
+        return (out, err), proc
+
+    def run(self, command, home,
+            create_home = False,
+            pidfile = 'pidfile',
+            stdin = None, 
+            stdout = 'stdout', 
+            stderr = 'stderr', 
+            sudo = False,
+            tty = False):
+        
+        self.debug("Running command '%s'" % command)
+        
+        if self.localhost:
+            (out, err), proc = execfuncs.lspawn(command, pidfile, 
+                    stdout = stdout, 
+                    stderr = stderr, 
+                    stdin = stdin, 
+                    home = home, 
+                    create_home = create_home, 
+                    sudo = sudo,
+                    user = user) 
+        else:
+            with self._lock:
+                (out, err), proc = sshfuncs.rspawn(
+                    command,
+                    pidfile = pidfile,
+                    home = home,
+                    create_home = create_home,
+                    stdin = stdin if stdin is not None else '/dev/null',
+                    stdout = stdout if stdout else '/dev/null',
+                    stderr = stderr if stderr else '/dev/null',
+                    sudo = sudo,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    tty = tty
+                    )
+
+        return (out, err), proc
+
+    def getpid(self, home, pidfile = "pidfile"):
+        if self.localhost:
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
+        else:
+            with self._lock:
+                pidtuple = sshfuncs.rgetpid(
+                    os.path.join(home, pidfile),
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
+        
+        return pidtuple
+
+    def status(self, pid, ppid):
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            with self._lock:
+                status = sshfuncs.rstatus(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
+           
+        return status
+    
+    def kill(self, pid, ppid, sudo = False):
         out = err = ""
-        (out, err), proc = self.execute(cmd, with_lock = True)
+        proc = None
+        status = self.status(pid, ppid)
+
+        if status == sshfuncs.ProcStatus.RUNNING:
+            if self.localhost:
+                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+            else:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rkill(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
+
+        return (out, err), proc
 
-    def upload(self, src, dst, text = False):
+    def copy(self, src, dst):
+        if self.localhost:
+            (out, err), proc = execfuncs.lcopy(source, dest, 
+                    recursive = True,
+                    strict_host_checking = False)
+        else:
+            with self._lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True,
+                    strict_host_checking = False)
+
+        return (out, err), proc
+
+
+    def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
            src  content to copy. Can be a local file, directory or a list of files
@@ -360,9 +610,17 @@ class LinuxNode(ResourceManager):
             f.close()
             src = f.name
 
+        # If dst files should not be overwritten, check that the files do not
+        # exits already 
+        if overwrite == False:
+            src = self.filter_existing_files(src, dst)
+            if not src:
+                return ("", ""), None 
+
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
         result = self.copy(src, dst)
 
         # clean up temp file
@@ -377,7 +635,12 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home):
+    def install_packages(self, packages, home, run_home = None):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
@@ -388,19 +651,25 @@ class LinuxNode(ResourceManager):
             self.error(msg, self.os)
             raise RuntimeError, msg
 
-        out = err = ""
-        (out, err), proc = self.run_and_wait(command, home, 
-            shfile = "instpkg.sh",
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "instpkg.sh"),
             pidfile = "instpkg_pidfile",
             ecodefile = "instpkg_exitcode",
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
+            overwrite = False,
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home):
-        command = ""
+    def remove_packages(self, packages, home, run_home = None):
+        """ Uninstall packages from the Linux host.
+
+        'home' is the directory to upload the package un-installation script.
+        'run_home' is the directory from where to execute the script.
+        """
         if self.use_rpm:
             command = rpmfuncs.remove_packages_command(self.os, packages)
         elif self.use_deb:
@@ -410,13 +679,15 @@ class LinuxNode(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        out = err = ""
-        (out, err), proc = self.run_and_wait(command, home, 
-            shfile = "rmpkg.sh",
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "rmpkg.sh"),
             pidfile = "rmpkg_pidfile",
             ecodefile = "rmpkg_exitcode",
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
+            overwrite = False,
             raise_on_error = True)
          
         return (out, err), proc 
@@ -433,6 +704,7 @@ class LinuxNode(ResourceManager):
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
             env = None,
+            overwrite = True,
             pidfile = "pidfile", 
             ecodefile = "exitcode", 
             stdin = None, 
@@ -446,12 +718,17 @@ class LinuxNode(ResourceManager):
         Then runs the script detached in background in the host, and
         busy-waites until the script finishes executing.
         """
-        self.upload_command(command, home, 
+
+        if not shfile.startswith("/"):
+            shfile = os.path.join(home, shfile)
+
+        self.upload_command(command, 
             shfile = shfile, 
             ecodefile = ecodefile, 
-            env = env)
+            env = env,
+            overwrite = overwrite)
 
-        command = "bash ./%s" % shfile
+        command = "bash %s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -514,9 +791,10 @@ class LinuxNode(ResourceManager):
         # Other error from 'cat'
         return ExitCode.ERROR
 
-    def upload_command(self, command, home, 
+    def upload_command(self, command, 
             shfile = "cmd.sh",
             ecodefile = "exitcode",
+            overwrite = True,
             env = None):
         """ Saves the command as a bash script file in the remote host, and
         forces to save the exit code of the command execution to the ecodefile
@@ -537,8 +815,7 @@ class LinuxNode(ResourceManager):
         # Add environ to command
         command = environ + command
 
-        dst = os.path.join(home, shfile)
-        return self.upload(command, dst, text = True)
+        return self.upload(command, shfile, text = True, overwrite = overwrite)
 
     def format_environment(self, env, inline = False):
         """Format environmental variables for command to be executed either
@@ -639,215 +916,63 @@ class LinuxNode(ResourceManager):
         return (out, err), proc
 
     def is_alive(self):
+        """ Checks if host is responsive
+        """
         if self.localhost:
             return True
 
         out = err = ""
         try:
-            # TODO: FIX NOT ALIVE!!!!
-            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
+            (out, err), proc = self.execute("echo 'ALIVE'",
+                    retry = 5, 
                     with_lock = True)
         except:
-            import traceback
             trace = traceback.format_exc()
             msg = "Unresponsive host  %s " % err
             self.error(msg, out, trace)
             return False
 
-        if out.strip().startswith('ALIVE'):
+        if out.strip() == "ALIVE":
             return True
         else:
             msg = "Unresponsive host "
             self.error(msg, out, err)
             return False
 
-    def copy(self, src, dst):
-        if self.localhost:
-            (out, err), proc = execfuncs.lcopy(source, dest, 
-                    recursive = True,
-                    strict_host_checking = False)
-        else:
-            with self._lock:
-                (out, err), proc = sshfuncs.rcopy(
-                    src, dst, 
-                    port = self.get("port"),
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    recursive = True,
-                    strict_host_checking = False)
-
-        return (out, err), proc
-
-    def execute(self, command,
-            sudo = False,
-            stdin = None, 
-            env = None,
-            tty = False,
-            forward_x11 = False,
-            timeout = None,
-            retry = 3,
-            err_on_timeout = True,
-            connect_timeout = 30,
-            strict_host_checking = False,
-            persistent = True,
-            blocking = True,
-            with_lock = False
-            ):
-        """ Notice that this invocation will block until the
-        execution finishes. If this is not the desired behavior,
-        use 'run' instead."""
+    def find_home(self):
+        """ Retrieves host home directory
+        """
+        (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
+                    with_lock = True)
 
-        if self.localhost:
-            (out, err), proc = execfuncs.lexec(command, 
-                    user = user,
-                    sudo = sudo,
-                    stdin = stdin,
-                    env = env)
-        else:
-            if with_lock:
-                with self._lock:
-                    (out, err), proc = sshfuncs.rexec(
-                        command, 
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        sudo = sudo,
-                        stdin = stdin,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey"),
-                        env = env,
-                        tty = tty,
-                        forward_x11 = forward_x11,
-                        timeout = timeout,
-                        retry = retry,
-                        err_on_timeout = err_on_timeout,
-                        connect_timeout = connect_timeout,
-                        persistent = persistent,
-                        blocking = blocking, 
-                        strict_host_checking = strict_host_checking
-                        )
-            else:
-                (out, err), proc = sshfuncs.rexec(
-                    command, 
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    sudo = sudo,
-                    stdin = stdin,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    env = env,
-                    tty = tty,
-                    forward_x11 = forward_x11,
-                    timeout = timeout,
-                    retry = retry,
-                    err_on_timeout = err_on_timeout,
-                    connect_timeout = connect_timeout,
-                    persistent = persistent,
-                    blocking = blocking, 
-                    strict_host_checking = strict_host_checking
-                    )
+        if proc.poll():
+            msg = "Imposible to retrieve HOME directory"
+            self.error(msg, out, err)
+            raise RuntimeError, msg
 
-        return (out, err), proc
+        self._home_dir =  out.strip()
 
-    def run(self, command, home,
-            create_home = False,
-            pidfile = 'pidfile',
-            stdin = None, 
-            stdout = 'stdout', 
-            stderr = 'stderr', 
-            sudo = False,
-            tty = False):
-        
-        self.debug("Running command '%s'" % command)
-        
-        if self.localhost:
-            (out, err), proc = execfuncs.lspawn(command, pidfile, 
-                    stdout = stdout, 
-                    stderr = stderr, 
-                    stdin = stdin, 
-                    home = home, 
-                    create_home = create_home, 
-                    sudo = sudo,
-                    user = user) 
-        else:
-            with self._lock:
-                (out, err), proc = sshfuncs.rspawn(
-                    command,
-                    pidfile = pidfile,
-                    home = home,
-                    create_home = create_home,
-                    stdin = stdin if stdin is not None else '/dev/null',
-                    stdout = stdout if stdout else '/dev/null',
-                    stderr = stderr if stderr else '/dev/null',
-                    sudo = sudo,
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    tty = tty
-                    )
+    def filter_existing_files(self, src, dst):
+        """ Removes files that already exist in the Linux host from src list
+        """
+        # construct a dictionary with { dst: src }
+        dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
+            src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
 
-        return (out, err), proc
+        command = []
+        for d in dests.keys():
+            command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
 
-    def getpid(self, home, pidfile = "pidfile"):
-        if self.localhost:
-            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
-        else:
-            with self._lock:
-                pidtuple = sshfuncs.rgetpid(
-                    os.path.join(home, pidfile),
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey")
-                    )
-        
-        return pidtuple
+        command = ";".join(command)
 
-    def status(self, pid, ppid):
-        if self.localhost:
-            status = execfuncs.lstatus(pid, ppid)
-        else:
-            with self._lock:
-                status = sshfuncs.rstatus(
-                        pid, ppid,
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey")
-                        )
-           
-        return status
+        (out, err), proc = self.execute(command, retry = 1, with_lock = True)
     
-    def kill(self, pid, ppid, sudo = False):
-        out = err = ""
-        proc = None
-        status = self.status(pid, ppid)
+        for d in dests.keys():
+            if out.find(d) > -1:
+                del dests[d]
 
-        if status == sshfuncs.ProcStatus.RUNNING:
-            if self.localhost:
-                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
-            else:
-                with self._lock:
-                    (out, err), proc = sshfuncs.rkill(
-                        pid, ppid,
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        sudo = sudo,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey")
-                        )
+        if not dests:
+            return ""
 
-        return (out, err), proc
+        return " ".join(dests.values())
 
index f5c6400..cc30348 100644 (file)
@@ -294,7 +294,7 @@ def rexec(command, host, user,
             else:
                 out = err = ""
                 if proc.poll():
-                    err = self._proc.stderr.read()
+                    err = proc.stderr.read()
 
             msg = " rexec - host %s - command %s " % (host, " ".join(args))
             log(msg, logging.DEBUG, out, err)
@@ -857,7 +857,7 @@ fi
     return (out, err), proc
 
 # POSIX
-def _communicate(self, input, timeout=None, err_on_timeout=True):
+def _communicate(proc, input, timeout=None, err_on_timeout=True):
     read_set = []
     write_set = []
     stdout = None # Return
@@ -870,19 +870,21 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
         killtime = timelimit + 4
         bailtime = timelimit + 4
 
-    if self.stdin:
+    if proc.stdin:
         # Flush stdio buffer.  This might block, if the user has
         # been writing to .stdin in an uncontrolled fashion.
-        self.stdin.flush()
+        proc.stdin.flush()
         if input:
-            write_set.append(self.stdin)
+            write_set.append(proc.stdin)
         else:
-            self.stdin.close()
-    if self.stdout:
-        read_set.append(self.stdout)
+            proc.stdin.close()
+
+    if proc.stdout:
+        read_set.append(proc.stdout)
         stdout = []
-    if self.stderr:
-        read_set.append(self.stderr)
+
+    if proc.stderr:
+        read_set.append(proc.stderr)
         stderr = []
 
     input_offset = 0
@@ -897,7 +899,7 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
                 else:
                     signum = signal.SIGTERM
                 # Lets kill it
-                os.kill(self.pid, signum)
+                os.kill(proc.pid, signum)
                 select_timeout = 0.5
             else:
                 select_timeout = timelimit - curtime + 0.1
@@ -915,32 +917,34 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
             else:
                 continue
         
-        if not rlist and not wlist and not xlist and self.poll() is not None:
+        if not rlist and not wlist and not xlist and proc.poll() is not None:
             # timeout and process exited, say bye
             break
 
-        if self.stdin in wlist:
+        if proc.stdin in wlist:
             # When select has indicated that the file is writable,
             # we can write up to PIPE_BUF bytes without risk
             # blocking.  POSIX defines PIPE_BUF >= 512
-            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+            bytes_written = os.write(proc.stdin.fileno(),
+                    buffer(input, input_offset, 512))
             input_offset += bytes_written
+
             if input_offset >= len(input):
-                self.stdin.close()
-                write_set.remove(self.stdin)
+                proc.stdin.close()
+                write_set.remove(proc.stdin)
 
-        if self.stdout in rlist:
-            data = os.read(self.stdout.fileno(), 1024)
+        if proc.stdout in rlist:
+            data = os.read(proc.stdout.fileno(), 1024)
             if data == "":
-                self.stdout.close()
-                read_set.remove(self.stdout)
+                proc.stdout.close()
+                read_set.remove(proc.stdout)
             stdout.append(data)
 
-        if self.stderr in rlist:
-            data = os.read(self.stderr.fileno(), 1024)
+        if proc.stderr in rlist:
+            data = os.read(proc.stderr.fileno(), 1024)
             if data == "":
-                self.stderr.close()
-                read_set.remove(self.stderr)
+                proc.stderr.close()
+                read_set.remove(proc.stderr)
             stderr.append(data)
     
     # All data exchanged.  Translate lists into strings.
@@ -953,19 +957,19 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
     # object do the translation: It is based on stdio, which is
     # impossible to combine with select (unless forcing no
     # buffering).
-    if self.universal_newlines and hasattr(file, 'newlines'):
+    if proc.universal_newlines and hasattr(file, 'newlines'):
         if stdout:
-            stdout = self._translate_newlines(stdout)
+            stdout = proc._translate_newlines(stdout)
         if stderr:
-            stderr = self._translate_newlines(stderr)
+            stderr = proc._translate_newlines(stderr)
 
     if killed and err_on_timeout:
-        errcode = self.poll()
+        errcode = proc.poll()
         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
     else:
         if killed:
-            self.poll()
+            proc.poll()
         else:
-            self.wait()
+            proc.wait()
         return (stdout, stderr)
 
index b1e489c..eb278ff 100755 (executable)
@@ -64,7 +64,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         ec.deploy()
 
-        ec.wait_finished([app])
+        ec.wait_finished(app)
 
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
@@ -97,7 +97,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         ec.deploy()
 
-        ec.wait_finished([app])
+        ec.wait_finished(app)
 
         self.assertTrue(ec.state(node) == ResourceState.STARTED)
         self.assertTrue(ec.state(app) == ResourceState.FINISHED)
@@ -111,11 +111,54 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
         rm = ec.get_resource(app)
-        p = os.path.join(rm.app_home, "stdout")
+        p = os.path.join(rm.run_home, "stdout")
         self.assertEquals(path, p)
 
         ec.shutdown()
 
+    @skipIfNotAlive
+    def t_code(self, host, user):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+        
+        prog = """#include <stdio.h>
+
+int
+main (void)
+{
+    printf ("Hello, world!\\n");
+    return 0;
+}
+"""
+        cmd = "${RUN_HOME}/hello" 
+        build = "gcc -Wall -x c ${APP_HOME}/code -o hello" 
+
+        app = ec.register_resource("LinuxApplication")
+        ec.set(app, "command", cmd)
+        ec.set(app, "code", prog)
+        ec.set(app, "depends", "gcc")
+        ec.set(app, "build", build)
+        ec.register_connection(app, node)
+
+        ec.deploy()
+
+        ec.wait_finished(app)
+
+        out = ec.trace(app, 'stdout')
+        self.assertEquals(out, "Hello, world!\n")
+
+        ec.shutdown()
+
     @skipIfNotAlive
     def t_concurrency(self, host, user):
         from nepi.execution.resource import ResourceFactory
@@ -159,7 +202,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
             path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
             rm = ec.get_resource(app)
-            p = os.path.join(rm.app_home, 'stdout')
+            p = os.path.join(rm.run_home, 'stdout')
             self.assertEquals(path, p)
 
         ec.shutdown()
@@ -306,6 +349,12 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_http_sources_ubuntu(self):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
+    def test_code_fedora(self):
+        self.t_code(self.fedora_host, self.fedora_user)
+
+    def test_code_ubuntu(self):
+        self.t_code(self.ubuntu_host, self.ubuntu_user)
+
     @skipInteractive
     def test_xterm_ubuntu(self):
         """ Interactive test. Should not run automatically """
index ef3d8df..e7d44e1 100755 (executable)
@@ -55,6 +55,7 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_run(self, host, user):
         node, ec = create_node(host, user)
         
+        node.find_home()
         app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
         
@@ -83,6 +84,7 @@ class LinuxNodeTestCase(unittest.TestCase):
         
         node, ec = create_node(host, user)
          
+        node.find_home()
         app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
          
@@ -102,13 +104,15 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_exitcode_kill(self, host, user):
         node, ec = create_node(host, user)
          
+        node.find_home()
         app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
        
         # Upload command that will not finish
         command = "ping localhost"
-        (out, err), proc = node.upload_command(command, app_home, 
-            shfile = "cmd.sh",
+        shfile = os.path.join(app_home, "cmd.sh")
+        (out, err), proc = node.upload_command(command, 
+            shfile = shfile,
             ecodefile = "exitcode")
 
         (out, err), proc = node.run(command, app_home,
@@ -140,6 +144,7 @@ class LinuxNodeTestCase(unittest.TestCase):
         
         node, ec = create_node(host, user)
          
+        node.find_home()
         app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
          
@@ -159,12 +164,13 @@ class LinuxNodeTestCase(unittest.TestCase):
  
         (out, err), proc = node.check_errors(app_home)
 
-        self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found")
+        self.assertTrue(err.find("cmd.sh: line 1: unexistent-command: command not found") > -1)
 
     @skipIfNotAlive
     def t_install(self, host, user):
         node, ec = create_node(host, user)
 
+        node.find_home()
         (out, err), proc = node.mkdir(node.node_home, clean = True)
         self.assertEquals(err, "")
 
@@ -177,10 +183,42 @@ class LinuxNodeTestCase(unittest.TestCase):
         (out, err), proc = node.rmdir(node.exp_home)
         self.assertEquals(err, "")
 
+    @skipIfNotAlive
+    def t_clean(self, host, user):
+        node, ec = create_node(host, user)
+
+        node.find_home()
+        node.mkdir(node.lib_dir)
+        node.mkdir(node.node_home)
+
+        command1 = " [ -d %s ] && echo 'Found'" % node.lib_dir
+        (out, err), proc = node.execute(command1)
+    
+        self.assertEquals(out.strip(), "Found")
+
+        command2 = " [ -d %s ] && echo 'Found'" % node.node_home
+        (out, err), proc = node.execute(command2)
+    
+        self.assertEquals(out.strip(), "Found")
+
+        node.clean_experiment()
+        
+        (out, err), proc = node.execute(command2)
+
+        self.assertEquals(out.strip(), "")
+
+        node.clean_home()
+        
+        (out, err), proc = node.execute(command1)
+
+        self.assertEquals(out.strip(), "")
+
+
     @skipIfNotAlive
     def t_xterm(self, host, user):
         node, ec = create_node(host, user)
 
+        node.find_home()
         (out, err), proc = node.mkdir(node.node_home, clean = True)
         self.assertEquals(err, "")
         
@@ -197,6 +235,7 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_compile(self, host, user):
         node, ec = create_node(host, user)
 
+        node.find_home()
         app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
 
@@ -288,7 +327,13 @@ main (void)
 
     def test_exitcode_error_ubuntu(self):
         self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user)
-    
+
+    def test_clean_fedora(self):
+        self.t_clean(self.fedora_host, self.fedora_user)
+
+    def test_clean_ubuntu(self):
+        self.t_clean(self.ubuntu_host, self.ubuntu_user)
+     
     @skipInteractive
     def test_xterm_ubuntu(self):
         """ Interactive test. Should not run automatically """