#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+# NOTE: This experiment example uses the generic LinuxApplication
+# ResourceManager to do the CCN set up in the hosts.
+# Alternatively, CCN specific ResourceManagers can be used
+# (i.e. LinuxCCND, LinuxCCNR, etc...), and those require less
+# manual configuration.
+#
+#
+
+# CCN topology:
+#
+#
+#
+# content ccncat
+# PL host Linux host
+# 0 ------- Internet ------ 0
+#
+
from nepi.execution.ec import ExperimentController, ECState
from nepi.execution.resource import ResourceState, ResourceAction, \
populate_factory
return app
def get_options():
- slicename = os.environ.get("PL_SLICE")
+ pl_slice = os.environ.get("PL_SLICE")
# We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
# id_rsa_planetlab exists
default_key = default_key if os.path.exists(default_key) else None
pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
- usage = "usage: %prog -s <pl-slice> -u <username> -m <movie> -l <exp-id> -i <ssh_key>"
+ # Default planetlab host
+ pl_host = "planetlab2.u-strasbg.fr"
+
+ # Another Linux host
+ # IMPORTANT NOTE: you must replace this host for another one
+ # you have access to. You must set up your SSH keys so
+ # the host can be accessed through SSH without prompting
+ # for a password. The host must allow X forwarding using SSH.
+ linux_host = 'roseval.pl.sophia.inria.fr'
+
+ usage = "usage: %prog -p <pl-host> -s <pl-slice> -l <linux-host> -u <linux-user> -m <movie> -e <exp-id> -i <ssh_key>"
parser = OptionParser(usage=usage)
+ parser.add_option("-p", "--pl-host", dest="pl_host",
+ help="PlanetLab hostname (already added to the <pl-slice> on the web site)",
+ default = pl_host, type="str")
parser.add_option("-s", "--pl-slice", dest="pl_slice",
- help="PlanetLab slicename", default = slicename, type="str")
- parser.add_option("-u", "--username", dest="username",
- help="User for extra host (non PlanetLab)", type="str")
+ help="PlanetLab slicename", default = pl_slice, type="str")
+ parser.add_option("-l", "--linux-host", dest="linux_host",
+ help="Hostname of second Linux host (non PlanetLab)",
+ default = linux_host, type="str")
+ parser.add_option("-u", "--linux-user", dest="linux_user",
+ help="User for extra Linux host (non PlanetLab)", default = linux_host,
+ type="str")
parser.add_option("-m", "--movie", dest="movie",
help="Stream movie", type="str")
- parser.add_option("-l", "--exp-id", dest="exp_id",
+ parser.add_option("-e", "--exp-id", dest="exp_id",
help="Label to identify experiment", type="str")
parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
help="Path to private SSH key to be used for connection",
if not options.movie:
parser.error("movie is a required argument")
- return (options.pl_slice, options.username, options.movie, options.exp_id,
+ return (options.pl_host, options.pl_slice, options.linux_host,
+ options.linux_user, options.movie, options.exp_id,
options.pl_ssh_key)
if __name__ == '__main__':
- ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options()
+ ( pl_host, pl_user, linux_host, linux_user, movie, exp_id, pl_ssh_key
+ ) = get_options()
# Search for available RMs
populate_factory()
- # PlanetLab node
- host1 = 'planetlab2.u-strasbg.fr'
-
- # Another node
- # IMPORTANT NOTE: you must replace this host for another one
- # you have access to. You must set up your SSH keys so
- # the host can be accessed through SSH without prompting
- # for a password. The host must allow X forwarding using SSH.
- host2 = 'roseval.pl.sophia.inria.fr'
-
# Create the ExperimentController instance
ec = ExperimentController(exp_id = exp_id)
- # Register a ResourceManager (RM) for the PlanetLab node
- node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
-
- peers = [host2]
- ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
+ # Register ResourceManager (RM)
+
+ # Register first PlanetLab host
+ node1 = add_node(ec, pl_host, pl_user, pl_ssh_key)
+ # Register CCN setup for PL host
+ peers = [linux_host]
+ ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
ec.register_connection(ccnd1, node1)
+ # Register content producer application (ccnseqwriter)
pub = add_publish(ec, movie)
ec.register_connection(pub, node1)
# The movie can only be published after ccnd is running
ec.register_condition(pub, ResourceAction.START,
ccnd1, ResourceState.STARTED)
-
- node2 = add_node(ec, host2, username)
- peers = [host1]
+
+ # Register Linux host
+ node2 = add_node(ec, linux_host, linux_user)
+
+ # Register CCN setup for Linux host
+ peers = [pl_host]
ccnd2 = add_ccnd(ec, "ubuntu", peers)
ec.register_connection(ccnd2, node2)
+ # Register consumer application (ccncat)
stream = add_stream(ec)
ec.register_connection(stream, node2)
from nepi.execution.ec import ExperimentController, ECState
from nepi.execution.resource import ResourceState, ResourceAction, \
populate_factory
-from nepi.resources.linux.node import OSType
+from nepi.execution.trace import TraceAttr
+import subprocess
from optparse import OptionParser, SUPPRESS_HELP
import os
return co
def add_stream(ec, ccnd, content_name):
- command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \
- content_name
+ # ccnx v7.2 issue 101007
+ command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % {
+ "content_name" : content_name}
app = ec.register_resource("LinuxCCNApplication")
- ec.set(app, "depends", "vlc")
- ec.set(app, "forwardX11", True)
ec.set(app, "command", command)
ec.register_connection(app, ccnd)
return app
+def get_options():
+ pl_slice = os.environ.get("PL_SLICE")
+
+ # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
+ # id_rsa_planetlab exists
+ default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+ default_key = default_key if os.path.exists(default_key) else None
+ pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
+
+ usage = "usage: %prog -s <pl-user> -m <movie> -c -e <exp-id> -i <ssh_key>"
+
+ parser = OptionParser(usage=usage)
+ parser.add_option("-s", "--pl-user", dest="pl_user",
+ help="PlanetLab slicename", default = pl_slice, type="str")
+ parser.add_option("-m", "--movie", dest="movie",
+ help="Stream movie", type="str")
+ parser.add_option("-e", "--exp-id", dest="exp_id",
+ help="Label to identify experiment", type="str")
+ parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
+ help="Path to private SSH key to be used for connection",
+ default = pl_ssh_key, type="str")
+
+ (options, args) = parser.parse_args()
+
+ if not options.movie:
+ parser.error("movie is a required argument")
+
+ return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key)
+
+
if __name__ == '__main__':
+ content_name = "ccnx:/test/VIDEO"
+
+ ( pl_user, movie, exp_id, pl_ssh_key ) = get_options()
+
# Search for available RMs
populate_factory()
- ec = ExperimentController(exp_id = "olahh")
+ ec = ExperimentController(exp_id = exp_id)
# hosts
host1 = "planetlab2.u-strasbg.fr"
host2 = "planet1.servers.ua.pt"
host3 = "planetlab1.cs.uoi.gr"
host4 = "planetlab1.aston.ac.uk"
- host5 = "itchy.comlab.bth.se"
- host6 = "roseval.pl.sophia.inria.fr"
-
- # users
- pluser = "inria_alina"
- user = "alina"
-
- content_name = "ccnx:/VIDEO"
- video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
- """
+ host5 = "planetlab2.willab.fi"
+ host6 = "planetlab-1.fokus.fraunhofer.de"
+
# describe nodes in the central ring
ring_hosts = [host1, host2, host3, host4]
ccnds = dict()
for i in xrange(len(ring_hosts)):
host = ring_hosts[i]
- node = add_node(ec, host, pluser)
+ node = add_node(ec, host, pl_user)
ccnd = add_ccnd(ec, node)
ccnr = add_ccnr(ec, ccnd)
ccnds[host] = ccnd
# l5 : h1 - h3 , h3 - h1
l5u = add_fib_entry(ec, ccnds[host1], host3)
l5d = add_fib_entry(ec, ccnds[host3], host1)
- """
+
# border node 1
- bnode1 = add_node(ec, host5, pluser)
+ bnode1 = add_node(ec, host5, pl_user)
ccndb1 = add_ccnd(ec, bnode1)
ccnrb1 = add_ccnr(ec, ccndb1)
- co = add_content(ec, ccnrb1, content_name, video)
+ ccnds[host5] = ccndb1
+ co = add_content(ec, ccnrb1, content_name, movie)
# border node 2
- bnode2 = add_node(ec, host6, user)
+ bnode2 = add_node(ec, host6, pl_user)
ccndb2 = add_ccnd(ec, bnode2)
ccnrb2 = add_ccnr(ec, ccndb2)
+ ccnds[host6] = ccndb2
app = add_stream(ec, ccndb2, content_name)
# connect border nodes
- #add_fib_entry(ec, ccndb1, host1)
- #add_fib_entry(ec, ccnds[host1], host5)
+ add_fib_entry(ec, ccndb1, host1)
+ add_fib_entry(ec, ccnds[host1], host5)
- #add_fib_entry(ec, ccndb2, host3)
- #add_fib_entry(ec, ccnds[host3], host6)
-
- add_fib_entry(ec, ccndb2, host5)
- add_fib_entry(ec, ccndb1, host6)
+ add_fib_entry(ec, ccndb2, host3)
+ add_fib_entry(ec, ccnds[host3], host6)
+
+ # Put down l5 10s after transfer started
+ ec.register_condition(l5u, ResourceAction.STOP,
+ app, ResourceState.STARTED, time = "10s")
+ ec.register_condition(l5d, ResourceAction.STOP,
+ app, ResourceState.STARTED, time = "10s")
# deploy all ResourceManagers
ec.deploy()
- ec.wait_finished([app])
+ ec.wait_started([app])
+
+ rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
+ command = 'tail -f %s' % rvideo_path
+
+ # pulling the content of the video received
+ # on b2, to stream it locally
+ proc1 = subprocess.Popen(['ssh',
+ '-o', 'StrictHostKeyChecking=no',
+ '-l', pl_user, host6,
+ command],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
- """
proc2 = subprocess.Popen(['vlc',
'--ffmpeg-threads=1',
'--sub-filter', 'marq',
'(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org',
'--marq-position=8',
'--no-video-title-show', '-'],
- stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdin=proc1.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (stdout, stderr) = proc2.communicate()
dirpath = tempfile.mkdtemp()
- """
+ print "Storing to DIRPATH ", dirpath
+
+ for ccnd in ccnds.values():
+ stdout = ec.trace(ccnd, "stderr")
+ fname = "log-%d" % ccnd
+ path = os.path.join(dirpath, fname)
+ f = open(path, "w")
+ f.write(stdout)
+ f.close()
# shutdown the experiment controller
ec.shutdown()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
class TraceAttr:
- """ Class representing the different attributes
- that can characterized a trace.
-
+ """ Trace attributes represent different information
+ aspects that can be retrieved from a trace.
"""
ALL = 'all'
STREAM = 'stream'
from nepi.util.timefuncs import strfnow, strfdiff
import os
+import subprocess
# TODO: Resolve wildcards in commands!!
+# TODO: compare_hash for all files that are uploaded!
@clsinit
def _register_attributes(cls):
command = Attribute("command", "Command to execute",
flags = Flags.ExecReadOnly)
- forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
+ forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
flags = Flags.ExecReadOnly)
env = Attribute("env", "Environment variables string for command execution",
flags = Flags.ExecReadOnly)
dst = os.path.join(self.app_home, "stdin")
- # TODO:
- # Check wether file already exists and if it exists
- # wether the file we want to upload is the same
- # (using md5sum)
+ # If what we are uploading is a file, check whether
+ # the same file already exists (using md5sum)
+ if self.compare_hash(stdin, dst):
+ return
self.node.upload(stdin, dst, text = True)
.replace("${NODE_HOME}", absolute_dir(self.node.node_home))
.replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
)
-
+
+ def compare_hash(self, local, remote):
+ # getting md5sum from remote file
+ (out, err), proc = self.node.execute("md5sum %s " % remote)
+
+ if proc.poll() == 0: #OK
+ if not os.path.isfile(local):
+ # store to a tmp file
+ f = tempfile.NamedTemporaryFile()
+ f.write(local)
+ f.flush()
+ local = f.name
+
+ lproc = subprocess.Popen(["md5sum", local],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ # getting md5sum from local file
+ (lout, lerr) = lproc.communicate()
+
+ # files are the same, no need to upload
+ lchk = lout.strip().split(" ")[0]
+ rchk = out.strip().split(" ")[0]
+
+ msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
+ local, lchk, remote, rchk)
+ self.debug(msg)
+
+ if lchk == rchk:
+ return True
+
+ return False
+
def valid_connection(self, guid):
# TODO: Validate!
return True
@clsinit_copy
class PlanetlabNode(LinuxNode):
- _rtype = "PlanetLabNode"
+ _rtype = "PlanetlabNode"
@classmethod
def _register_attributes(cls):
return self._os
- @property
- def localhost(self):
- return False
-
- def discover(self):
- # Get the list of nodes that match the filters
-
-
- # find one that
- if not self.is_alive():
- self._state = ResourceState.FAILED
- msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
- self.error(msg)
- raise RuntimeError, msg
-
- if self.get("cleanProcesses"):
- self.clean_processes()
-
- if self.get("cleanHome"):
- self.clean_home()
-
- self.mkdir(self.node_home)
-
- super(PlanetlabNode, self).discover()
-
def provision(self):
if not self.is_alive():
self._state = ResourceState.FAILED
out = err = ""
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
- def is_alive(self):
- if self.localhost:
- return True
-
- out = err = ""
- try:
- # TODO: FIX NOT ALIVE!!!!
- (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
- with_lock = True)
- except:
- import traceback
- trace = traceback.format_exc()
- msg = "Unresponsive host %s " % err
- self.error(msg, out, trace)
- return False
-
- if out.strip().startswith('ALIVE'):
- return True
- else:
- msg = "Unresponsive host "
- self.error(msg, out, err)
- return False
-
def blacklist(self):
# TODO!!!!
self.warn(" Blacklisting malfunctioning node ")