1067001bf6eaff51da4875ff009f705d40d95200
[nepi.git] / examples / streamming / planetlab_multiple_vlc.py
1 #!/usr/bin/env python
2
3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
7 import os
8 import tempfile
9 import time
10 import uuid
11
12 """
13 This experiment evaluates the consumption of computer resources when using
14 VLC for Internet broasdcasting using PlanetLab nodes as both server and clients. 
15 A root node (server) streams a broadcast in a loop, while the clients retrieve
16 the same video over and over until experiment run time is elapsed.
17
18 While the experiment is running cpu and memory usage, and the amount of bytes 
19 transmitted per stream are traced to files.
20
21 """
22
23
24 # Trak SIGTERM, and set global termination flag instead of dying
25 import signal
26 TERMINATE = []
27 def _finalize(sig,frame):
28     global TERMINATE
29     TERMINATE.append(None)
30 signal.signal(signal.SIGTERM, _finalize)
31 signal.signal(signal.SIGINT, _finalize)
32
33 class MonitorInfo(object):
34     TYPE_ROOT = "root"
35     TYPE_LEAF = "leaf"
36
37     def __init__(self, hostname, type):
38         self.hostname = hostname
39         self.type = type
40         self.cpumem_monitor = None
41         self.net_monitor = None
42
43 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd, 
44         pl_ssh_key, root_dir):
45     pl_provider = FactoriesProvider("planetlab")
46     slice_desc = exp_desc.add_testbed_description(pl_provider)
47     slice_desc.set_attribute_value("homeDirectory", root_dir)
48     slice_desc.set_attribute_value("slice", slicename)
49     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
50     slice_desc.set_attribute_value("authUser", pl_user)
51     slice_desc.set_attribute_value("authPass", pl_pwd)
52     slice_desc.set_attribute_value("plcHost", plc_host)
53     # Kills all running processes before starting the experiment
54     slice_desc.set_attribute_value("cleanProc", True)
55     # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
56     # folders in the sliver Home directory, including result files!
57     slice_desc.set_attribute_value("cleanHome", True)
58     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
59     return slice_desc
60  
61 def create_node(hostname, pl_inet, slice_desc):
62     pl_node = slice_desc.create("Node")
63     pl_node.set_attribute_value("hostname", hostname)
64     pl_node.set_attribute_value("label", "%d" % pl_node.guid)
65     pl_node.set_attribute_value("operatingSystem", "f12")
66     pl_iface = slice_desc.create("NodeInterface")
67     pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
68     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
69     pl_node.connector("devs").connect(pl_iface.connector("node"))
70     return pl_node, pl_iface
71
72 def fix_keys_app(slice_desc, pl_node):
73     pl_app = slice_desc.create("Application")
74     pl_app.set_attribute_value("command", "yum reinstall -y --nogpgcheck fedora-release")
75     pl_app.set_attribute_value("sudo", True)
76     pl_app.connector("node").connect(pl_node.connector("apps"))
77     return pl_app
78
79 def create_vlc_server(movie, pl_node, slice_desc):
80     mv = os.path.basename(movie)
81     pl_app = slice_desc.create("Application")
82     pl_app.set_attribute_value("rpmFusion", True)
83     pl_app.set_attribute_value("depends", "vlc")
84     pl_app.set_attribute_value("build", 
85     #    "echo -e 'new TEST vod enabled\\nsetup TEST input %s' > ${SOURCES}/VOD.vlm" % mv)
86        "echo -e 'new TEST broadcast enabled loop\\n"\
87        "setup TEST input %s\\n"\
88        "setup TEST output #rtp{mux=ts,sdp=rtsp://0.0.0.0:8554/TEST}\\n\\n"\
89        "new test_sched schedule enabled\\n"\
90        "setup test_sched append control TEST play' > ${SOURCES}/VOD.vlm" % mv)
91
92     pl_app.set_attribute_value("sources", "%s" % movie)
93     pl_app.set_attribute_value("command",
94     #        "sudo -S dbus-uuidgen --ensure ; vlc -vvv -I dummy --vlm-conf VOD.vlm --rtsp-host=0.0.0.0:8554")
95         "sudo -S dbus-uuidgen --ensure ; vlc -vvv -I dummy --vlm-conf VOD.vlm")
96     pl_app.enable_trace("stdout")
97     pl_app.enable_trace("stderr")
98     pl_node.connector("apps").connect(pl_app.connector("node"))
99     return pl_app
100
101 def create_vlc_client(root_node, pl_node, slice_desc):
102     label = "%d_app" % pl_node.guid
103     hostname = root_node.get_attribute_value("hostname")
104     pl_app = slice_desc.create("Application")
105     pl_app.set_attribute_value("label", label)
106     pl_app.set_attribute_value("rpmFusion", True)
107     pl_app.set_attribute_value("depends", "vlc")
108     pl_app.set_attribute_value("command",
109        "sudo -S dbus-uuidgen --ensure ; sleep 5;" \
110        "vlc -I dummy --repeat rtsp://%s:8554/TEST --sout '#std{access=file,mux=ts,dst=/dev/null}'" % (hostname))
111     pl_app.enable_trace("stdout")
112     pl_app.enable_trace("stderr")
113     pl_node.connector("apps").connect(pl_app.connector("node"))
114     return pl_app
115
116 def create_cpumem_monitor(pl_node, slice_desc):
117     """ This function creates a monitoring application for the
118     utilization of node resources by the vlc application.
119
120     The format of the stdout trace file is the following:
121     'timestamp cpu(%) mem(%) time'
122     """
123     label = "%d_cpumem" % pl_node.guid
124     pl_app = slice_desc.create("Application")
125     pl_app.set_attribute_value("label", label)
126     pl_app.set_attribute_value("command", 
127             "while true ; do echo $(date +%Y%m%d%H%M%S%z) " \
128             " $(top -b -n 1 | grep 'vlc' | head -1 | sed 's/\s\s*/ /g' | cut -d' ' -f10,11,12)" \
129             "; sleep 1 ; done")
130     pl_app.enable_trace("stdout")
131     pl_app.enable_trace("stderr")
132     pl_node.connector("apps").connect(pl_app.connector("node"))
133     return pl_app
134
135 def create_net_monitor(pl_node, slice_desc, ifaces):
136     """ This function creates a monitoring application for the
137     amount of bytes transmitted/received by the vlc application.
138
139     The format of the stdout trace file is the following:
140     'total-Mbytes total-time'
141     """
142     label = "%d_net" % pl_node.guid
143     hosts = " or ".join(map(lambda iface: " ( host {#[%s].addr[0].[Address]#} ) " % 
144         iface.get_attribute_value("label"), ifaces))
145     pl_app = slice_desc.create("Application")
146     pl_app.set_attribute_value("label", label)
147     pl_app.set_attribute_value("rpmFusion", True)
148     pl_app.set_attribute_value("sudo", True)
149     pl_app.set_attribute_value("depends", "tcpdump pv")
150     pl_app.set_attribute_value("command", 
151             "tcpdump -l -i eth0 -nNqttf '(%s)' -w - | pv -fbt >/dev/null 2>>{#[%s].trace[stdout].[name]#}" %
152             (hosts, label))
153     pl_app.enable_trace("stdout")
154     pl_app.enable_trace("stderr")
155     pl_node.connector("apps").connect(pl_app.connector("node"))
156     return pl_app
157
158 def store_results(controller, monitors, results_dir, exp_label):
159     # create results directory for experiment
160     root_path = os.path.join(results_dir, exp_label)
161
162     print "STORING RESULTS in ", root_path
163
164     try:
165         os.makedirs(root_path)
166     except OSError:
167         pass
168
169     # collect information on nodes
170     hosts_info = ""
171
172     for mon in monitors:
173         hosts_info += "%s %s\n" % (mon.hostname, mon.type)
174
175         # create a subdir per hostname
176         node_path = os.path.join(root_path, mon.hostname)
177         try:
178             os.makedirs(node_path)
179         except OSError:
180             pass
181
182         # store monitoring results
183         cpumem_stdout = controller.trace(mon.cpumem_monitor.guid, "stdout")
184         net_stdout = controller.trace(mon.net_monitor.guid, "stdout")
185         results = dict({"cpumem": cpumem_stdout, "net": net_stdout})
186         for name, stdout in results.iteritems():
187             fpath = os.path.join(node_path, name)
188             f = open(fpath, "w")
189             f.write(stdout)
190             f.close()
191
192     # store node info file
193     fpath = os.path.join(root_path, "hosts")
194     f = open(fpath, "w")
195     f.write(hosts_info)
196     f.close()
197
198
199 def get_options():
200     slicename = os.environ.get("PL_SLICE")
201     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
202     pl_ssh_key = os.environ.get(
203         "PL_SSH_KEY",
204         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
205     pl_user = os.environ.get('PL_USER')
206     pl_pwd = os.environ.get('PL_PASS')
207     exp_label = "%s" % uuid.uuid4()
208
209     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
210             -p <pl_password> -m <movie> -r <results-dir> -l <experiment-label>"
211
212     parser = OptionParser(usage=usage)
213     parser.add_option("-s", "--slicename", dest="slicename", 
214             help="PlanetLab slicename", default=slicename, type="str")
215     parser.add_option("-H", "--pl-host", dest="pl_host", 
216             help="PlanetLab site (e.g. www.planet-lab.eu)", 
217             default=pl_host, type="str")
218     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
219             help="Path to private ssh key used for PlanetLab authentication", 
220             default=pl_ssh_key, type="str")
221     parser.add_option("-u", "--pl-user", dest="pl_user", 
222             help="PlanetLab account user (i.e. Registration email address)", 
223             default=pl_user, type="str")
224     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
225             help="PlanetLab account password", default=pl_pwd, type="str")
226     parser.add_option("-m", "--movie", dest="movie", 
227             help="Stream movie", type="str")
228     parser.add_option("-r", "--results", dest="results_dir", default = "/tmp", 
229             help="Path to directory to store results", type="str")
230     parser.add_option("-l", "--label", dest="exp_label", default = exp_label, 
231             help="Label to identify experiment results", type="str")
232     parser.add_option("-t", "--time", dest="time_to_run", default = 2, 
233             help="Time to run the experiment in hours", type="float")
234
235     (options, args) = parser.parse_args()
236
237     if not options.movie:
238         parser.error("movie is a required argument")
239
240     return (options.slicename, options.pl_host, options.pl_user, 
241             options.pl_pwd, options.pl_ssh_key, options.movie,
242             options.results_dir, options.exp_label, options.time_to_run)
243
244 if __name__ == '__main__':
245     root_dir = tempfile.mkdtemp()
246     (pl_slice, 
247             pl_host, 
248             pl_user, 
249             pl_pwd, 
250             pl_ssh_key, 
251             movie, 
252             results_dir,
253             exp_label,
254             time_to_run) = get_options()
255
256     # list to store information on monitoring apps per node
257     monitors = []
258     
259     # Create the experiment description object
260     exp_desc = ExperimentDescription()
261
262     # Create slice
263     slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
264         pl_ssh_key, root_dir)
265    
266     # Create the Internet box object
267     pl_inet = slice_desc.create("Internet")
268
269     # Create root node
270     hostname = "ple6.ipv6.lip6.fr"
271     (root_node, root_iface) = create_node(hostname, pl_inet, slice_desc)
272
273     # Create monitor info object for root node
274     root_mon = MonitorInfo(hostname, MonitorInfo.TYPE_ROOT)
275     monitors.append(root_mon)
276
277     # Fix GPG key problem in node
278     #fix_keys_app(slice_desc, root_node)
279
280     # Add VLC service
281     create_vlc_server(movie, root_node, slice_desc)
282     
283     # Add memory and cpu monitoring for root node
284     root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
285
286     # Create leaf nodes
287     cli_apps = []
288     cli_ifaces = []
289
290     hostnames = ["planetlab1.rd.tut.fi", 
291             "planetlab1.s3.kth.se", 
292             "planetlab1.tlm.unavarra.es", 
293             "planet1.servers.ua.pt", 
294             "onelab3.warsaw.rd.tp.pl", 
295             "gschembra3.diit.unict.it", 
296             "iraplab1.iralab.uni-karlsruhe.de", 
297             "host3-plb.loria.fr", 
298             "kostis.di.uoa.gr", 
299             "planetlab04.cnds.unibe.ch"]
300     
301     for hostname in hostnames:
302         pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
303         cli_ifaces.append(pl_iface)
304
305         # Fix GPG key problem in node
306         #fix_keys_app(slice_desc, root_node)
307  
308         # Create monitor info object for root node
309         node_mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
310         monitors.append(node_mon)
311       
312         # Add memory and cpu monitoring for all nodes
313         node_mon.cpumem_monitor = create_cpumem_monitor(pl_node, slice_desc)
314
315         # Add network monitoring for all nodes
316         node_mon.net_monitor = create_net_monitor(pl_node, slice_desc, [root_iface])
317
318         # Add VLC clients
319         app = create_vlc_client(root_node, pl_node, slice_desc)
320         cli_apps.append(app)
321
322     # Add network monitoring for root node
323     root_mon.net_monitor = create_net_monitor(root_node, slice_desc, cli_ifaces)
324
325     xml = exp_desc.to_xml()
326    
327     controller = ExperimentController(xml, root_dir)
328     controller.start()
329
330     start_time = time.time()
331     duration = time_to_run * 3600 # in seconds
332     while not TERMINATE:
333         time.sleep(1)
334         if (time.time() - start_time) > duration: # elapsed time
335             TERMINATE.append(None)
336
337     controller.stop()
338  
339     # store results in results dir
340     store_results(controller, monitors, results_dir, exp_label)
341    
342     controller.shutdown()
343