Fixed relative paths in Linux Application
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 10 May 2013 11:24:34 +0000 (13:24 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 10 May 2013 11:24:34 +0000 (13:24 +0200)
src/neco/execution/ec.py
src/neco/resources/linux/application.py
src/neco/resources/linux/node.py
test/resources/linux/application.py
test/resources/linux/node.py

index ba064d4..b793d0e 100644 (file)
@@ -75,7 +75,9 @@ class ExperimentController(object):
     def wait_finished(self, guids):
         while not all([self.state(guid) == ResourceState.FINISHED \
                 for guid in guids]) and not self.finished:
-            time.sleep(1)
+            # We keep the sleep as large as possible to 
+            # decrese the number of RM state requests
+            time.sleep(2)
     
     def get_task(self, tid):
         return self._tasks.get(tid)
index b7b3f4e..4f2b989 100644 (file)
@@ -110,7 +110,7 @@ class LinuxApplication(ResourceManager):
 
     @property
     def app_home(self):
-        return os.path.join(self.node.exp_dir, self._home)
+        return os.path.join(self.node.exp_home, self._home)
 
     @property
     def src_dir(self):
@@ -133,8 +133,8 @@ class LinuxApplication(ResourceManager):
 
         path = os.path.join(self.app_home, name)
         
-        cmd = "(test -f %s && echo 'success') || echo 'error'" % path
-        (out, err), proc = self.node.execute(cmd)
+        command = "(test -f %s && echo 'success') || echo 'error'" % path
+        (out, err), proc = self.node.execute(command)
 
         if (err and proc.poll()) or out.find("error") != -1:
             msg = " Couldn't find trace %s " % name
@@ -190,14 +190,17 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
-        command = self.replace_paths(self.get("command"))
-        x11 = self.get("forwardX11") or False
-        if not x11:
+        command = self.get("command")
+        x11 = self.get("forwardX11")
+        if not x11 and command:
             self.info("Uploading command '%s'" % command)
-            
+        
+            # TODO: missing set PATH and PYTHONPATH!!
+
             # If the command runs asynchronous, pre upload the command 
             # to the app.sh file in the remote host
             dst = os.path.join(self.app_home, "app.sh")
+            command = self.replace_paths(command)
             self.node.upload(command, dst, text = True)
 
         super(LinuxApplication, self).provision()
@@ -220,14 +223,26 @@ class LinuxApplication(ResourceManager):
                     sources.remove(source)
 
             # Download http sources
-            for source in http_sources:
-                dst = os.path.join(self.src_dir, source.split("/")[-1])
-                # TODO: Check if the tar.gz is already downloaded using a hash
-                # and don't download twice !!
-                command = "wget -o %s %s" % (dst, source)
-                self.node.execute(command)
-
-            self.node.upload(sources, self.src_dir)
+            if http_sources:
+                cmd = " wget -c --directory-prefix=${SOURCES} "
+                verif = ""
+
+                for source in http_sources:
+                    cmd += " %s " % (source)
+                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                
+                # Wget output goes to stderr :S
+                cmd += " 2> /dev/null ; "
+
+                # Add verification
+                cmd += " %s " % verif
+
+                # Upload the command to a file, and execute asynchronously
+                self.upload_and_run(cmd, 
+                        "http_sources.sh", "http_sources_pid", 
+                        "http_sources_out", "http_sources_err")
+            if sources:
+                self.node.upload(sources, self.src_dir)
 
     def upload_code(self):
         code = self.get("code")
@@ -254,32 +269,22 @@ class LinuxApplication(ResourceManager):
             # create dir for build
             self.node.mkdir(self.build_dir)
 
-            cmd = self.replace_paths(build)
-
-            (out, err), proc = self.run_and_wait(cmd, self.app_home,
-                pidfile = "build_pid",
-                stdout = "build_out", 
-                stderr = "build_err", 
-                raise_on_error = True)
+            # Upload the command to a file, and execute asynchronously
+            self.upload_and_run(build, 
+                    "build.sh", "build_pid", 
+                    "build_out", "build_err")
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
-            cmd = self.replace_paths(install)
-
-            (out, err), proc = self.run_and_wait(cmd, self.app_home, 
-                pidfile = "install_pid",
-                stdout = "install_out", 
-                stderr = "install_err", 
-                raise_on_error = True)
+            # Upload the command to a file, and execute asynchronously
+            self.upload_and_run(install, 
+                    "install.sh", "install_pid", 
+                    "install_out", "install_err")
 
     def deploy(self):
-        command = self.replace_paths(self.get("command"))
-        
-        self.info(" Deploying command '%s' " % command)
-
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
@@ -287,6 +292,8 @@ class LinuxApplication(ResourceManager):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
+                command = self.get("command") or ""
+                self.info(" Deploying command '%s' " % command)
                 self.discover()
                 self.provision()
             except:
@@ -296,33 +303,48 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.replace_paths(self.get("command"))
+        command = self.get("command")
         env = self.get("env")
         stdin = 'stdin' if self.get("stdin") else None
+        stdout = 'stdout' if self.get("stdout") else 'stdout'
+        stderr = 'stderr' if self.get("stderr") else 'stderr'
         sudo = self.get('sudo') or False
         x11 = self.get("forwardX11") or False
         failed = False
 
         super(LinuxApplication, self).start()
 
+        if not command:
+            self.info("No command to start ")
+            self._state = ResourceState.FINISHED
+            return 
+    
         self.info("Starting command '%s'" % command)
 
         if x11:
+            # If the command requires X11 forwarding, we
+            # can't run it asynchronously
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    stdout = 'stdout',
-                    stderr = 'stderr',
+                    stdout = stdout,
+                    stderr = stderr,
                     env = env,
                     forward_x11 = x11)
 
             if proc.poll() and err:
                 failed = True
         else:
