From d32dba78910bd348b9bbeb0e8242d31bfd39c0a7 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Fri, 10 May 2013 19:02:59 +0200 Subject: [PATCH 1/1] Added example for Linux Application using CCNx --- examples/linux/ccnx/simple_topo.py | 162 ++++++++++++++++++++++++ src/neco/resources/linux/application.py | 76 +++++------ src/neco/util/sshfuncs.py | 3 +- 3 files changed, 203 insertions(+), 38 deletions(-) create mode 100644 examples/linux/ccnx/simple_topo.py diff --git a/examples/linux/ccnx/simple_topo.py b/examples/linux/ccnx/simple_topo.py new file mode 100644 index 00000000..f57ce201 --- /dev/null +++ b/examples/linux/ccnx/simple_topo.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +from neco.execution.ec import ExperimentController, ECState +from neco.execution.resource import ResourceState, ResourceAction, \ + populate_factory + +from optparse import OptionParser, SUPPRESS_HELP + +import os +import time + +def add_node(ec, host, user): + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + #ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + return node + +def add_ccnd(ec, os_type, peers): + if os_type == "f12": + depends = ( " autoconf openssl-devel expat-devel libpcap-devel " + " ecryptfs-utils-devel libxml2-devel automake gawk " + " gcc gcc-c++ git pcre-devel ") + elif os_type == "ubuntu": + depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev " + " libecryptfs0 libxml2-utils automake gawk gcc g++ " + " git-core pkg-config libpcre3-dev ") + + sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz" + + build = ( + # Evaluate if ccnx binaries are already installed + " ( " + " test -d ${EXP_HOME}/ccnx/bin" + " ) || ( " + # If not, untar and build + " ( " + " mkdir -p ${SOURCES}/ccnx && " + " tar xf ${SOURCES}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SOURCES}/ccnx " + " ) && " + "cd ${SOURCES}/ccnx && " + # Just execute and silence warnings... + "( ( ./configure && make ) 2>&1 )" + " )") + + install = ( + # Evaluate if ccnx binaries are already installed + " ( " + " test -d ${EXP_HOME}/ccnx/bin " + " ) || ( " + " mkdir -p ${EXP_HOME}/ccnx/bin && " + " cp -r ${SOURCES}/ccnx ${EXP_HOME}" + " )" + ) + + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + + command = "ccndstart 2>&1 ; " + peers = map(lambda peer: "ccndc add ccnx:/ udp %s" % peer, peers) + command += " ; ".join(peers) + " ; " + command += " ccnr 2>&1 " + + app = ec.register_resource("LinuxApplication") + ec.set(app, "depends", depends) + ec.set(app, "sources", sources) + ec.set(app, "install", install) + ec.set(app, "build", build) + ec.set(app, "env", env) + ec.set(app, "command", command) + + return app + +def add_publish(ec, movie): + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + command = "ccnseqwriter -r ccnx:/VIDEO" + + app = ec.register_resource("LinuxApplication") + ec.set(app, "stdin", movie) + ec.set(app, "env", env) + ec.set(app, "command", command) + + return app + +def add_stream(ec): + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1" + + app = ec.register_resource("LinuxApplication") + ec.set(app, "depends", "vlc") + ec.set(app, "forwardX11", True) + ec.set(app, "env", env) + ec.set(app, "command", command) + + return app + +def get_options(): + slicename = os.environ.get("PL_SLICE") + + usage = "usage: %prog -s -u -m -l " + + parser = OptionParser(usage=usage) + parser.add_option("-s", "--pl-slice", dest="pl_slice", + help="PlanetLab slicename", default=slicename, type="str") + parser.add_option("-u", "--user-2", dest="user2", + help="User for non PlanetLab machine", type="str") + parser.add_option("-m", "--movie", dest="movie", + help="Stream movie", type="str") + parser.add_option("-l", "--exp-id", dest="exp_id", + help="Label to identify experiment", type="str") + + (options, args) = parser.parse_args() + + if not options.movie: + parser.error("movie is a required argument") + + return (options.pl_slice, options.user2, options.movie, options.exp_id) + +if __name__ == '__main__': + ( pl_slice, user2, movie, exp_id ) = get_options() + + # Search for available RMs + populate_factory() + + host1 = 'nepi2.pl.sophia.inria.fr' + host2 = 'roseval.pl.sophia.inria.fr' + + ec = ExperimentController(exp_id = exp_id) + + node1 = add_node(ec, host1, pl_slice) + + peers = [host2] + ccnd1 = add_ccnd(ec, "f12", peers) + + ec.register_connection(ccnd1, node1) + + pub = add_publish(ec, movie) + ec.register_connection(pub, node1) + # The movie can only be published after ccnd is running + ec.register_condition(pub, ResourceAction.START, + ccnd1, ResourceState.STARTED) + + node2 = add_node(ec, host2, user2) + peers = [host1] + ccnd2 = add_ccnd(ec, "ubuntu", peers) + ec.register_connection(ccnd2, node2) + + stream = add_stream(ec) + ec.register_connection(stream, node2) + # The stream can only be retrieved after ccnd is running + ec.register_condition(stream, ResourceAction.START, + ccnd2, ResourceState.STARTED) + # And also, the stream can only be retrieved after it was published + ec.register_condition(stream, ResourceAction.START, + pub, ResourceState.STARTED) + + ec.deploy() + + apps = [ccnd1, pub, ccnd2, stream] + ec.wait_finished(apps) + + ec.shutdown() + diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index 4f2b989f..953c651e 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -181,6 +181,9 @@ class LinuxApplication(ResourceManager): # upload code self.upload_code() + # upload stdin + self.upload_stdin() + # install dependencies self.install_dependencies() @@ -194,8 +197,14 @@ class LinuxApplication(ResourceManager): x11 = self.get("forwardX11") if not x11 and command: self.info("Uploading command '%s'" % command) - - # TODO: missing set PATH and PYTHONPATH!! + + # Export environment + environ = "" + env = self.get("env") or "" + for var in env.split(" "): + environ += 'export %s\n' % var + + command = environ + command # If the command runs asynchronous, pre upload the command # to the app.sh file in the remote host @@ -255,6 +264,15 @@ class LinuxApplication(ResourceManager): dst = os.path.join(self.src_dir, "code") self.node.upload(sources, dst, text = True) + def upload_stdin(self): + stdin = self.get("stdin") + if stdin: + # create dir for sources + self.info(" Uploading stdin ") + + dst = os.path.join(self.app_home, "stdin") + self.node.upload(stdin, dst, text = True) + def install_dependencies(self): depends = self.get("depends") if depends: @@ -303,13 +321,13 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).deploy() def start(self): - command = self.get("command") - env = self.get("env") - stdin = 'stdin' if self.get("stdin") else None - stdout = 'stdout' if self.get("stdout") else 'stdout' - stderr = 'stderr' if self.get("stderr") else 'stderr' + command = self.get('command') + env = self.get('env') + stdin = 'stdin' if self.get('stdin') else None + stdout = 'stdout' if self.get('stdout') else 'stdout' + stderr = 'stderr' if self.get('stderr') else 'stderr' sudo = self.get('sudo') or False - x11 = self.get("forwardX11") or False + x11 = self.get('forwardX11') or False failed = False super(LinuxApplication, self).start() @@ -322,24 +340,29 @@ class LinuxApplication(ResourceManager): self.info("Starting command '%s'" % command) if x11: + if env: + # Export environment + environ = "" + for var in env.split(" "): + environ += ' %s ' % var + + command = "(" + environ + " ; " + command + ")" + command = self.replace_paths(command) + # If the command requires X11 forwarding, we # can't run it asynchronously (out, err), proc = self.node.execute(command, sudo = sudo, stdin = stdin, - stdout = stdout, - stderr = stderr, - env = env, forward_x11 = x11) + self._state = ResourceState.FINISHED + if proc.poll() and err: failed = True else: # Command was previously uploaded, now run the remote # bash file asynchronously - if env: - env = self.replace_paths(env) - cmd = "bash ./app.sh" (out, err), proc = self.node.run(cmd, self.app_home, stdin = stdin, @@ -452,7 +475,8 @@ class LinuxApplication(ResourceManager): .replace("${BUILD}", absolute_dir(self.build_dir)) .replace("${APP_HOME}", absolute_dir(self.app_home)) .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) - .replace("${EXP_HOME}", self.node.exp_home) ) + .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) + ) def valid_connection(self, guid): # TODO: Validate! @@ -462,25 +486,3 @@ class LinuxApplication(ResourceManager): self._node = resources[0] if len(resources) == 1 else None return self._node - def hash_app(self): - """ Generates a hash representing univokely the application. - Is used to determine whether the home directory should be cleaned - or not. - - """ - command = self.get("command") - forwards_x11 = self.get("forwardX11") - env = self.get("env") - sudo = self.get("sudo") - depends = self.get("depends") - sources = self.get("sources") - cls._register_attribute(sources) - cls._register_attribute(build) - cls._register_attribute(install) - cls._register_attribute(stdin) - cls._register_attribute(stdout) - cls._register_attribute(stderr) - cls._register_attribute(tear_down) - skey = "".join(map(str, args)) - return hashlib.md5(skey).hexdigest() - diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index 043bc64d..982e23af 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -239,7 +239,8 @@ def rexec(command, host, user, for x in xrange(retry): # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, + proc = subprocess.Popen(args, + env = env, stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE) -- 2.43.0