Adding comments to Linux CCN examples
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 26 Jun 2013 17:42:07 +0000 (10:42 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 26 Jun 2013 17:42:07 +0000 (10:42 -0700)
examples/linux/ccn/ccncat_2_nodes.py [moved from examples/linux/ccn/vlc_2_hosts.py with 75% similarity]
examples/linux/ccn/ccncat_extended_ring_topo.py [moved from examples/linux/ccn/vlc_extended_ring_topo.py with 58% similarity]
src/nepi/execution/trace.py
src/nepi/resources/linux/application.py
src/nepi/resources/planetlab/node.py

similarity index 75%
rename from examples/linux/ccn/vlc_2_hosts.py
rename to examples/linux/ccn/ccncat_2_nodes.py
index bf2543c..55133da 100755 (executable)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
+
+# NOTE: This experiment example uses the generic LinuxApplication
+#       ResourceManager to do the CCN set up in the hosts.
+#       Alternatively, CCN specific ResourceManagers can be used
+#       (i.e. LinuxCCND, LinuxCCNR, etc...), and those require less 
+#       manual configuration.
+#
+#
+
+# CCN topology:
+#
+#                
+#                 
+#  content                ccncat
+#  PL host               Linux host
+#  0 ------- Internet ------ 0
+#           
+
 from nepi.execution.ec import ExperimentController, ECState 
 from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
@@ -117,7 +135,7 @@ def add_stream(ec):
     return app
 
 def get_options():
-    slicename = os.environ.get("PL_SLICE")
+    pl_slice = os.environ.get("PL_SLICE")
 
     # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
     # id_rsa_planetlab exists 
@@ -125,16 +143,33 @@ def get_options():
     default_key = default_key if os.path.exists(default_key) else None
     pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
 
-    usage = "usage: %prog -s <pl-slice> -u <username> -m <movie> -l <exp-id> -i <ssh_key>"
+    # Default planetlab host
+    pl_host = "planetlab2.u-strasbg.fr"
+
+    # Another Linux host 
+    # IMPORTANT NOTE: you must replace this host for another one
+    #       you have access to. You must set up your SSH keys so
+    #       the host can be accessed through SSH without prompting
+    #       for a password. The host must allow X forwarding using SSH.
+    linux_host = 'roseval.pl.sophia.inria.fr'
+
+    usage = "usage: %prog -p <pl-host> -s <pl-slice> -l <linux-host> -u <linux-user> -m <movie> -e <exp-id> -i <ssh_key>"
 
     parser = OptionParser(usage=usage)
+    parser.add_option("-p", "--pl-host", dest="pl_host", 
+            help="PlanetLab hostname (already added to the <pl-slice> on the web site)", 
+            default = pl_host, type="str")
     parser.add_option("-s", "--pl-slice", dest="pl_slice", 
-            help="PlanetLab slicename", default = slicename, type="str")
-    parser.add_option("-u", "--username", dest="username", 
-            help="User for extra host (non PlanetLab)", type="str")
+            help="PlanetLab slicename", default = pl_slice, type="str")
+    parser.add_option("-l", "--linux-host", dest="linux_host", 
+            help="Hostname of second Linux host (non PlanetLab)",
+            default = linux_host, type="str")
+    parser.add_option("-u", "--linux-user", dest="linux_user", 
+            help="User for extra Linux host (non PlanetLab)", default = linux_host,
+            type="str")
     parser.add_option("-m", "--movie", dest="movie", 
             help="Stream movie", type="str")
