From: Alina Quereilhac Date: Wed, 26 Jun 2013 17:42:07 +0000 (-0700) Subject: Adding comments to Linux CCN examples X-Git-Tag: nepi-3.0.0~91 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=9ff58aeb6f61e082787e8df4531ae7c5e546e88e;p=nepi.git Adding comments to Linux CCN examples --- diff --git a/examples/linux/ccn/vlc_2_hosts.py b/examples/linux/ccn/ccncat_2_nodes.py similarity index 75% rename from examples/linux/ccn/vlc_2_hosts.py rename to examples/linux/ccn/ccncat_2_nodes.py index bf2543c6..55133dab 100755 --- a/examples/linux/ccn/vlc_2_hosts.py +++ b/examples/linux/ccn/ccncat_2_nodes.py @@ -19,6 +19,24 @@ # # Author: Alina Quereilhac + +# NOTE: This experiment example uses the generic LinuxApplication +# ResourceManager to do the CCN set up in the hosts. +# Alternatively, CCN specific ResourceManagers can be used +# (i.e. LinuxCCND, LinuxCCNR, etc...), and those require less +# manual configuration. +# +# + +# CCN topology: +# +# +# +# content ccncat +# PL host Linux host +# 0 ------- Internet ------ 0 +# + from nepi.execution.ec import ExperimentController, ECState from nepi.execution.resource import ResourceState, ResourceAction, \ populate_factory @@ -117,7 +135,7 @@ def add_stream(ec): return app def get_options(): - slicename = os.environ.get("PL_SLICE") + pl_slice = os.environ.get("PL_SLICE") # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the # id_rsa_planetlab exists @@ -125,16 +143,33 @@ def get_options(): default_key = default_key if os.path.exists(default_key) else None pl_ssh_key = os.environ.get("PL_SSHKEY", default_key) - usage = "usage: %prog -s -u -m -l -i " + # Default planetlab host + pl_host = "planetlab2.u-strasbg.fr" + + # Another Linux host + # IMPORTANT NOTE: you must replace this host for another one + # you have access to. You must set up your SSH keys so + # the host can be accessed through SSH without prompting + # for a password. The host must allow X forwarding using SSH. + linux_host = 'roseval.pl.sophia.inria.fr' + + usage = "usage: %prog -p -s -l -u -m -e -i " parser = OptionParser(usage=usage) + parser.add_option("-p", "--pl-host", dest="pl_host", + help="PlanetLab hostname (already added to the on the web site)", + default = pl_host, type="str") parser.add_option("-s", "--pl-slice", dest="pl_slice", - help="PlanetLab slicename", default = slicename, type="str") - parser.add_option("-u", "--username", dest="username", - help="User for extra host (non PlanetLab)", type="str") + help="PlanetLab slicename", default = pl_slice, type="str") + parser.add_option("-l", "--linux-host", dest="linux_host", + help="Hostname of second Linux host (non PlanetLab)", + default = linux_host, type="str") + parser.add_option("-u", "--linux-user", dest="linux_user", + help="User for extra Linux host (non PlanetLab)", default = linux_host, + type="str") parser.add_option("-m", "--movie", dest="movie", help="Stream movie", type="str") - parser.add_option("-l", "--exp-id", dest="exp_id", + parser.add_option("-e", "--exp-id", dest="exp_id", help="Label to identify experiment", type="str") parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", help="Path to private SSH key to be used for connection", @@ -145,48 +180,47 @@ def get_options(): if not options.movie: parser.error("movie is a required argument") - return (options.pl_slice, options.username, options.movie, options.exp_id, + return (options.pl_host, options.pl_slice, options.linux_host, + options.linux_user, options.movie, options.exp_id, options.pl_ssh_key) if __name__ == '__main__': - ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options() + ( pl_host, pl_user, linux_host, linux_user, movie, exp_id, pl_ssh_key + ) = get_options() # Search for available RMs populate_factory() - # PlanetLab node - host1 = 'planetlab2.u-strasbg.fr' - - # Another node - # IMPORTANT NOTE: you must replace this host for another one - # you have access to. You must set up your SSH keys so - # the host can be accessed through SSH without prompting - # for a password. The host must allow X forwarding using SSH. - host2 = 'roseval.pl.sophia.inria.fr' - # Create the ExperimentController instance ec = ExperimentController(exp_id = exp_id) - # Register a ResourceManager (RM) for the PlanetLab node - node1 = add_node(ec, host1, pl_slice, pl_ssh_key) - - peers = [host2] - ccnd1 = add_ccnd(ec, OSType.FEDORA, peers) + # Register ResourceManager (RM) + + # Register first PlanetLab host + node1 = add_node(ec, pl_host, pl_user, pl_ssh_key) + # Register CCN setup for PL host + peers = [linux_host] + ccnd1 = add_ccnd(ec, OSType.FEDORA, peers) ec.register_connection(ccnd1, node1) + # Register content producer application (ccnseqwriter) pub = add_publish(ec, movie) ec.register_connection(pub, node1) # The movie can only be published after ccnd is running ec.register_condition(pub, ResourceAction.START, ccnd1, ResourceState.STARTED) - - node2 = add_node(ec, host2, username) - peers = [host1] + + # Register Linux host + node2 = add_node(ec, linux_host, linux_user) + + # Register CCN setup for Linux host + peers = [pl_host] ccnd2 = add_ccnd(ec, "ubuntu", peers) ec.register_connection(ccnd2, node2) + # Register consumer application (ccncat) stream = add_stream(ec) ec.register_connection(stream, node2) diff --git a/examples/linux/ccn/vlc_extended_ring_topo.py b/examples/linux/ccn/ccncat_extended_ring_topo.py similarity index 58% rename from examples/linux/ccn/vlc_extended_ring_topo.py rename to examples/linux/ccn/ccncat_extended_ring_topo.py index b246c9bf..8c32436d 100755 --- a/examples/linux/ccn/vlc_extended_ring_topo.py +++ b/examples/linux/ccn/ccncat_extended_ring_topo.py @@ -41,8 +41,9 @@ from nepi.execution.ec import ExperimentController, ECState from nepi.execution.resource import ResourceState, ResourceAction, \ populate_factory -from nepi.resources.linux.node import OSType +from nepi.execution.trace import TraceAttr +import subprocess from optparse import OptionParser, SUPPRESS_HELP import os @@ -83,45 +84,71 @@ def add_content(ec, ccnr, content_name, content): return co def add_stream(ec, ccnd, content_name): - command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \ - content_name + # ccnx v7.2 issue 101007 + command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % { + "content_name" : content_name} app = ec.register_resource("LinuxCCNApplication") - ec.set(app, "depends", "vlc") - ec.set(app, "forwardX11", True) ec.set(app, "command", command) ec.register_connection(app, ccnd) return app +def get_options(): + pl_slice = os.environ.get("PL_SLICE") + + # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the + # id_rsa_planetlab exists + default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME']) + default_key = default_key if os.path.exists(default_key) else None + pl_ssh_key = os.environ.get("PL_SSHKEY", default_key) + + usage = "usage: %prog -s -m -c -e -i " + + parser = OptionParser(usage=usage) + parser.add_option("-s", "--pl-user", dest="pl_user", + help="PlanetLab slicename", default = pl_slice, type="str") + parser.add_option("-m", "--movie", dest="movie", + help="Stream movie", type="str") + parser.add_option("-e", "--exp-id", dest="exp_id", + help="Label to identify experiment", type="str") + parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", + help="Path to private SSH key to be used for connection", + default = pl_ssh_key, type="str") + + (options, args) = parser.parse_args() + + if not options.movie: + parser.error("movie is a required argument") + + return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key) + + if __name__ == '__main__': + content_name = "ccnx:/test/VIDEO" + + ( pl_user, movie, exp_id, pl_ssh_key ) = get_options() + # Search for available RMs populate_factory() - ec = ExperimentController(exp_id = "olahh") + ec = ExperimentController(exp_id = exp_id) # hosts host1 = "planetlab2.u-strasbg.fr" host2 = "planet1.servers.ua.pt" host3 = "planetlab1.cs.uoi.gr" host4 = "planetlab1.aston.ac.uk" - host5 = "itchy.comlab.bth.se" - host6 = "roseval.pl.sophia.inria.fr" - - # users - pluser = "inria_alina" - user = "alina" - - content_name = "ccnx:/VIDEO" - video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts" - """ + host5 = "planetlab2.willab.fi" + host6 = "planetlab-1.fokus.fraunhofer.de" + # describe nodes in the central ring ring_hosts = [host1, host2, host3, host4] ccnds = dict() for i in xrange(len(ring_hosts)): host = ring_hosts[i] - node = add_node(ec, host, pluser) + node = add_node(ec, host, pl_user) ccnd = add_ccnd(ec, node) ccnr = add_ccnr(ec, ccnd) ccnds[host] = ccnd @@ -146,35 +173,51 @@ if __name__ == '__main__': # l5 : h1 - h3 , h3 - h1 l5u = add_fib_entry(ec, ccnds[host1], host3) l5d = add_fib_entry(ec, ccnds[host3], host1) - """ + # border node 1 - bnode1 = add_node(ec, host5, pluser) + bnode1 = add_node(ec, host5, pl_user) ccndb1 = add_ccnd(ec, bnode1) ccnrb1 = add_ccnr(ec, ccndb1) - co = add_content(ec, ccnrb1, content_name, video) + ccnds[host5] = ccndb1 + co = add_content(ec, ccnrb1, content_name, movie) # border node 2 - bnode2 = add_node(ec, host6, user) + bnode2 = add_node(ec, host6, pl_user) ccndb2 = add_ccnd(ec, bnode2) ccnrb2 = add_ccnr(ec, ccndb2) + ccnds[host6] = ccndb2 app = add_stream(ec, ccndb2, content_name) # connect border nodes - #add_fib_entry(ec, ccndb1, host1) - #add_fib_entry(ec, ccnds[host1], host5) + add_fib_entry(ec, ccndb1, host1) + add_fib_entry(ec, ccnds[host1], host5) - #add_fib_entry(ec, ccndb2, host3) - #add_fib_entry(ec, ccnds[host3], host6) - - add_fib_entry(ec, ccndb2, host5) - add_fib_entry(ec, ccndb1, host6) + add_fib_entry(ec, ccndb2, host3) + add_fib_entry(ec, ccnds[host3], host6) + + # Put down l5 10s after transfer started + ec.register_condition(l5u, ResourceAction.STOP, + app, ResourceState.STARTED, time = "10s") + ec.register_condition(l5d, ResourceAction.STOP, + app, ResourceState.STARTED, time = "10s") # deploy all ResourceManagers ec.deploy() - ec.wait_finished([app]) + ec.wait_started([app]) + + rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH) + command = 'tail -f %s' % rvideo_path + + # pulling the content of the video received + # on b2, to stream it locally + proc1 = subprocess.Popen(['ssh', + '-o', 'StrictHostKeyChecking=no', + '-l', pl_user, host6, + command], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) - """ proc2 = subprocess.Popen(['vlc', '--ffmpeg-threads=1', '--sub-filter', 'marq', @@ -182,10 +225,22 @@ if __name__ == '__main__': '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', '--marq-position=8', '--no-video-title-show', '-'], - stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdin=proc1.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc2.communicate() dirpath = tempfile.mkdtemp() - """ + print "Storing to DIRPATH ", dirpath + + for ccnd in ccnds.values(): + stdout = ec.trace(ccnd, "stderr") + fname = "log-%d" % ccnd + path = os.path.join(dirpath, fname) + f = open(path, "w") + f.write(stdout) + f.close() # shutdown the experiment controller ec.shutdown() diff --git a/src/nepi/execution/trace.py b/src/nepi/execution/trace.py index 3b22eaf3..19065dbe 100644 --- a/src/nepi/execution/trace.py +++ b/src/nepi/execution/trace.py @@ -18,9 +18,8 @@ # Author: Alina Quereilhac class TraceAttr: - """ Class representing the different attributes - that can characterized a trace. - + """ Trace attributes represent different information + aspects that can be retrieved from a trace. """ ALL = 'all' STREAM = 'stream' diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 440e2d1c..ea4b0a4b 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -25,8 +25,10 @@ from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff import os +import subprocess # TODO: Resolve wildcards in commands!! +# TODO: compare_hash for all files that are uploaded! @clsinit @@ -37,7 +39,7 @@ class LinuxApplication(ResourceManager): def _register_attributes(cls): command = Attribute("command", "Command to execute", flags = Flags.ExecReadOnly) - forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", + forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) env = Attribute("env", "Environment variables string for command execution", flags = Flags.ExecReadOnly) @@ -316,10 +318,10 @@ class LinuxApplication(ResourceManager): dst = os.path.join(self.app_home, "stdin") - # TODO: - # Check wether file already exists and if it exists - # wether the file we want to upload is the same - # (using md5sum) + # If what we are uploading is a file, check whether + # the same file already exists (using md5sum) + if self.compare_hash(stdin, dst): + return self.node.upload(stdin, dst, text = True) @@ -589,7 +591,39 @@ class LinuxApplication(ResourceManager): .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) ) - + + def compare_hash(self, local, remote): + # getting md5sum from remote file + (out, err), proc = self.node.execute("md5sum %s " % remote) + + if proc.poll() == 0: #OK + if not os.path.isfile(local): + # store to a tmp file + f = tempfile.NamedTemporaryFile() + f.write(local) + f.flush() + local = f.name + + lproc = subprocess.Popen(["md5sum", local], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + + # getting md5sum from local file + (lout, lerr) = lproc.communicate() + + # files are the same, no need to upload + lchk = lout.strip().split(" ")[0] + rchk = out.strip().split(" ")[0] + + msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( + local, lchk, remote, rchk) + self.debug(msg) + + if lchk == rchk: + return True + + return False + def valid_connection(self, guid): # TODO: Validate! return True diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 45cd3a3a..bdcc19f1 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -27,7 +27,7 @@ reschedule_delay = "0.5s" @clsinit_copy class PlanetlabNode(LinuxNode): - _rtype = "PlanetLabNode" + _rtype = "PlanetlabNode" @classmethod def _register_attributes(cls): @@ -207,31 +207,6 @@ class PlanetlabNode(LinuxNode): return self._os - @property - def localhost(self): - return False - - def discover(self): - # Get the list of nodes that match the filters - - - # find one that - if not self.is_alive(): - self._state = ResourceState.FAILED - msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") - self.error(msg) - raise RuntimeError, msg - - if self.get("cleanProcesses"): - self.clean_processes() - - if self.get("cleanHome"): - self.clean_home() - - self.mkdir(self.node_home) - - super(PlanetlabNode, self).discover() - def provision(self): if not self.is_alive(): self._state = ResourceState.FAILED @@ -281,29 +256,6 @@ class PlanetlabNode(LinuxNode): out = err = "" (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - def is_alive(self): - if self.localhost: - return True - - out = err = "" - try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, - with_lock = True) - except: - import traceback - trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False - - if out.strip().startswith('ALIVE'): - return True - else: - msg = "Unresponsive host " - self.error(msg, out, err) - return False - def blacklist(self): # TODO!!!! self.warn(" Blacklisting malfunctioning node ")