Adding ICN PlanetLab large experiment scenarios
[nepi.git] / examples / streaming / ccn_broadcast.py
1 #!/usr/bin/env python
2
3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
7 import os
8 import tempfile
9 import time
10 import uuid
11
12 # Trak SIGTERM, and set global termination flag instead of dying
13 import signal
14 TERMINATE = []
15 def _finalize(sig,frame):
16     global TERMINATE
17     TERMINATE.append(None)
18 signal.signal(signal.SIGTERM, _finalize)
19 signal.signal(signal.SIGINT, _finalize)
20
21 class MonitorInfo(object):
22     TYPE_ROOT = "root"
23     TYPE_MID  = "middle"
24     TYPE_LEAF = "leaf"
25
26     def __init__(self, hostname, type):
27         self.hostname = hostname
28         self.type = type
29         self.cpumem_monitor = None
30         self.net_out_monitor = None
31         self.net_in_monitor = None
32         self.ccnd = None
33         self.ccncat = None
34         self.ccnseqwriter = None
35
36 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd, 
37         pl_ssh_key, root_dir):
38     pl_provider = FactoriesProvider("planetlab")
39     slice_desc = exp_desc.add_testbed_description(pl_provider)
40     slice_desc.set_attribute_value("homeDirectory", root_dir)
41     slice_desc.set_attribute_value("slice", slicename)
42     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
43     slice_desc.set_attribute_value("authUser", pl_user)
44     slice_desc.set_attribute_value("authPass", pl_pwd)
45     slice_desc.set_attribute_value("plcHost", plc_host)
46     # Kills all running processes before starting the experiment
47     slice_desc.set_attribute_value("cleanProc", True)
48     # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
49     # folders in the sliver Home directory, including result files!
50     slice_desc.set_attribute_value("cleanHome", True)
51     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
52     return slice_desc
53  
54 def create_node(hostname, pl_inet, slice_desc):
55     pl_node = slice_desc.create("Node")
56     pl_node.set_attribute_value("hostname", hostname)
57     pl_node.set_attribute_value("label", "%d" % pl_node.guid)
58     pl_node.set_attribute_value("operatingSystem", "f12")
59     pl_iface = slice_desc.create("NodeInterface")
60     pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
61     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
62     pl_node.connector("devs").connect(pl_iface.connector("node"))
63     return pl_node, pl_iface
64
65 def create_ccnd(pl_node, slice_desc, pl_ifaces, port):
66     pl_app = slice_desc.create("CCNxDaemon")
67     pl_app.set_attribute_value("ccnxVersion", "0.7.1")
68     pl_app.set_attribute_value("repository", True)
69     
70     # We use a wildcard to replace the public IP address of the node during runtime,
71     # once this IP is known
72     routes = "|".join(map(lambda pl_iface: "ccnx:/ udp {#[%s].addr[0].[Address]#}" % 
73         pl_iface.get_attribute_value("label"), pl_ifaces))
74     
75     # Add unicast ccn routes 
76     pl_app.set_attribute_value("ccnRoutes", routes)
77
78     # Use a specific port to bind the CCNx daemon
79     if port:
80         pl_app.set_attribute_value("ccnLocalPort", port)
81
82     pl_app.enable_trace("stdout")
83     pl_app.enable_trace("stderr")
84     pl_app.connector("node").connect(pl_node.connector("apps"))
85     return pl_app
86
87 def create_ccnpush(movie, pl_node, slice_desc, port):
88     pl_app = slice_desc.create("Application")
89     pl_app.set_attribute_value("stdin", movie)
90
91     command = "ccnseqwriter -r ccnx:/VIDEO"
92     if port:
93         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
94
95     pl_app.set_attribute_value("command", command)
96
97     pl_app.enable_trace("stdout")
98     pl_app.enable_trace("stderr")
99     pl_app.connector("node").connect(pl_node.connector("apps"))
100     return pl_app
101
102 def create_ccnpull(pl_node, slice_desc, port):
103     pl_app = slice_desc.create("Application")
104     pl_app.set_attribute_value("rpmFusion", True)
105     pl_app.set_attribute_value("depends", "vlc")
106
107     #command = " sudo -S dbus-uuidgen --ensure ; ccncat ccnx:/VIDEO"
108     command = " ccncat ccnx:/VIDEO"
109     if port:
110         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
111
112     #command += " | vlc -I dummy - vlc://quit > /dev/null "
113     command += " > /dev/null "
114     pl_app.set_attribute_value("command", command)
115     
116     pl_app.enable_trace("stdout")
117     pl_app.enable_trace("stderr")
118     pl_app.connector("node").connect(pl_node.connector("apps"))
119     return pl_app
120
121 def create_cpumem_monitor(pl_node, slice_desc):
122     label = "%d_cpumem" % pl_node.guid
123     pl_app = slice_desc.create("Application")
124     pl_app.set_attribute_value("label", label)
125     pl_app.set_attribute_value("command", 
126             "while true; do echo $(date +%Y%m%d%H%M%S%z) "\
127             " $(top -b -n 1 |  grep 'bash\|python' | sed 's/\s\s*/ /g' | "\
128             " sed 's/^\s//g' | cut -d' ' -f9,10,11 | awk '{ sum1 +=$1; sum2 += $2; } "\
129             " END {printf \"%2.1f %2.1f 0:00.00\", sum1, sum2;}'); sleep 1 ; done ")
130
131     pl_app.enable_trace("stdout")
132     pl_app.enable_trace("stderr")
133     pl_node.connector("apps").connect(pl_app.connector("node"))
134     return pl_app
135
136 def create_net_monitor(pl_node, slice_desc, pl_ifaces, lblprefix = "any", pcap=False):
137     label = "%d_%s_net" % (pl_node.guid, lblprefix)
138     hosts = " or ".join(map(lambda pl_iface: " ( host {#[%s].addr[0].[Address]#} ) " % 
139         pl_iface.get_attribute_value("label"), pl_ifaces))
140     pl_app = slice_desc.create("Application")
141     pl_app.set_attribute_value("label", label)
142     pl_app.set_attribute_value("rpmFusion", True)
143     pl_app.set_attribute_value("sudo", True)
144     pl_app.set_attribute_value("depends", "tcpdump pv")
145
146     output = "/dev/null"
147     if pcap:
148         output = "{#[%s].trace[output].[name]#}" % label
149
150     pl_app.set_attribute_value("command", 
151             "tcpdump -l -i eth0 -s 0 -f '(%s)' -w - | pv -fbt >%s 2>>{#[%s].trace[stdout].[name]#}" %
152             (hosts, output, label))
153
154     if pcap:
155         pl_app.enable_trace("output")
156     
157     pl_app.enable_trace("stdout")
158     pl_app.enable_trace("stderr")
159     pl_node.connector("apps").connect(pl_app.connector("node"))
160     return pl_app
161
162 def store_results(controller, monitors, results_dir, exp_label):
163     # create results directory for experiment
164     root_path = os.path.join(results_dir, exp_label)
165
166     print "STORING RESULTS in ", root_path
167
168     try:
169         os.makedirs(root_path)
170     except OSError:
171         pass
172
173     # collect information on nodes
174     hosts_info = ""
175
176     for mon in monitors:
177         hosts_info += "%s %s\n" % (mon.hostname, mon.type)
178
179         # create a subdir per hostname
180         node_path = os.path.join(root_path, mon.hostname)
181         try:
182             os.makedirs(node_path)
183         except OSError:
184             pass
185
186         # store monitoring results
187         cpumem_out = controller.trace(mon.cpumem_monitor.guid, "stdout")
188         
189         net_in = None
190         if mon.net_in_monitor:
191             net_in = controller.trace(mon.net_in_monitor.guid, "stdout")
192         
193         net_out = None
194         if mon.net_out_monitor:
195             net_out = controller.trace(mon.net_out_monitor.guid, "stdout")
196
197         ccnd_err = controller.trace(mon.ccnd.guid, "stderr")
198         ccnd_out = controller.trace(mon.ccnd.guid, "stdout")
199         
200         ccncat_err = None
201         if mon.ccncat:
202             ccncat_err = controller.trace(mon.ccncat.guid, "stderr")
203
204         ccnseqwriter_err = None
205         if mon.ccnseqwriter:
206             ccnseqwriter_err = controller.trace(mon.ccnseqwriter.guid, "stderr")
207         
208         results = dict({
209             "cpumem": cpumem_out, 
210             "net_in": net_in, 
211             "net_out": net_out, 
212             "ccnd_err": ccnd_err, 
213             "ccnd_out": ccnd_out, 
214             "ccncat_err": ccncat_err,
215             "ccnseqwriter_err": ccnseqwriter_err })
216
217         for name, result in results.iteritems():
218             if not result:
219                 continue
220
221             fpath = os.path.join(node_path, name)
222             f = open(fpath, "w")
223             f.write(result)
224             f.close()
225
226     # store node info file
227     fpath = os.path.join(root_path, "hosts")
228     f = open(fpath, "w")
229     f.write(hosts_info)
230     f.close()
231
232 def get_options():
233     slicename = os.environ.get("PL_SLICE")
234     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
235     pl_ssh_key = os.environ.get(
236         "PL_SSH_KEY",
237         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
238     pl_user = os.environ.get('PL_USER')
239     pl_pwd = os.environ.get('PL_PASS')
240     exp_label = "%s" % uuid.uuid4()
241
242     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
243 -m <movie> -p <pl_password> -r <results-dir> -l <experiment-label> \
244 -P <ccnd-port>"
245
246     parser = OptionParser(usage=usage)
247     parser.add_option("-s", "--slicename", dest="slicename", 
248             help="PlanetLab slicename", default=slicename, type="str")
249     parser.add_option("-H", "--pl-host", dest="pl_host", 
250             help="PlanetLab site (e.g. www.planet-lab.eu)", 
251             default=pl_host, type="str")
252     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
253             help="Path to private ssh key used for PlanetLab authentication", 
254             default=pl_ssh_key, type="str")
255     parser.add_option("-u", "--pl-user", dest="pl_user", 
256             help="PlanetLab account user (i.e. Registration email address)", 
257             default=pl_user, type="str")
258     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
259             help="PlanetLab account password", default=pl_pwd, type="str")
260     parser.add_option("-m", "--movie", dest="movie", 
261             help="Stream movie", type="str")
262     parser.add_option("-r", "--results", dest="results_dir", default = "/tmp", 
263             help="Path to directory to store results", type="str")
264     parser.add_option("-l", "--label", dest="exp_label", default = exp_label, 
265             help="Label to identify experiment results", type="str")
266     parser.add_option("-t", "--time", dest="time_to_run", default = 20, 
267             help="Time to run the experiment in hours", type="float")
268     parser.add_option("-P", "--port", dest="port", 
269             help="Port to bind the CCNx daemon", type="int")
270
271     (options, args) = parser.parse_args()
272
273     if not options.movie:
274         parser.error("movie is a required argument")
275
276     return (options.slicename, options.pl_host, options.pl_user, 
277             options.pl_pwd, options.pl_ssh_key, options.movie,
278             options.results_dir, options.exp_label, options.time_to_run,
279             options.port)
280
281 if __name__ == '__main__':
282     root_dir = tempfile.mkdtemp()
283     (pl_slice, 
284             pl_host, 
285             pl_user, 
286             pl_pwd, 
287             pl_ssh_key, 
288             movie, 
289             results_dir,
290             exp_label,
291             time_to_run,
292             port) = get_options()
293
294     # list to store information on monitoring apps per node
295     monitors = []
296     
297     # Create the experiment description object
298     exp_desc = ExperimentDescription()
299
300     # Create slice
301     slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
302         pl_ssh_key, root_dir)
303    
304     # Create the Internet box object
305     pl_inet = slice_desc.create("Internet")
306
307     ### Level 0 - Root node
308     root_hostname = "ple6.ipv6.lip6.fr"
309     (root_node, root_iface) = create_node(root_hostname, pl_inet, slice_desc)
310
311     ### Level 1 - Intermediate nodes
312     l1_hostnames = dict()
313     l1_hostnames["fi"] = "planetlab-1.research.netlab.hut.fi"
314     l1_hostnames["se"] = "planetlab2.sics.se"
315     l1_hostnames["es"] = "planetlab1.um.es"
316     l1_hostnames["pt"] = "planetlab-um10.di.uminho.pt"
317     l1_hostnames["pl"] = "pandora.we.po.opole.pl"
318     l1_hostnames["it"] = "planetlab02.dis.unina.it"
319     l1_hostnames["de"] = "planetlab2.wiwi.hu-berlin.de"
320     l1_hostnames["fr"] = "planetlab2.u-strasbg.fr"
321     l1_hostnames["gr"] = "planetlab1.ics.forth.gr"
322     l1_hostnames["ch"] = "lsirextpc02.epfl.ch"
323     l1_hostnames["uk"] = "planetlab2.aston.ac.uk"
324     l1_hostnames["be"] = "planetlab1.extern.kuleuven.be"
325
326     l1_ifaces = dict()
327     l1_nodes = dict()
328     
329     for country, hostname in l1_hostnames.iteritems():
330         l1_node, l1_iface = create_node(hostname, pl_inet, slice_desc)
331         l1_ifaces[country] = l1_iface
332         l1_nodes[country] = l1_node
333
334     ### Level 0 - CCN & Monitoring
335     
336     # Add CCN Daemon to root node
337     out_ifaces = l1_ifaces.values()
338     root_ccnd = create_ccnd(root_node, slice_desc, out_ifaces, port)
339
340     # Publish video in root node
341     root_ccnseqwriter = create_ccnpush(movie, root_node, slice_desc, port)
342
343     # Create monitor info object for root node
344     root_mon = MonitorInfo(root_hostname, MonitorInfo.TYPE_ROOT)
345     monitors.append(root_mon)
346    
347     # Add memory and cpu monitoring for root node
348     root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
349     root_mon.net_out_monitor = create_net_monitor(root_node, slice_desc, 
350             out_ifaces, lblprefix = "out")
351     root_mon.ccnd = root_ccnd
352     root_mon.ccnseqwriter = root_ccnseqwriter
353
354     ### Level 2 - Leaf nodes
355     l2_hostnames = dict()
356     l2_hostnames["fi"] = ["planetlab1.rd.tut.fi",
357              "planetlab-2.research.netlab.hut.fi",
358              "planetlab2.willab.fi",
359              "planetlab3.hiit.fi",
360              "planetlab4.hiit.fi",
361              "planetlab1.willab.fi",
362     ]
363
364     l2_hostnames["se"] = ["planetlab1.s3.kth.se",
365              "itchy.comlab.bth.se",
366              "planetlab-1.ida.liu.se",
367              "scratchy.comlab.bth.se",
368              "planetlab2.s3.kth.se",
369              "planetlab1.sics.se",
370     ]
371
372     l2_hostnames["es"] = ["planetlab1.tlm.unavarra.es",
373              "planetlab2.uc3m.es",
374              "planetlab2.upc.es",
375              "ait21.us.es",
376              "planetlab3.upc.es",
377              "planetlab1.uc3m.es",
378              "planetlab2.dit.upm.es",
379              "planetlab1.upc.es",
380              "planetlab2.um.es",
381     ]
382
383     l2_hostnames["pt"] = ["planet1.servers.ua.pt",
384              "planetlab2.fct.ualg.pt",
385              "planetlab-1.tagus.ist.utl.pt",
386              "planetlab-2.tagus.ist.utl.pt",
387              "planetlab-um00.di.uminho.pt",
388              "planet2.servers.ua.pt",
389     ]
390
391     l2_hostnames["pl"] = ["planetlab1.mini.pw.edu.pl",
392              "roti.mimuw.edu.pl",
393              "planetlab1.ci.pwr.wroc.pl",
394              "planetlab1.pjwstk.edu.pl",
395              "ple2.tu.koszalin.pl",
396              "planetlab2.ci.pwr.wroc.pl",
397              "planetlab2.cyfronet.pl",
398              "plab2.ple.silweb.pl",
399              "planetlab1.cyfronet.pl",
400              "plab4.ple.silweb.pl",
401              "ple2.dmcs.p.lodz.pl",
402              "planetlab2.pjwstk.edu.pl",
403              "ple1.dmcs.p.lodz.pl",
404     ]
405
406     l2_hostnames["it"] = ["gschembra3.diit.unict.it",
407              "onelab6.iet.unipi.it",
408              "planetlab1.science.unitn.it",
409              "planetlab-1.ing.unimo.it",
410              "gschembra4.diit.unict.it",
411     ]
412
413     l2_hostnames["de"] = ["iraplab1.iralab.uni-karlsruhe.de",
414              "planetlab-1.fokus.fraunhofer.de",
415              "iraplab2.iralab.uni-karlsruhe.de",
416              "planet2.zib.de",
417              "pl2.uni-rostock.de",
418              "onelab-1.fhi-fokus.de",
419              "planet2.l3s.uni-hannover.de",
420              "planetlab1.exp-math.uni-essen.de",
421              "planetlab-2.fokus.fraunhofer.de",
422              "planetlab02.tkn.tu-berlin.de",
423              "planetlab1.informatik.uni-goettingen.de",
424              "planetlab1.informatik.uni-erlangen.de",
425              "planetlab2.exp-math.uni-essen.de",
426              "planetlab2.lkn.ei.tum.de",
427              "planetlab1.wiwi.hu-berlin.de",
428              "planet1.l3s.uni-hannover.de",
429              "planetlab1.informatik.uni-wuerzburg.de",
430               "planet1.zib.de",
431     ]
432
433     l2_hostnames["fr"] = ["host3-plb.loria.fr",
434              "inriarennes1.irisa.fr",
435              "inriarennes2.irisa.fr",
436              "peeramide.irisa.fr",
437              "pl1.bell-labs.fr",
438              "pl2.bell-labs.fr",
439              "host4-plb.loria.fr",
440              "planetlab-1.imag.fr",
441              "planetlab-2.imag.fr",
442              "ple2.ipv6.lip6.fr",
443              "planetlab1.u-strasbg.fr",
444     ]
445
446     l2_hostnames["gr"] = ["kostis.di.uoa.gr",
447              "planetlab1.ionio.gr",
448              "planetlab2.ionio.gr",
449              "planetlab2.cs.uoi.gr",
450              "stella.planetlab.ntua.gr",
451              "vicky.planetlab.ntua.gr",
452              "planetlab1.cs.uoi.gr",
453              "pl002.ece.upatras.gr",
454     ]
455
456     l2_hostnames["ch"] = ["planetlab04.cnds.unibe.ch",
457              "lsirextpc01.epfl.ch",
458              "planetlab2.csg.uzh.ch",
459              "planetlab1.csg.uzh.ch",
460              "planetlab-2.cs.unibas.ch",
461              "planetlab-1.cs.unibas.ch",
462     ]
463
464     l2_hostnames["uk"] = ["planetlab4.cs.st-andrews.ac.uk",
465              "planetlab-1.imperial.ac.uk",
466              "planetlab3.xeno.cl.cam.ac.uk",
467              "planetlab1.xeno.cl.cam.ac.uk",
468              "planetlab2.xeno.cl.cam.ac.uk",
469              "planetlab3.cs.st-andrews.ac.uk",
470              "planetlab1.aston.ac.uk",
471              "planetlab1.nrl.eecs.qmul.ac.uk",
472     ]
473
474     l2_hostnames["be"] = ["chimay.infonet.fundp.ac.be",
475              "orval.infonet.fundp.ac.be",
476              "rochefort.infonet.fundp.ac.be",
477              "planck227ple.test.ibbt.be",
478     ]
479     
480     for country, hostnames in l2_hostnames.iteritems():
481         l2_ifaces = []
482         l1_hostname = l1_hostnames[country]
483         l1_iface = l1_ifaces[country]
484         l1_node = l1_nodes[country]
485         
486         for hostname in hostnames:
487             l2_node, l2_iface = create_node(hostname, pl_inet, slice_desc)
488             l2_ifaces.append(l2_iface)
489
490             in_ifaces = [l1_iface]
491
492             ### Level 2 - CCN & Monitoring
493         
494             # Add CCN Daemon to intermediate nodes
495             ccnd = create_ccnd(l2_node, slice_desc, in_ifaces, port)
496
497             # Retrieve video in leaf node
498             ccncat = create_ccnpull(l2_node, slice_desc, port)
499
500             # Create monitor info object for intermediate nodes
501             mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
502             monitors.append(mon)
503        
504             # Add memory and cpu monitoring for intermediate nodes
505             mon.cpumem_monitor = create_cpumem_monitor(l2_node, slice_desc)
506             mon.net_in_monitor = create_net_monitor(l2_node, slice_desc, 
507                     in_ifaces, lblprefix = "in")
508             mon.ccnd = ccnd
509             mon.ccncat = ccncat
510
511         ### Level 1 - CCN & Monitoring
512
513         in_ifaces = [root_iface]
514         out_ifaces = l2_ifaces
515         all_ifaces = list(out_ifaces)
516         all_ifaces.extend(in_ifaces)
517
518         # Add CCN Daemon to intermediate nodes
519         ccnd = create_ccnd(l1_node, slice_desc, all_ifaces, port)
520
521         # Create monitor info object for intermediate nodes
522         mon = MonitorInfo(l1_hostname, MonitorInfo.TYPE_MID)
523         monitors.append(mon)
524        
525         # Add memory and cpu monitoring for intermediate nodes
526         mon.cpumem_monitor = create_cpumem_monitor(l1_node, slice_desc)
527         mon.net_in_monitor = create_net_monitor(l1_node, slice_desc, 
528                 in_ifaces, lblprefix="in")
529         mon.net_out_monitor = create_net_monitor(l1_node, slice_desc, 
530                 out_ifaces, lblprefix="out")
531         mon.ccnd = ccnd
532
533     xml = exp_desc.to_xml()
534    
535     controller = ExperimentController(xml, root_dir)
536     controller.start()
537
538     start_time = time.time()
539     duration = time_to_run * 60 # in seconds
540     while not TERMINATE:
541         time.sleep(1)
542         if (time.time() - start_time) > duration: # elapsed time
543             TERMINATE.append(None)
544
545     controller.stop()
546  
547     # store results in results dir
548     store_results(controller, monitors, results_dir, exp_label)
549    
550     controller.shutdown()
551