-    parser.add_option("-l", "--exp-id", dest="exp_id", 
+    parser.add_option("-e", "--exp-id", dest="exp_id", 
             help="Label to identify experiment", type="str")
     parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
             help="Path to private SSH key to be used for connection", 
@@ -145,48 +180,47 @@ def get_options():
     if not options.movie:
         parser.error("movie is a required argument")
 
-    return (options.pl_slice, options.username, options.movie, options.exp_id, 
+    return (options.pl_host, options.pl_slice, options.linux_host, 
+            options.linux_user, options.movie, options.exp_id, 
             options.pl_ssh_key)
 
 if __name__ == '__main__':
-    ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options()
+    ( pl_host, pl_user, linux_host, linux_user, movie, exp_id, pl_ssh_key 
+            ) = get_options()
 
     # Search for available RMs
     populate_factory()
     
-    # PlanetLab node
-    host1 = 'planetlab2.u-strasbg.fr'
-    
-    # Another node 
-    # IMPORTANT NOTE: you must replace this host for another one
-    #       you have access to. You must set up your SSH keys so
-    #       the host can be accessed through SSH without prompting
-    #       for a password. The host must allow X forwarding using SSH.
-    host2 = 'roseval.pl.sophia.inria.fr'
-
     # Create the ExperimentController instance
     ec = ExperimentController(exp_id = exp_id)
 
-    # Register a ResourceManager (RM) for the PlanetLab node
-    node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
-    
-    peers = [host2]
-    ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
+    # Register ResourceManager (RM) 
+
+    # Register first PlanetLab host
+    node1 = add_node(ec, pl_host, pl_user, pl_ssh_key)
 
+    # Register CCN setup for PL host
+    peers = [linux_host]
+    ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
     ec.register_connection(ccnd1, node1)
 
+    # Register content producer application (ccnseqwriter)
     pub = add_publish(ec, movie)
     ec.register_connection(pub, node1)
 
     # The movie can only be published after ccnd is running
     ec.register_condition(pub, ResourceAction.START, 
             ccnd1, ResourceState.STARTED)
-    
-    node2 = add_node(ec, host2, username)
-    peers = [host1]
+   
+    # Register Linux host
+    node2 = add_node(ec, linux_host, linux_user)
+
+    # Register CCN setup for Linux host
+    peers = [pl_host]
     ccnd2 = add_ccnd(ec, "ubuntu", peers)
     ec.register_connection(ccnd2, node2)
      
+    # Register consumer application (ccncat)
     stream = add_stream(ec)
     ec.register_connection(stream, node2)
 
similarity index 58%
rename from examples/linux/ccn/vlc_extended_ring_topo.py
rename to examples/linux/ccn/ccncat_extended_ring_topo.py
index b246c9b..8c32436 100755 (executable)
@@ -41,8 +41,9 @@
 from nepi.execution.ec import ExperimentController, ECState 
 from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
-from nepi.resources.linux.node import OSType
+from nepi.execution.trace import TraceAttr
 
+import subprocess
 from optparse import OptionParser, SUPPRESS_HELP
 
 import os
@@ -83,45 +84,71 @@ def add_content(ec, ccnr, content_name, content):
     return co
 
 def add_stream(ec, ccnd, content_name):
-    command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \
-            content_name
+    # ccnx v7.2 issue 101007 
+    command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % {
+            "content_name" : content_name}
 
     app = ec.register_resource("LinuxCCNApplication")
-    ec.set(app, "depends", "vlc")
-    ec.set(app, "forwardX11", True)
     ec.set(app, "command", command)
     ec.register_connection(app, ccnd)
 
     return app
 
+def get_options():
+    pl_slice = os.environ.get("PL_SLICE")
+
+    # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
+    # id_rsa_planetlab exists 
+    default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+    default_key = default_key if os.path.exists(default_key) else None
+    pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
+
+    usage = "usage: %prog -s <pl-user> -m <movie> -c -e <exp-id> -i <ssh_key>"
+
+    parser = OptionParser(usage=usage)
+    parser.add_option("-s", "--pl-user", dest="pl_user", 
+            help="PlanetLab slicename", default = pl_slice, type="str")
+    parser.add_option("-m", "--movie", dest="movie", 
+            help="Stream movie", type="str")
+    parser.add_option("-e", "--exp-id", dest="exp_id", 
+            help="Label to identify experiment", type="str")
+    parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
+            help="Path to private SSH key to be used for connection", 
+            default = pl_ssh_key, type="str")
+
+    (options, args) = parser.parse_args()
+
+    if not options.movie:
+        parser.error("movie is a required argument")
+
+    return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key)
+
+
 if __name__ == '__main__':
+    content_name = "ccnx:/test/VIDEO"
+    
+    ( pl_user, movie, exp_id, pl_ssh_key ) = get_options()
+
     # Search for available RMs
     populate_factory()
     
-    ec = ExperimentController(exp_id = "olahh")
+    ec = ExperimentController(exp_id = exp_id)
     
     # hosts
     host1 = "planetlab2.u-strasbg.fr"
     host2 = "planet1.servers.ua.pt"
     host3 = "planetlab1.cs.uoi.gr"
     host4 = "planetlab1.aston.ac.uk"
