Fixing issues with serialization
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 10 Aug 2014 02:56:51 +0000 (04:56 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 10 Aug 2014 02:56:51 +0000 (04:56 +0200)
21 files changed:
examples/ccn_flooding/flooding.py [deleted file]
examples/ccn_flooding/planetlab.py [new file with mode: 0644]
src/nepi/data/__init__.py [new file with mode: 0644]
src/nepi/data/processing/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/parser.py [new file with mode: 0644]
src/nepi/data/processing/ping/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ping/parser.py [new file with mode: 0644]
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/execution/runner.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/util/guid.py
src/nepi/util/netgraph.py
src/nepi/util/parsers/xml_parser.py
src/nepi/util/plotter.py
src/nepi/util/statfuncs.py [new file with mode: 0644]
src/nepi/util/timefuncs.py
test/util/serializer.py

diff --git a/examples/ccn_flooding/flooding.py b/examples/ccn_flooding/flooding.py
deleted file mode 100644 (file)
index c35d7ef..0000000
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env python
-
-###############################################################################
-#
-#    NEPI, a framework to manage network experiments
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU General Public License for more details.
-#
-#    You should have received a copy of the GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-#
-###############################################################################
-
-from nepi.execution.ec import ExperimentController 
-from nepi.execution.resource import ResourceState, ResourceAction
-from nepi.util.netgraph import NetGraph, TopologyType
-
-import os
-import tempfile
-
-from dminer import ccn
-
-PL_NODES = dict({
-    "0": "iraplab1.iralab.uni-karlsruhe.de",
-    "1": "planetlab1.informatik.uni-goettingen.de",
-    "2": "dfn-ple1.x-win.dfn.de",
-    "3": "mars.planetlab.haw-hamburg.de",
-    "4": "planetlab2.unineuchatel.ch", 
-    "5": "planetlab-node3.it-sudparis.eu",
-    "6": "planetlab2.extern.kuleuven.be",
-    "7": "node2pl.planet-lab.telecom-lille1.eu",
-    "8": "planetvs2.informatik.uni-stuttgart.de",
-    "9": "planetlab1.informatik.uni-wuerzburg.de",
-    "10": "planet1.l3s.uni-hannover.de",
-    "11": "planetlab1.wiwi.hu-berlin.de",
-    "12": "pl2.uni-rostock.de", 
-    "13": "planetlab1.u-strasbg.fr",
-    "14": "peeramidion.irisa.fr"
-    })
-
-pl_slice = os.environ.get("PL_SLICE")
-pl_user = os.environ.get("PL_USER")
-pl_password = os.environ.get("PL_PASS")
-pl_ssh_key = os.environ.get("PL_SSHKEY")
-
-content_name = "ccnx:/test/bunny.ts"
-
-pipeline = 4 # Default value for ccncat
-
-operating_system = "f14"
-
-country = "germany"
-
-repofile = os.path.join(
-        os.path.dirname(os.path.realpath(__file__)),
-            "..", "repoFile1.0.8.2")
-
-def add_collector(ec, trace_name, store_dir, sub_dir, rename = None):
-    collector = ec.register_resource("Collector")
-    ec.set(collector, "traceName", trace_name)
-    ec.set(collector, "storeDir", store_dir)
-    ec.set(collector, "subDir", sub_dir)
-    if rename:
-        ec.set(collector, "rename", rename)
-
-    return collector
-
-def add_node(ec, n):
-    hostname = PL_NODES[n]
-
-    node = ec.register_resource("PlanetlabNode")
-    ec.set(node, "hostname", hostname)
-    ec.set(node, "username", username)
-    ec.set(node, "identity", identity)
-    ec.set(node, "pluser", pl_user)
-    ec.set(node, "plpassword", pl_password)
-    #ec.set(node, "country", country)
-    #ec.set(node, "operatingSystem", operating_system)
-    ec.set(node, "cleanExperiment", True)
-    ec.set(node, "cleanProcesses", True)
-
-    return node
-
-def add_ccnd(ec, node, n):
-    global PL_NODES
-
-    ccnd = ec.register_resource("LinuxCCND")
-    ec.set(ccnd, "debug", 7)
-    ec.register_connection(ccnd, node)
-
-    # collector for ccnd trace
-    hostname = PL_NODES[n]
-    collector = add_collector(ec, "stderr", hostname, "log")
-    ec.register_connection(collector, ccnd)
-
-    PL_NODES[n] = (hostname, node, ccnd)
-    return ccnd
-
-def add_ccnr(ec, ccnd):
-    ccnr = ec.register_resource("LinuxCCNR")
-
-    ec.set(ccnr, "repoFile1", repofile)
-    ec.register_connection(ccnr, ccnd)
-
-    return ccnr
-
-def add_fib_entry(ec, n1, n2):
-    (hostname1, node1, ccnd1) = PL_NODES[n1] 
-    (hostname2, node2, ccnd2) = PL_NODES[n2]
-
-    entry = ec.register_resource("LinuxFIBEntry")
-    ec.set(entry, "host", peer_host)
-
-    ec.register_connection(entry, ccnd1)
-
-    ec.enable_trace(entry, "ping")
-    collector = add_collector(ec, "ping", hostname2)
-    ec.register_connection(collector, entry)
-
-    return entry
-
-def add_ccncat(ec, ccnd):
-    ccncat = ec.register_resource("LinuxCCNCat")
-    ec.set(ccncat, "pipeline", pipeline)
-    ec.set(ccncat, "contentName", content_name)
-    ec.register_connection(ccncat, ccnd)
-
-    return ccncat
-
-def compute_metric_callback(ec, run):
-    ## Process logs and analyse data
-    try:
-        graph = ccn.parse_ccndlogs(graph = graph, 
-                parse_ping_logs = True)
-    except:
-        print "Skipping: Error parsing ccnd logs", run_dir
-        raise
-
-    source = ccn.consumers(graph)[0]
-    target = ccn.producers(graph)[0]
-
-    # Process the data from the ccnd logs, but do not re compute
-    # the link delay. 
-    try:
-        (content_names,
-            interest_expiry_count,
-            interest_dupnonce_count,
-            interest_count,
-            content_count) = ccn.process_ccn_data(graph, source)
-    except:
-        print "Skipping: Error processing ccn data", run_dir
-        raise
-
-    # Compute the shortest path
-    shortest_path = ccn.shortest_path(graph, source, target)
-
-    # Compute the load coefficient
-    lcoeff = ccn.load_coefficient(graph, shortest_path, content_names)
-
-    return lcoeff
-             
-if __name__ == '__main__':
-    
-    #### Generate a LADDER network topology 
-    net_graph = NetGraph(topo_type = TopologyType.LADDER, 
-            node_count = 6, 
-            assign_st = True, 
-            assign_ips = True)
-   
-    target = net_graph.targets()[0]
-    source = net_graph.sources()[0]
-    
-    wait_guids = []
-
-    #### Create NEPI Experiment Description (EC)
-    ec = ExperimentController(exp_id)
-
-    ### Add CCN nodes to the (EC)
-    for n in graph.nodes():
-        node = add_node(ec, n)
-        ccnd = add_ccnd(ec, node, n)
-        
-        if n == target:
-            ccnr = add_ccnr(ec, ccnd)
-
-        ## Add content retrival application
-        if n == source:
-            ccncat = add_ccncat(ec, ccnd)
-            wait_guids.append(ccncat)
-
-    #### Add connections between CCN nodes
-    for n1, n2 in graph.edges():
-        add_fib_entry(ec, n1, n2)
-        add_fib_entry(ec, n2, n1)
-
-    #### Define the callback to compute experiment metric
-    metric_callback = functools.partial(compute_metric_callback, ping)
-
-    #### Run experiment until metric convergence
-    rnr = ExperimentRunner()
-
-    runs = rnr.run(ec, min_runs = 10, max_runs = 300 
-            compute_metric_callback = metric_callback,
-            wait_guids = wait_guids,
-            wait_time = 0)
-
diff --git a/examples/ccn_flooding/planetlab.py b/examples/ccn_flooding/planetlab.py
new file mode 100644 (file)
index 0000000..5834a85
--- /dev/null
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    NEPI, a framework to manage network experiments
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import socket
+import os
+
+PL_NODES = dict({
+    "0": "iraplab1.iralab.uni-karlsruhe.de",
+    "1": "planetlab1.informatik.uni-goettingen.de",
+    "2": "dfn-ple1.x-win.dfn.de",
+    "3": "mars.planetlab.haw-hamburg.de",
+    "4": "planetlab2.unineuchatel.ch", 
+    "5": "planetlab-node3.it-sudparis.eu",
+    "6": "planetlab2.extern.kuleuven.be",
+    "7": "node2pl.planet-lab.telecom-lille1.eu",
+    "8": "planetvs2.informatik.uni-stuttgart.de",
+    "9": "planetlab1.informatik.uni-wuerzburg.de",
+    "10": "planet1.l3s.uni-hannover.de",
+    "11": "planetlab1.wiwi.hu-berlin.de",
+    "12": "pl2.uni-rostock.de", 
+    "13": "planetlab1.u-strasbg.fr",
+    "14": "peeramidion.irisa.fr"
+    })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+        os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, subdir, newname = None):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "subDir", subdir)
+    if newname:
+        ec.set(collector, "rename", newname)
+
+    return collector
+
+def add_pl_host(ec, nid):
+    hostname = PL_NODES[nid]
+
+    # Add a planetlab host to the experiment description
+    host = ec.register_resource("PlanetlabNode")
+    ec.set(host, "hostname", hostname)
+    ec.set(host, "username", pl_slice)
+    ec.set(host, "identity", pl_ssh_key)
+    #ec.set(host, "pluser", pl_user)
+    #ec.set(host, "plpassword", pl_password)
+    #ec.set(host, "country", country)
+    #ec.set(host, "operatingSystem", operating_system)
+    ec.set(host, "cleanExperiment", True)
+    ec.set(host, "cleanProcesses", True)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "hostname", hostname)
+    ec.netgraph.annotate_node(nid, "host", host)
+    
+    # Annotate the graph node with an ip address
+    ip = socket.gethostbyname(hostname)
+    ec.netgraph.annotate_node_ip(nid, ip)
+
+def add_pl_ccnd(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+    
+    # Add a CCN daemon to the planetlab node
+    ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
+    ec.register_connection(ccnd, host)
+    
+    # Collector to retrieve ccnd log
+    collector = add_collector(ec, "stderr", nid, "log")
+    ec.register_connection(collector, ccnd)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_pl_ccnr(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN content repository to the planetlab node
+    ccnr = ec.register_resource("LinuxCCNR")
+
+    ec.set(ccnr, "repoFile1", repofile)
+    ec.register_connection(ccnr, ccnd)
+
+def add_pl_ccncat(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN cat application to the planetlab node
+    ccncat = ec.register_resource("LinuxCCNCat")
+    ec.set(ccncat, "pipeline", pipeline)
+    ec.set(ccncat, "contentName", content_name)
+    ec.register_connection(ccncat, ccnd)
+
+def add_pl_fib_entry(ec, nid1, nid2):
+    # Retrieve annotations from netgraph
+    ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd")
+    hostname2 = ec.netgraph.node_annotation(nid2, "hostname")
+    
+    # Add a FIB entry between one planetlab node and its peer
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", hostname2)
+    ec.register_connection(entry, ccnd1)
+
+    # Collector to retrieve peering ping output (to measure neighbors delay)
+    ec.enable_trace(entry, "ping")
+    collector = add_collector(ec, "ping", nid1)
+    ec.register_connection(collector, entry)
+
+    return entry
+
+def avg_interests(ec, run):
+    ## Process logs
+    logs_dir = ec.run_dir
+
+    print ec.netgraph
+
+    (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = ccn_parser.process_content_history_logs(
+                logs_dir, 
+                ec.netgraph)
+
+    shortest_path = networkx.shortest_path(graph, 
+            source = ec.netgraph.sources()[0], 
+            target = ec.netgraph.targets()[0])
+
+    ### Compute metric: Avg number of Interests seen per content name
+    ###                 normalized by the number of nodes in the shortest path
+    content_name_count = len(content_names.values())
+    nodes_in_shortest_path = len(content_names.values())
+    metric = interest_count / float(content_name_count) / float(nodes_in_shortest_path)
+
+    # TODO: DUMP RESULTS TO FILE
+    # TODO: DUMP GRAPH DELAYS!
+
+    return metric
+
+def add_pl_edge(ec, nid1, nid2):
+    #### Add connections between CCN nodes
+    add_pl_fib_entry(ec, nid1, nid2)
+    add_pl_fib_entry(ec, nid2, nid1)
+
+def add_pl_node(ec, nid):
+    ### Add CCN nodes (ec.netgraph holds the topology graph)
+    add_pl_host(ec, nid)
+    add_pl_ccnd(ec, nid)
+        
+    if nid == ec.netgraph.targets()[0]:
+        add_pl_ccnr(ec, nid)
+
+    if nid == ec.netgraph.sources()[0]:
+        add_pl_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+    #### Create NEPI Experiment Description with LINEAR topology 
+    ec = ExperimentController("pl_ccn", 
+            topo_type = TopologyType.LINEAR, 
+            node_count = 4, 
+            #assign_ips = True,
+            assign_st = True,
+            add_node_callback = add_pl_node, 
+            add_edge_callback = add_pl_edge)
+    
+    print "Results stored at", ec.exp_dir
+
+    #### Retrieve the content producing resource to wait for ot to finish
+    ccncat = ec.filter_resources("LinuxCCNCat")
+   
+    #### Run experiment until metric convergences
+    rnr = ExperimentRunner()
+    runs = rnr.run(ec, min_runs = 10, max_runs = 300, 
+            compute_metric_callback = avg_interests,
+            wait_guids = ccncat,
+            wait_time = 0)
+
diff --git a/src/nepi/data/__init__.py b/src/nepi/data/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/__init__.py b/src/nepi/data/processing/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/__init__.py b/src/nepi/data/processing/ccn/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/parser.py b/src/nepi/data/processing/ccn/parser.py
new file mode 100644 (file)
index 0000000..1a65830
--- /dev/null
@@ -0,0 +1,401 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse (CCNx) ccnd logs.
+#
+# Results from experiments must be stored in a directory
+# named with the experiment run id.
+# ccnd logs are stored in .log files in a subdirectory per node.
+# The following diagram exemplifies the experiment result directory
+# structure (nidi is the unique identifier assigned to node i):
+#
+#    run_id
+#               \   nid1
+#                        \ nid2.log
+#               \   nid2
+#                        \ nid1.log
+#               \   nid3
+#                        \ nid3.log
+#
+
+import collections
+import functools
+import networkx
+import os
+import pickle
+import tempfile
+
+from nepi.util.timefuncs import compute_delay_ms
+from nepi.util.statfuncs import compute_mean
+import nepi.data.processing.ping.parser as ping_parser
+
+def is_control(content_name):
+    return content_name.startswith("ccnx:/%C1") or \
+            content_name.startswith("ccnx:/ccnx") or \
+            content_name.startswith("ccnx:/...")
+
+
+def parse_file(filename):
+    """ Parses message information from ccnd log files
+
+        filename: path to ccndlog file
+
+    """
+
+    faces = dict()
+    sep = " "
+
+    f = open(filename, "r")
+
+    data = []
+
+    for line in f:
+        cols =  line.strip().split(sep)
+
+        # CCN_PEEK
+        # MESSAGE interest_from
+        # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
+        #
+        # MESSAGE interest_to
+        # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
+        #
+        # MESSAGE CONTENT FROM
+        # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # MESSAGE CONTENT_TO
+        # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
+
+        # External face creation
+        # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
+
+        if line.find("accepted datagram client") > -1:
+            face_id = (cols[5]).replace("id=",'')
+            ip = cols[7] 
+            port = cols[9]
+            faces[face_id] = (ip, port)
+            continue
+
+        # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
+        if line.find("releasing face id") > -1:
+            face_id = cols[5]
+            if face_id in faces:
+                del faces[face_id]
+            continue
+
+        if len(cols) < 6:
+            continue
+
+        timestamp = cols[0]
+        message_type = cols[3]
+
+        if message_type not in ["interest_from", "interest_to", "content_from", 
+                "content_to", "interest_dupnonce", "interest_expiry"]:
+            continue
+
+        face_id = cols[4] 
+        content_name = cols[5]
+
+        # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
+        nonce = ""
+        if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
+            last = cols[-1]
+            if len(last.split("-")) == 5:
+                nonce = last
+
+        try:
+            size = int((cols[6]).replace('(',''))
+        except:
+            print "interest_expiry without face id!", line
+            continue
+
+        # If no external IP address was identified for this face
+        # asume it is a local face
+        hostip = "localhost"
+
+        if face_id in faces:
+            hostip, port = faces[face_id]
+
+        data.append((content_name, timestamp, message_type, hostip, face_id, 
+            size, nonce, line))
+
+    f.close()
+
+    return data
+
+def dump_content_history(content_history):
+    f = tempfile.NamedTemporaryFile(delete=False)
+    pickle.dump(content_history, f)
+    f.close()
+    return f.name
+
+def load_content_history(fname):
+    f = open(fname, "r")
+    content_history = pickle.load(f)
+    f.close()
+
+    os.remove(fname)
+    return content_history
+
+def annotate_cn_node(graph, nid, ips2nid, data, content_history):
+    for (content_name, timestamp, message_type, hostip, face_id, 
+            size, nonce, line) in data:
+
+        # Ignore local messages for the time being. 
+        # They could later be used to calculate the processing times
+        # of messages.
+        if peer == "localhost":
+            return
+
+        # Ignore control messages for the time being
+        if is_control(content_name):
+            return
+
+        if message_type == "interest_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_consumer"] = True
+        elif message_type == "content_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_producer"] = True
+
+        # remove digest
+        if message_type in ["content_from", "content_to"]:
+            content_name = "/".join(content_name.split("/")[:-1])
+           
+        if content_name not in content_history:
+            content_history[content_name] = list()
+      
+        peernid = ips2nid[peer]
+        add_edge(graph, nid, peernid)
+
+        content_history[content_name].append((timestamp, message_type, nid, 
+            peernid, nonce, size, line))
+
+def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
+    """ Adds CCN content history for each node in the topology graph.
+
+    """
+    # Make a copy of the graph to ensure integrity
+    graph = graph.copy()
+
+    ips2nids = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nids[ip] = nid
+
+    # Now walk through the ccnd logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different node
+        nid = os.path.basename(dirpath)
+    
+        content_history = dict()
+
+        for fname in fnames:
+            if fname.endswith(".log"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nids, data, content_history)
+
+        # Avoid storing everything in memory, instead dump to a file
+        # and reference the file
+        fname = dump_content_history(content_history)
+        graph.node[nid]["history"] = fname
+
+    if parse_ping_logs:
+        ping_parser.annotate_cn_graph(logs_dir, graph)
+
+    return graph
+
+def ccn_producers(graph):
+    """ Returns the nodes that are content providers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_producer")]
+
+def ccn_consumers(graph):
+    """ Returns the nodes that are content consumers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_consumer")]
+
+def process_content_history(graph):
+    """ Compute CCN message counts and aggregates content historical 
+    information in the content_names dictionary 
+    
+    """
+
+    ## Assume single source
+    source = ccn_consumers(graph)[0]
+
+    interest_expiry_count = 0
+    interest_dupnonce_count = 0
+    interest_count = 0
+    content_count = 0
+    content_names = dict()
+
+    # Collect information about exchanged messages by content name and
+    # link delay info.
+    for nid in graph.nodes():
+        # Load the data collected from the node's ccnd log
+        fname = graph.node[nid]["history"]
+        history = load_content_history(fname)
+
+        for content_name in history.keys():
+            hist = history[content_name]
+
+            for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
+                if message_type in ["content_from", "content_to"]:
+                    # The first Interest sent will not have a version or chunk number.
+                    # The first Content sent back in reply, will end in /=00 or /%00.
+                    # Make sure to map the first Content to the first Interest.
+                    if content_name.endswith("/=00"):
+                        content_name = "/".join(content_name.split("/")[0:-2])
+
+                # Add content name to dictionary
+                if content_name not in content_names:
+                    content_names[content_name] = dict()
+                    content_names[content_name]["interest"] = dict()
+                    content_names[content_name]["content"] = list()
+
+                # Classify interests by replica
+                if message_type in ["interest_from"] and \
+                        nonce not in content_names[content_name]["interest"]:
+                    content_names[content_name]["interest"][nonce] = list()
+     
+                # Add consumer history
+                if nid == source:
+                    if message_type in ["interest_to", "content_from"]:
+                        # content name history as seen by the source
+                        if "consumer_history" not in content_names[content_name]:
+                            content_names[content_name]["consumer_history"] = list()
+
+                        content_names[content_name]["consumer_history"].append(
+                                (timestamp, message_type)) 
+
+                # Add messages per content name and cumulate totals by message type
+                if message_type == "interest_dupnonce":
+                    interest_dupnonce_count += 1
+                elif message_type == "interest_expiry":
+                    interest_expiry_count += 1
+                elif message_type == "interest_from":
+                    interest_count += 1
+                    # Append to interest history of the content name
+                    content_names[content_name]["interest"][nonce].append(
+                            (timestamp, nid2, nid1))
+                elif message_type == "content_from":
+                    content_count += 1
+                    # Append to content history of the content name
+                    content_names[content_name]["content"].append((timestamp, nid2, nid1))
+                else:
+                    continue
+            del hist
+        del history
+
+    # Compute the time elapsed between the time an interest is sent
+    # in the consumer node and when the content is received back
+    for content_name in content_names.keys():
+        # order content and interest messages by timestamp
+        content_names[content_name]["content"] = sorted(
+              content_names[content_name]["content"])
+        
+        for nonce, timestamps in content_names[content_name][
+                    "interest"].iteritems():
+              content_names[content_name]["interest"][nonce] = sorted(
+                        timestamps)
+      
+        history = sorted(content_names[content_name]["consumer_history"])
+        content_names[content_name]["consumer_history"] = history
+
+        # compute the rtt time of the message
+        rtt = None
+        waiting_content = False 
+        interest_timestamp = None
+        content_timestamp = None
+        
+        for (timestamp, message_type) in history:
+            if not waiting_content and message_type == "interest_to":
+                waiting_content = True
+                interest_timestamp = timestamp
+                continue
+
+            if waiting_content and message_type == "content_from":
+                content_timestamp = timestamp
+                break
+    
+        # If we can't determine who sent the interest, discard it
+        rtt = -1
+        if interest_timestamp and content_timestamp:
+            rtt = compute_delay_ms(content_timestamp, interest_timestamp)
+
+        content_names[content_name]["rtt"] = rtt
+        content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
+
+    return (content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count)
+
+def process_content_history_logs(logs_dir, graph):
+    """ Parse CCN logs and aggregate content history information in graph.
+    Returns annotated graph and message countn and content names history.
+
+    """
+    ## Process logs and analyse data
+    try:
+        graph = annotate_cn_graph(logs_dir, graph, 
+                parse_ping_logs = True)
+    except:
+        print "Skipping: Error parsing ccnd logs", logs_dir
+        raise
+
+    source = consumers(graph)[0]
+    target = producers(graph)[0]
+
+    # Process the data from the ccnd logs, but do not re compute
+    # the link delay. 
+    try:
+        (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = process_content_history(graph)
+    except:
+        print "Skipping: Error processing ccn data", logs_dir
+        raise
+
+    return (graph,
+            content_names,
+            interest_expiry_count,
+            interest_dupnonce_count,
+            interest_count,
+            content_count) 
diff --git a/src/nepi/data/processing/ping/__init__.py b/src/nepi/data/processing/ping/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ping/parser.py b/src/nepi/data/processing/ping/parser.py
new file mode 100644 (file)
index 0000000..3396207
--- /dev/null
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse log files generated using ping. 
+#
+
+import re
+
+# RE to match line starting "traceroute to"
+_rre = re.compile("\d+ bytes from ((?P<hostname>[^\s]+) )?\(?(?P<ip>[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P<time>[^\s]+) ms")
+
+def parse_file(filename):
+    """
+        filename: path to traceroute file
+
+    """
+
+    f = open(filename, "r")
+
+    # Traceroute info
+    target_ip = None
+    target_hostname = None
+   
+    data = []
+
+    for line in f:
+        # match traceroute to ...
+        m = re.match(_rre, line)
+        if not m:
+            continue
+
+        target_ip = m.groupdict()["ip"]
+        # FIX THIS: Make sure the regular expression does not inlcude 
+        # the ')' in the ip group 
+        target_ip = target_ip.replace(")","")
+        target_hostname = m.groupdict()["hostname"]
+        time = m.groupdict()["time"]
+        data.append((target_ip, target_hostname, time))
+
+    f.close()
+
+    return data
+
+def annotate_cn_node(graph, nid1, ips2nids, data):
+    for (target_ip, target_hostname, time) in data:
+        nid2 = ips2nid[target_ip]
+
+        if "delays" not in graph.edge[nid1][nid2]:
+            graph.edge[nid1][nid2]["delays"] = []
+
+        time = float(time.replace("ms", "").replace(" ",""))
+
+        graph.edge[nid1][nid2]["delays"].append(time)
+
+def annotate_cn_graph(logs_dir, graph): 
+    """ Add delay inormation to graph using data collected using
+    ping.
+
+    """
+    ips2nids = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nids[ip] = nid
+
+    # Walk through the ping logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different host
+        nid = os.path.basename(dirpath)
+    
+        for fname in fnames:
+            if fname.endswith(".ping"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nids, data)
+
+    # Take as weight the most frequent value
+    for nid1, nid2 in graph.edges():
+        delays = collections.Counter(graph.edge[nid1][nid2]["delays"])
+        weight = delays.most_common(1)[0][0]
+        del graph.edge[nid1][nid2]["delays"]
+        graph.edge[nid1][nid2]["weight"] = weight
+
+    return graph
+
+
index 19fbfc2..bf0a853 100644 (file)
@@ -55,6 +55,7 @@ class Flags:
     # Attribute global is set to all resources of rtype
     Global  = 1 << 7 # 128
 
+
 class Attribute(object):
     """
     .. class:: Class Args :
index 1357e4f..5664d2f 100644 (file)
@@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -101,7 +102,7 @@ class ExperimentController(object):
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -116,7 +117,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -126,10 +127,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -160,22 +160,31 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None, local_dir = None, persist = False):
-        """ ExperimentController entity to model an execute a network experiment.
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            add_node_callback = None, add_edge_callback = None, **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
         
         :param exp_id: Human readable name to identify the experiment
-        :type name: str
+        :type exp_id: str
 
         :param local_dir: Path to local directory where to store experiment
             related files
-        :type name: str
+        :type local_dir: str
 
         :param persist: Save an XML description of the experiment after 
         completion at local_dir
-        :type name: bool
+        :type persist: bool
 
-        """
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
 
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -233,6 +242,12 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback and add_edge_callback:
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -313,11 +328,20 @@ class ExperimentController(object):
 
     @property
     def persist(self):
-        """ If Trie persist the ExperimentController to XML format upon completion
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
 
         """
         return self._persist
 
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -478,7 +502,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
@@ -488,7 +512,7 @@ class ExperimentController(object):
         """
         rms = []
         for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
@@ -497,16 +521,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
         keys = self._resources.keys()
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -1184,3 +1223,18 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        ### Add resources to the EC
+        for nid in self.netgraph.graph.nodes():
+            add_node_callback(self, nid)
+
+        #### Add connections between resources
+        for nid1, nid2 in self.netgraph.graph.edges():
+            add_edge_callback(self, nid1, nid2)
+
+
index c389cfe..f4b19f5 100644 (file)
@@ -113,11 +113,13 @@ def failtrap(func):
         try:
             return func(self, *args, **kwargs)
         except:
+            self.fail()
+            
             import traceback
             err = traceback.format_exc()
-            self.error(err)
-            self.debug("SETTING guid %d to state FAILED" % self.guid)
-            self.fail()
+            logger = Logger(self._rtype)
+            logger.error(err)
+            logger.error("SETTING guid %d to state FAILED" % self.guid)
             raise
     
     return wrapped
@@ -560,11 +562,14 @@ class ResourceManager(Logger):
             try:
                 self.do_release()
             except:
+                self.set_released()
+
                 import traceback
                 err = traceback.format_exc()
-                self.error(err)
-
-                self.set_released()
+                msg = " %s guid %d ----- FAILED TO RELEASED ----- \n %s " % (
+                        self._rtype, self.guid, err)
+                logger = Logger(self._rtype)
+                logger.debug(msg)
 
     def fail(self):
         """ Sets the RM to state FAILED.
@@ -1050,12 +1055,18 @@ class ResourceManager(Logger):
     def set_released(self, time = None):
         """ Mark ResourceManager as REALEASED """
         self.set_state(ResourceState.RELEASED, "_release_time", time)
-        self.debug("----- RELEASED ---- ")
+
+        msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_failed(self, time = None):
         """ Mark ResourceManager as FAILED """
         self.set_state(ResourceState.FAILED, "_failed_time", time)
-        self.debug("----- FAILED ---- ")
+
+        msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_discovered(self, time = None):
         """ Mark ResourceManager as DISCOVERED """
index 75e766f..52b4bde 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
 
 import math
 import numpy
@@ -140,12 +140,15 @@ class ExperimentRunner(object):
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
-
+    
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
+        if ec.state == ECState.FAILED:
+            raise RuntimeError, "Experiment failed"
+
         return ec
 
 
index 5e06dab..d095a25 100644 (file)
@@ -173,7 +173,9 @@ class LinuxApplication(ResourceManager):
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
+        self._node = None
         self._home = "app-%s" % self.guid
+
         # whether the command should run in foreground attached
         # to a terminal
         self._in_foreground = False
@@ -194,9 +196,16 @@ class LinuxApplication(ResourceManager):
 
     @property
     def node(self):
-        node = self.get_connected(LinuxNode.get_rtype())
-        if node: return node[0]
-        raise RuntimeError, "Application must be connected to Node"
+        if not self._node:
+            node = self.get_connected(LinuxNode.get_rtype())
+            if not node: 
+                msg = "Application %s guid %d NOT connected to Node" % (
+                        self._rtype, self.guid)
+                raise RuntimeError, msg
+
+            self._node = node[0]
+
+        return self._node
 
     @property
     def app_home(self):
index 8bf9bd1..700ab10 100644 (file)
@@ -85,14 +85,37 @@ class LinuxFIBEntry(LinuxApplication):
         super(LinuxFIBEntry, self).__init__(ec, guid)
         self._home = "fib-%s" % self.guid
         self._ping = None
-        self._mtr = None
         self._traceroute = None
+        self._ccnd = None
 
     @property
     def ccnd(self):
-        ccnd = self.get_connected(LinuxCCND.get_rtype())
-        if ccnd: return ccnd[0]
-        return None
+        if not self._ccnd:
+            ccnd = self.get_connected(LinuxCCND.get_rtype())
+            if ccnd: 
+                self._ccnd = ccnd[0]
+            
+        return self._ccnd
+
+    @property
+    def ping(self):
+        if not self._ping:
+            from nepi.resources.linux.ping import LinuxPing
+            ping = self.get_connected(LinuxPing.get_rtype())
+            if ping: 
+                self._ping = ping[0]
+            
+        return self._ping
+
+    @property
+    def traceroute(self):
+        if not self._traceroute:
+            from nepi.resources.linux.traceroute import LinuxTraceroute
+            traceroute = self.get_connected(LinuxTraceroute.get_rtype())
+            if traceroute: 
+                self._traceroute = traceroute[0]
+            
+        return self._traceroute
 
     @property
     def node(self):
@@ -101,11 +124,14 @@ class LinuxFIBEntry(LinuxApplication):
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         if name == "ping":
-            return self.ec.trace(self._ping, "stdout", attr, block, offset)
-        if name == "mtr":
-            return self.ec.trace(self._mtr, "stdout", attr, block, offset)
+            if not self.ping:
+                return None
+            return self.ec.trace(self.ping, "stdout", attr, block, offset)
+
         if name == "traceroute":
-            return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
+            if not self.traceroute:
+                return None
+            return self.ec.trace(self.traceroute, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
     
@@ -159,38 +185,27 @@ class LinuxFIBEntry(LinuxApplication):
             raise RuntimeError, msg
         
     def configure(self):
-        if self.trace_enabled("ping"):
+        if self.trace_enabled("ping") and not self.ping:
             self.info("Configuring PING trace")
-            self._ping = self.ec.register_resource("LinuxPing")
-            self.ec.set(self._ping, "printTimestamp", True)
-            self.ec.set(self._ping, "target", self.get("host"))
-            self.ec.set(self._ping, "earlyStart", True)
-            self.ec.register_connection(self._ping, self.node.guid)
+            ping = self.ec.register_resource("LinuxPing")
+            self.ec.set(ping, "printTimestamp", True)
+            self.ec.set(ping, "target", self.get("host"))
+            self.ec.set(ping, "earlyStart", True)
+            self.ec.register_connection(ping, self.node.guid)
+            self.ec.register_connection(ping, self.guid)
             # schedule ping deploy
-            self.ec.deploy(guids=[self._ping], group = self.deployment_group)
-
-        if self.trace_enabled("mtr"):
-            self.info("Configuring MTR trace")
-            self._mtr = self.ec.register_resource("LinuxMtr")
-            self.ec.set(self._mtr, "noDns", True)
-            self.ec.set(self._mtr, "printTimestamp", True)
-            self.ec.set(self._mtr, "continuous", True)
-            self.ec.set(self._mtr, "target", self.get("host"))
-            self.ec.set(self._mtr, "earlyStart", True)
-            self.ec.register_connection(self._mtr, self.node.guid)
-            # schedule mtr deploy
-            self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
+            self.ec.deploy(guids=[ping], group = self.deployment_group)
 
-        if self.trace_enabled("traceroute"):
+        if self.trace_enabled("traceroute") and not self.traceroute:
             self.info("Configuring TRACEROUTE trace")
-            self._traceroute = self.ec.register_resource("LinuxTraceroute")
-            self.ec.set(self._traceroute, "printTimestamp", True)
-            self.ec.set(self._traceroute, "continuous", True)
-            self.ec.set(self._traceroute, "target", self.get("host"))
-            self.ec.set(self._traceroute, "earlyStart", True)
-            self.ec.register_connection(self._traceroute, self.node.guid)
+            traceroute = self.ec.register_resource("LinuxTraceroute")
+            self.ec.set(traceroute, "printTimestamp", True)
+            self.ec.set(traceroute, "continuous", True)
+            self.ec.set(traceroute, "target", self.get("host"))
+            self.ec.set(traceroute, "earlyStart", True)
+            self.ec.register_connection(traceroute, self.node.guid)
             # schedule mtr deploy
-            self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+            self.ec.deploy(guids=[traceroute], group = self.deployment_group)
 
     def do_start(self):
         if self.state == ResourceState.READY:
index 522a174..44624a2 100644 (file)
 # Should it be made thread-safe?
 class GuidGenerator(object):
     def __init__(self):
-        self._guids = list()
+        self._last_guid = 0
 
     def next(self, guid = None):
-        if guid != None:
-            return guid
-        else:
-            last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
-            guid = last_guid + 1 
-        self._guids.append(guid)
-        self._guids.sort()
+        if guid == None:
+            guid = self._last_guid + 1
+
+        self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
         return guid
 
index c3ba814..d939ece 100644 (file)
@@ -39,7 +39,7 @@ class NetGraph(object):
 
     """
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, **kwargs):
         """ A graph can be generated using a specified pattern 
         (LADDER, MESH, TREE, etc), or provided as an argument.
 
@@ -88,7 +88,7 @@ class NetGraph(object):
             branches = kwargs.get("branches")
 
             self._topo_type = topo_type
-            self._graph = self.generate_grap(topo_type, node_count, 
+            self._graph = self.generate_graph(topo_type, node_count, 
                     branches = branches)
 
         if kwargs.get("assign_ips"):
@@ -124,21 +124,21 @@ class NetGraph(object):
         return self.graph.edges()
 
     def generate_graph(self, topo_type, node_count, branches = None):
-        if topo_type == LADDER:
+        if topo_type == TopologyType.LADDER:
             total_nodes = node_count/2
             graph = networkx.ladder_graph(total_nodes)
 
-        elif topo_type == LINEAR:
+        elif topo_type == TopologyType.LINEAR:
             graph = networkx.path_graph(node_count)
 
-        elif topo_type == MESH:
+        elif topo_type == TopologyType.MESH:
             graph = networkx.complete_graph(node_count)
 
-        elif topo_type == TREE:
+        elif topo_type == TopologyType.TREE:
             h = math.log(node_count + 1)/math.log(2) - 1
             graph = networkx.balanced_tree(2, h)
 
-        elif topo_type == STAR:
+        elif topo_type == TopologyType.STAR:
             graph = networkx.Graph()
             graph.add_node(0)
 
@@ -155,9 +155,8 @@ class NetGraph(object):
 
         # node ids are int, make them str
         g = networkx.Graph()
-        g.add_nodes_from(map(lambda nid: NODES[str(nid)], 
-            graph.nodes()))
-        g.add_edges_from(map(lambda t: (NODES[str(t[0])], NODES[str(t[1])]), 
+        g.add_nodes_from(map(lambda nid: str(nid), graph.nodes()))
+        g.add_edges_from(map(lambda t: (str(t[0]), str(t[1])), 
             graph.edges()))
 
         return g
@@ -183,6 +182,30 @@ class NetGraph(object):
             # confidence interval of the mean RTT
             self.graph.edge[nid1][nid2]["weight_ci"] = None
 
+    def annotate_node_ip(self, nid, ip):
+        if "ips" not in self.graph.node[nid]:
+            self.graph.node[nid]["ips"] = list()
+
+        self.graph.node[nid]["ips"].append(ip)
+    
+    def annotate_node(self, nid, name, value):
+        self.graph.node[nid][name] = value
+    
+    def node_annotation(self, nid, name):
+        return self.graph.node[nid].get(name)
+    
+    def del_node_annotation(self, nid, name):
+        del self.graph.node[nid][name]
+
+    def annotate_edge(self, nid1, nid2, name, value):
+        self.graph.edge[nid1][nid2][name] = value
+    
+    def edge_annotation(self, nid1, nid2, name):
+        return self.graph.edge[nid1][nid2].get(name)
+    
+    def del_edge_annotation(self, nid1, nid2, name):
+        del self.graph.edge[nid1][nid2][name]
+
     def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
         """ Assign IP addresses to each end of each edge of the network graph,
         computing all the point to point subnets and addresses in the network
@@ -211,9 +234,14 @@ class NetGraph(object):
             new_prefix = 31
         else:
             raise RuntimeError, "Invalid IP version %d" % version
+        
+        ## Clear all previusly assigned IPs
+        for nid in self.graph.node():
+            self.graph.node[nid]["ips"] = list()
 
+        ## Generate and assign new IPs
         sub_itr = net.iter_subnets(new_prefix = new_prefix)
-
+        
         for nid1, nid2 in self.graph.edges():
             #### Compute subnets for each link
             
@@ -237,16 +265,19 @@ class NetGraph(object):
             self.graph.edge[nid1][nid2]["net"]["network"] = mask
             self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen
 
+            self.annotate_node_ip(nid1, ip1)
+            self.annotate_node_ip(nid2, ip2)
+
     def get_p2p_info(self, nid1, nid2):
         net = self.graph.edge[nid1][nid2]["net"]
         return ( net[nid1], net[nid2], net["mask"], net["network"], 
                 net["prefixlen"] )
 
     def set_source(self, nid):
-        graph.node[nid]["source"] = True
+        self.graph.node[nid]["source"] = True
 
     def set_target(self, nid):
-        graph.node[nid]["target"] = True
+        self.graph.node[nid]["target"] = True
 
     def targets(self):
         """ Returns the nodes that are targets """
@@ -256,7 +287,7 @@ class NetGraph(object):
     def sources(self):
         """ Returns the nodes that are sources """
         return [nid for nid in self.graph.nodes() \
-                if self.graph.node[nid].get("sources")]
+                if self.graph.node[nid].get("source")]
 
     def select_target_zero(self):
         """ Marks the node 0 as target
@@ -276,9 +307,9 @@ class NetGraph(object):
             source = leaves.pop(random.randint(0, len(leaves) - 1))
         else:
             # options must not be already sources or targets
-            options = [ k for k,v in graph.degree().iteritems() \
-                    if v == 1 and not graph.node[k].get("source") \
-                        and not graph.node[k].get("target")]
+            options = [ k for k,v in self.graph.degree().iteritems() \
+                    if v == 1 and not self.graph.node[k].get("source") \
+                        and not self.graph.node[k].get("target")]
 
             source = options.pop(random.randint(0, len(options) - 1))
         
index 125360e..077a12d 100644 (file)
@@ -93,12 +93,15 @@ class ECXMLParser(object):
         ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
         ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
         ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir))
+
         doc.appendChild(ecnode)
 
         for guid, rm in ec._resources.iteritems():
             self._rm_to_xml(doc, ecnode, ec, guid, rm)
 
         return doc
+    
+    def _netgraph_to_xml(self, doc, ecnode, ec):
 
     def _rm_to_xml(self, doc, ecnode, ec, guid, rm):
         rmnode = doc.createElement("rm")
index 48b826e..6b5622e 100644 (file)
@@ -23,7 +23,7 @@ import os
 try:
     import networkx
 except ImportError:
-    msg = ("Networkx library is not installed, you will not be able to plot.")
+    msg = "Networkx library is not installed, you will not be able to plot."
     logger = Logger("Plotter")
     logger.debug(msg)
 
diff --git a/src/nepi/util/statfuncs.py b/src/nepi/util/statfuncs.py
new file mode 100644 (file)
index 0000000..5c0b2ca
--- /dev/null
@@ -0,0 +1,46 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import math
+import numpy
+from scipy import stats
+
+def compute_mean(sample):
+    # TODO: Discard outliers !!!!
+
+    if not sample:
+        print " CANNOT COMPUTE STATS for ", sample
+        return (0, 0, 0, 0)
+
+    x = numpy.array(sample)
+
+    # sample mean and standard deviation
+    n, min_max, mean, var, skew, kurt = stats.describe(x)
+    std = math.sqrt(var)
+
+    # for the population mean and std ...
+    # mean = x.mean()
+    # std = x.std()
+    
+    # Calculate confidence interval t-distribution
+    ## BUG: Use quantil of NORMAL distribution, not t-student quantil distribution
+    ci = stats.t.interval(0.95, n-1, loc = mean, scale = std/math.sqrt(n))
+
+    return (mean, std, ci[0], ci[1])
+
index 3d7c366..f7fbc85 100644 (file)
@@ -113,3 +113,11 @@ def stabsformat(sdate, dbase = None):
 
     return None
 
+def compute_delay_ms(timestamp2, timestamp1):
+    d1 = datetime.datetime.fromtimestamp(float(timestamp1))
+    d2 = datetime.datetime.fromtimestamp(float(timestamp2))
+    delay = d2 - d1
+
+    # round up resolution - round up to miliseconds
+    return delay.total_seconds() * 1000
+
index 3896ea2..993a0f4 100755 (executable)
@@ -132,7 +132,7 @@ class SerializerTestCase(unittest.TestCase):
 
         # Load serialized experiment
         ec2 = ExperimentController.load(filepath)
-        apps = ec2.get_resources_by_type("dummy::Application")
+        apps = ec2.filter_resources("dummy::Application")
         ec2.deploy()
         ec2.wait_finished(apps)
         ec2.shutdown()