3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 from optparse import OptionParser, SUPPRESS_HELP
12 # Trak SIGTERM, and set global termination flag instead of dying
15 def _finalize(sig,frame):
17 TERMINATE.append(None)
18 signal.signal(signal.SIGTERM, _finalize)
19 signal.signal(signal.SIGINT, _finalize)
21 class MonitorInfo(object):
26 def __init__(self, hostname, type):
27 self.hostname = hostname
29 self.cpumem_monitor = None
30 self.net_monitor = None
32 def create_slice(exp_desc, slicename, plc_host, pl_user, pl_pwd,
33 pl_ssh_key, root_dir):
34 pl_provider = FactoriesProvider("planetlab")
35 slice_desc = exp_desc.add_testbed_description(pl_provider)
36 slice_desc.set_attribute_value("homeDirectory", root_dir)
37 slice_desc.set_attribute_value("slice", slicename)
38 slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
39 slice_desc.set_attribute_value("authUser", pl_user)
40 slice_desc.set_attribute_value("authPass", pl_pwd)
41 slice_desc.set_attribute_value("plcHost", plc_host)
42 # Kills all running processes before starting the experiment
43 slice_desc.set_attribute_value("cleanProc", True)
44 # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
45 # folders in the sliver Home directory, including result files!
46 slice_desc.set_attribute_value("cleanHome", True)
47 slice_desc.set_attribute_value("plLogLevel", "DEBUG")
50 def create_node(hostname, pl_inet, slice_desc):
51 pl_node = slice_desc.create("Node")
52 pl_node.set_attribute_value("hostname", hostname)
53 pl_node.set_attribute_value("label", "%d" % pl_node.guid)
54 pl_node.set_attribute_value("operatingSystem", "f12")
55 pl_iface = slice_desc.create("NodeInterface")
56 pl_iface.set_attribute_value("label", "iface_%d" % pl_node.guid)
57 pl_iface.connector("inet").connect(pl_inet.connector("devs"))
58 pl_node.connector("devs").connect(pl_iface.connector("node"))
59 return pl_node, pl_iface
61 def create_ccnd(pl_node, slice_desc, pl_ifaces, port):
62 pl_app = slice_desc.create("CCNxDaemon")
63 pl_app.set_attribute_value("ccnxVersion", "ccnx-0.5.1")
65 # We use a wildcard to replace the public IP address of the node during runtime,
66 # once this IP is known
67 routes = "|".join(map(lambda pl_iface: "udp {#[%s].addr[0].[Address]#}" %
68 pl_iface.get_attribute_value("label"), pl_ifaces))
70 # Add unicast ccn routes
71 pl_app.set_attribute_value("ccnRoutes", routes)
73 # Use a specific port to bind the CCNx daemon
75 pl_app.set_attribute_value("ccnLocalPort", port)
77 pl_app.enable_trace("stdout")
78 pl_app.enable_trace("stderr")
79 pl_app.connector("node").connect(pl_node.connector("apps"))
82 def create_ccnpush(movie, pl_node, slice_desc, port):
83 pl_app = slice_desc.create("Application")
84 pl_app.set_attribute_value("stdin", movie)
86 command = "ccnseqwriter ccnx:/VIDEO"
88 command = "CCN_LOCAL_PORT=%d %s " % (port, command)
90 pl_app.set_attribute_value("command", command)
92 pl_app.enable_trace("stdout")
93 pl_app.enable_trace("stderr")
94 pl_app.connector("node").connect(pl_node.connector("apps"))
97 def create_ccnpull(pl_node, slice_desc, port):
98 pl_app = slice_desc.create("Application")
99 pl_app.set_attribute_value("rpmFusion", True)
100 pl_app.set_attribute_value("depends", "vlc")
102 command = " sudo -S dbus-uuidgen --ensure ; while true ; do ccncat ccnx:/VIDEO"
104 command = "CCN_LOCAL_PORT=%d %s " % (port, command)
106 command += " | vlc -I dummy - vlc://quit > /dev/null ; done"
107 pl_app.set_attribute_value("command", command)
109 pl_app.enable_trace("stdout")
110 pl_app.enable_trace("stderr")
111 pl_app.connector("node").connect(pl_node.connector("apps"))
114 def create_cpumem_monitor(pl_node, slice_desc):
115 label = "%d_cpumem" % pl_node.guid
116 pl_app = slice_desc.create("Application")
117 pl_app.set_attribute_value("label", label)
118 pl_app.set_attribute_value("command",
119 "while true; do echo $(date +%Y%m%d%H%M%S%z) "\
120 " $(top -b -n 1 | grep 'bash\|python' | sed 's/\s\s*/ /g' | "\
121 " sed 's/^\s//g' | cut -d' ' -f9,10,11 | awk '{ sum1 +=$1; sum2 += $2; } "\
122 " END {printf \"%2.1f %2.1f 0:00.00\", sum1, sum2;}'); sleep 1 ; done ")
124 pl_app.enable_trace("stdout")
125 pl_app.enable_trace("stderr")
126 pl_node.connector("apps").connect(pl_app.connector("node"))
129 def create_net_monitor(pl_node, slice_desc, pl_ifaces):
130 label = "%d_net" % pl_node.guid
131 hosts = " or ".join(map(lambda pl_iface: " ( host {#[%s].addr[0].[Address]#} ) " %
132 pl_iface.get_attribute_value("label"), pl_ifaces))
133 pl_app = slice_desc.create("Application")
134 pl_app.set_attribute_value("label", label)
135 pl_app.set_attribute_value("rpmFusion", True)
136 pl_app.set_attribute_value("sudo", True)
137 pl_app.set_attribute_value("depends", "tcpdump pv")
138 pl_app.set_attribute_value("command",
139 "tcpdump -l -i eth0 -nNqttf '(%s)' -w - | pv -fbt >/dev/null 2>>{#[%s].trace[stdout].[name]#}" %
141 pl_app.enable_trace("stdout")
142 pl_app.enable_trace("stderr")
143 pl_node.connector("apps").connect(pl_app.connector("node"))
146 def store_results(controller, monitors, results_dir, exp_label):
147 # create results directory for experiment
148 root_path = os.path.join(results_dir, exp_label)
150 print "STORING RESULTS in ", root_path
153 os.makedirs(root_path)
157 # collect information on nodes
161 hosts_info += "%s %s\n" % (mon.hostname, mon.type)
163 # create a subdir per hostname
164 node_path = os.path.join(root_path, mon.hostname)
166 os.makedirs(node_path)
170 # store monitoring results
171 cpumem_stdout = controller.trace(mon.cpumem_monitor.guid, "stdout")
172 net_stdout = controller.trace(mon.net_monitor.guid, "stdout")
173 results = dict({"cpumem": cpumem_stdout, "net": net_stdout})
174 for name, stdout in results.iteritems():
175 fpath = os.path.join(node_path, name)
180 # store node info file
181 fpath = os.path.join(root_path, "hosts")
187 slicename = os.environ.get("PL_SLICE")
188 pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
189 pl_ssh_key = os.environ.get(
191 "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
192 pl_user = os.environ.get('PL_USER')
193 pl_pwd = os.environ.get('PL_PASS')
194 exp_label = "%s" % uuid.uuid4()
196 usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> \
197 -m <movie> -p <pl_password> -r <results-dir> -l <experiment-label> \
200 parser = OptionParser(usage=usage)
201 parser.add_option("-s", "--slicename", dest="slicename",
202 help="PlanetLab slicename", default=slicename, type="str")
203 parser.add_option("-H", "--pl-host", dest="pl_host",
204 help="PlanetLab site (e.g. www.planet-lab.eu)",
205 default=pl_host, type="str")
206 parser.add_option("-k", "--ssh-key", dest="pl_ssh_key",
207 help="Path to private ssh key used for PlanetLab authentication",
208 default=pl_ssh_key, type="str")
209 parser.add_option("-u", "--pl-user", dest="pl_user",
210 help="PlanetLab account user (i.e. Registration email address)",
211 default=pl_user, type="str")
212 parser.add_option("-p", "--pl-pwd", dest="pl_pwd",
213 help="PlanetLab account password", default=pl_pwd, type="str")
214 parser.add_option("-m", "--movie", dest="movie",
215 help="Stream movie", type="str")
216 parser.add_option("-r", "--results", dest="results_dir", default = "/tmp",
217 help="Path to directory to store results", type="str")
218 parser.add_option("-l", "--label", dest="exp_label", default = exp_label,
219 help="Label to identify experiment results", type="str")
220 parser.add_option("-t", "--time", dest="time_to_run", default = 2,
221 help="Time to run the experiment in hours", type="float")
222 parser.add_option("-P", "--port", dest="port",
223 help="Port to bind the CCNx daemon", type="int")
225 (options, args) = parser.parse_args()
227 if not options.movie:
228 parser.error("movie is a required argument")
230 return (options.slicename, options.pl_host, options.pl_user,
231 options.pl_pwd, options.pl_ssh_key, options.movie,
232 options.results_dir, options.exp_label, options.time_to_run,
235 if __name__ == '__main__':
236 root_dir = tempfile.mkdtemp()
246 port) = get_options()
248 # list to store information on monitoring apps per node
251 # Create the experiment description object
252 exp_desc = ExperimentDescription()
255 slice_desc = create_slice(exp_desc, pl_slice, pl_host, pl_user, pl_pwd,
256 pl_ssh_key, root_dir)
258 # Create the Internet box object
259 pl_inet = slice_desc.create("Internet")
261 ### Level 0 - Root node
262 root_hostname = "chimay.infonet.fundp.ac.be"
263 (root_node, root_iface) = create_node(root_hostname, pl_inet, slice_desc)
265 ### Level 1 - Intermediate nodes
266 l1_hostnames = dict()
267 l1_hostnames["uk"] = "planetlab4.cs.st-andrews.ac.uk"
271 for country, hostname in l1_hostnames.iteritems():
272 pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
273 l1_ifaces[country] = pl_iface
274 l1_nodes[country] = pl_node
276 ### Level 0 - CCN & Monitoring
278 # Add CCN Daemon to root node
279 ifaces = l1_ifaces.values()
280 create_ccnd(root_node, slice_desc, ifaces, port)
282 # Publish video in root node
283 create_ccnpush(movie, root_node, slice_desc, port)
285 # Create monitor info object for root node
286 root_mon = MonitorInfo(root_hostname, MonitorInfo.TYPE_ROOT)
287 monitors.append(root_mon)
289 # Add memory and cpu monitoring for root node
290 root_mon.cpumem_monitor = create_cpumem_monitor(root_node, slice_desc)
291 root_mon.net_monitor = create_net_monitor(root_node, slice_desc, ifaces)
293 ### Level 2 - Leaf nodes
294 l2_hostnames = dict()
295 l2_hostnames["uk"] = ["planetlab-1.imperial.ac.uk",
296 "planetlab3.xeno.cl.cam.ac.uk",
297 "planetlab1.xeno.cl.cam.ac.uk"
300 for country, hostnames in l2_hostnames.iteritems():
302 l1_hostname = l1_hostnames[country]
303 l1_iface = l1_ifaces[country]
304 l1_node = l1_nodes[country]
306 for hostname in hostnames:
307 pl_node, pl_iface = create_node(hostname, pl_inet, slice_desc)
308 l2_ifaces.append(pl_iface)
310 ### Level 2 - CCN & Monitoring
312 # Add CCN Daemon to intermediate nodes
313 create_ccnd(pl_node, slice_desc, [l1_iface], port)
315 # Retrieve video in leaf node
316 create_ccnpull(pl_node, slice_desc, port)
318 # Create monitor info object for intermediate nodes
319 mon = MonitorInfo(hostname, MonitorInfo.TYPE_LEAF)
322 # Add memory and cpu monitoring for intermediate nodes
323 mon.cpumem_monitor = create_cpumem_monitor(pl_node, slice_desc)
324 mon.net_monitor = create_net_monitor(pl_node, slice_desc, [l1_iface])
326 ### Level 1 - CCN & Monitoring
328 ifaces = [root_iface]
329 ifaces.extend(l2_ifaces)
331 # Add CCN Daemon to intermediate nodes
332 create_ccnd(l1_node, slice_desc, ifaces, port)
334 # Create monitor info object for intermediate nodes
335 mon = MonitorInfo(l1_hostname, MonitorInfo.TYPE_MID)
338 # Add memory and cpu monitoring for intermediate nodes
339 mon.cpumem_monitor = create_cpumem_monitor(l1_node, slice_desc)
340 mon.net_monitor = create_net_monitor(l1_node, slice_desc, ifaces)
342 xml = exp_desc.to_xml()
344 controller = ExperimentController(xml, root_dir)
347 start_time = time.time()
348 duration = time_to_run * 3600 # in seconds
351 #if (time.time() - start_time) > duration: # elapsed time
352 # TERMINATE.append(None)
356 # store results in results dir
357 store_results(controller, monitors, results_dir, exp_label)
359 controller.shutdown()