Added example for Linux Application using CCNx
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 10 May 2013 17:02:59 +0000 (19:02 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 10 May 2013 17:02:59 +0000 (19:02 +0200)
examples/linux/ccnx/simple_topo.py [new file with mode: 0644]
src/neco/resources/linux/application.py
src/neco/util/sshfuncs.py

diff --git a/examples/linux/ccnx/simple_topo.py b/examples/linux/ccnx/simple_topo.py
new file mode 100644 (file)
index 0000000..f57ce20
--- /dev/null
@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController, ECState 
+from neco.execution.resource import ResourceState, ResourceAction, \
+        populate_factory
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+
+def add_node(ec, host, user):
+    node = ec.register_resource("LinuxNode")
+    ec.set(node, "hostname", host)
+    ec.set(node, "username", user)
+    #ec.set(node, "cleanHome", True)
+    ec.set(node, "cleanProcesses", True)
+    return node
+
+def add_ccnd(ec, os_type, peers):
+    if os_type == "f12":
+        depends = ( " autoconf openssl-devel  expat-devel libpcap-devel "
+                " ecryptfs-utils-devel libxml2-devel automake gawk " 
+                " gcc gcc-c++ git pcre-devel ")
+    elif os_type == "ubuntu":
+        depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
+                " libecryptfs0 libxml2-utils automake gawk gcc g++ "
+                " git-core pkg-config libpcre3-dev ")
+
+    sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
+
+    build = (
+        # Evaluate if ccnx binaries are already installed
+        " ( "
+            "  test -d ${EXP_HOME}/ccnx/bin"
+        " ) || ( "
+        # If not, untar and build
+            " ( "
+                " mkdir -p ${SOURCES}/ccnx && "
+                " tar xf ${SOURCES}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SOURCES}/ccnx "
+             " ) && "
+                "cd ${SOURCES}/ccnx && "
+                # Just execute and silence warnings...
+                "(  ( ./configure && make )  2>&1 )"
+         " )") 
+
+    install = (
+        # Evaluate if ccnx binaries are already installed
+        " ( "
+            "  test -d ${EXP_HOME}/ccnx/bin "
+        " ) || ( "
+            "  mkdir -p ${EXP_HOME}/ccnx/bin && "
+            "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+        " )"
+    )
+
+    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+
+    command = "ccndstart 2>&1 ; "
+    peers = map(lambda peer: "ccndc add ccnx:/ udp  %s" % peer, peers)
+    command += " ; ".join(peers) + " ; "
+    command += " ccnr 2>&1 "
+
+    app = ec.register_resource("LinuxApplication")
+    ec.set(app, "depends", depends)
+    ec.set(app, "sources", sources)
+    ec.set(app, "install", install)
+    ec.set(app, "build", build)
+    ec.set(app, "env", env)
+    ec.set(app, "command", command)
+
+    return app
+
+def add_publish(ec, movie):
+    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+    command = "ccnseqwriter -r ccnx:/VIDEO"
+
+    app = ec.register_resource("LinuxApplication")
+    ec.set(app, "stdin", movie)
+    ec.set(app, "env", env)
+    ec.set(app, "command", command)
+
+    return app
+
+def add_stream(ec):
+    env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1"
+
+    app = ec.register_resource("LinuxApplication")
+    ec.set(app, "depends", "vlc")
+    ec.set(app, "forwardX11", True)
+    ec.set(app, "env", env)
+    ec.set(app, "command", command)
+
+    return app
+
+def get_options():
+    slicename = os.environ.get("PL_SLICE")
+
+    usage = "usage: %prog -s <pl-slice> -u <user-2> -m <movie> -l <exp-id>"
+
+    parser = OptionParser(usage=usage)
+    parser.add_option("-s", "--pl-slice", dest="pl_slice", 
+            help="PlanetLab slicename", default=slicename, type="str")
+    parser.add_option("-u", "--user-2", dest="user2", 
+            help="User for non PlanetLab machine", type="str")
+    parser.add_option("-m", "--movie", dest="movie", 
+            help="Stream movie", type="str")
+    parser.add_option("-l", "--exp-id", dest="exp_id", 
+            help="Label to identify experiment", type="str")
+
+    (options, args) = parser.parse_args()
+
+    if not options.movie:
+        parser.error("movie is a required argument")
+
+    return (options.pl_slice, options.user2, options.movie, options.exp_id)
+
+if __name__ == '__main__':
+    ( pl_slice, user2, movie, exp_id ) = get_options()
+
+    # Search for available RMs
+    populate_factory()
+
+    host1 = 'nepi2.pl.sophia.inria.fr'
+    host2 = 'roseval.pl.sophia.inria.fr'
+
+    ec = ExperimentController(exp_id = exp_id)
+
+    node1 = add_node(ec, host1, pl_slice)
+    
+    peers = [host2]
+    ccnd1 = add_ccnd(ec, "f12", peers)
+
+    ec.register_connection(ccnd1, node1)
+
+    pub = add_publish(ec, movie)
+    ec.register_connection(pub, node1)
+    # The movie can only be published after ccnd is running
+    ec.register_condition(pub, ResourceAction.START, 
+            ccnd1, ResourceState.STARTED)
+    
+    node2 = add_node(ec, host2, user2)
+    peers = [host1]
+    ccnd2 = add_ccnd(ec, "ubuntu", peers)
+    ec.register_connection(ccnd2, node2)
+     
+    stream = add_stream(ec)
+    ec.register_connection(stream, node2)
+    # The stream can only be retrieved after ccnd is running
+    ec.register_condition(stream, ResourceAction.START, 
+            ccnd2, ResourceState.STARTED)
+    # And also, the stream can only be retrieved after it was published
+    ec.register_condition(stream, ResourceAction.START, 
+            pub, ResourceState.STARTED)
+    ec.deploy()
+
+    apps = [ccnd1, pub, ccnd2, stream]
+    ec.wait_finished(apps)
+
+    ec.shutdown()
+
index 4f2b989..953c651 100644 (file)
@@ -181,6 +181,9 @@ class LinuxApplication(ResourceManager):
         # upload code
         self.upload_code()
 