-            # Run the command asynchronously
-            command = "bash ./app.sh"
-            (out, err), proc = self.node.run(command, self.app_home, 
+            # Command was  previously uploaded, now run the remote
+            # bash file asynchronously
+            if env:
+                env = self.replace_paths(env)
+
+            cmd = "bash ./app.sh"
+            (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
+                stdout = stdout,
+                stderr = stderr,
                 sudo = sudo)
 
             if proc.poll() and err:
@@ -406,6 +428,32 @@ class LinuxApplication(ResourceManager):
 
         return self._state
 
+    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
+        dst = os.path.join(self.app_home, fname)
+        cmd = self.replace_paths(cmd)
+        self.node.upload(cmd, dst, text = True)
+
+        cmd = "bash ./%s" % fname
+        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
+            pidfile = pidfile,
+            stdout = outfile, 
+            stderr = errfile, 
+            raise_on_error = True)
+
+    def replace_paths(self, command):
+        """
+        Replace all special path tags with shell-escaped actual paths.
+        """
+        def absolute_dir(d):
+            return d if d.startswith("/") else os.path.join("${HOME}", d)
+
+        return ( command
+            .replace("${SOURCES}", absolute_dir(self.src_dir))
+            .replace("${BUILD}", absolute_dir(self.build_dir))
+            .replace("${APP_HOME}", absolute_dir(self.app_home))
+            .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
+            .replace("${EXP_HOME}", self.node.exp_home) )
+        
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
@@ -436,14 +484,3 @@ class LinuxApplication(ResourceManager):
         skey = "".join(map(str, args))
         return hashlib.md5(skey).hexdigest()
 
-    def replace_paths(self, command):
-        """
-        Replace all special path tags with shell-escaped actual paths.
-        """
-        return ( command
-            .replace("${SOURCES}", self.src_dir)
-            .replace("${BUILD}", self.build_dir) 
-            .replace("${APPHOME}", self.app_home) 
-            .replace("${NODEHOME}", self.node.home) )
-
-
index 4907c21..39107cb 100644 (file)
@@ -18,6 +18,7 @@ import threading
 
 reschedule_delay = "0.5s"
 
+
 @clsinit
 class LinuxNode(ResourceManager):
     _rtype = "LinuxNode"
@@ -79,18 +80,16 @@ class LinuxNode(ResourceManager):
 
     @property
     def home(self):
-        return self.get("home") or "/tmp"
+        return self.get("home") or ""
 
     @property
-    def exp_dir(self):
-        exp_dir = os.path.join(self.home, self.ec.exp_id)
-        return exp_dir if exp_dir.startswith('/') or \
-                exp_dir.startswith("~/") else "~/"
+    def exp_home(self):
+        return os.path.join(self.home, self.ec.exp_id)
 
     @property
     def node_home(self):
         node_home = "node-%d" % self.guid
-        return os.path.join(self.exp_dir, node_home)
+        return os.path.join(self.exp_home, node_home)
 
     @property
     def os(self):
@@ -197,10 +196,15 @@ class LinuxNode(ResourceManager):
             
     def clean_home(self):
         self.info("Cleaning up home")
-
-        cmd = ("cd %s ; " % self.home +
-            "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
-            " -execdir rm -rf {} + ")
+        
+        cmd = (
+            # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
+            "find . -maxdepth 1 -name 'nepi-*' " +
+            " -execdir rm -rf {} + "
+            )
+            
+        if self.home:
+            cmd = "cd %s ; " % self.home + cmd
 
         out = err = ""
         (out, err), proc = self.execute(cmd, with_lock = True)
@@ -511,7 +515,7 @@ class LinuxNode(ResourceManager):
 
     def run(self, command, 
             home = None,
-            create_home = True,
+            create_home = False,
             pidfile = "pid",
             stdin = None, 
             stdout = 'stdout', 
index b6066f1..7167978 100644 (file)
@@ -187,6 +187,45 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         ec.shutdown()
 
+    @skipIfNotAlive
+    def t_http_sources(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \
+            "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf"
+
+        app = ec.register_resource("LinuxApplication")
+        ec.set(app, "sources", sources)
+
+        ec.register_connection(app, node)
+
+        ec.deploy()
+
+        ec.wait_finished([app])
+
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+        err = ec.trace(app, 'http_sources_err')
+        self.assertTrue(err == "")
+        
+        out = ec.trace(app, 'http_sources_out')
+        self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1)
+        self.assertTrue(out.find("nepi_integration_framework.pdf") > -1)
+
+        ec.shutdown()
+
     def test_stdout_fedora(self):
         self.t_stdout(self.fedora_host, self.fedora_user)
 
@@ -211,6 +250,13 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_condition_ubuntu(self):
         self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat")
 
+    def test_http_sources_fedora(self):
+        self.t_http_sources(self.fedora_host, self.fedora_user)
+
+    def test_http_sources_ubuntu(self):
+        self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
+
+
     # TODO: test compilation, sources, dependencies, etc!!!
 
 if __name__ == '__main__':
index cd56757..8849a4c 100644 (file)
@@ -49,7 +49,7 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_run(self, host, user):
         node, ec = create_node(host, user)
         
-        app_home = os.path.join(node.exp_dir, "my-app")
+        app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
         
         command = "ping %s" % self.target
@@ -87,7 +87,7 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_compile(self, host, user):
         node, ec = create_node(host, user)
 
-        app_home = os.path.join(node.exp_dir, "my-app")
+        app_home = os.path.join(node.exp_home, "my-app")
         node.mkdir(app_home, clean = True)
 
         prog = """#include <stdio.h>