adding ccnx tree topology streamming over Internet example
[nepi.git] / examples / streamming / planetlab_ccn_vlc.py
1 #!/usr/bin/env python
2
3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
7 import os
8 import tempfile
9 import time
10 import uuid
11
12 # Trak SIGTERM, and set global termination flag instead of dying
13 import signal
14 TERMINATE = []
15 def _finalize(sig,frame):
16     global TERMINATE
17     TERMINATE.append(None)
18 signal.signal(signal.SIGTERM, _finalize)
19 signal.signal(signal.SIGINT, _finalize)
20
21 class MonitorInfo(object):
22     TYPE_ROOT = "root"
23     TYPE_MID  = "middle"
24     TYPE_LEAF = "leaf"
25
26     def __init__(self, hostname, type):
27         self.hostname = hostname
28         self.type = type
29         self.cpumem_monitor = None
30         self.net_monitor = None
31
32 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd, 
33         pl_ssh_key, root_dir):
34     pl_provider = FactoriesProvider("planetlab")
35     slice_desc = exp_desc.add_testbed_description(pl_provider)
36     slice_desc.set_attribute_value("homeDirectory", root_dir)
37     slice_desc.set_attribute_value("slice", slicename)
38     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
39     slice_desc.set_attribute_value("authUser", pl_user)
40     slice_desc.set_attribute_value("authPass", pl_pwd)
41     slice_desc.set_attribute_value("plcHost", plc_host)
42     # Kills all running processes before starting the experiment
43     slice_desc.set_attribute_value("cleanProc", True)
44     # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
45     # folders in the sliver Home directory, including result files!
46     slice_desc.set_attribute_value("cleanHome", True)
47     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
48     return slice_desc
49  
50 def create_node(hostname, pl_inet, slice_desc):
51     pl_node = slice_desc.create("Node")
52     pl_node.set_attribute_value("hostname", hostname)
53     pl_node.set_attribute_value("label", "%d" % pl_node.guid)
54     pl_node.set_attribute_value("operatingSystem", "f12")
55     pl_iface = slice_desc.create("NodeInterface")
56     pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
57     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
58     pl_node.connector("devs").connect(pl_iface.connector("node"))
59     return pl_node, pl_iface
60
61 def create_ccnd(pl_node, slice_desc, pl_ifaces, port):
62     pl_app = slice_desc.create("CCNxDaemon")
63     pl_app.set_attribute_value("ccnxVersion", "ccnx-0.5.1")
64     
65     # We use a wildcard to replace the public IP address of the node during runtime,
66     # once this IP is known
67     routes = "|".join(map(lambda pl_iface: "udp {#[%s].addr[0].[Address]#}" % 
68         pl_iface.get_attribute_value("label"), pl_ifaces))
69     
70     # Add unicast ccn routes 
71     pl_app.set_attribute_value("ccnRoutes", routes)
72
73     # Use a specific port to bind the CCNx daemon
74     if port:
75         pl_app.set_attribute_value("ccnLocalPort", port)
76
77     pl_app.enable_trace("stdout")
78     pl_app.enable_trace("stderr")
79     pl_app.connector("node").connect(pl_node.connector("apps"))
80     return pl_app
81
82 def create_ccnpush(movie, pl_node, slice_desc, port):
83     pl_app = slice_desc.create("Application")
84     pl_app.set_attribute_value("stdin", movie)
85
86     command = "ccnseqwriter ccnx:/VIDEO"
87     if port:
88         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
89
90     pl_app.set_attribute_value("command", command)
91
92     pl_app.enable_trace("stdout")
93     pl_app.enable_trace("stderr")
94     pl_app.connector("node").connect(pl_node.connector("apps"))
95     return pl_app
96
97 def create_ccnpull(pl_node, slice_desc, port):
98     pl_app = slice_desc.create("Application")
99     pl_app.set_attribute_value("rpmFusion", True)
100     pl_app.set_attribute_value("depends", "vlc")
101
102     command = " sudo -S dbus-uuidgen --ensure ; while true ; do ccncat ccnx:/VIDEO"
103     if port:
104         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
105
106     command += " | vlc -I dummy - vlc://quit > /dev/null ; done"
107     pl_app.set_attribute_value("command", command)
108     
109     pl_app.enable_trace("stdout")
110     pl_app.enable_trace("stderr")
111     pl_app.connector("node").connect(pl_node.connector("apps"))
112     return pl_app
113
114 def create_cpumem_monitor(pl_node, slice_desc):
115     label = "%d_cpumem" % pl_node.guid
116     pl_app = slice_desc.create("Application")
117     pl_app.set_attribute_value("label", label)
118     pl_app.set_attribute_value("command", 
119             "while true; do echo $(date +%Y%m%d%H%M%S%z) "\
120             " $(top -b -n 1 |  grep 'bash\|python' | sed 's/\s\s*/ /g' | "\
121             " sed 's/^\s//g' | cut -d' ' -f9,10,11 | awk '{ sum1 +=$1; sum2 += $2; } "\
122             " END {printf \"%2.1f %2.1f 0:00.00\", sum1, sum2;}'); sleep 1 ; done ")
123
124     pl_app.enable_trace("stdout")
125     pl_app.enable_trace("stderr")
126     pl_node.connector("apps").connect(pl_app.connector("node"))
127     return pl_app
128
129 def create_net_monitor(pl_node, slice_desc, pl_ifaces):
130     label = "%d_net" % pl_node.guid
131     hosts = " or ".join(map(lambda pl_iface: " ( host {#[%s].addr[0].[Address]#} ) " % 
132         pl_iface.get_attribute_value("label"), pl_ifaces))
133     pl_app = slice_desc.create("Application")
134     pl_app.set_attribute_value("label", label)
135     pl_app.set_attribute_value("rpmFusion", True)
136     pl_app.set_attribute_value("sudo", True)
137     pl_app.set_attribute_value("depends", "tcpdump pv")
138     pl_app.set_attribute_value("command", 
139             "tcpdump -l -i eth0 -nNqttf '(%s)' -w - | pv -fbt >/dev/null 2>>{#[%s].trace[stdout].[name]#}" %
140             (hosts, label))
141     pl_app.enable_trace("stdout")
142     pl_app.enable_trace("stderr")
143     pl_node.connector("apps").connect(pl_app.connector("node"))
144     return pl_app
145
146 def store_results(controller, monitors, results_dir, exp_label):
147     # create results directory for experiment
148     root_path = os.path.join(results_dir, exp_label)
149
150     print "STORING RESULTS in ", root_path
151
152     try:
153         os.makedirs(root_path)
154     except OSError:
155         pass
156
157     # collect information on nodes
158     hosts_info = ""
159
160     for mon in monitors:
161         hosts_info += "%s %s\n" % (mon.hostname, mon.type)
162
163         # create a subdir per hostname
164         node_path = os.path.join(root_path, mon.hostname)
165         try:
166             os.makedirs(node_path)
167         except OSError:
168             pass
169
170         # store monitoring results
171         cpumem_stdout = controller.trace(mon.cpumem_monitor.guid, "stdout")
172         net_stdout = controller.trace(mon.net_monitor.guid, "stdout")
173         results = dict({"cpumem": cpumem_stdout, "net": net_stdout})
174         for name, stdout in results.iteritems():
175             fpath = os.path.join(node_path, name)
176             f = open(fpath, "w")
177             f.write(stdout)
178             f.close()
179
180     # store node info file
181     fpath = os.path.join(root_path, "hosts")
182     f = open(fpath, "w")
183     f.write(hosts_info)
184     f.close()
185
186 def get_options():
187     slicename = os.environ.get("PL_SLICE")
188     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
189     pl_ssh_key = os.environ.get(
190         "PL_SSH_KEY",
191         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
192     pl_user = os.environ.get('PL_USER')
193     pl_pwd = os.environ.get('PL_PASS')
194     exp_label = "%s" % uuid.uuid4()
195
196     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
197 -m <movie> -p <pl_password> -r <results-dir> -l <experiment-label> \
198 -P <ccnd-port>"
199
200     parser = OptionParser(usage=usage)
201     parser.add_option("-s", "--slicename", dest="slicename", 
202             help="PlanetLab slicename", default=slicename, type="str")
203     parser.add_option("-H", "--pl-host", dest="pl_host", 
204             help="PlanetLab site (e.g. www.planet-lab.eu)", 
205             default=pl_host, type="str")
206     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
207             help="Path to private ssh key used for PlanetLab authentication", 
208             default=pl_ssh_key, type="str")
209     parser.add_option("-u", "--pl-user", dest="pl_user", 
210             help="PlanetLab account user (i.e. Registration email address)", 
211             default=pl_user, type="str")
212     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
213             help="PlanetLab account password", default=pl_pwd, type="str")
214     parser.add_option("-m", "--movie", dest="movie", 
215             help="Stream movie", type="str")
216     parser.add_option("-r", "--results", dest="results_dir", default = "/tmp", 
217             help="Path to directory to store results", type="str")
218     parser.add_option("-l", "--label", dest="exp_label", default = exp_label, 
219             help="Label to identify experiment results", type="str")
220     parser.add_option("-t", "--time", dest="time_to_run", default = 2, 
221             help="Time to run the experiment in hours", type="float")
222     parser.add_option("-P", "--port", dest="port", 
223             help="Port to bind the CCNx daemon", type="int")
224
225     (options, args) = parser.parse_args()
226
227     if not options.movie:
228         parser.error("movie is a required argument")
229
230     return (options.slicename, options.pl_host, options.pl_user, 
231             options.pl_pwd, options.pl_ssh_key, options.movie,
232             options.results_dir, options.exp_label, options.time_to_run,
233             options.port)
234
235 if __name__ == '__main__':
236     root_dir = tempfile.mkdtemp()
237     (pl_slice, 
238             pl_host, 
239             pl_user, 
240             pl_pwd, 
241             pl_ssh_key, 
242             movie, 
243             results_dir,
244             exp_label,
245             time_to_run,
246             port) = get_options()
247
248     # list to store information on monitoring apps per node
249     monitors = []
250     
251     # Create the experiment description object
252     exp_desc = ExperimentDescription()
253
254     # Create slice
255     slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
256         pl_ssh_key, root_dir)
257    
258     # Create the Internet box object
259     pl_inet = slice_desc.create("Internet")
260
261     ### Level 0 - Root node
262     root_hostname = "chimay.infonet.fundp.ac.be"
263     (root_node, root_iface) = create_node(root_hostname, pl_inet, slice_desc)
264
265     ### Level 1 - Intermediate nodes
266     l1_hostnames = dict()
267     l1_hostnames["uk"] = "planetlab4.cs.st-andrews.ac.uk"
268     l1_ifaces = dict()
269     l1_nodes = dict()
270     
271     for country, hostname in l1_hostnames.iteritems():
272         pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
273         l1_ifaces[country] = pl_iface
274         l1_nodes[country] = pl_node
275
276     ### Level 0 - CCN & Monitoring
277     
278     # Add CCN Daemon to root node
279     ifaces = l1_ifaces.values()
280     create_ccnd(root_node, slice_desc, ifaces, port)
281
282     # Publish video in root node
283     create_ccnpush(movie, root_node, slice_desc, port)
284
285     # Create monitor info object for root node
286     root_mon = MonitorInfo(root_hostname, MonitorInfo.TYPE_ROOT)
287     monitors.append(root_mon)
288    
289     # Add memory and cpu monitoring for root node
290     root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
291     root_mon.net_monitor = create_net_monitor(root_node, slice_desc, ifaces)
292
293     ### Level 2 - Leaf nodes
294     l2_hostnames = dict()
295     l2_hostnames["uk"] = ["planetlab-1.imperial.ac.uk",
296         "planetlab3.xeno.cl.cam.ac.uk",
297         "planetlab1.xeno.cl.cam.ac.uk"
298     ]
299     
300     for country, hostnames in l2_hostnames.iteritems():
301         l2_ifaces = []
302         l1_hostname = l1_hostnames[country]
303         l1_iface = l1_ifaces[country]
304         l1_node = l1_nodes[country]
305         
306         for hostname in hostnames:
307             pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
308             l2_ifaces.append(pl_iface)
309
310             ### Level 2 - CCN & Monitoring
311         
312             # Add CCN Daemon to intermediate nodes
313             create_ccnd(pl_node, slice_desc, [l1_iface], port)
314
315             # Retrieve video in leaf node
316             create_ccnpull(pl_node, slice_desc, port)
317
318             # Create monitor info object for intermediate nodes
319             mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
320             monitors.append(mon)
321        
322             # Add memory and cpu monitoring for intermediate nodes
323             mon.cpumem_monitor = create_cpumem_monitor(pl_node, slice_desc)
324             mon.net_monitor = create_net_monitor(pl_node, slice_desc, [l1_iface])
325
326         ### Level 1 - CCN & Monitoring
327
328         ifaces = [root_iface]
329         ifaces.extend(l2_ifaces)
330
331         # Add CCN Daemon to intermediate nodes
332         create_ccnd(l1_node, slice_desc, ifaces, port)
333
334         # Create monitor info object for intermediate nodes
335         mon = MonitorInfo(l1_hostname, MonitorInfo.TYPE_MID)
336         monitors.append(mon)
337        
338         # Add memory and cpu monitoring for intermediate nodes
339         mon.cpumem_monitor = create_cpumem_monitor(l1_node, slice_desc)
340         mon.net_monitor = create_net_monitor(l1_node, slice_desc, ifaces)
341
342     xml = exp_desc.to_xml()
343    
344     controller = ExperimentController(xml, root_dir)
345     controller.start()
346
347     start_time = time.time()
348     duration = time_to_run * 3600 # in seconds
349     while not TERMINATE:
350         time.sleep(1)
351         #if (time.time() - start_time) > duration: # elapsed time
352         #    TERMINATE.append(None)
353
354     controller.stop()
355  
356     # store results in results dir
357     store_results(controller, monitors, results_dir, exp_label)
358    
359     controller.shutdown()
360