+        # upload stdin
+        self.upload_stdin()
+
         # install dependencies
         self.install_dependencies()
 
@@ -194,8 +197,14 @@ class LinuxApplication(ResourceManager):
         x11 = self.get("forwardX11")
         if not x11 and command:
             self.info("Uploading command '%s'" % command)
-        
-            # TODO: missing set PATH and PYTHONPATH!!
+
+            # Export environment
+            environ = ""
+            env = self.get("env") or ""
+            for var in env.split(" "):
+                environ += 'export %s\n' % var
+
+            command = environ + command
 
             # If the command runs asynchronous, pre upload the command 
             # to the app.sh file in the remote host
@@ -255,6 +264,15 @@ class LinuxApplication(ResourceManager):
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
 
+    def upload_stdin(self):
+        stdin = self.get("stdin")
+        if stdin:
+            # create dir for sources
+            self.info(" Uploading stdin ")
+
+            dst = os.path.join(self.app_home, "stdin")
+            self.node.upload(stdin, dst, text = True)
+
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
@@ -303,13 +321,13 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get("command")
-        env = self.get("env")
-        stdin = 'stdin' if self.get("stdin") else None
-        stdout = 'stdout' if self.get("stdout") else 'stdout'
-        stderr = 'stderr' if self.get("stderr") else 'stderr'
+        command = self.get('command')
+        env = self.get('env')
+        stdin = 'stdin' if self.get('stdin') else None
+        stdout = 'stdout' if self.get('stdout') else 'stdout'
+        stderr = 'stderr' if self.get('stderr') else 'stderr'
         sudo = self.get('sudo') or False
-        x11 = self.get("forwardX11") or False
+        x11 = self.get('forwardX11') or False
         failed = False
 
         super(LinuxApplication, self).start()
@@ -322,24 +340,29 @@ class LinuxApplication(ResourceManager):
         self.info("Starting command '%s'" % command)
 
         if x11:
+            if env:
+                # Export environment
+                environ = ""
+                for var in env.split(" "):
+                    environ += ' %s ' % var
+
+                command = "(" + environ + " ; " + command + ")"
+                command = self.replace_paths(command)
+
             # If the command requires X11 forwarding, we
             # can't run it asynchronously
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    stdout = stdout,
-                    stderr = stderr,
-                    env = env,
                     forward_x11 = x11)
 
+            self._state = ResourceState.FINISHED
+
             if proc.poll() and err:
                 failed = True
         else:
             # Command was  previously uploaded, now run the remote
             # bash file asynchronously
-            if env:
-                env = self.replace_paths(env)
-
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -452,7 +475,8 @@ class LinuxApplication(ResourceManager):
             .replace("${BUILD}", absolute_dir(self.build_dir))
             .replace("${APP_HOME}", absolute_dir(self.app_home))
             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
-            .replace("${EXP_HOME}", self.node.exp_home) )
+            .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+            )
         
     def valid_connection(self, guid):
         # TODO: Validate!
@@ -462,25 +486,3 @@ class LinuxApplication(ResourceManager):
         self._node = resources[0] if len(resources) == 1 else None
         return self._node
 
-    def hash_app(self):
-        """ Generates a hash representing univokely the application.
-        Is used to determine whether the home directory should be cleaned
-        or not.
-
-        """
-        command = self.get("command")
-        forwards_x11 = self.get("forwardX11")
-        env = self.get("env")
-        sudo = self.get("sudo")
-        depends = self.get("depends")
-        sources = self.get("sources")
-        cls._register_attribute(sources)
-        cls._register_attribute(build)
-        cls._register_attribute(install)
-        cls._register_attribute(stdin)
-        cls._register_attribute(stdout)
-        cls._register_attribute(stderr)
-        cls._register_attribute(tear_down)
-        skey = "".join(map(str, args))
-        return hashlib.md5(skey).hexdigest()
-
index 043bc64..982e23a 100644 (file)
@@ -239,7 +239,8 @@ def rexec(command, host, user,
 
     for x in xrange(retry):
         # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args, 
+        proc = subprocess.Popen(args,
+                env = env,
                 stdout = subprocess.PIPE,
                 stdin = subprocess.PIPE, 
                 stderr = subprocess.PIPE)