3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
12 # Trak SIGTERM, and set global termination flag instead of dying
15 def _finalize(sig,frame):
17 TERMINATE.append(None)
18 signal.signal(signal.SIGTERM, _finalize)
19 signal.signal(signal.SIGINT, _finalize)
21 class MonitorInfo(object):
26 def __init__(self, hostname, type):
27 self.hostname = hostname
29 self.cpumem_monitor = None
30 self.net_monitor = None
33 self.ccnseqwriter = None
35 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd,
36 pl_ssh_key, root_dir):
37 pl_provider = FactoriesProvider("planetlab")
38 slice_desc = exp_desc.add_testbed_description(pl_provider)
39 slice_desc.set_attribute_value("homeDirectory", root_dir)
40 slice_desc.set_attribute_value("slice", slicename)
41 slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
42 slice_desc.set_attribute_value("authUser", pl_user)
43 slice_desc.set_attribute_value("authPass", pl_pwd)
44 slice_desc.set_attribute_value("plcHost", plc_host)
45 # Kills all running processes before starting the experiment
46 slice_desc.set_attribute_value("cleanProc", True)
47 # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
48 # folders in the sliver Home directory, including result files!
49 slice_desc.set_attribute_value("cleanHome", True)
50 slice_desc.set_attribute_value("plLogLevel", "DEBUG")
53 def create_node(hostname, pl_inet, slice_desc):
54 pl_node = slice_desc.create("Node")
55 pl_node.set_attribute_value("hostname", hostname)
56 pl_node.set_attribute_value("label", "%d" % pl_node.guid)
57 pl_node.set_attribute_value("operatingSystem", "f12")
58 pl_iface = slice_desc.create("NodeInterface")
59 pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
60 pl_iface.connector("inet").connect(pl_inet.connector("devs"))
61 pl_node.connector("devs").connect(pl_iface.connector("node"))
62 return pl_node, pl_iface
64 def create_ccnd(pl_node, slice_desc, pl_ifaces, port):
65 pl_app = slice_desc.create("CCNxDaemon")
66 pl_app.set_attribute_value("ccnxVersion", "ccnx-0.5.1")
68 # We use a wildcard to replace the public IP address of the node during runtime,
69 # once this IP is known
70 routes = "|".join(map(lambda pl_iface: "udp {#[%s].addr[0].[Address]#}" %
71 pl_iface.get_attribute_value("label"), pl_ifaces))
73 # Add unicast ccn routes
74 pl_app.set_attribute_value("ccnRoutes", routes)
76 # Use a specific port to bind the CCNx daemon
78 pl_app.set_attribute_value("ccnLocalPort", port)
80 pl_app.enable_trace("stdout")
81 pl_app.enable_trace("stderr")
82 pl_app.connector("node").connect(pl_node.connector("apps"))
85 def create_ccnpush(movie, pl_node, slice_desc, port):
86 pl_app = slice_desc.create("Application")
87 pl_app.set_attribute_value("stdin", movie)
89 command = "ccnseqwriter ccnx:/VIDEO"
91 command = "CCN_LOCAL_PORT=%d %s " % (port, command)
93 pl_app.set_attribute_value("command", command)
95 pl_app.enable_trace("stdout")
96 pl_app.enable_trace("stderr")
97 pl_app.connector("node").connect(pl_node.connector("apps"))
100 def create_ccnpull(pl_node, slice_desc, port):
101 pl_app = slice_desc.create("Application")
102 pl_app.set_attribute_value("rpmFusion", True)
103 pl_app.set_attribute_value("depends", "vlc")
105 command = " sudo -S dbus-uuidgen --ensure ; while true ; do ccncat ccnx:/VIDEO"
107 command = "CCN_LOCAL_PORT=%d %s " % (port, command)
109 command += " | vlc -I dummy - vlc://quit > /dev/null ; done"
110 pl_app.set_attribute_value("command", command)
112 pl_app.enable_trace("stdout")
113 pl_app.enable_trace("stderr")
114 pl_app.connector("node").connect(pl_node.connector("apps"))
117 def create_cpumem_monitor(pl_node, slice_desc):
118 label = "%d_cpumem" % pl_node.guid
119 pl_app = slice_desc.create("Application")
120 pl_app.set_attribute_value("label", label)
121 pl_app.set_attribute_value("command",
122 "while true; do echo $(date +%Y%m%d%H%M%S%z) "\
123 " $(top -b -n 1 | grep 'bash\|python' | sed 's/\s\s*/ /g' | "\
124 " sed 's/^\s//g' | cut -d' ' -f9,10,11 | awk '{ sum1 +=$1; sum2 += $2; } "\
125 " END {printf \"%2.1f %2.1f 0:00.00\", sum1, sum2;}'); sleep 1 ; done ")
127 pl_app.enable_trace("stdout")
128 pl_app.enable_trace("stderr")
129 pl_node.connector("apps").connect(pl_app.connector("node"))
132 def create_net_monitor(pl_node, slice_desc, pl_ifaces):
133 label = "%d_net" % pl_node.guid
134 hosts = " or ".join(map(lambda pl_iface: " ( host {#[%s].addr[0].[Address]#} ) " %
135 pl_iface.get_attribute_value("label"), pl_ifaces))
136 pl_app = slice_desc.create("Application")
137 pl_app.set_attribute_value("label", label)
138 pl_app.set_attribute_value("rpmFusion", True)
139 pl_app.set_attribute_value("sudo", True)
140 pl_app.set_attribute_value("depends", "tcpdump pv")
141 pl_app.set_attribute_value("command",
142 "tcpdump -l -i eth0 -nNqttf '(%s)' -w - | pv -fbt >/dev/null 2>>{#[%s].trace[stdout].[name]#}" %
144 pl_app.enable_trace("stdout")
145 pl_app.enable_trace("stderr")
146 pl_node.connector("apps").connect(pl_app.connector("node"))
149 def store_results(controller, monitors, results_dir, exp_label):
150 # create results directory for experiment
151 root_path = os.path.join(results_dir, exp_label)
153 print "STORING RESULTS in ", root_path
156 os.makedirs(root_path)
160 # collect information on nodes
164 hosts_info += "%s %s\n" % (mon.hostname, mon.type)
166 # create a subdir per hostname
167 node_path = os.path.join(root_path, mon.hostname)
169 os.makedirs(node_path)
173 # store monitoring results
174 cpumem_stdout = controller.trace(mon.cpumem_monitor.guid, "stdout")
175 net_stdout = controller.trace(mon.net_monitor.guid, "stdout")
176 ccnd_error = controller.trace(mon.ccnd.guid, "stderr")
180 ccncat_error = controller.trace(mon.ccncat.guid, "stderr")
182 ccnseqwriter_error = None
184 ccnseqwriter_error = controller.trace(mon.ccnseqwriter.guid, "stderr")
186 results = dict({"cpumem": cpumem_stdout, "net": net_stdout,
187 "ccnd_stderr": ccnd_error, "ccncat_stderr": ccncat_error,
188 "ccnseqwriter_stderr": ccnseqwriter_error })
190 for name, result in results.iteritems():
194 fpath = os.path.join(node_path, name)
199 # store node info file
200 fpath = os.path.join(root_path, "hosts")
206 slicename = os.environ.get("PL_SLICE")
207 pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
208 pl_ssh_key = os.environ.get(
210 "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
211 pl_user = os.environ.get('PL_USER')
212 pl_pwd = os.environ.get('PL_PASS')
213 exp_label = "%s" % uuid.uuid4()
215 usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
216 -m <movie> -p <pl_password> -r <results-dir> -l <experiment-label> \
219 parser = OptionParser(usage=usage)
220 parser.add_option("-s", "--slicename", dest="slicename",
221 help="PlanetLab slicename", default=slicename, type="str")
222 parser.add_option("-H", "--pl-host", dest="pl_host",
223 help="PlanetLab site (e.g. www.planet-lab.eu)",
224 default=pl_host, type="str")
225 parser.add_option("-k", "--ssh-key", dest="pl_ssh_key",
226 help="Path to private ssh key used for PlanetLab authentication",
227 default=pl_ssh_key, type="str")
228 parser.add_option("-u", "--pl-user", dest="pl_user",
229 help="PlanetLab account user (i.e. Registration email address)",
230 default=pl_user, type="str")
231 parser.add_option("-p", "--pl-pwd", dest="pl_pwd",
232 help="PlanetLab account password", default=pl_pwd, type="str")
233 parser.add_option("-m", "--movie", dest="movie",
234 help="Stream movie", type="str")
235 parser.add_option("-r", "--results", dest="results_dir", default = "/tmp",
236 help="Path to directory to store results", type="str")
237 parser.add_option("-l", "--label", dest="exp_label", default = exp_label,
238 help="Label to identify experiment results", type="str")
239 parser.add_option("-t", "--time", dest="time_to_run", default = 1,
240 help="Time to run the experiment in hours", type="float")
241 parser.add_option("-P", "--port", dest="port",
242 help="Port to bind the CCNx daemon", type="int")
244 (options, args) = parser.parse_args()
246 if not options.movie:
247 parser.error("movie is a required argument")
249 return (options.slicename, options.pl_host, options.pl_user,
250 options.pl_pwd, options.pl_ssh_key, options.movie,
251 options.results_dir, options.exp_label, options.time_to_run,
254 if __name__ == '__main__':
255 root_dir = tempfile.mkdtemp()
265 port) = get_options()
267 # list to store information on monitoring apps per node
270 # Create the experiment description object
271 exp_desc = ExperimentDescription()
274 slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
275 pl_ssh_key, root_dir)
277 # Create the Internet box object
278 pl_inet = slice_desc.create("Internet")
280 ### Level 0 - Root node
281 root_hostname = "ple6.ipv6.lip6.fr"
282 (root_node, root_iface) = create_node(root_hostname, pl_inet, slice_desc)
284 ### Level 1 - Intermediate nodes
285 l1_hostnames = dict()
286 l1_hostnames["fi"] = "planetlab-1.research.netlab.hut.fi"
287 l1_hostnames["se"] = "planetlab2.sics.se"
288 l1_hostnames["es"] = "planetlab1.um.es"
289 l1_hostnames["pt"] = "planetlab-um10.di.uminho.pt"
290 l1_hostnames["pl"] = "pandora.we.po.opole.pl"
291 l1_hostnames["it"] = "gschembra4.diit.unict.it"
292 l1_hostnames["de"] = "planetlab2.wiwi.hu-berlin.de"
293 l1_hostnames["fr"] = "planetlab1.u-strasbg.fr"
294 l1_hostnames["gr"] = "planetlab1.ics.forth.gr"
295 l1_hostnames["ch"] = "planetlab2.unineuchatel.ch"
299 for country, hostname in l1_hostnames.iteritems():
300 pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
301 l1_ifaces[country] = pl_iface
302 l1_nodes[country] = pl_node
304 ### Level 0 - CCN & Monitoring
306 # Add CCN Daemon to root node
307 ifaces = l1_ifaces.values()
308 root_ccnd = create_ccnd(root_node, slice_desc, ifaces, port)
310 # Publish video in root node
311 root_ccnseqwriter = create_ccnpush(movie, root_node, slice_desc, port)
313 # Create monitor info object for root node
314 root_mon = MonitorInfo(root_hostname, MonitorInfo.TYPE_ROOT)
315 monitors.append(root_mon)
317 # Add memory and cpu monitoring for root node
318 root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
319 root_mon.net_monitor = create_net_monitor(root_node, slice_desc, ifaces)
320 root_mon.ccnd = root_ccnd
321 root_mon.ccnseqwriter = root_ccnseqwriter
323 ### Level 2 - Leaf nodes
324 l2_hostnames = dict()
325 l2_hostnames["fi"] = ["planetlab1.rd.tut.fi",]
326 l2_hostnames["se"] = ["planetlab1.s3.kth.se",]
327 l2_hostnames["es"] = ["planetlab1.tlm.unavarra.es",]
328 l2_hostnames["pt"] = ["planet1.servers.ua.pt",]
329 l2_hostnames["pl"] = ["onelab3.warsaw.rd.tp.pl",]
330 l2_hostnames["it"] = ["gschembra3.diit.unict.it",]
331 l2_hostnames["de"] = ["iraplab1.iralab.uni-karlsruhe.de",]
332 l2_hostnames["fr"] = ["host3-plb.loria.fr",]
333 l2_hostnames["gr"] = ["kostis.di.uoa.gr",]
334 l2_hostnames["ch"] = ["planetlab04.cnds.unibe.ch",]
336 for country, hostnames in l2_hostnames.iteritems():
338 l1_hostname = l1_hostnames[country]
339 l1_iface = l1_ifaces[country]
340 l1_node = l1_nodes[country]
342 for hostname in hostnames:
343 pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
344 l2_ifaces.append(pl_iface)
346 ### Level 2 - CCN & Monitoring
348 # Add CCN Daemon to intermediate nodes
349 ccnd = create_ccnd(pl_node, slice_desc, [l1_iface], port)
351 # Retrieve video in leaf node
352 ccncat = create_ccnpull(pl_node, slice_desc, port)
354 # Create monitor info object for intermediate nodes
355 mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
358 # Add memory and cpu monitoring for intermediate nodes
359 mon.cpumem_monitor = create_cpumem_monitor(pl_node, slice_desc)
360 mon.net_monitor = create_net_monitor(pl_node, slice_desc, [l1_iface])
364 ### Level 1 - CCN & Monitoring
366 ifaces = [root_iface]
367 ifaces.extend(l2_ifaces)
369 # Add CCN Daemon to intermediate nodes
370 ccnd = create_ccnd(l1_node, slice_desc, ifaces, port)
372 # Create monitor info object for intermediate nodes
373 mon = MonitorInfo(l1_hostname, MonitorInfo.TYPE_MID)
376 # Add memory and cpu monitoring for intermediate nodes
377 mon.cpumem_monitor = create_cpumem_monitor(l1_node, slice_desc)
378 mon.net_monitor = create_net_monitor(l1_node, slice_desc, ifaces)
381 xml = exp_desc.to_xml()
383 controller = ExperimentController(xml, root_dir)
386 start_time = time.time()
387 duration = time_to_run * 3600 # in seconds
390 if (time.time() - start_time) > duration: # elapsed time
391 TERMINATE.append(None)
395 # store results in results dir
396 store_results(controller, monitors, results_dir, exp_label)
398 controller.shutdown()