From 7a9e71714aa04620778742135637c9f1ed8c499b Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Fri, 10 May 2013 13:24:34 +0200 Subject: [PATCH] Fixed relative paths in Linux Application --- src/neco/execution/ec.py | 4 +- src/neco/resources/linux/application.py | 137 +++++++++++++++--------- src/neco/resources/linux/node.py | 26 +++-- test/resources/linux/application.py | 46 ++++++++ test/resources/linux/node.py | 4 +- 5 files changed, 153 insertions(+), 64 deletions(-) diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index ba064d49..b793d0e4 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -75,7 +75,9 @@ class ExperimentController(object): def wait_finished(self, guids): while not all([self.state(guid) == ResourceState.FINISHED \ for guid in guids]) and not self.finished: - time.sleep(1) + # We keep the sleep as large as possible to + # decrese the number of RM state requests + time.sleep(2) def get_task(self, tid): return self._tasks.get(tid) diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index b7b3f4eb..4f2b989f 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -110,7 +110,7 @@ class LinuxApplication(ResourceManager): @property def app_home(self): - return os.path.join(self.node.exp_dir, self._home) + return os.path.join(self.node.exp_home, self._home) @property def src_dir(self): @@ -133,8 +133,8 @@ class LinuxApplication(ResourceManager): path = os.path.join(self.app_home, name) - cmd = "(test -f %s && echo 'success') || echo 'error'" % path - (out, err), proc = self.node.execute(cmd) + command = "(test -f %s && echo 'success') || echo 'error'" % path + (out, err), proc = self.node.execute(command) if (err and proc.poll()) or out.find("error") != -1: msg = " Couldn't find trace %s " % name @@ -190,14 +190,17 @@ class LinuxApplication(ResourceManager): # Install self.install() - command = self.replace_paths(self.get("command")) - x11 = self.get("forwardX11") or False - if not x11: + command = self.get("command") + x11 = self.get("forwardX11") + if not x11 and command: self.info("Uploading command '%s'" % command) - + + # TODO: missing set PATH and PYTHONPATH!! + # If the command runs asynchronous, pre upload the command # to the app.sh file in the remote host dst = os.path.join(self.app_home, "app.sh") + command = self.replace_paths(command) self.node.upload(command, dst, text = True) super(LinuxApplication, self).provision() @@ -220,14 +223,26 @@ class LinuxApplication(ResourceManager): sources.remove(source) # Download http sources - for source in http_sources: - dst = os.path.join(self.src_dir, source.split("/")[-1]) - # TODO: Check if the tar.gz is already downloaded using a hash - # and don't download twice !! - command = "wget -o %s %s" % (dst, source) - self.node.execute(command) - - self.node.upload(sources, self.src_dir) + if http_sources: + cmd = " wget -c --directory-prefix=${SOURCES} " + verif = "" + + for source in http_sources: + cmd += " %s " % (source) + verif += " ls ${SOURCES}/%s ;" % os.path.basename(source) + + # Wget output goes to stderr :S + cmd += " 2> /dev/null ; " + + # Add verification + cmd += " %s " % verif + + # Upload the command to a file, and execute asynchronously + self.upload_and_run(cmd, + "http_sources.sh", "http_sources_pid", + "http_sources_out", "http_sources_err") + if sources: + self.node.upload(sources, self.src_dir) def upload_code(self): code = self.get("code") @@ -254,32 +269,22 @@ class LinuxApplication(ResourceManager): # create dir for build self.node.mkdir(self.build_dir) - cmd = self.replace_paths(build) - - (out, err), proc = self.run_and_wait(cmd, self.app_home, - pidfile = "build_pid", - stdout = "build_out", - stderr = "build_err", - raise_on_error = True) + # Upload the command to a file, and execute asynchronously + self.upload_and_run(build, + "build.sh", "build_pid", + "build_out", "build_err") def install(self): install = self.get("install") if install: self.info(" Installing sources ") - cmd = self.replace_paths(install) - - (out, err), proc = self.run_and_wait(cmd, self.app_home, - pidfile = "install_pid", - stdout = "install_out", - stderr = "install_err", - raise_on_error = True) + # Upload the command to a file, and execute asynchronously + self.upload_and_run(install, + "install.sh", "install_pid", + "install_out", "install_err") def deploy(self): - command = self.replace_paths(self.get("command")) - - self.info(" Deploying command '%s' " % command) - # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: @@ -287,6 +292,8 @@ class LinuxApplication(ResourceManager): self.ec.schedule(reschedule_delay, self.deploy) else: try: + command = self.get("command") or "" + self.info(" Deploying command '%s' " % command) self.discover() self.provision() except: @@ -296,33 +303,48 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).deploy() def start(self): - command = self.replace_paths(self.get("command")) + command = self.get("command") env = self.get("env") stdin = 'stdin' if self.get("stdin") else None + stdout = 'stdout' if self.get("stdout") else 'stdout' + stderr = 'stderr' if self.get("stderr") else 'stderr' sudo = self.get('sudo') or False x11 = self.get("forwardX11") or False failed = False super(LinuxApplication, self).start() + if not command: + self.info("No command to start ") + self._state = ResourceState.FINISHED + return + self.info("Starting command '%s'" % command) if x11: + # If the command requires X11 forwarding, we + # can't run it asynchronously (out, err), proc = self.node.execute(command, sudo = sudo, stdin = stdin, - stdout = 'stdout', - stderr = 'stderr', + stdout = stdout, + stderr = stderr, env = env, forward_x11 = x11) if proc.poll() and err: failed = True else: - # Run the command asynchronously - command = "bash ./app.sh" - (out, err), proc = self.node.run(command, self.app_home, + # Command was previously uploaded, now run the remote + # bash file asynchronously + if env: + env = self.replace_paths(env) + + cmd = "bash ./app.sh" + (out, err), proc = self.node.run(cmd, self.app_home, stdin = stdin, + stdout = stdout, + stderr = stderr, sudo = sudo) if proc.poll() and err: @@ -406,6 +428,32 @@ class LinuxApplication(ResourceManager): return self._state + def upload_and_run(self, cmd, fname, pidfile, outfile, errfile): + dst = os.path.join(self.app_home, fname) + cmd = self.replace_paths(cmd) + self.node.upload(cmd, dst, text = True) + + cmd = "bash ./%s" % fname + (out, err), proc = self.node.run_and_wait(cmd, self.app_home, + pidfile = pidfile, + stdout = outfile, + stderr = errfile, + raise_on_error = True) + + def replace_paths(self, command): + """ + Replace all special path tags with shell-escaped actual paths. + """ + def absolute_dir(d): + return d if d.startswith("/") else os.path.join("${HOME}", d) + + return ( command + .replace("${SOURCES}", absolute_dir(self.src_dir)) + .replace("${BUILD}", absolute_dir(self.build_dir)) + .replace("${APP_HOME}", absolute_dir(self.app_home)) + .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) + .replace("${EXP_HOME}", self.node.exp_home) ) + def valid_connection(self, guid): # TODO: Validate! return True @@ -436,14 +484,3 @@ class LinuxApplication(ResourceManager): skey = "".join(map(str, args)) return hashlib.md5(skey).hexdigest() - def replace_paths(self, command): - """ - Replace all special path tags with shell-escaped actual paths. - """ - return ( command - .replace("${SOURCES}", self.src_dir) - .replace("${BUILD}", self.build_dir) - .replace("${APPHOME}", self.app_home) - .replace("${NODEHOME}", self.node.home) ) - - diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 4907c21f..39107cb5 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -18,6 +18,7 @@ import threading reschedule_delay = "0.5s" + @clsinit class LinuxNode(ResourceManager): _rtype = "LinuxNode" @@ -79,18 +80,16 @@ class LinuxNode(ResourceManager): @property def home(self): - return self.get("home") or "/tmp" + return self.get("home") or "" @property - def exp_dir(self): - exp_dir = os.path.join(self.home, self.ec.exp_id) - return exp_dir if exp_dir.startswith('/') or \ - exp_dir.startswith("~/") else "~/" + def exp_home(self): + return os.path.join(self.home, self.ec.exp_id) @property def node_home(self): node_home = "node-%d" % self.guid - return os.path.join(self.exp_dir, node_home) + return os.path.join(self.exp_home, node_home) @property def os(self): @@ -197,10 +196,15 @@ class LinuxNode(ResourceManager): def clean_home(self): self.info("Cleaning up home") - - cmd = ("cd %s ; " % self.home + - "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+ - " -execdir rm -rf {} + ") + + cmd = ( + # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + + "find . -maxdepth 1 -name 'nepi-*' " + + " -execdir rm -rf {} + " + ) + + if self.home: + cmd = "cd %s ; " % self.home + cmd out = err = "" (out, err), proc = self.execute(cmd, with_lock = True) @@ -511,7 +515,7 @@ class LinuxNode(ResourceManager): def run(self, command, home = None, - create_home = True, + create_home = False, pidfile = "pid", stdin = None, stdout = 'stdout', diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index b6066f1c..7167978e 100644 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -187,6 +187,45 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.shutdown() + @skipIfNotAlive + def t_http_sources(self, host, user): + from neco.execution.resource import ResourceFactory + + ResourceFactory.register_type(LinuxNode) + ResourceFactory.register_type(LinuxApplication) + + ec = ExperimentController() + + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \ + "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf" + + app = ec.register_resource("LinuxApplication") + ec.set(app, "sources", sources) + + ec.register_connection(app, node) + + ec.deploy() + + ec.wait_finished([app]) + + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue(ec.state(app) == ResourceState.FINISHED) + + err = ec.trace(app, 'http_sources_err') + self.assertTrue(err == "") + + out = ec.trace(app, 'http_sources_out') + self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1) + self.assertTrue(out.find("nepi_integration_framework.pdf") > -1) + + ec.shutdown() + def test_stdout_fedora(self): self.t_stdout(self.fedora_host, self.fedora_user) @@ -211,6 +250,13 @@ class LinuxApplicationTestCase(unittest.TestCase): def test_condition_ubuntu(self): self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat") + def test_http_sources_fedora(self): + self.t_http_sources(self.fedora_host, self.fedora_user) + + def test_http_sources_ubuntu(self): + self.t_http_sources(self.ubuntu_host, self.ubuntu_user) + + # TODO: test compilation, sources, dependencies, etc!!! if __name__ == '__main__': diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index cd56757d..8849a4cb 100644 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -49,7 +49,7 @@ class LinuxNodeTestCase(unittest.TestCase): def t_run(self, host, user): node, ec = create_node(host, user) - app_home = os.path.join(node.exp_dir, "my-app") + app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) command = "ping %s" % self.target @@ -87,7 +87,7 @@ class LinuxNodeTestCase(unittest.TestCase): def t_compile(self, host, user): node, ec = create_node(host, user) - app_home = os.path.join(node.exp_dir, "my-app") + app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) prog = """#include -- 2.43.0