--- /dev/null
+#!/usr/bin/env python
+
+###############################################################################
+#
+# NEPI, a framework to manage network experiments
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import networkx
+import socket
+import os
+
+content_name = "ccnx:/test/bunny.ts"
+
+STOP_TIME = "5000s"
+
+repofile = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def get_simulator(ec):
+ simulator = ec.filter_resources("LinuxNS3Simulation")
+
+ if not simulator:
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", "localhost")
+
+ simu = ec.register_resource("LinuxNS3Simulation")
+ ec.register_connection(simu, node)
+ return simu
+
+ return simulator[0]
+
+def add_collector(ec, trace_name, subdir, newname = None):
+ collector = ec.register_resource("Collector")
+ ec.set(collector, "traceName", trace_name)
+ ec.set(collector, "subDir", subdir)
+ if newname:
+ ec.set(collector, "rename", newname)
+
+ return collector
+
+def add_dce_host(ec, nid):
+ simu = get_simulator(ec)
+
+ host = ec.register_resource("ns3::Node")
+ ec.set(host, "enableStack", True)
+ ec.register_connection(host, simu)
+
+ # Annotate the graph
+ ec.netgraph.annotate_node(nid, "host", host)
+
+def add_dce_ccnd(ec, nid):
+ # Retrieve annotation from netgraph
+ host = ec.netgraph.node_annotation(nid, "host")
+
+ # Add dce ccnd to the dce node
+ ccnd = ec.register_resource("ns3::LinuxDceCCND")
+ ec.set (ccnd, "stackSize", 1<<20)
+ ec.set (ccnd, "debug", 7)
+ ec.set (ccnd, "capacity", 50000)
+ ec.set (ccnd, "StartTime", "1s")
+ ec.set (ccnd, "StopTime", STOP_TIME)
+ ec.register_connection(ccnd, host)
+
+ # Collector to retrieve ccnd log
+ collector = add_collector(ec, "stderr", nid, "log")
+ ec.register_connection(collector, ccnd)
+
+ # Annotate the graph
+ ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_dce_ccnr(ec, nid):
+ # Retrieve annotation from netgraph
+ host = ec.netgraph.node_annotation(nid, "host")
+
+ # Add a CCN content repository to the dce node
+ ccnr = ec.register_resource("ns3::LinuxDceCCNR")
+ ec.set (ccnr, "repoFile1", repofile)
+ ec.set (ccnr, "stackSize", 1<<20)
+ ec.set (ccnr, "StartTime", "2s")
+ ec.set (ccnr, "StopTime", STOP_TIME)
+ ec.register_connection(ccnr, host)
+
+def add_dce_ccncat(ec, nid):
+ # Retrieve annotation from netgraph
+ host = ec.netgraph.node_annotation(nid, "host")
+
+ ccnpeek = ec.register_resource("ns3::LinuxDceCCNPeek")
+ #ec.set (ccnpeek, "contentName", "ccnx:/chunk0")
+ ec.set (ccnpeek, "contentName", content_name)
+ ec.set (ccnpeek, "stackSize", 1<<20)
+ ec.set (ccnpeek, "StartTime", "5s")
+ ec.set (ccnpeek, "StopTime", STOP_TIME)
+ ec.register_connection(ccnpeek, host)
+
+ collector = add_collector(ec, "stdout", nid, "peek")
+ ec.register_connection(collector, ccnpeek)
+
+ # Add a ccncat application to the dce host
+ ccncat = ec.register_resource("ns3::LinuxDceCCNCat")
+ ec.set (ccncat, "contentName", content_name)
+ ec.set (ccncat, "stackSize", 1<<20)
+ ec.set (ccncat, "StartTime", "8s")
+ ec.set (ccncat, "StopTime", STOP_TIME)
+ ec.register_connection(ccncat, host)
+
+def add_dce_fib_entry(ec, nid1, nid2):
+ # Retrieve annotations from netgraph
+ host1 = ec.netgraph.node_annotation(nid1, "host")
+ net = ec.netgraph.edge_net_annotation(nid1, nid2)
+ ip2 = net[nid2]
+
+ # Add FIB entry between peer hosts
+ ccndc = ec.register_resource("ns3::LinuxDceFIBEntry")
+ ec.set (ccndc, "protocol", "udp")
+ ec.set (ccndc, "uri", "ccnx:/")
+ ec.set (ccndc, "host", ip2)
+ ec.set (ccndc, "stackSize", 1<<20)
+ ec.set (ccndc, "StartTime", "2s")
+ ec.set (ccndc, "StopTime", STOP_TIME)
+ ec.register_connection(ccndc, host1)
+
+def add_dce_net_iface(ec, nid1, nid2):
+ # Retrieve annotations from netgraph
+ host = ec.netgraph.node_annotation(nid1, "host")
+ net = ec.netgraph.edge_net_annotation(nid1, nid2)
+ ip1 = net[nid1]
+ prefix = net["prefix"]
+
+ dev = ec.register_resource("ns3::PointToPointNetDevice")
+ ec.set(dev,"DataRate", "5Mbps")
+ ec.set(dev, "ip", ip1)
+ ec.set(dev, "prefix", prefix)
+ ec.register_connection(host, dev)
+
+ queue = ec.register_resource("ns3::DropTailQueue")
+ ec.register_connection(dev, queue)
+
+ return dev
+
+def avg_interests(ec, run):
+ ## Process logs
+ logs_dir = ec.run_dir
+
+ (graph,
+ content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count) = ccn_parser.process_content_history_logs(
+ logs_dir,
+ ec.netgraph.topology)
+
+ shortest_path = networkx.shortest_path(graph,
+ source = ec.netgraph.sources()[0],
+ target = ec.netgraph.targets()[0])
+
+ ### Compute metric: Avg number of Interests seen per content name
+ ### normalized by the number of nodes in the shortest path
+ content_name_count = len(content_names.values())
+ nodes_in_shortest_path = len(shortest_path) - 1
+ metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path))
+
+ # TODO: DUMP RESULTS TO FILE
+ # TODO: DUMP GRAPH DELAYS!
+ f = open("/tmp/metric", "w+")
+ f.write("%.2f\n" % metric)
+ f.close()
+ print " METRIC", metric
+
+ return metric
+
+def add_dce_edge(ec, nid1, nid2):
+ ### Add network interfaces to hosts
+ p2p1 = add_dce_net_iface(ec, nid1, nid2)
+ p2p2 = add_dce_net_iface(ec, nid2, nid1)
+
+ # Create point to point link between interfaces
+ chan = ec.register_resource("ns3::PointToPointChannel")
+ ec.set(chan, "Delay", "0ms")
+
+ ec.register_connection(chan, p2p1)
+ ec.register_connection(chan, p2p2)
+
+ #### Add routing between CCN nodes
+ add_dce_fib_entry(ec, nid1, nid2)
+ add_dce_fib_entry(ec, nid2, nid1)
+
+def add_dce_node(ec, nid):
+ ### Add CCN nodes (ec.netgraph holds the topology graph)
+ add_dce_host(ec, nid)
+ add_dce_ccnd(ec, nid)
+
+ if nid == ec.netgraph.targets()[0]:
+ add_dce_ccnr(ec, nid)
+
+ if nid == ec.netgraph.sources()[0]:
+ add_dce_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+ #### Create NEPI Experiment Description with LINEAR topology
+ ec = ExperimentController("dce_ccn",
+ topo_type = TopologyType.LINEAR,
+ node_count = 4,
+ assign_st = True,
+ assign_ips = True,
+ add_node_callback = add_dce_node,
+ add_edge_callback = add_dce_edge)
+
+ print "Results stored at", ec.exp_dir
+
+ #### Retrieve the consumer to wait for ot to finish
+ ccncat = ec.filter_resources("ns3::LinuxDceCCNCat")
+
+ #### Run experiment until metric convergences
+ rnr = ExperimentRunner()
+ runs = rnr.run(ec, min_runs = 1, max_runs = 1,
+ compute_metric_callback = avg_interests,
+ wait_guids = ccncat,
+ wait_time = 0)
+
--- /dev/null
+#!/usr/bin/env python
+
+###############################################################################
+#
+# NEPI, a framework to manage network experiments
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import networkx
+import socket
+import os
+
+PL_NODES = dict({
+ "0": "iraplab1.iralab.uni-karlsruhe.de",
+ "1": "planetlab1.informatik.uni-goettingen.de",
+ "2": "dfn-ple1.x-win.dfn.de",
+ "3": "mars.planetlab.haw-hamburg.de",
+ "4": "planetlab2.unineuchatel.ch",
+ "5": "planetlab-node3.it-sudparis.eu",
+ "6": "planetlab2.extern.kuleuven.be",
+ "7": "node2pl.planet-lab.telecom-lille1.eu",
+ "8": "planetvs2.informatik.uni-stuttgart.de",
+ "9": "planetlab1.informatik.uni-wuerzburg.de",
+ "10": "planet1.l3s.uni-hannover.de",
+ "11": "planetlab1.wiwi.hu-berlin.de",
+ "12": "pl2.uni-rostock.de",
+ "13": "planetlab1.u-strasbg.fr",
+ "14": "peeramidion.irisa.fr"
+ })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, subdir, newname = None):
+ collector = ec.register_resource("Collector")
+ ec.set(collector, "traceName", trace_name)
+ ec.set(collector, "subDir", subdir)
+ if newname:
+ ec.set(collector, "rename", newname)
+
+ return collector
+
+def add_pl_host(ec, nid):
+ hostname = PL_NODES[nid]
+
+ # Add a planetlab host to the experiment description
+ host = ec.register_resource("PlanetlabNode")
+ ec.set(host, "hostname", hostname)
+ ec.set(host, "username", pl_slice)
+ ec.set(host, "identity", pl_ssh_key)
+ #ec.set(host, "pluser", pl_user)
+ #ec.set(host, "plpassword", pl_password)
+ #ec.set(host, "country", country)
+ #ec.set(host, "operatingSystem", operating_system)
+ ec.set(host, "cleanExperiment", True)
+ ec.set(host, "cleanProcesses", True)
+
+ # Annotate the graph
+ ec.netgraph.annotate_node(nid, "hostname", hostname)
+ ec.netgraph.annotate_node(nid, "host", host)
+
+ # Annotate the graph node with an ip address
+ ip = socket.gethostbyname(hostname)
+ ec.netgraph.annotate_node_ip(nid, ip)
+
+def add_pl_ccnd(ec, nid):
+ # Retrieve annotation from netgraph
+ host = ec.netgraph.node_annotation(nid, "host")
+
+ # Add a CCN daemon to the planetlab node
+ ccnd = ec.register_resource("LinuxCCND")
+ ec.set(ccnd, "debug", 7)
+ ec.register_connection(ccnd, host)
+
+ # Collector to retrieve ccnd log
+ collector = add_collector(ec, "stderr", nid, "log")
+ ec.register_connection(collector, ccnd)
+
+ # Annotate the graph
+ ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_pl_ccnr(ec, nid):
+ # Retrieve annotation from netgraph
+ ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+
+ # Add a CCN content repository to the planetlab node
+ ccnr = ec.register_resource("LinuxCCNR")
+
+ ec.set(ccnr, "repoFile1", repofile)
+ ec.register_connection(ccnr, ccnd)
+
+def add_pl_ccncat(ec, nid):
+ # Retrieve annotation from netgraph
+ ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+
+ # Add a CCN cat application to the planetlab node
+ ccncat = ec.register_resource("LinuxCCNCat")
+ ec.set(ccncat, "pipeline", pipeline)
+ ec.set(ccncat, "contentName", content_name)
+ ec.register_connection(ccncat, ccnd)
+
+def add_pl_fib_entry(ec, nid1, nid2):
+ # Retrieve annotations from netgraph
+ ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd")
+ hostname2 = ec.netgraph.node_annotation(nid2, "hostname")
+
+ # Add a FIB entry between one planetlab node and its peer
+ entry = ec.register_resource("LinuxFIBEntry")
+ ec.set(entry, "host", hostname2)
+ ec.register_connection(entry, ccnd1)
+
+ # Collector to retrieve peering ping output (to measure neighbors delay)
+ ec.enable_trace(entry, "ping")
+ collector = add_collector(ec, "ping", nid1)
+ ec.register_connection(collector, entry)
+
+ return entry
+
+def avg_interests(ec, run):
+ ## Process logs
+ logs_dir = ec.run_dir
+
+ (graph,
+ content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count) = ccn_parser.process_content_history_logs(
+ logs_dir,
+ ec.netgraph.topology)
+
+ shortest_path = networkx.shortest_path(graph,
+ source = ec.netgraph.sources()[0],
+ target = ec.netgraph.targets()[0])
+
+ ### Compute metric: Avg number of Interests seen per content name
+ ### normalized by the number of nodes in the shortest path
+ content_name_count = len(content_names.values())
+ nodes_in_shortest_path = len(shortest_path) - 1
+ metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path))
+
+ # TODO: DUMP RESULTS TO FILE
+ # TODO: DUMP GRAPH DELAYS!
+ f = open("/tmp/metric", "w+")
+ f.write("%.2f\n" % metric)
+ f.close()
+ print " METRIC", metric
+
+ return metric
+
+def add_pl_edge(ec, nid1, nid2):
+ #### Add connections between CCN nodes
+ add_pl_fib_entry(ec, nid1, nid2)
+ add_pl_fib_entry(ec, nid2, nid1)
+
+def add_pl_node(ec, nid):
+ ### Add CCN nodes (ec.netgraph holds the topology graph)
+ add_pl_host(ec, nid)
+ add_pl_ccnd(ec, nid)
+
+ if nid == ec.netgraph.targets()[0]:
+ add_pl_ccnr(ec, nid)
+
+ if nid == ec.netgraph.sources()[0]:
+ add_pl_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+ #### Create NEPI Experiment Description with LINEAR topology
+ ec = ExperimentController("pl_ccn",
+ topo_type = TopologyType.LINEAR,
+ node_count = 4,
+ #assign_ips = True,
+ assign_st = True,
+ add_node_callback = add_pl_node,
+ add_edge_callback = add_pl_edge)
+
+ print "Results stored at", ec.exp_dir
+
+ #### Retrieve the content producing resource to wait for ot to finish
+ ccncat = ec.filter_resources("LinuxCCNCat")
+
+ #### Run experiment until metric convergences
+ rnr = ExperimentRunner()
+ runs = rnr.run(ec, min_runs = 10, max_runs = 300,
+ compute_metric_callback = avg_interests,
+ wait_guids = ccncat,
+ wait_time = 0)
+
return app
-def add_collector(ec, trace_name, store_dir):
+def add_collector(ec, trace_name):
collector = ec.register_resource("Collector")
ec.set(collector, "traceName", trace_name)
- ec.set(collector, "storeDir", store_dir)
return collector
( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
- ec = ExperimentController(exp_id = exp_id)
+ ec = ExperimentController(exp_id = exp_id, local_dir = results_dir)
# hosts in the US
#host1 = "planetlab4.wail.wisc.edu"
app, ResourceState.STARTED, time = "10s")
# Register a collector to automatically collect traces
- collector = add_collector(ec, "stderr", results_dir)
+ collector = add_collector(ec, "stderr")
for ccnd in ccnds.values():
ec.register_connection(collector, ccnd)
ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
ssh_user = ####### <<< ASSING the SSH username >>>
+results_dir = "/tmp/demo_CCN_results"
+
## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir)
## Register node 1
node1 = ec.register_resource("LinuxNode")
# Register a collector to automatically collect the ccnd logs
# to a local directory
-results_dir = "/tmp/demo_CCN_results"
col1 = ec.register_resource("Collector")
ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
ec.set(col1, "subDir", hostname1)
ec.register_connection(col1, ccnd1)
col2 = ec.register_resource("Collector")
ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
ec.set(col2, "subDir", hostname2)
ec.register_connection(col2, ccnd2)
pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>>
slicename = ####### <<< ASSING the PlanetLab slicename >>>
+results_dir = "/tmp/demo_CCN_results"
+
## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN",
+ local_dir = results_dir)
## Register node 1
node1 = ec.register_resource("PlanetlabNode")
# Register a collector to automatically collect the ccnd logs
# to a local directory
-results_dir = "/tmp/demo_CCN_results"
col1 = ec.register_resource("Collector")
ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
ec.set(col1, "subDir", hostname1)
ec.register_connection(col1, ccnd1)
col2 = ec.register_resource("Collector")
ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
ec.set(col2, "subDir", hostname2)
ec.register_connection(col2, ccnd2)
--- /dev/null
+#!/usr/bin/env python
+
+###############################################################################
+#
+# CCNX benchmark
+# Copyright (C) 2014 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse (CCNx) ccnd logs.
+#
+# Results from experiments must be stored in a directory
+# named with the experiment run id.
+# ccnd logs are stored in .log files in a subdirectory per node.
+# The following diagram exemplifies the experiment result directory
+# structure (nidi is the unique identifier assigned to node i):
+#
+# run_id
+# \ nid1
+# \ nid2.log
+# \ nid2
+# \ nid1.log
+# \ nid3
+# \ nid3.log
+#
+
+import collections
+import functools
+import networkx
+import os
+import pickle
+import tempfile
+
+from nepi.util.timefuncs import compute_delay_ms
+from nepi.util.statfuncs import compute_mean
+import nepi.data.processing.ping.parser as ping_parser
+
+def is_control(content_name):
+ return content_name.startswith("ccnx:/%C1") or \
+ content_name.startswith("ccnx:/ccnx") or \
+ content_name.startswith("ccnx:/...")
+
+
+def parse_file(filename):
+ """ Parses message information from ccnd log files
+
+ filename: path to ccndlog file
+
+ """
+
+ faces = dict()
+ sep = " "
+
+ f = open(filename, "r")
+
+ data = []
+
+ for line in f:
+ cols = line.strip().split(sep)
+
+ # CCN_PEEK
+ # MESSAGE interest_from
+ # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
+ #
+ # MESSAGE interest_to
+ # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
+ #
+ # MESSAGE CONTENT FROM
+ # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+ #
+ # MESSAGE CONTENT_TO
+ # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+ #
+ # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
+
+ # External face creation
+ # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
+
+ if line.find("accepted datagram client") > -1:
+ face_id = (cols[5]).replace("id=",'')
+ ip = cols[7]
+ port = cols[9]
+ faces[face_id] = (ip, port)
+ continue
+
+ # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
+ if line.find("releasing face id") > -1:
+ face_id = cols[5]
+ if face_id in faces:
+ del faces[face_id]
+ continue
+
+ if len(cols) < 6:
+ continue
+
+ timestamp = cols[0]
+ message_type = cols[3]
+
+ if message_type not in ["interest_from", "interest_to", "content_from",
+ "content_to", "interest_dupnonce", "interest_expiry"]:
+ continue
+
+ face_id = cols[4]
+ content_name = cols[5]
+
+ # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
+ nonce = ""
+ if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
+ last = cols[-1]
+ if len(last.split("-")) == 5:
+ nonce = last
+
+ try:
+ size = int((cols[6]).replace('(',''))
+ except:
+ print "interest_expiry without face id!", line
+ continue
+
+ # If no external IP address was identified for this face
+ # asume it is a local face
+ peer = "localhost"
+
+ if face_id in faces:
+ peer, port = faces[face_id]
+
+ data.append((content_name, timestamp, message_type, peer, face_id,
+ size, nonce, line))
+
+ f.close()
+
+ return data
+
+def dump_content_history(content_history):
+ f = tempfile.NamedTemporaryFile(delete=False)
+ pickle.dump(content_history, f)
+ f.close()
+ return f.name
+
+def load_content_history(fname):
+ f = open(fname, "r")
+ content_history = pickle.load(f)
+ f.close()
+
+ os.remove(fname)
+ return content_history
+
+def annotate_cn_node(graph, nid, ips2nid, data, content_history):
+ for (content_name, timestamp, message_type, peer, face_id,
+ size, nonce, line) in data:
+
+ # Ignore control messages for the time being
+ if is_control(content_name):
+ continue
+
+ if message_type == "interest_from" and \
+ peer == "localhost":
+ graph.node[nid]["ccn_consumer"] = True
+ elif message_type == "content_from" and \
+ peer == "localhost":
+ graph.node[nid]["ccn_producer"] = True
+
+ # Ignore local messages for the time being.
+ # They could later be used to calculate the processing times
+ # of messages.
+ if peer == "localhost":
+ continue
+
+ # remove digest
+ if message_type in ["content_from", "content_to"]:
+ content_name = "/".join(content_name.split("/")[:-1])
+
+ if content_name not in content_history:
+ content_history[content_name] = list()
+
+ peernid = ips2nid[peer]
+ graph.add_edge(nid, peernid)
+
+ content_history[content_name].append((timestamp, message_type, nid,
+ peernid, nonce, size, line))
+
+def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
+ """ Adds CCN content history for each node in the topology graph.
+
+ """
+ # Make a copy of the graph to ensure integrity
+ graph = graph.copy()
+
+ ips2nid = dict()
+
+ for nid in graph.nodes():
+ ips = graph.node[nid]["ips"]
+ for ip in ips:
+ ips2nid[ip] = nid
+
+ # Now walk through the ccnd logs...
+ for dirpath, dnames, fnames in os.walk(logs_dir):
+ # continue if we are not at the leaf level (if there are subdirectories)
+ if dnames:
+ continue
+
+ # Each dirpath correspond to a different node
+ nid = os.path.basename(dirpath)
+
+ content_history = dict()
+
+ for fname in fnames:
+ if fname.endswith(".log"):
+ filename = os.path.join(dirpath, fname)
+ data = parse_file(filename)
+ annotate_cn_node(graph, nid, ips2nid, data, content_history)
+
+ # Avoid storing everything in memory, instead dump to a file
+ # and reference the file
+ fname = dump_content_history(content_history)
+ graph.node[nid]["history"] = fname
+
+ if parse_ping_logs:
+ ping_parser.annotate_cn_graph(logs_dir, graph)
+
+ return graph
+
+def ccn_producers(graph):
+ """ Returns the nodes that are content providers """
+ return [nid for nid in graph.nodes() \
+ if graph.node[nid].get("ccn_producer")]
+
+def ccn_consumers(graph):
+ """ Returns the nodes that are content consumers """
+ return [nid for nid in graph.nodes() \
+ if graph.node[nid].get("ccn_consumer")]
+
+def process_content_history(graph):
+ """ Compute CCN message counts and aggregates content historical
+ information in the content_names dictionary
+
+ """
+
+ ## Assume single source
+ source = ccn_consumers(graph)[0]
+
+ interest_expiry_count = 0
+ interest_dupnonce_count = 0
+ interest_count = 0
+ content_count = 0
+ content_names = dict()
+
+ # Collect information about exchanged messages by content name and
+ # link delay info.
+ for nid in graph.nodes():
+ # Load the data collected from the node's ccnd log
+ fname = graph.node[nid]["history"]
+ history = load_content_history(fname)
+
+ for content_name in history.keys():
+ hist = history[content_name]
+
+ for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
+ if message_type in ["content_from", "content_to"]:
+ # The first Interest sent will not have a version or chunk number.
+ # The first Content sent back in reply, will end in /=00 or /%00.
+ # Make sure to map the first Content to the first Interest.
+ if content_name.endswith("/=00"):
+ content_name = "/".join(content_name.split("/")[0:-2])
+
+ # Add content name to dictionary
+ if content_name not in content_names:
+ content_names[content_name] = dict()
+ content_names[content_name]["interest"] = dict()
+ content_names[content_name]["content"] = list()
+
+ # Classify interests by replica
+ if message_type in ["interest_from"] and \
+ nonce not in content_names[content_name]["interest"]:
+ content_names[content_name]["interest"][nonce] = list()
+
+ # Add consumer history
+ if nid == source:
+ if message_type in ["interest_to", "content_from"]:
+ # content name history as seen by the source
+ if "consumer_history" not in content_names[content_name]:
+ content_names[content_name]["consumer_history"] = list()
+
+ content_names[content_name]["consumer_history"].append(
+ (timestamp, message_type))
+
+ # Add messages per content name and cumulate totals by message type
+ if message_type == "interest_dupnonce":
+ interest_dupnonce_count += 1
+ elif message_type == "interest_expiry":
+ interest_expiry_count += 1
+ elif message_type == "interest_from":
+ interest_count += 1
+ # Append to interest history of the content name
+ content_names[content_name]["interest"][nonce].append(
+ (timestamp, nid2, nid1))
+ elif message_type == "content_from":
+ content_count += 1
+ # Append to content history of the content name
+ content_names[content_name]["content"].append((timestamp, nid2, nid1))
+ else:
+ continue
+ del hist
+ del history
+
+ # Compute the time elapsed between the time an interest is sent
+ # in the consumer node and when the content is received back
+ for content_name in content_names.keys():
+ # order content and interest messages by timestamp
+ content_names[content_name]["content"] = sorted(
+ content_names[content_name]["content"])
+
+ for nonce, timestamps in content_names[content_name][
+ "interest"].iteritems():
+ content_names[content_name]["interest"][nonce] = sorted(
+ timestamps)
+
+ history = sorted(content_names[content_name]["consumer_history"])
+ content_names[content_name]["consumer_history"] = history
+
+ # compute the rtt time of the message
+ rtt = None
+ waiting_content = False
+ interest_timestamp = None
+ content_timestamp = None
+
+ for (timestamp, message_type) in history:
+ if not waiting_content and message_type == "interest_to":
+ waiting_content = True
+ interest_timestamp = timestamp
+ continue
+
+ if waiting_content and message_type == "content_from":
+ content_timestamp = timestamp
+ break
+
+ # If we can't determine who sent the interest, discard it
+ rtt = -1
+ if interest_timestamp and content_timestamp:
+ rtt = compute_delay_ms(content_timestamp, interest_timestamp)
+
+ content_names[content_name]["rtt"] = rtt
+ content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
+
+ return (graph,
+ content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count)
+
+def process_content_history_logs(logs_dir, graph):
+ """ Parse CCN logs and aggregate content history information in graph.
+ Returns annotated graph and message countn and content names history.
+
+ """
+ ## Process logs and analyse data
+ try:
+ graph = annotate_cn_graph(logs_dir, graph,
+ parse_ping_logs = True)
+ except:
+ print "Skipping: Error parsing ccnd logs", logs_dir
+ raise
+
+ source = ccn_consumers(graph)[0]
+ target = ccn_producers(graph)[0]
+
+ # Process the data from the ccnd logs, but do not re compute
+ # the link delay.
+ try:
+ (graph,
+ content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count) = process_content_history(graph)
+ except:
+ print "Skipping: Error processing ccn data", logs_dir
+ raise
+
+ return (graph,
+ content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count)
--- /dev/null
+#!/usr/bin/env python
+
+###############################################################################
+#
+# CCNX benchmark
+# Copyright (C) 2014 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse log files generated using ping.
+#
+
+import collections
+import re
+import os
+
+# RE to match line starting "traceroute to"
+_rre = re.compile("\d+ bytes from ((?P<hostname>[^\s]+) )?\(?(?P<ip>[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P<time>[^\s]+) ms")
+
+def parse_file(filename):
+ """
+ filename: path to traceroute file
+
+ """
+
+ f = open(filename, "r")
+
+ # Traceroute info
+ target_ip = None
+ target_hostname = None
+
+ data = []
+
+ for line in f:
+ # match traceroute to ...
+ m = re.match(_rre, line)
+ if not m:
+ continue
+
+ target_ip = m.groupdict()["ip"]
+ # FIX THIS: Make sure the regular expression does not inlcude
+ # the ')' in the ip group
+ target_ip = target_ip.replace(")","")
+ target_hostname = m.groupdict()["hostname"]
+ time = m.groupdict()["time"]
+ data.append((target_ip, target_hostname, time))
+
+ f.close()
+
+ return data
+
+def annotate_cn_node(graph, nid1, ips2nid, data):
+ for (target_ip, target_hostname, time) in data:
+ nid2 = ips2nid[target_ip]
+
+ if "delays" not in graph.edge[nid1][nid2]:
+ graph.edge[nid1][nid2]["delays"] = []
+
+ time = float(time.replace("ms", "").replace(" ",""))
+
+ graph.edge[nid1][nid2]["delays"].append(time)
+
+def annotate_cn_graph(logs_dir, graph):
+ """ Add delay inormation to graph using data collected using
+ ping.
+
+ """
+ ips2nid = dict()
+
+ for nid in graph.nodes():
+ ips = graph.node[nid]["ips"]
+ for ip in ips:
+ ips2nid[ip] = nid
+
+ # Walk through the ping logs...
+ for dirpath, dnames, fnames in os.walk(logs_dir):
+ # continue if we are not at the leaf level (if there are subdirectories)
+ if dnames:
+ continue
+
+ # Each dirpath correspond to a different host
+ nid = os.path.basename(dirpath)
+
+ for fname in fnames:
+ if fname.endswith(".ping"):
+ filename = os.path.join(dirpath, fname)
+ data = parse_file(filename)
+ annotate_cn_node(graph, nid, ips2nid, data)
+
+ # Take as weight the most frequent value
+ for nid1, nid2 in graph.edges():
+ delays = collections.Counter(graph.edge[nid1][nid2]["delays"])
+ weight = delays.most_common(1)[0][0]
+ del graph.edge[nid1][nid2]["delays"]
+ graph.edge[nid1][nid2]["weight"] = weight
+
+ return graph
+
+
# Attribute global is set to all resources of rtype
Global = 1 << 7 # 128
+
class Attribute(object):
"""
.. class:: Class Args :
from nepi.execution.trace import TraceAttr
from nepi.util.serializer import ECSerializer, SFormats
from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType
# TODO: use multiprocessing instead of threading
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
import logging
import os
import sys
+import tempfile
import time
import threading
import weakref
.. note::
An experiment, or scenario, is defined by a concrete set of resources,
- behavior, configuration and interconnection of those resources.
+ and the behavior, configuration and interconnection of those resources.
The Experiment Description (ED) is a detailed representation of a
single experiment. It contains all the necessary information to
allow repeating the experiment. NEPI allows to describe
recreated (and re-run) by instantiating an EC and recreating
the same experiment description.
- In NEPI, an experiment is represented as a graph of interconnected
+ An experiment is represented as a graph of interconnected
resources. A resource is a generic concept in the sense that any
component taking part of an experiment, whether physical of
virtual, is considered a resource. A resources could be a host,
single resource. ResourceManagers are specific to a resource
type (i.e. An RM to control a Linux application will not be
the same as the RM used to control a ns-3 simulation).
- To support a new type of resource in NEPI, a new RM must be
- implemented. NEPI already provides a variety of
- RMs to control basic resources, and new can be extended from
- the existing ones.
+ To support a new type of resource, a new RM must be implemented.
+ NEPI already provides a variety of RMs to control basic resources,
+ and new can be extended from the existing ones.
Through the EC interface the user can create ResourceManagers (RMs),
configure them and interconnect them, to describe an experiment.
exp_id, which can be re-used in different ExperimentController,
and the run_id, which is unique to one ExperimentController instance, and
is automatically generated by NEPI.
-
+
"""
@classmethod
ec = serializer.load(filepath)
return ec
- def __init__(self, exp_id = None):
+ def __init__(self, exp_id = None, local_dir = None, persist = False,
+ add_node_callback = None, add_edge_callback = None, **kwargs):
+ """ ExperimentController entity to model an execute a network
+ experiment.
+
+ :param exp_id: Human readable name to identify the experiment
+ :type exp_id: str
+
+ :param local_dir: Path to local directory where to store experiment
+ related files
+ :type local_dir: str
+
+ :param persist: Save an XML description of the experiment after
+ completion at local_dir
+ :type persist: bool
+
+ :param add_node_callback: Callback to invoke for node instantiation
+ when automatic topology creation mode is used
+ :type add_node_callback: function
+
+ :param add_edge_callback: Callback to invoke for edge instantiation
+ when automatic topology creation mode is used
+ :type add_edge_callback: function
+
+ """
super(ExperimentController, self).__init__()
# Logging
# resources used, etc)
self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
+ # Local path where to store experiment related files (results, etc)
+ if not local_dir:
+ local_dir = tempfile.gettempdir() # /tmp
+
+ self._local_dir = local_dir
+ self._exp_dir = os.path.join(local_dir, self.exp_id)
+ self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+ # If True persist the experiment controller in XML format, after completion
+ self._persist = persist
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
# EC state
self._state = ECState.RUNNING
+ # Automatically construct experiment description
+ self._netgraph = None
+ if add_node_callback or add_edge_callback or kwargs.get("topology"):
+ self._build_from_netgraph(add_node_callback, add_edge_callback,
+ **kwargs)
+
# The runner is a pool of threads used to parallelize
# execution of tasks
self._nthreads = 20
"""
return self._nthreads
-
+ @property
+ def local_dir(self):
+ """ Root local directory for experiment files
+
+ """
+ return self._local_dir
+
+ @property
+ def exp_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment.
+
+ """
+ return self._exp_dir
+
+ @property
+ def run_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment run.
+
+ """
+ return self._run_dir
+
+ @property
+ def persist(self):
+ """ If True, persists the ExperimentController to XML format upon
+ experiment completion
+
+ """
+ return self._persist
+
+ @property
+ def netgraph(self):
+ """ Return NetGraph instance if experiment description was automatically
+ generated
+
+ """
+ return self._netgraph
+
@property
def abort(self):
""" Returns True if the experiment has failed and should be interrupted,
return sec
def save(self, dirpath = None, format = SFormats.XML):
+ if dirpath == None:
+ dirpath = self.run_dir
+
+ try:
+ os.makedirs(dirpath)
+ except OSError:
+ pass
+
serializer = ECSerializer()
- path = serializer.save(self, dirpath = None, format = format)
+ path = serializer.save(self, dirpath, format = format)
return path
def get_task(self, tid):
return rm
def get_resources_by_type(self, rtype):
- """ Returns a registered ResourceManager by its guid
+ """ Returns the ResourceManager objects of type rtype
:param rtype: Resource type
:type rtype: string
"""
rms = []
for guid, rm in self._resources.iteritems():
- if rm.get_rtype() == type:
+ if rm.get_rtype() == rtype:
rms.append(rm)
return rms
@property
def resources(self):
- """ Returns the set() of guids of all the ResourceManager
+ """ Returns the guids of all ResourceManagers
:return: Set of all RM guids
- :rtype: set
+ :rtype: list
"""
keys = self._resources.keys()
return keys
+ def filter_resources(self, rtype):
+ """ Returns the guids of all ResourceManagers of type rtype
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of guids
+
+ """
+ rms = []
+ for guid, rm in self._resources.iteritems():
+ if rm.get_rtype() == rtype:
+ rms.append(rm.guid)
+ return rms
+
def register_resource(self, rtype, guid = None):
""" Registers a new ResourceManager of type 'rtype' in the experiment
self.wait_released(guids)
+ if self.persist:
+ self.save()
+
for guid in guids:
if self.get(guid, "hardRelease"):
self.remove_resource(guid)
self._cond.notify()
self._cond.release()
+ def _build_from_netgraph(self, add_node_callback, add_edge_callback,
+ **kwargs):
+ """ Automates experiment description using a NetGraph instance.
+ """
+ self._netgraph = NetGraph(**kwargs)
+
+ if add_node_callback:
+ ### Add resources to the EC
+ for nid in self.netgraph.nodes():
+ add_node_callback(self, nid)
+
+ if add_edge_callback:
+ #### Add connections between resources
+ for nid1, nid2 in self.netgraph.edges():
+ add_edge_callback(self, nid1, nid2)
+
try:
return func(self, *args, **kwargs)
except:
+ self.fail()
+
import traceback
err = traceback.format_exc()
- self.error(err)
- self.debug("SETTING guid %d to state FAILED" % self.guid)
- self.fail()
+ logger = Logger(self._rtype)
+ logger.error(err)
+ logger.error("SETTING guid %d to state FAILED" % self.guid)
raise
return wrapped
try:
self.do_release()
except:
+ self.set_released()
+
import traceback
err = traceback.format_exc()
- self.error(err)
-
- self.set_released()
+ msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
+ self._rtype, self.guid, err)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
def fail(self):
""" Sets the RM to state FAILED.
def set_released(self, time = None):
""" Mark ResourceManager as REALEASED """
self.set_state(ResourceState.RELEASED, "_release_time", time)
- self.debug("----- RELEASED ---- ")
+
+ msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
def set_failed(self, time = None):
""" Mark ResourceManager as FAILED """
self.set_state(ResourceState.FAILED, "_failed_time", time)
- self.debug("----- FAILED ---- ")
+
+ msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
def set_discovered(self, time = None):
""" Mark ResourceManager as DISCOVERED """
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
import math
import numpy
import os
-import tempfile
import time
class ExperimentRunner(object):
""" Re-runs a same experiment multiple times
:param ec: Experiment description of experiment to run
- :type name: ExperimentController
- :rtype: EperimentController
+ :type ec: ExperimentController
:param min_runs: Minimum number of repetitions for experiment
- :type name: int
- :rtype: int
+ :type min_runs: int
:param max_runs: Maximum number of repetitions for experiment
- :type name: int
- :rtype: int
+ :type max_runs: int
:param wait_time: Time to wait in seconds between invoking
ec.deploy() and ec.release()
- :type name: float
- :rtype: float
+ :type wait_time: float
:param wait_guids: List of guids to pass to ec.wait_finished
after invoking ec.deploy()
- :type name: list
- :rtype: list of int
+ :type wait_guids: list
:param compute_metric_callback: function to invoke after each
experiment run, to compute an experiment metric.
metric = compute_metric_callback(ec, run)
- :type name: function
- :rtype: function
+ :type compute_metric_callback: function
:param evaluate_convergence_callback: function to evaluate whether the
collected metric samples have converged and the experiment runner
If stop is True, then the runner will exit.
- :type name: function
- :rtype: function
+ :type evaluate_convergence_callback: function
"""
"Experiment will stop when the standard error with 95% "
"confidence interval is >= 5% of the mean of the collected samples ")
- # Set useRunId = True in Collectors to make sure results are
- # independently stored.
- collectors = ec.get_resources_by_type("Collector")
- for collector in collectors:
- collector.set("useRunId", True)
+ # Force persistence of experiment controller
+ ec._persist = True
- dirpath = tempfile.mkdtemp()
- filepath = ec.save(dirpath)
+ filepath = ec.save(dirpath = ec.exp_dir)
samples = []
run = 0
- while True:
+ stop = False
+
+ while not stop:
run += 1
ec = self.run_experiment(filepath, wait_time, wait_guids)
ec.logger.info(" RUN %d \n" % run)
- if run >= min_runs and max_runs > -1 and run >= max_runs :
- break
-
if compute_metric_callback:
metric = compute_metric_callback(ec, run)
if metric is not None:
if run >= min_runs and evaluate_convergence_callback:
if evaluate_convergence_callback(ec, run, samples):
- break
+ stop = True
+
+ if run >= min_runs and max_runs > -1 and run >= max_runs :
+ stop = True
+
del ec
return run
ec = ExperimentController.load(filepath)
ec.deploy()
-
+
ec.wait_finished(wait_guids)
time.sleep(wait_time)
ec.release()
+ if ec.state == ECState.FAILED:
+ raise RuntimeError, "Experiment failed"
+
return ec
"Name of the trace to be collected",
flags = Flags.Design)
- store_dir = Attribute("storeDir",
- "Path to local directory to store trace results",
- default = tempfile.gettempdir(),
- flags = Flags.Design)
-
- use_run_id = Attribute("useRunId",
- "If set to True stores traces into a sub directory named after "
- "the RUN ID assigned by the EC",
- type = Types.Bool,
- default = False,
- flags = Flags.Design)
-
sub_dir = Attribute("subDir",
"Sub directory to collect traces into",
flags = Flags.Design)
flags = Flags.Design)
cls._register_attribute(trace_name)
- cls._register_attribute(store_dir)
cls._register_attribute(sub_dir)
cls._register_attribute(rename)
- cls._register_attribute(use_run_id)
def __init__(self, ec, guid):
super(Collector, self).__init__(ec, guid)
self.error(msg)
raise RuntimeError, msg
- self._store_path = self.get("storeDir")
-
- if self.get("useRunId"):
- self._store_path = os.path.join(self._store_path, self.ec.run_id)
+ self._store_path = self.ec.run_dir
subdir = self.get("subDir")
if subdir:
- self._store_path = os.path.join(self._store_path, subdir)
+ self._store_path = os.path.join(self.store_path, subdir)
msg = "Creating local directory at %s to store %s traces " % (
- self._store_path, trace_name)
+ self.store_path, trace_name)
self.info(msg)
try:
rms = self.get_connected()
for rm in rms:
- result = self.ec.trace(rm.guid, trace_name)
fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
- rename))
+ rename))
+
try:
+ result = self.ec.trace(rm.guid, trace_name)
f = open(fpath, "w")
f.write(result)
f.close()
super(LinuxApplication, self).__init__(ec, guid)
self._pid = None
self._ppid = None
+ self._node = None
self._home = "app-%s" % self.guid
+
# whether the command should run in foreground attached
# to a terminal
self._in_foreground = False
@property
def node(self):
- node = self.get_connected(LinuxNode.get_rtype())
- if node: return node[0]
- raise RuntimeError, "Application must be connected to Node"
+ if not self._node:
+ node = self.get_connected(LinuxNode.get_rtype())
+ if not node:
+ msg = "Application %s guid %d NOT connected to Node" % (
+ self._rtype, self.guid)
+ raise RuntimeError, msg
+
+ self._node = node[0]
+
+ return self._node
@property
def app_home(self):
if (proc and proc.poll()) or err:
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
-
+
super(LinuxApplication, self).do_stop()
def do_release(self):
super(LinuxFIBEntry, self).__init__(ec, guid)
self._home = "fib-%s" % self.guid
self._ping = None
- self._mtr = None
self._traceroute = None
+ self._ccnd = None
@property
def ccnd(self):
- ccnd = self.get_connected(LinuxCCND.get_rtype())
- if ccnd: return ccnd[0]
- return None
+ if not self._ccnd:
+ ccnd = self.get_connected(LinuxCCND.get_rtype())
+ if ccnd:
+ self._ccnd = ccnd[0]
+
+ return self._ccnd
+
+ @property
+ def ping(self):
+ if not self._ping:
+ from nepi.resources.linux.ping import LinuxPing
+ ping = self.get_connected(LinuxPing.get_rtype())
+ if ping:
+ self._ping = ping[0]
+
+ return self._ping
+
+ @property
+ def traceroute(self):
+ if not self._traceroute:
+ from nepi.resources.linux.traceroute import LinuxTraceroute
+ traceroute = self.get_connected(LinuxTraceroute.get_rtype())
+ if traceroute:
+ self._traceroute = traceroute[0]
+
+ return self._traceroute
@property
def node(self):
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
if name == "ping":
- return self.ec.trace(self._ping, "stdout", attr, block, offset)
- if name == "mtr":
- return self.ec.trace(self._mtr, "stdout", attr, block, offset)
+ if not self.ping:
+ return None
+ return self.ec.trace(self.ping.guid, "stdout", attr, block, offset)
+
if name == "traceroute":
- return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
+ if not self.traceroute:
+ return None
+ return self.ec.trace(self.traceroute.guid, "stdout", attr, block, offset)
return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
raise RuntimeError, msg
def configure(self):
- if self.trace_enabled("ping"):
+ if self.trace_enabled("ping") and not self.ping:
self.info("Configuring PING trace")
- self._ping = self.ec.register_resource("LinuxPing")
- self.ec.set(self._ping, "printTimestamp", True)
- self.ec.set(self._ping, "target", self.get("host"))
- self.ec.set(self._ping, "earlyStart", True)
- self.ec.register_connection(self._ping, self.node.guid)
+ ping = self.ec.register_resource("LinuxPing")
+ self.ec.set(ping, "printTimestamp", True)
+ self.ec.set(ping, "target", self.get("host"))
+ self.ec.set(ping, "earlyStart", True)
+ self.ec.register_connection(ping, self.node.guid)
+ self.ec.register_connection(ping, self.guid)
# schedule ping deploy
- self.ec.deploy(guids=[self._ping], group = self.deployment_group)
-
- if self.trace_enabled("mtr"):
- self.info("Configuring MTR trace")
- self._mtr = self.ec.register_resource("LinuxMtr")
- self.ec.set(self._mtr, "noDns", True)
- self.ec.set(self._mtr, "printTimestamp", True)
- self.ec.set(self._mtr, "continuous", True)
- self.ec.set(self._mtr, "target", self.get("host"))
- self.ec.set(self._mtr, "earlyStart", True)
- self.ec.register_connection(self._mtr, self.node.guid)
- # schedule mtr deploy
- self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
+ self.ec.deploy(guids=[ping], group = self.deployment_group)
- if self.trace_enabled("traceroute"):
+ if self.trace_enabled("traceroute") and not self.traceroute:
self.info("Configuring TRACEROUTE trace")
- self._traceroute = self.ec.register_resource("LinuxTraceroute")
- self.ec.set(self._traceroute, "printTimestamp", True)
- self.ec.set(self._traceroute, "continuous", True)
- self.ec.set(self._traceroute, "target", self.get("host"))
- self.ec.set(self._traceroute, "earlyStart", True)
- self.ec.register_connection(self._traceroute, self.node.guid)
+ traceroute = self.ec.register_resource("LinuxTraceroute")
+ self.ec.set(traceroute, "printTimestamp", True)
+ self.ec.set(traceroute, "continuous", True)
+ self.ec.set(traceroute, "target", self.get("host"))
+ self.ec.set(traceroute, "earlyStart", True)
+ self.ec.register_connection(traceroute, self.node.guid)
+ self.ec.register_connection(traceroute, self.guid)
# schedule mtr deploy
- self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+ self.ec.deploy(guids=[traceroute], group = self.deployment_group)
def do_start(self):
if self.state == ResourceState.READY:
# Should it be made thread-safe?
class GuidGenerator(object):
def __init__(self):
- self._guids = list()
+ self._last_guid = 0
def next(self, guid = None):
- if guid != None:
- return guid
- else:
- last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
- guid = last_guid + 1
- self._guids.append(guid)
- self._guids.sort()
+ if guid == None:
+ guid = self._last_guid + 1
+
+ self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
return guid
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import ipaddr
+import networkx
+import random
+
+class TopologyType:
+ LINEAR = "linear"
+ LADDER = "ladder"
+ MESH = "mesh"
+ TREE = "tree"
+ STAR = "star"
+ ADHOC = "adhoc"
+
+## TODO:
+## - AQ: Add support for hypergraphs (to be able to add hyper edges to
+## model CSMA or wireless networks)
+
+class NetGraph(object):
+ """ NetGraph represents a network topology.
+ Network graphs are internally using the networkx library.
+
+ """
+
+ def __init__(self, **kwargs):
+ """ A graph can be generated using a specified pattern
+ (LADDER, MESH, TREE, etc), or provided as an argument.
+
+ :param topology: Undirected graph to use as internal representation
+ :type topology: networkx.Graph
+
+ :param topo_type: One of TopologyType.{LINEAR,LADDER,MESH,TREE,STAR}
+ used to automatically generate the topology graph.
+ :type topo_type: TopologyType
+
+ :param node_count: Number of nodes in the topology to be generated.
+ :type node_count: int
+
+ :param branches: Number of branches (arms) for the STAR topology.
+ :type branches: int
+
+
+ :param assign_ips: Automatically assign IP addresses to each node.
+ :type assign_ips: bool
+
+ :param network: Base network segment for IP address assignment.
+ :type network: str
+
+ :param prefix: Base network prefix for IP address assignment.
+ :type prefix: int
+
+ :param version: IP version for IP address assignment.
+ :type version: int
+
+
+ :param assign_st: Select source and target nodes on the graph.
+ :type assign_st: bool
+
+ NOTE: Only point-to-point like network topologies are supported for now.
+ (Wireless and Ethernet networks were several nodes share the same
+ edge (hyperedge) can not be modeled for the moment).
+
+ """
+ self._topology = kwargs.get("topology")
+ self._topo_type = kwargs.get("topo_type", TopologyType.ADHOC)
+
+ if not self.topology:
+ if kwargs.get("node_count"):
+ node_count = kwargs["node_count"]
+ branches = kwargs.get("branches")
+
+ self._topology = self.generate_topology(self.topo_type,
+ node_count, branches = branches)
+ else:
+ self._topology = networkx.Graph()
+
+ if kwargs.get("assign_ips"):
+ network = kwargs.get("network", "10.0.0.0")
+ prefix = kwargs.get("prefix", 8)
+ version = kwargs.get("version", 4)
+
+ self.assign_p2p_ips(network = network, prefix = prefix,
+ version = version)
+
+ if kwargs.get("assign_st"):
+ self.select_target_zero()
+ self.select_random_leaf_source()
+
+ @property
+ def topology(self):
+ return self._topology
+
+ @property
+ def topo_type(self):
+ return self._topo_type
+
+ @property
+ def order(self):
+ return self.topology.order()
+
+ def nodes(self):
+ return self.topology.nodes()
+
+ def edges(self):
+ return self.topology.edges()
+
+ def generate_topology(self, topo_type, node_count, branches = None):
+ if topo_type == TopologyType.LADDER:
+ total_nodes = node_count/2
+ graph = networkx.ladder_graph(total_nodes)
+
+ elif topo_type == TopologyType.LINEAR:
+ graph = networkx.path_graph(node_count)
+
+ elif topo_type == TopologyType.MESH:
+ graph = networkx.complete_graph(node_count)
+
+ elif topo_type == TopologyType.TREE:
+ h = math.log(node_count + 1)/math.log(2) - 1
+ graph = networkx.balanced_tree(2, h)
+
+ elif topo_type == TopologyType.STAR:
+ graph = networkx.Graph()
+ graph.add_node(0)
+
+ nodesinbranch = (node_count - 1)/ BRANCHES
+ c = 1
+
+ for i in xrange(BRANCHES):
+ prev = 0
+ for n in xrange(1, nodesinbranch + 1):
+ graph.add_node(c)
+ graph.add_edge(prev, c)
+ prev = c
+ c += 1
+
+ # node ids are int, make them str
+ g = networkx.Graph()
+ g.add_nodes_from(map(lambda nid: str(nid), graph.nodes()))
+ g.add_edges_from(map(lambda t: (str(t[0]), str(t[1])),
+ graph.edges()))
+
+ return g
+
+ def add_node(self, nid):
+ nid = str(nid)
+
+ if nid not in self.topology:
+ self.topology.add_node(nid)
+
+ def add_edge(self, nid1, nid2):
+ nid1 = str(nid1)
+ nid2 = str(nid2)
+
+ self.add_node(nid1)
+ self.add_node( nid2)
+
+ if nid1 not in self.topology[nid2]:
+ self.topology.add_edge(nid2, nid1)
+
+ def annotate_node_ip(self, nid, ip):
+ if "ips" not in self.topology.node[nid]:
+ self.topology.node[nid]["ips"] = list()
+
+ self.topology.node[nid]["ips"].append(ip)
+
+ def node_ip_annotations(self, nid):
+ return self.topology.node[nid].get("ips", [])
+
+ def annotate_node(self, nid, name, value):
+ if not isinstance(value, str) and not isinstance(value, int) and \
+ not isinstance(value, float) and not isinstance(value, bool):
+ raise RuntimeError, "Non-serializable annotation"
+
+ self.topology.node[nid][name] = value
+
+ def node_annotation(self, nid, name):
+ return self.topology.node[nid].get(name)
+
+ def node_annotations(self, nid):
+ return self.topology.node[nid].keys()
+
+ def del_node_annotation(self, nid, name):
+ del self.topology.node[nid][name]
+
+ def annotate_edge(self, nid1, nid2, name, value):
+ if not isinstance(value, str) and not isinstance(value, int) and \
+ not isinstance(value, float) and not isinstance(value, bool):
+ raise RuntimeError, "Non-serializable annotation"
+
+ self.topology.edge[nid1][nid2][name] = value
+
+ def annotate_edge_net(self, nid1, nid2, ip1, ip2, mask, network,
+ prefixlen):
+ self.topology.edge[nid1][nid2]["net"] = dict()
+ self.topology.edge[nid1][nid2]["net"][nid1] = ip1
+ self.topology.edge[nid1][nid2]["net"][nid2] = ip2
+ self.topology.edge[nid1][nid2]["net"]["mask"] = mask
+ self.topology.edge[nid1][nid2]["net"]["network"] = network
+ self.topology.edge[nid1][nid2]["net"]["prefix"] = prefixlen
+
+ def edge_net_annotation(self, nid1, nid2):
+ return self.topology.edge[nid1][nid2].get("net", dict())
+
+ def edge_annotation(self, nid1, nid2, name):
+ return self.topology.edge[nid1][nid2].get(name)
+
+ def edge_annotations(self, nid1, nid2):
+ return self.topology.edge[nid1][nid2].keys()
+
+ def del_edge_annotation(self, nid1, nid2, name):
+ del self.topology.edge[nid1][nid2][name]
+
+ def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
+ """ Assign IP addresses to each end of each edge of the network graph,
+ computing all the point to point subnets and addresses in the network
+ representation.
+
+ :param network: Base network address used for subnetting.
+ :type network: str
+
+ :param prefix: Prefix for the base network address used for subnetting.
+ :type prefixt: int
+
+ :param version: IP version (either 4 or 6).
+ :type version: int
+
+ """
+ if networkx.number_connected_components(self.topology) > 1:
+ raise RuntimeError("Disconnected graph!!")
+
+ # Assign IP addresses to host
+ netblock = "%s/%d" % (network, prefix)
+ if version == 4:
+ net = ipaddr.IPv4Network(netblock)
+ new_prefix = 30
+ elif version == 6:
+ net = ipaddr.IPv6Network(netblock)
+ new_prefix = 30
+ else:
+ raise RuntimeError, "Invalid IP version %d" % version
+
+ ## Clear all previusly assigned IPs
+ for nid in self.topology.nodes():
+ self.topology.node[nid]["ips"] = list()
+
+ ## Generate and assign new IPs
+ sub_itr = net.iter_subnets(new_prefix = new_prefix)
+
+ for nid1, nid2 in self.topology.edges():
+ #### Compute subnets for each link
+
+ # get a subnet of base_add with prefix /30
+ subnet = sub_itr.next()
+ mask = subnet.netmask.exploded
+ network = subnet.network.exploded
+ prefixlen = subnet.prefixlen
+
+ # get host addresses in that subnet
+ i = subnet.iterhosts()
+ addr1 = i.next()
+ addr2 = i.next()
+
+ ip1 = addr1.exploded
+ ip2 = addr2.exploded
+ self.annotate_edge_net(nid1, nid2, ip1, ip2, mask, network,
+ prefixlen)
+
+ self.annotate_node_ip(nid1, ip1)
+ self.annotate_node_ip(nid2, ip2)
+
+ def get_p2p_info(self, nid1, nid2):
+ net = self.topology.edge[nid1][nid2]["net"]
+ return ( net[nid1], net[nid2], net["mask"], net["network"],
+ net["prefixlen"] )
+
+ def set_source(self, nid):
+ self.topology.node[nid]["source"] = True
+
+ def is_source(self, nid):
+ return self.topology.node[nid].get("source")
+
+ def set_target(self, nid):
+ self.topology.node[nid]["target"] = True
+
+ def is_target(self, nid):
+ return self.topology.node[nid].get("target")
+
+ def targets(self):
+ """ Returns the nodes that are targets """
+ return [nid for nid in self.topology.nodes() \
+ if self.topology.node[nid].get("target")]
+
+ def sources(self):
+ """ Returns the nodes that are sources """
+ return [nid for nid in self.topology.nodes() \
+ if self.topology.node[nid].get("source")]
+
+ def select_target_zero(self):
+ """ Marks the node 0 as target
+ """
+ self.set_target("0")
+
+ def select_random_leaf_source(self):
+ """ Marks a random leaf node as source.
+ """
+
+ # The ladder is a special case because is not symmetric.
+ if self.topo_type == TopologyType.LADDER:
+ total_nodes = self.order/2
+ leaf1 = str(total_nodes - 1)
+ leaf2 = str(nodes - 1)
+ leaves = [leaf1, leaf2]
+ source = leaves.pop(random.randint(0, len(leaves) - 1))
+ else:
+ # options must not be already sources or targets
+ options = [ k for k,v in self.topology.degree().iteritems() \
+ if v == 1 and not self.topology.node[k].get("source") \
+ and not self.topology.node[k].get("target")]
+
+ source = options.pop(random.randint(0, len(options) - 1))
+
+ self.set_source(source)
+
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+from nepi.util.netgraph import NetGraph, TopologyType
from nepi.util.timefuncs import stformat, tsformat
from xml.dom import minidom
ecnode.setAttribute("exp_id", xmlencode(ec.exp_id))
ecnode.setAttribute("run_id", xmlencode(ec.run_id))
ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
+ ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
doc.appendChild(ecnode)
+ if ec.netgraph != None:
+ self._netgraph_to_xml(doc, ecnode, ec)
+
+ rmsnode = doc.createElement("rms")
+ ecnode.appendChild(rmsnode)
+
for guid, rm in ec._resources.iteritems():
- self._rm_to_xml(doc, ecnode, ec, guid, rm)
+ self._rm_to_xml(doc, rmsnode, ec, guid, rm)
return doc
-
- def _rm_to_xml(self, doc, ecnode, ec, guid, rm):
+
+ def _netgraph_to_xml(self, doc, ecnode, ec):
+ ngnode = doc.createElement("topology")
+ ngnode.setAttribute("topo-type", xmlencode(ec.netgraph.topo_type))
+ ecnode.appendChild(ngnode)
+
+ self. _netgraph_nodes_to_xml(doc, ngnode, ec)
+ self. _netgraph_edges_to_xml(doc, ngnode, ec)
+
+ def _netgraph_nodes_to_xml(self, doc, ngnode, ec):
+ ngnsnode = doc.createElement("nodes")
+ ngnode.appendChild(ngnsnode)
+
+ for nid in ec.netgraph.nodes():
+ ngnnode = doc.createElement("node")
+ ngnnode.setAttribute("nid", xmlencode(nid))
+ ngnsnode.appendChild(ngnnode)
+
+ # Mark ources and targets
+ if ec.netgraph.is_source(nid):
+ ngnnode.setAttribute("source", xmlencode(True))
+
+ if ec.netgraph.is_target(nid):
+ ngnnode.setAttribute("target", xmlencode(True))
+
+ # Node annotations
+ annosnode = doc.createElement("node-annotations")
+ add_annotations = False
+ for name in ec.netgraph.node_annotations(nid):
+ add_annotations = True
+ value = ec.netgraph.node_annotation(nid, name)
+ annonode = doc.createElement("node-annotation")
+ annonode.setAttribute("name", xmlencode(name))
+ annonode.setAttribute("value", xmlencode(value))
+ annonode.setAttribute("type", from_type(value))
+ annosnode.appendChild(annonode)
+
+ if add_annotations:
+ ngnnode.appendChild(annosnode)
+
+ def _netgraph_edges_to_xml(self, doc, ngnode, ec):
+ ngesnode = doc.createElement("edges")
+ ngnode.appendChild(ngesnode)
+
+ for nid1, nid2 in ec.netgraph.edges():
+ ngenode = doc.createElement("edge")
+ ngenode.setAttribute("nid1", xmlencode(nid1))
+ ngenode.setAttribute("nid2", xmlencode(nid2))
+ ngesnode.appendChild(ngenode)
+
+ # Edge annotations
+ annosnode = doc.createElement("edge-annotations")
+ add_annotations = False
+ for name in ec.netgraph.edge_annotations(nid1, nid2):
+ add_annotations = True
+ value = ec.netgraph.edge_annotation(nid1, nid2, name)
+ annonode = doc.createElement("edge-annotation")
+ annonode.setAttribute("name", xmlencode(name))
+ annonode.setAttribute("value", xmlencode(value))
+ annonode.setAttribute("type", from_type(value))
+ annosnode.appendChild(annonode)
+
+ if add_annotations:
+ ngenode.appendChild(annosnode)
+
+ def _rm_to_xml(self, doc, rmsnode, ec, guid, rm):
rmnode = doc.createElement("rm")
rmnode.setAttribute("guid", xmlencode(guid))
rmnode.setAttribute("rtype", xmlencode(rm._rtype))
rmnode.setAttribute("release_time", xmlencode(rm._release_time))
if rm._failed_time:
rmnode.setAttribute("failed_time", xmlencode(rm._failed_time))
- ecnode.appendChild(rmnode)
+ rmsnode.appendChild(rmnode)
anode = doc.createElement("attributes")
attributes = False
if ecnode.nodeType == doc.ELEMENT_NODE:
exp_id = xmldecode(ecnode.getAttribute("exp_id"))
run_id = xmldecode(ecnode.getAttribute("run_id"))
+ local_dir = xmldecode(ecnode.getAttribute("local_dir"))
+
+ # Configure number of preocessing threads
nthreads = xmldecode(ecnode.getAttribute("nthreads"))
-
os.environ["NEPI_NTHREADS"] = nthreads
- ec = ExperimentController(exp_id = exp_id)
+
+ # Deserialize netgraph
+ netgraph = self._netgraph_from_xml(doc, ecnode)
+ topo_type = netgraph.topo_type if netgraph else None
+
+ # Instantiate EC
+ ec = ExperimentController(exp_id = exp_id, local_dir = local_dir,
+ topology = netgraph.topology, topo_type = topo_type)
connections = set()
- rmnode_list = ecnode.getElementsByTagName("rm")
- for rmnode in rmnode_list:
- if rmnode.nodeType == doc.ELEMENT_NODE:
- self._rm_from_xml(doc, rmnode, ec, connections)
+ rmsnode_list = ecnode.getElementsByTagName("rms")
+ if rmsnode_list:
+ rmnode_list = rmsnode_list[0].getElementsByTagName("rm")
+ for rmnode in rmnode_list:
+ if rmnode.nodeType == doc.ELEMENT_NODE:
+ self._rm_from_xml(doc, rmnode, ec, connections)
for (guid1, guid2) in connections:
ec.register_connection(guid1, guid2)
return ec
+ def _netgraph_from_xml(self, doc, ecnode):
+ netgraph = None
+
+ topology = ecnode.getElementsByTagName("topology")
+ if topology:
+ topology = topology[0]
+ topo_type = xmldecode(topology.getAttribute("topo-type"))
+
+ netgraph = NetGraph(topo_type = topo_type)
+
+ ngnsnode_list = topology.getElementsByTagName("nodes")
+ if ngnsnode_list:
+ ngnsnode = ngnsnode_list[0].getElementsByTagName("node")
+ for ngnnode in ngnsnode:
+ nid = xmldecode(ngnnode.getAttribute("nid"))
+ netgraph.add_node(nid)
+
+ if ngnnode.hasAttribute("source"):
+ netgraph.set_source(nid)
+ if ngnnode.hasAttribute("target"):
+ netgraph.set_target(nid)
+
+ annosnode_list = ngnnode.getElementsByTagName("node-annotations")
+
+ if annosnode_list:
+ annosnode = annosnode_list[0].getElementsByTagName("node-annotation")
+ for annonode in annosnode:
+ name = xmldecode(annonode.getAttribute("name"))
+
+ if name == "ips":
+ ips = xmldecode(annonode.getAttribute("value"), eval) # list
+ for ip in ips:
+ netgraph.annotate_node_ip(nid, ip)
+ else:
+ value = xmldecode(annonode.getAttribute("value"))
+ tipe = xmldecode(annonode.getAttribute("type"))
+ value = to_type(tipe, value)
+ netgraph.annotate_node(nid, name, value)
+
+ ngesnode_list = topology.getElementsByTagName("edges")
+ if ngesnode_list:
+ ngesnode = ngesnode_list[0].getElementsByTagName("edge")
+ for ngenode in ngesnode:
+ nid1 = xmldecode(ngenode.getAttribute("nid1"))
+ nid2 = xmldecode(ngenode.getAttribute("nid2"))
+ netgraph.add_edge(nid1, nid2)
+
+ annosnode_list = ngenode.getElementsByTagName("edge-annotations")
+ if annosnode_list:
+ annosnode = annosnode_list[0].getElementsByTagName("edge-annotation")
+ for annonode in annosnode:
+ name = xmldecode(annonode.getAttribute("name"))
+
+ if name == "net":
+ net = xmldecode(annonode.getAttribute("value"), eval) # dict
+ netgraph.annotate_edge_net(nid1, nid2, net[nid1], net[nid2],
+ net["mask"], net["network"], net["prefix"])
+ else:
+ value = xmldecode(annonode.getAttribute("value"))
+ tipe = xmldecode(annonode.getAttribute("type"))
+ value = to_type(tipe, value)
+ netgraph.annotate_edge(nid1, nid2, name, value)
+ return netgraph
+
def _rm_from_xml(self, doc, rmnode, ec, connections):
start_time = None
stop_time = None
ccnnode_list = cnnode_list[0].getElementsByTagName("condition")
for ccnnode in ccnnode_list:
action = xmldecode(ccnnode.getAttribute("action"), int)
- group = xmldecode(ccnnode.getAttribute("group"), eval)
+ group = xmldecode(ccnnode.getAttribute("group"), eval) # list
state = xmldecode(ccnnode.getAttribute("state"), int)
time = xmldecode(ccnnode.getAttribute("time"))
time = to_type('STRING', time)
try:
import networkx
except ImportError:
- msg = ("Networkx library is not installed, you will not be able to plot.")
+ msg = "Networkx library is not installed, you will not be able to plot."
logger = Logger("Plotter")
logger.debug(msg)
return sec
- def save(self, ec, dirpath = None, format = SFormats.XML):
- if not dirpath:
- import tempfile
- dirpath = tempfile.mkdtemp()
-
+ def save(self, ec, dirpath, format = SFormats.XML):
date = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
filename = "%s_%s" % (ec.exp_id, date)
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import math
+import numpy
+from scipy import stats
+
+def compute_mean(sample):
+ # TODO: Discard outliers !!!!
+
+ if not sample:
+ print " CANNOT COMPUTE STATS for ", sample
+ return (0, 0, 0, 0)
+
+ x = numpy.array(sample)
+
+ # sample mean and standard deviation
+ n, min_max, mean, var, skew, kurt = stats.describe(x)
+ std = math.sqrt(var)
+
+ # for the population mean and std ...
+ # mean = x.mean()
+ # std = x.std()
+
+ # Calculate confidence interval t-distribution
+ ## BUG: Use quantil of NORMAL distribution, not t-student quantil distribution
+ ci = stats.t.interval(0.95, n-1, loc = mean, scale = std/math.sqrt(n))
+
+ return (mean, std, ci[0], ci[1])
+
return None
+def compute_delay_ms(timestamp2, timestamp1):
+ d1 = datetime.datetime.fromtimestamp(float(timestamp1))
+ d2 = datetime.datetime.fromtimestamp(float(timestamp2))
+ delay = d2 - d1
+
+ # round up resolution - round up to miliseconds
+ return delay.total_seconds() * 1000
+
dirpath = tempfile.mkdtemp()
- ec = ExperimentController(exp_id="test-condition-multirun")
+ ec = ExperimentController(exp_id = "test-condition-multirun",
+ local_dir = dirpath)
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", host)
collector = ec.register_resource("Collector")
ec.set(collector, "traceName", "stdout")
- ec.set(collector, "storeDir", dirpath)
- ec.set(collector, "useRunId", True)
ec.register_connection(ping, collector)
def compute_metric_callback(ping, ec, run):
self.assertTrue(runs >= 5)
dircount = 0
- for d in os.listdir(dirpath):
- path = os.path.join(dirpath, d)
+
+ for d in os.listdir(ec.exp_dir):
+ path = os.path.join(ec.exp_dir, d)
if os.path.isdir(path):
dircount += 1
logs = glob.glob(os.path.join(path, "*.stdout"))
# Load serialized experiment
ec2 = ExperimentController.load(filepath)
- apps = ec2.get_resources_by_type("dummy::Application")
+ apps = ec2.filter_resources("dummy::Application")
ec2.deploy()
ec2.wait_finished(apps)
ec2.shutdown()