PlanetLab support toon-up: home_cleanup only nepi folders + make server support longe...
[nepi.git] / examples / streamming / planetlab_ccn_vlc.py
1 #!/usr/bin/env python
2
3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
7 import os
8 import tempfile
9 import time
10 import uuid
11
12 # Trak SIGTERM, and set global termination flag instead of dying
13 import signal
14 TERMINATE = []
15 def _finalize(sig,frame):
16     global TERMINATE
17     TERMINATE.append(None)
18 signal.signal(signal.SIGTERM, _finalize)
19 signal.signal(signal.SIGINT, _finalize)
20
21 class MonitorInfo(object):
22     TYPE_ROOT = "root"
23     TYPE_MID  = "middle"
24     TYPE_LEAF = "leaf"
25
26     def __init__(self, hostname, type):
27         self.hostname = hostname
28         self.type = type
29         self.cpumem_monitor = None
30         self.net_monitor = None
31         self.ccnd = None
32         self.ccncat = None
33         self.ccnseqwriter = None
34
35 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd, 
36         pl_ssh_key, root_dir):
37     pl_provider = FactoriesProvider("planetlab")
38     slice_desc = exp_desc.add_testbed_description(pl_provider)
39     slice_desc.set_attribute_value("homeDirectory", root_dir)
40     slice_desc.set_attribute_value("slice", slicename)
41     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
42     slice_desc.set_attribute_value("authUser", pl_user)
43     slice_desc.set_attribute_value("authPass", pl_pwd)
44     slice_desc.set_attribute_value("plcHost", plc_host)
45     # Kills all running processes before starting the experiment
46     slice_desc.set_attribute_value("cleanProc", True)
47     # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
48     # folders in the sliver Home directory, including result files!
49     slice_desc.set_attribute_value("cleanHome", True)
50     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
51     return slice_desc
52  
53 def create_node(hostname, pl_inet, slice_desc):
54     pl_node = slice_desc.create("Node")
55     pl_node.set_attribute_value("hostname", hostname)
56     pl_node.set_attribute_value("label", "%d" % pl_node.guid)
57     pl_node.set_attribute_value("operatingSystem", "f12")
58     pl_iface = slice_desc.create("NodeInterface")
59     pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
60     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
61     pl_node.connector("devs").connect(pl_iface.connector("node"))
62     return pl_node, pl_iface
63
64 def create_ccnd(pl_node, slice_desc, pl_ifaces, port):
65     pl_app = slice_desc.create("CCNxDaemon")
66     pl_app.set_attribute_value("ccnxVersion", "ccnx-0.5.1")
67     
68     # We use a wildcard to replace the public IP address of the node during runtime,
69     # once this IP is known
70     routes = "|".join(map(lambda pl_iface: "udp {#[%s].addr[0].[Address]#}" % 
71         pl_iface.get_attribute_value("label"), pl_ifaces))
72     
73     # Add unicast ccn routes 
74     pl_app.set_attribute_value("ccnRoutes", routes)
75
76     # Use a specific port to bind the CCNx daemon
77     if port:
78         pl_app.set_attribute_value("ccnLocalPort", port)
79
80     pl_app.enable_trace("stdout")
81     pl_app.enable_trace("stderr")
82     pl_app.connector("node").connect(pl_node.connector("apps"))
83     return pl_app
84
85 def create_ccnpush(movie, pl_node, slice_desc, port):
86     pl_app = slice_desc.create("Application")
87     pl_app.set_attribute_value("stdin", movie)
88
89     command = "ccnseqwriter ccnx:/VIDEO"
90     if port:
91         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
92
93     pl_app.set_attribute_value("command", command)
94
95     pl_app.enable_trace("stdout")
96     pl_app.enable_trace("stderr")
97     pl_app.connector("node").connect(pl_node.connector("apps"))
98     return pl_app
99
100 def create_ccnpull(pl_node, slice_desc, port):
101     pl_app = slice_desc.create("Application")
102     pl_app.set_attribute_value("rpmFusion", True)
103     pl_app.set_attribute_value("depends", "vlc")
104
105     command = " sudo -S dbus-uuidgen --ensure ; while true ; do ccncat ccnx:/VIDEO"
106     if port:
107         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
108
109     command += " | vlc -I dummy - vlc://quit > /dev/null ; done"
110     pl_app.set_attribute_value("command", command)
111     
112     pl_app.enable_trace("stdout")
113     pl_app.enable_trace("stderr")
114     pl_app.connector("node").connect(pl_node.connector("apps"))
115     return pl_app
116
117 def create_cpumem_monitor(pl_node, slice_desc):
118     label = "%d_cpumem" % pl_node.guid
119     pl_app = slice_desc.create("Application")
120     pl_app.set_attribute_value("label", label)
121     pl_app.set_attribute_value("command", 
122             "while true; do echo $(date +%Y%m%d%H%M%S%z) "\
123             " $(top -b -n 1 |  grep 'bash\|python' | sed 's/\s\s*/ /g' | "\
124             " sed 's/^\s//g' | cut -d' ' -f9,10,11 | awk '{ sum1 +=$1; sum2 += $2; } "\
125             " END {printf \"%2.1f %2.1f 0:00.00\", sum1, sum2;}'); sleep 1 ; done ")
126
127     pl_app.enable_trace("stdout")
128     pl_app.enable_trace("stderr")
129     pl_node.connector("apps").connect(pl_app.connector("node"))
130     return pl_app
131
132 def create_net_monitor(pl_node, slice_desc, pl_ifaces):
133     label = "%d_net" % pl_node.guid
134     hosts = " or ".join(map(lambda pl_iface: " ( host {#[%s].addr[0].[Address]#} ) " % 
135         pl_iface.get_attribute_value("label"), pl_ifaces))
136     pl_app = slice_desc.create("Application")
137     pl_app.set_attribute_value("label", label)
138     pl_app.set_attribute_value("rpmFusion", True)
139     pl_app.set_attribute_value("sudo", True)
140     pl_app.set_attribute_value("depends", "tcpdump pv")
141     pl_app.set_attribute_value("command", 
142             "tcpdump -l -i eth0 -nNqttf '(%s)' -w - | pv -fbt >/dev/null 2>>{#[%s].trace[stdout].[name]#}" %
143             (hosts, label))
144     pl_app.enable_trace("stdout")
145     pl_app.enable_trace("stderr")
146     pl_node.connector("apps").connect(pl_app.connector("node"))
147     return pl_app
148
149 def store_results(controller, monitors, results_dir, exp_label):
150     # create results directory for experiment
151     root_path = os.path.join(results_dir, exp_label)
152
153     print "STORING RESULTS in ", root_path
154
155     try:
156         os.makedirs(root_path)
157     except OSError:
158         pass
159
160     # collect information on nodes
161     hosts_info = ""
162
163     for mon in monitors:
164         hosts_info += "%s %s\n" % (mon.hostname, mon.type)
165
166         # create a subdir per hostname
167         node_path = os.path.join(root_path, mon.hostname)
168         try:
169             os.makedirs(node_path)
170         except OSError:
171             pass
172
173         # store monitoring results
174         cpumem_stdout = controller.trace(mon.cpumem_monitor.guid, "stdout")
175         net_stdout = controller.trace(mon.net_monitor.guid, "stdout")
176         ccnd_error = controller.trace(mon.ccnd.guid, "stderr")
177
178         ccncat_error = None
179         if mon.ccncat:
180             ccncat_error = controller.trace(mon.ccncat.guid, "stderr")
181
182         ccnseqwriter_error = None
183         if mon.ccnseqwriter:
184             ccnseqwriter_error = controller.trace(mon.ccnseqwriter.guid, "stderr")
185         
186         results = dict({"cpumem": cpumem_stdout, "net": net_stdout, 
187             "ccnd_stderr": ccnd_error, "ccncat_stderr": ccncat_error,
188             "ccnseqwriter_stderr": ccnseqwriter_error })
189
190         for name, result in results.iteritems():
191             if not result:
192                 continue
193
194             fpath = os.path.join(node_path, name)
195             f = open(fpath, "w")
196             f.write(result)
197             f.close()
198
199     # store node info file
200     fpath = os.path.join(root_path, "hosts")
201     f = open(fpath, "w")
202     f.write(hosts_info)
203     f.close()
204
205 def get_options():
206     slicename = os.environ.get("PL_SLICE")
207     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
208     pl_ssh_key = os.environ.get(
209         "PL_SSH_KEY",
210         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
211     pl_user = os.environ.get('PL_USER')
212     pl_pwd = os.environ.get('PL_PASS')
213     exp_label = "%s" % uuid.uuid4()
214
215     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
216 -m <movie> -p <pl_password> -r <results-dir> -l <experiment-label> \
217 -P <ccnd-port>"
218
219     parser = OptionParser(usage=usage)
220     parser.add_option("-s", "--slicename", dest="slicename", 
221             help="PlanetLab slicename", default=slicename, type="str")
222     parser.add_option("-H", "--pl-host", dest="pl_host", 
223             help="PlanetLab site (e.g. www.planet-lab.eu)", 
224             default=pl_host, type="str")
225     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
226             help="Path to private ssh key used for PlanetLab authentication", 
227             default=pl_ssh_key, type="str")
228     parser.add_option("-u", "--pl-user", dest="pl_user", 
229             help="PlanetLab account user (i.e. Registration email address)", 
230             default=pl_user, type="str")
231     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
232             help="PlanetLab account password", default=pl_pwd, type="str")
233     parser.add_option("-m", "--movie", dest="movie", 
234             help="Stream movie", type="str")
235     parser.add_option("-r", "--results", dest="results_dir", default = "/tmp", 
236             help="Path to directory to store results", type="str")
237     parser.add_option("-l", "--label", dest="exp_label", default = exp_label, 
238             help="Label to identify experiment results", type="str")
239     parser.add_option("-t", "--time", dest="time_to_run", default = 1, 
240             help="Time to run the experiment in hours", type="float")
241     parser.add_option("-P", "--port", dest="port", 
242             help="Port to bind the CCNx daemon", type="int")
243
244     (options, args) = parser.parse_args()
245
246     if not options.movie:
247         parser.error("movie is a required argument")
248
249     return (options.slicename, options.pl_host, options.pl_user, 
250             options.pl_pwd, options.pl_ssh_key, options.movie,
251             options.results_dir, options.exp_label, options.time_to_run,
252             options.port)
253
254 if __name__ == '__main__':
255     root_dir = tempfile.mkdtemp()
256     (pl_slice, 
257             pl_host, 
258             pl_user, 
259             pl_pwd, 
260             pl_ssh_key, 
261             movie, 
262             results_dir,
263             exp_label,
264             time_to_run,
265             port) = get_options()
266
267     # list to store information on monitoring apps per node
268     monitors = []
269     
270     # Create the experiment description object
271     exp_desc = ExperimentDescription()
272
273     # Create slice
274     slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
275         pl_ssh_key, root_dir)
276    
277     # Create the Internet box object
278     pl_inet = slice_desc.create("Internet")
279
280     ### Level 0 - Root node
281     root_hostname = "ple6.ipv6.lip6.fr"
282     (root_node, root_iface) = create_node(root_hostname, pl_inet, slice_desc)
283
284     ### Level 1 - Intermediate nodes
285     l1_hostnames = dict()
286     l1_hostnames["fi"] = "planetlab-1.research.netlab.hut.fi"
287     l1_hostnames["se"] = "planetlab2.sics.se"
288     l1_hostnames["es"] = "planetlab1.um.es"
289     l1_hostnames["pt"] = "planetlab-um10.di.uminho.pt"
290     l1_hostnames["pl"] = "pandora.we.po.opole.pl"
291     l1_hostnames["it"] = "gschembra4.diit.unict.it"
292     l1_hostnames["de"] = "planetlab2.wiwi.hu-berlin.de"
293     l1_hostnames["fr"] = "planetlab1.u-strasbg.fr"
294     l1_hostnames["gr"] = "planetlab1.ics.forth.gr"
295     l1_hostnames["ch"] = "planetlab2.unineuchatel.ch"
296     l1_ifaces = dict()
297     l1_nodes = dict()
298     
299     for country, hostname in l1_hostnames.iteritems():
300         pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
301         l1_ifaces[country] = pl_iface
302         l1_nodes[country] = pl_node
303
304     ### Level 0 - CCN & Monitoring
305     
306     # Add CCN Daemon to root node
307     ifaces = l1_ifaces.values()
308     root_ccnd = create_ccnd(root_node, slice_desc, ifaces, port)
309
310     # Publish video in root node
311     root_ccnseqwriter = create_ccnpush(movie, root_node, slice_desc, port)
312
313     # Create monitor info object for root node
314     root_mon = MonitorInfo(root_hostname, MonitorInfo.TYPE_ROOT)
315     monitors.append(root_mon)
316    
317     # Add memory and cpu monitoring for root node
318     root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
319     root_mon.net_monitor = create_net_monitor(root_node, slice_desc, ifaces)
320     root_mon.ccnd = root_ccnd
321     root_mon.ccnseqwriter = root_ccnseqwriter
322
323     ### Level 2 - Leaf nodes
324     l2_hostnames = dict()
325     l2_hostnames["fi"] = ["planetlab1.rd.tut.fi",]
326     l2_hostnames["se"] = ["planetlab1.s3.kth.se",]
327     l2_hostnames["es"] = ["planetlab1.tlm.unavarra.es",]
328     l2_hostnames["pt"] = ["planet1.servers.ua.pt",]
329     l2_hostnames["pl"] = ["onelab3.warsaw.rd.tp.pl",]
330     l2_hostnames["it"] = ["gschembra3.diit.unict.it",]
331     l2_hostnames["de"] = ["iraplab1.iralab.uni-karlsruhe.de",]
332     l2_hostnames["fr"] = ["host3-plb.loria.fr",]
333     l2_hostnames["gr"] = ["kostis.di.uoa.gr",]
334     l2_hostnames["ch"] = ["planetlab04.cnds.unibe.ch",]
335
336     for country, hostnames in l2_hostnames.iteritems():
337         l2_ifaces = []
338         l1_hostname = l1_hostnames[country]
339         l1_iface = l1_ifaces[country]
340         l1_node = l1_nodes[country]
341         
342         for hostname in hostnames:
343             pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
344             l2_ifaces.append(pl_iface)
345
346             ### Level 2 - CCN & Monitoring
347         
348             # Add CCN Daemon to intermediate nodes
349             ccnd = create_ccnd(pl_node, slice_desc, [l1_iface], port)
350
351             # Retrieve video in leaf node
352             ccncat = create_ccnpull(pl_node, slice_desc, port)
353
354             # Create monitor info object for intermediate nodes
355             mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
356             monitors.append(mon)
357        
358             # Add memory and cpu monitoring for intermediate nodes
359             mon.cpumem_monitor = create_cpumem_monitor(pl_node, slice_desc)
360             mon.net_monitor = create_net_monitor(pl_node, slice_desc, [l1_iface])
361             mon.ccnd = ccnd
362             mon.ccncat = ccncat
363
364         ### Level 1 - CCN & Monitoring
365
366         ifaces = [root_iface]
367         ifaces.extend(l2_ifaces)
368
369         # Add CCN Daemon to intermediate nodes
370         ccnd = create_ccnd(l1_node, slice_desc, ifaces, port)
371
372         # Create monitor info object for intermediate nodes
373         mon = MonitorInfo(l1_hostname, MonitorInfo.TYPE_MID)
374         monitors.append(mon)
375        
376         # Add memory and cpu monitoring for intermediate nodes
377         mon.cpumem_monitor = create_cpumem_monitor(l1_node, slice_desc)
378         mon.net_monitor = create_net_monitor(l1_node, slice_desc, ifaces)
379         mon.ccnd = ccnd
380
381     xml = exp_desc.to_xml()
382    
383     controller = ExperimentController(xml, root_dir)
384     controller.start()
385
386     start_time = time.time()
387     duration = time_to_run * 3600 # in seconds
388     while not TERMINATE:
389         time.sleep(1)
390         if (time.time() - start_time) > duration: # elapsed time
391             TERMINATE.append(None)
392
393     controller.stop()
394  
395     # store results in results dir
396     store_results(controller, monitors, results_dir, exp_label)
397    
398     controller.shutdown()
399