-    host5 = "itchy.comlab.bth.se"
-    host6 = "roseval.pl.sophia.inria.fr"
-
-    # users
-    pluser = "inria_alina"
-    user = "alina"
-
-    content_name = "ccnx:/VIDEO"
-    video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
-    """
+    host5 = "planetlab2.willab.fi"
+    host6 = "planetlab-1.fokus.fraunhofer.de"
+    
     # describe nodes in the central ring
     ring_hosts = [host1, host2, host3, host4]
     ccnds = dict()
 
     for i in xrange(len(ring_hosts)):
         host = ring_hosts[i]
-        node = add_node(ec, host, pluser)
+        node = add_node(ec, host, pl_user)
         ccnd = add_ccnd(ec, node)
         ccnr = add_ccnr(ec, ccnd)
         ccnds[host] = ccnd
@@ -146,35 +173,51 @@ if __name__ == '__main__':
     # l5 : h1 - h3 , h3 - h1
     l5u = add_fib_entry(ec, ccnds[host1], host3)
     l5d = add_fib_entry(ec, ccnds[host3], host1)
-    """  
+    
     # border node 1
-    bnode1 = add_node(ec, host5, pluser)
+    bnode1 = add_node(ec, host5, pl_user)
     ccndb1 = add_ccnd(ec, bnode1)
     ccnrb1 = add_ccnr(ec, ccndb1)
-    co = add_content(ec, ccnrb1, content_name, video)
+    ccnds[host5] = ccndb1
+    co = add_content(ec, ccnrb1, content_name, movie)
 
     # border node 2
-    bnode2 = add_node(ec, host6, user)
+    bnode2 = add_node(ec, host6, pl_user)
     ccndb2 = add_ccnd(ec, bnode2)
     ccnrb2 = add_ccnr(ec, ccndb2)
+    ccnds[host6] = ccndb2
     app = add_stream(ec, ccndb2, content_name)
  
     # connect border nodes
-    #add_fib_entry(ec, ccndb1, host1)
-    #add_fib_entry(ec, ccnds[host1], host5)
+    add_fib_entry(ec, ccndb1, host1)
+    add_fib_entry(ec, ccnds[host1], host5)
 
-    #add_fib_entry(ec, ccndb2, host3)
-    #add_fib_entry(ec, ccnds[host3], host6)
-    add_fib_entry(ec, ccndb2, host5)
-    add_fib_entry(ec, ccndb1, host6)
+    add_fib_entry(ec, ccndb2, host3)
+    add_fib_entry(ec, ccnds[host3], host6)
+
+    # Put down l5 10s after transfer started
+    ec.register_condition(l5u, ResourceAction.STOP, 
+            app, ResourceState.STARTED, time = "10s")
+    ec.register_condition(l5d, ResourceAction.STOP, 
+            app, ResourceState.STARTED, time = "10s")
  
     # deploy all ResourceManagers
     ec.deploy()
 
-    ec.wait_finished([app])
+    ec.wait_started([app])
+
+    rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
+    command = 'tail -f %s' % rvideo_path
+
+    # pulling the content of the video received
+    # on b2, to stream it locally
+    proc1 = subprocess.Popen(['ssh',
+        '-o', 'StrictHostKeyChecking=no',
+        '-l', pl_user, host6,
+        command],
+        stdout = subprocess.PIPE, 
+        stderr = subprocess.PIPE)
     
-    """
     proc2 = subprocess.Popen(['vlc', 
         '--ffmpeg-threads=1',
         '--sub-filter', 'marq', 
@@ -182,10 +225,22 @@ if __name__ == '__main__':
         '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', 
         '--marq-position=8', 
         '--no-video-title-show', '-'], 
-        stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdin=proc1.stdout, 
+        stdout=subprocess.PIPE, 
+        stderr=subprocess.PIPE)
+
+    (stdout, stderr) = proc2.communicate() 
 
     dirpath = tempfile.mkdtemp()
-    """
+    print "Storing to DIRPATH ", dirpath
+
+    for ccnd in ccnds.values():
+        stdout = ec.trace(ccnd, "stderr")
+        fname = "log-%d" % ccnd
+        path = os.path.join(dirpath, fname)
+        f = open(path, "w")
+        f.write(stdout)
+        f.close()
 
     # shutdown the experiment controller
     ec.shutdown()
index 3b22eaf..19065db 100644 (file)
@@ -18,9 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 class TraceAttr:
-    """ Class representing the different attributes 
-    that can characterized a trace.
-
+    """ Trace attributes represent different information
+    aspects that can be retrieved from a trace.
     """
     ALL = 'all'
     STREAM = 'stream'
index 440e2d1..ea4b0a4 100644 (file)
@@ -25,8 +25,10 @@ from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
+import subprocess
 
 # TODO: Resolve wildcards in commands!!
+# TODO: compare_hash for all files that are uploaded!
 
 
 @clsinit
@@ -37,7 +39,7 @@ class LinuxApplication(ResourceManager):
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute", 
                 flags = Flags.ExecReadOnly)
-        forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
+        forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
                 flags = Flags.ExecReadOnly)
         env = Attribute("env", "Environment variables string for command execution",
                 flags = Flags.ExecReadOnly)
@@ -316,10 +318,10 @@ class LinuxApplication(ResourceManager):
             
             dst = os.path.join(self.app_home, "stdin")
 
-            # TODO:
-            # Check wether file already exists and if it exists 
-            # wether the file we want to upload is the same
-            # (using md5sum)
+            # If what we are uploading is a file, check whether
+            # the same file already exists (using md5sum)
+            if self.compare_hash(stdin, dst):
+                return
 
             self.node.upload(stdin, dst, text = True)
 
@@ -589,7 +591,39 @@ class LinuxApplication(ResourceManager):
             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
             )
-        
+
+    def compare_hash(self, local, remote):
+        # getting md5sum from remote file
+        (out, err), proc = self.node.execute("md5sum %s " % remote)
+
+        if proc.poll() == 0: #OK
+            if not os.path.isfile(local):
+                # store to a tmp file
+                f = tempfile.NamedTemporaryFile()
+                f.write(local)
+                f.flush()
+                local = f.name
+
+            lproc = subprocess.Popen(["md5sum", local],
+                stdout = subprocess.PIPE,
+                stderr = subprocess.PIPE) 
+
+            # getting md5sum from local file
+            (lout, lerr) = lproc.communicate()
+
+            # files are the same, no need to upload
+            lchk = lout.strip().split(" ")[0]
+            rchk = out.strip().split(" ")[0]
+
+            msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
+                    local, lchk, remote, rchk)
+            self.debug(msg)
+
+            if lchk == rchk:
+                return True
+
+        return False
+
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
index 45cd3a3..bdcc19f 100644 (file)
@@ -27,7 +27,7 @@ reschedule_delay = "0.5s"
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
-    _rtype = "PlanetLabNode"
+    _rtype = "PlanetlabNode"
 
     @classmethod
     def _register_attributes(cls):
@@ -207,31 +207,6 @@ class PlanetlabNode(LinuxNode):
 
         return self._os
 
-    @property
-    def localhost(self):
-        return False
-
-    def discover(self):
-        # Get the list of nodes that match the filters
-
-
-        # find one that 
-        if not self.is_alive():
-            self._state = ResourceState.FAILED
-            msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
-            self.error(msg)
-            raise RuntimeError, msg
-
-        if self.get("cleanProcesses"):
-            self.clean_processes()
-
-        if self.get("cleanHome"):
-            self.clean_home()
-       
-        self.mkdir(self.node_home)
-
-        super(PlanetlabNode, self).discover()
-
     def provision(self):
         if not self.is_alive():
             self._state = ResourceState.FAILED
@@ -281,29 +256,6 @@ class PlanetlabNode(LinuxNode):
         out = err = ""
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
-    def is_alive(self):
-        if self.localhost:
-            return True
-
-        out = err = ""
-        try:
-            # TODO: FIX NOT ALIVE!!!!
-            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
-                    with_lock = True)
-        except:
-            import traceback
-            trace = traceback.format_exc()
-            msg = "Unresponsive host  %s " % err
-            self.error(msg, out, trace)
-            return False
-
-        if out.strip().startswith('ALIVE'):
-            return True
-        else:
-            msg = "Unresponsive host "
-            self.error(msg, out, err)
-            return False
-
     def blacklist(self):
         # TODO!!!!
         self.warn(" Blacklisting malfunctioning node ")