Fixing issues with serialization
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 10 Aug 2014 02:56:51 +0000 (04:56 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 10 Aug 2014 02:56:51 +0000 (04:56 +0200)
21 files changed:
examples/ccn_flooding/flooding.py [deleted file]
examples/ccn_flooding/planetlab.py [new file with mode: 0644]
src/nepi/data/__init__.py [new file with mode: 0644]
src/nepi/data/processing/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/parser.py [new file with mode: 0644]
src/nepi/data/processing/ping/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ping/parser.py [new file with mode: 0644]
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/execution/runner.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/util/guid.py
src/nepi/util/netgraph.py
src/nepi/util/parsers/xml_parser.py
src/nepi/util/plotter.py
src/nepi/util/statfuncs.py [new file with mode: 0644]
src/nepi/util/timefuncs.py
test/util/serializer.py

diff --git a/examples/ccn_flooding/flooding.py b/examples/ccn_flooding/flooding.py
deleted file mode 100644 (file)
index c35d7ef..0000000
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env python
-
-###############################################################################
-#
-#    NEPI, a framework to manage network experiments
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU General Public License for more details.
-#
-#    You should have received a copy of the GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-#
-###############################################################################
-
-from nepi.execution.ec import ExperimentController 
-from nepi.execution.resource import ResourceState, ResourceAction
-from nepi.util.netgraph import NetGraph, TopologyType
-
-import os
-import tempfile
-
-from dminer import ccn
-
-PL_NODES = dict({
-    "0": "iraplab1.iralab.uni-karlsruhe.de",
-    "1": "planetlab1.informatik.uni-goettingen.de",
-    "2": "dfn-ple1.x-win.dfn.de",
-    "3": "mars.planetlab.haw-hamburg.de",
-    "4": "planetlab2.unineuchatel.ch", 
-    "5": "planetlab-node3.it-sudparis.eu",
-    "6": "planetlab2.extern.kuleuven.be",
-    "7": "node2pl.planet-lab.telecom-lille1.eu",
-    "8": "planetvs2.informatik.uni-stuttgart.de",
-    "9": "planetlab1.informatik.uni-wuerzburg.de",
-    "10": "planet1.l3s.uni-hannover.de",
-    "11": "planetlab1.wiwi.hu-berlin.de",
-    "12": "pl2.uni-rostock.de", 
-    "13": "planetlab1.u-strasbg.fr",
-    "14": "peeramidion.irisa.fr"
-    })
-
-pl_slice = os.environ.get("PL_SLICE")
-pl_user = os.environ.get("PL_USER")
-pl_password = os.environ.get("PL_PASS")
-pl_ssh_key = os.environ.get("PL_SSHKEY")
-
-content_name = "ccnx:/test/bunny.ts"
-
-pipeline = 4 # Default value for ccncat
-
-operating_system = "f14"
-
-country = "germany"
-
-repofile = os.path.join(
-        os.path.dirname(os.path.realpath(__file__)),
-            "..", "repoFile1.0.8.2")
-
-def add_collector(ec, trace_name, store_dir, sub_dir, rename = None):
-    collector = ec.register_resource("Collector")
-    ec.set(collector, "traceName", trace_name)
-    ec.set(collector, "storeDir", store_dir)
-    ec.set(collector, "subDir", sub_dir)
-    if rename:
-        ec.set(collector, "rename", rename)
-
-    return collector
-
-def add_node(ec, n):
-    hostname = PL_NODES[n]
-
-    node = ec.register_resource("PlanetlabNode")
-    ec.set(node, "hostname", hostname)
-    ec.set(node, "username", username)
-    ec.set(node, "identity", identity)
-    ec.set(node, "pluser", pl_user)
-    ec.set(node, "plpassword", pl_password)
-    #ec.set(node, "country", country)
-    #ec.set(node, "operatingSystem", operating_system)
-    ec.set(node, "cleanExperiment", True)
-    ec.set(node, "cleanProcesses", True)
-
-    return node
-
-def add_ccnd(ec, node, n):
-    global PL_NODES
-
-    ccnd = ec.register_resource("LinuxCCND")
-    ec.set(ccnd, "debug", 7)
-    ec.register_connection(ccnd, node)
-
-    # collector for ccnd trace
-    hostname = PL_NODES[n]
-    collector = add_collector(ec, "stderr", hostname, "log")
-    ec.register_connection(collector, ccnd)
-
-    PL_NODES[n] = (hostname, node, ccnd)
-    return ccnd
-
-def add_ccnr(ec, ccnd):
-    ccnr = ec.register_resource("LinuxCCNR")
-
-    ec.set(ccnr, "repoFile1", repofile)
-    ec.register_connection(ccnr, ccnd)
-
-    return ccnr
-
-def add_fib_entry(ec, n1, n2):
-    (hostname1, node1, ccnd1) = PL_NODES[n1] 
-    (hostname2, node2, ccnd2) = PL_NODES[n2]
-
-    entry = ec.register_resource("LinuxFIBEntry")
-    ec.set(entry, "host", peer_host)
-
-    ec.register_connection(entry, ccnd1)
-
-    ec.enable_trace(entry, "ping")
-    collector = add_collector(ec, "ping", hostname2)
-    ec.register_connection(collector, entry)
-
-    return entry
-
-def add_ccncat(ec, ccnd):
-    ccncat = ec.register_resource("LinuxCCNCat")
-    ec.set(ccncat, "pipeline", pipeline)
-    ec.set(ccncat, "contentName", content_name)
-    ec.register_connection(ccncat, ccnd)
-
-    return ccncat
-
-def compute_metric_callback(ec, run):
-    ## Process logs and analyse data
-    try:
-        graph = ccn.parse_ccndlogs(graph = graph, 
-                parse_ping_logs = True)
-    except:
-        print "Skipping: Error parsing ccnd logs", run_dir
-        raise
-
-    source = ccn.consumers(graph)[0]
-    target = ccn.producers(graph)[0]
-
-    # Process the data from the ccnd logs, but do not re compute
-    # the link delay. 
-    try:
-        (content_names,
-            interest_expiry_count,
-            interest_dupnonce_count,
-            interest_count,
-            content_count) = ccn.process_ccn_data(graph, source)
-    except:
-        print "Skipping: Error processing ccn data", run_dir
-        raise
-
-    # Compute the shortest path
-    shortest_path = ccn.shortest_path(graph, source, target)
-
-    # Compute the load coefficient
-    lcoeff = ccn.load_coefficient(graph, shortest_path, content_names)
-
-    return lcoeff
-             
-if __name__ == '__main__':
-    
-    #### Generate a LADDER network topology 
-    net_graph = NetGraph(topo_type = TopologyType.LADDER, 
-            node_count = 6, 
-            assign_st = True, 
-            assign_ips = True)
-   
-    target = net_graph.targets()[0]
-    source = net_graph.sources()[0]
-    
-    wait_guids = []
-
-    #### Create NEPI Experiment Description (EC)
-    ec = ExperimentController(exp_id)
-
-    ### Add CCN nodes to the (EC)
-    for n in graph.nodes():
-        node = add_node(ec, n)
-        ccnd = add_ccnd(ec, node, n)
-        
-        if n == target:
-            ccnr = add_ccnr(ec, ccnd)
-
-        ## Add content retrival application
-        if n == source:
-            ccncat = add_ccncat(ec, ccnd)
-            wait_guids.append(ccncat)
-
-    #### Add connections between CCN nodes
-    for n1, n2 in graph.edges():
-        add_fib_entry(ec, n1, n2)
-        add_fib_entry(ec, n2, n1)
-
-    #### Define the callback to compute experiment metric
-    metric_callback = functools.partial(compute_metric_callback, ping)
-
-    #### Run experiment until metric convergence
-    rnr = ExperimentRunner()
-
-    runs = rnr.run(ec, min_runs = 10, max_runs = 300 
-            compute_metric_callback = metric_callback,
-            wait_guids = wait_guids,
-            wait_time = 0)
-
diff --git a/examples/ccn_flooding/planetlab.py b/examples/ccn_flooding/planetlab.py
new file mode 100644 (file)
index 0000000..5834a85
--- /dev/null
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    NEPI, a framework to manage network experiments
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import socket
+import os
+
+PL_NODES = dict({
+    "0": "iraplab1.iralab.uni-karlsruhe.de",
+    "1": "planetlab1.informatik.uni-goettingen.de",
+    "2": "dfn-ple1.x-win.dfn.de",
+    "3": "mars.planetlab.haw-hamburg.de",
+    "4": "planetlab2.unineuchatel.ch", 
+    "5": "planetlab-node3.it-sudparis.eu",
+    "6": "planetlab2.extern.kuleuven.be",
+    "7": "node2pl.planet-lab.telecom-lille1.eu",
+    "8": "planetvs2.informatik.uni-stuttgart.de",
+    "9": "planetlab1.informatik.uni-wuerzburg.de",
+    "10": "planet1.l3s.uni-hannover.de",
+    "11": "planetlab1.wiwi.hu-berlin.de",
+    "12": "pl2.uni-rostock.de", 
+    "13": "planetlab1.u-strasbg.fr",
+    "14": "peeramidion.irisa.fr"
+    })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+        os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, subdir, newname = None):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "subDir", subdir)
+    if newname:
+        ec.set(collector, "rename", newname)
+
+    return collector
+
+def add_pl_host(ec, nid):
+    hostname = PL_NODES[nid]
+
+    # Add a planetlab host to the experiment description
+    host = ec.register_resource("PlanetlabNode")
+    ec.set(host, "hostname", hostname)
+    ec.set(host, "username", pl_slice)
+    ec.set(host, "identity", pl_ssh_key)
+    #ec.set(host, "pluser", pl_user)
+    #ec.set(host, "plpassword", pl_password)
+    #ec.set(host, "country", country)
+    #ec.set(host, "operatingSystem", operating_system)
+    ec.set(host, "cleanExperiment", True)
+    ec.set(host, "cleanProcesses", True)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "hostname", hostname)
+    ec.netgraph.annotate_node(nid, "host", host)
+    
+    # Annotate the graph node with an ip address
+    ip = socket.gethostbyname(hostname)
+    ec.netgraph.annotate_node_ip(nid, ip)
+
+def add_pl_ccnd(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+    
+    # Add a CCN daemon to the planetlab node
+    ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
+    ec.register_connection(ccnd, host)
+    
+    # Collector to retrieve ccnd log
+    collector = add_collector(ec, "stderr", nid, "log")
+    ec.register_connection(collector, ccnd)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_pl_ccnr(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN content repository to the planetlab node
+    ccnr = ec.register_resource("LinuxCCNR")
+
+    ec.set(ccnr, "repoFile1", repofile)
+    ec.register_connection(ccnr, ccnd)
+
+def add_pl_ccncat(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN cat application to the planetlab node
+    ccncat = ec.register_resource("LinuxCCNCat")
+    ec.set(ccncat, "pipeline", pipeline)
+    ec.set(ccncat, "contentName", content_name)
+    ec.register_connection(ccncat, ccnd)
+
+def add_pl_fib_entry(ec, nid1, nid2):
+    # Retrieve annotations from netgraph
+    ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd")
+    hostname2 = ec.netgraph.node_annotation(nid2, "hostname")
+    
+    # Add a FIB entry between one planetlab node and its peer
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", hostname2)
+    ec.register_connection(entry, ccnd1)
+
+    # Collector to retrieve peering ping output (to measure neighbors delay)
+    ec.enable_trace(entry, "ping")
+    collector = add_collector(ec, "ping", nid1)
+    ec.register_connection(collector, entry)
+
+    return entry
+
+def avg_interests(ec, run):
+    ## Process logs
+    logs_dir = ec.run_dir
+
+    print ec.netgraph
+
+    (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = ccn_parser.process_content_history_logs(
+                logs_dir, 
+                ec.netgraph)
+
+    shortest_path = networkx.shortest_path(graph, 
+            source = ec.netgraph.sources()[0], 
+            target = ec.netgraph.targets()[0])
+
+    ### Compute metric: Avg number of Interests seen per content name
+    ###                 normalized by the number of nodes in the shortest path
+    content_name_count = len(content_names.values())
+    nodes_in_shortest_path = len(content_names.values())
+    metric = interest_count / float(content_name_count) / float(nodes_in_shortest_path)
+
+    # TODO: DUMP RESULTS TO FILE
+    # TODO: DUMP GRAPH DELAYS!
+
+    return metric
+
+def add_pl_edge(ec, nid1, nid2):
+    #### Add connections between CCN nodes
+    add_pl_fib_entry(ec, nid1, nid2)
+    add_pl_fib_entry(ec, nid2, nid1)
+
+def add_pl_node(ec, nid):
+    ### Add CCN nodes (ec.netgraph holds the topology graph)
+    add_pl_host(ec, nid)
+    add_pl_ccnd(ec, nid)
+        
+    if nid == ec.netgraph.targets()[0]:
+        add_pl_ccnr(ec, nid)
+
+    if nid == ec.netgraph.sources()[0]:
+        add_pl_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+    #### Create NEPI Experiment Description with LINEAR topology 
+    ec = ExperimentController("pl_ccn", 
+            topo_type = TopologyType.LINEAR, 
+            node_count = 4, 
+            #assign_ips = True,
+            assign_st = True,
+            add_node_callback = add_pl_node, 
+            add_edge_callback = add_pl_edge)
+    
+    print "Results stored at", ec.exp_dir
+
+    #### Retrieve the content producing resource to wait for ot to finish
+    ccncat = ec.filter_resources("LinuxCCNCat")
+   
+    #### Run experiment until metric convergences
+    rnr = ExperimentRunner()
+    runs = rnr.run(ec, min_runs = 10, max_runs = 300, 
+            compute_metric_callback = avg_interests,
+            wait_guids = ccncat,
+            wait_time = 0)
+
diff --git a/src/nepi/data/__init__.py b/src/nepi/data/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/__init__.py b/src/nepi/data/processing/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/__init__.py b/src/nepi/data/processing/ccn/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/parser.py b/src/nepi/data/processing/ccn/parser.py
new file mode 100644 (file)
index 0000000..1a65830
--- /dev/null
@@ -0,0 +1,401 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse (CCNx) ccnd logs.
+#
+# Results from experiments must be stored in a directory
+# named with the experiment run id.
+# ccnd logs are stored in .log files in a subdirectory per node.
+# The following diagram exemplifies the experiment result directory
+# structure (nidi is the unique identifier assigned to node i):
+#
+#    run_id
+#               \   nid1
+#                        \ nid2.log
+#               \   nid2
+#                        \ nid1.log
+#               \   nid3
+#                        \ nid3.log
+#
+
+import collections
+import functools
+import networkx
+import os
+import pickle
+import tempfile
+
+from nepi.util.timefuncs import compute_delay_ms
+from nepi.util.statfuncs import compute_mean
+import nepi.data.processing.ping.parser as ping_parser
+
+def is_control(content_name):
+    return content_name.startswith("ccnx:/%C1") or \
+            content_name.startswith("ccnx:/ccnx") or \
+            content_name.startswith("ccnx:/...")
+
+
+def parse_file(filename):
+    """ Parses message information from ccnd log files
+
+        filename: path to ccndlog file
+
+    """
+
+    faces = dict()
+    sep = " "
+
+    f = open(filename, "r")
+
+    data = []
+
+    for line in f:
+        cols =  line.strip().split(sep)
+
+        # CCN_PEEK
+        # MESSAGE interest_from
+        # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
+        #
+        # MESSAGE interest_to
+        # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
+        #
+        # MESSAGE CONTENT FROM
+        # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # MESSAGE CONTENT_TO
+        # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
+
+        # External face creation
+        # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
+
+        if line.find("accepted datagram client") > -1:
+            face_id = (cols[5]).replace("id=",'')
+            ip = cols[7] 
+            port = cols[9]
+            faces[face_id] = (ip, port)
+            continue
+
+        # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
+        if line.find("releasing face id") > -1:
+            face_id = cols[5]
+            if face_id in faces:
+                del faces[face_id]
+            continue
+
+        if len(cols) < 6:
+            continue
+
+        timestamp = cols[0]
+        message_type = cols[3]
+
+        if message_type not in ["interest_from", "interest_to", "content_from", 
+                "content_to", "interest_dupnonce", "interest_expiry"]:
+            continue
+
+        face_id = cols[4] 
+        content_name = cols[5]
+
+        # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
+        nonce = ""
+        if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
+            last = cols[-1]
+            if len(last.split("-")) == 5:
+                nonce = last
+
+        try:
+            size = int((cols[6]).replace('(',''))
+        except:
+            print "interest_expiry without face id!", line
+            continue
+
+        # If no external IP address was identified for this face
+        # asume it is a local face
+        hostip = "localhost"
+
+        if face_id in faces:
+            hostip, port = faces[face_id]
+
+        data.append((content_name, timestamp, message_type, hostip, face_id, 
+            size, nonce, line))
+
+    f.close()
+
+    return data
+
+def dump_content_history(content_history):
+    f = tempfile.NamedTemporaryFile(delete=False)
+    pickle.dump(content_history, f)
+    f.close()
+    return f.name
+
+def load_content_history(fname):
+    f = open(fname, "r")
+    content_history = pickle.load(f)
+    f.close()
+
+    os.remove(fname)
+    return content_history
+
+def annotate_cn_node(graph, nid, ips2nid, data, content_history):
+    for (content_name, timestamp, message_type, hostip, face_id, 
+            size, nonce, line) in data:
+
+        # Ignore local messages for the time being. 
+        # They could later be used to calculate the processing times
+        # of messages.
+        if peer == "localhost":
+            return
+
+        # Ignore control messages for the time being
+        if is_control(content_name):
+            return
+
+        if message_type == "interest_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_consumer"] = True
+        elif message_type == "content_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_producer"] = True
+
+        # remove digest
+        if message_type in ["content_from", "content_to"]:
+            content_name = "/".join(content_name.split("/")[:-1])
+           
+        if content_name not in content_history:
+            content_history[content_name] = list()
+      
+        peernid = ips2nid[peer]
+        add_edge(graph, nid, peernid)
+
+        content_history[content_name].append((timestamp, message_type, nid, 
+            peernid, nonce, size, line))
+
+def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
+    """ Adds CCN content history for each node in the topology graph.
+
+    """
+    # Make a copy of the graph to ensure integrity
+    graph = graph.copy()
+
+    ips2nids = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nids[ip] = nid
+
+    # Now walk through the ccnd logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different node
+        nid = os.path.basename(dirpath)
+    
+        content_history = dict()
+
+        for fname in fnames:
+            if fname.endswith(".log"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nids, data, content_history)
+
+        # Avoid storing everything in memory, instead dump to a file
+        # and reference the file
+        fname = dump_content_history(content_history)
+        graph.node[nid]["history"] = fname
+
+    if parse_ping_logs:
+        ping_parser.annotate_cn_graph(logs_dir, graph)
+
+    return graph
+
+def ccn_producers(graph):
+    """ Returns the nodes that are content providers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_producer")]
+
+def ccn_consumers(graph):
+    """ Returns the nodes that are content consumers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_consumer")]
+
+def process_content_history(graph):
+    """ Compute CCN message counts and aggregates content historical 
+    information in the content_names dictionary 
+    
+    """
+
+    ## Assume single source
+    source = ccn_consumers(graph)[0]
+
+    interest_expiry_count = 0
+    interest_dupnonce_count = 0
+    interest_count = 0
+    content_count = 0
+    content_names = dict()
+
+    # Collect information about exchanged messages by content name and
+    # link delay info.
+    for nid in graph.nodes():
+        # Load the data collected from the node's ccnd log
+        fname = graph.node[nid]["history"]
+        history = load_content_history(fname)
+
+        for content_name in history.keys():
+            hist = history[content_name]
+
+            for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
+                if message_type in ["content_from", "content_to"]:
+                    # The first Interest sent will not have a version or chunk number.
+                    # The first Content sent back in reply, will end in /=00 or /%00.
+                    # Make sure to map the first Content to the first Interest.
+                    if content_name.endswith("/=00"):
+                        content_name = "/".join(content_name.split("/")[0:-2])
+
+                # Add content name to dictionary
+                if content_name not in content_names:
+                    content_names[content_name] = dict()
+                    content_names[content_name]["interest"] = dict()
+                    content_names[content_name]["content"] = list()
+
+                # Classify interests by replica
+                if message_type in ["interest_from"] and \
+                        nonce not in content_names[content_name]["interest"]:
+                    content_names[content_name]["interest"][nonce] = list()
+     
+                # Add consumer history
+                if nid == source:
+                    if message_type in ["interest_to", "content_from"]:
+                        # content name history as seen by the source
+                        if "consumer_history" not in content_names[content_name]:
+                            content_names[content_name]["consumer_history"] = list()
+
+                        content_names[content_name]["consumer_history"].append(
+                                (timestamp, message_type)) 
+
+                # Add messages per content name and cumulate totals by message type
+                if message_type == "interest_dupnonce":
+                    interest_dupnonce_count += 1
+                elif message_type == "interest_expiry":
+                    interest_expiry_count += 1
+                elif message_type == "interest_from":
+                    interest_count += 1
+                    # Append to interest history of the content name
+                    content_names[content_name]["interest"][nonce].append(
+                            (timestamp, nid2, nid1))
+                elif message_type == "content_from":
+                    content_count += 1
+                    # Append to content history of the content name
+                    content_names[content_name]["content"].append((timestamp, nid2, nid1))
+                else:
+                    continue
+            del hist
+        del history
+
+    # Compute the time elapsed between the time an interest is sent
+    # in the consumer node and when the content is received back
+    for content_name in content_names.keys():
+        # order content and interest messages by timestamp
+        content_names[content_name]["content"] = sorted(
+              content_names[content_name]["content"])
+        
+        for nonce, timestamps in content_names[content_name][
+                    "interest"].iteritems():
+              content_names[content_name]["interest"][nonce] = sorted(
+                        timestamps)
+      
+        history = sorted(content_names[content_name]["consumer_history"])
+        content_names[content_name]["consumer_history"] = history
+
+        # compute the rtt time of the message
+        rtt = None
+        waiting_content = False 
+        interest_timestamp = None
+        content_timestamp = None
+        
+        for (timestamp, message_type) in history:
+            if not waiting_content and message_type == "interest_to":
+                waiting_content = True
+                interest_timestamp = timestamp
+                continue
+
+            if waiting_content and message_type == "content_from":
+                content_timestamp = timestamp
+                break
+    
+        # If we can't determine who sent the interest, discard it
+        rtt = -1
+        if interest_timestamp and content_timestamp:
+            rtt = compute_delay_ms(content_timestamp, interest_timestamp)
+
+        content_names[content_name]["rtt"] = rtt
+        content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
+
+    return (content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count)
+
+def process_content_history_logs(logs_dir, graph):
+    """ Parse CCN logs and aggregate content history information in graph.
+    Returns annotated graph and message countn and content names history.
+
+    """
+    ## Process logs and analyse data
+    try:
+        graph = annotate_cn_graph(logs_dir, graph, 
+                parse_ping_logs = True)
+    except:
+        print "Skipping: Error parsing ccnd logs", logs_dir
+        raise
+
+    source = consumers(graph)[0]
+    target = producers(graph)[0]
+
+    # Process the data from the ccnd logs, but do not re compute
+    # the link delay. 
+    try:
+        (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = process_content_history(graph)
+    except:
+        print "Skipping: Error processing ccn data", logs_dir
+        raise
+
+    return (graph,
+            content_names,
+            interest_expiry_count,
+            interest_dupnonce_count,
+            interest_count,
+            content_count) 
diff --git a/src/nepi/data/processing/ping/__init__.py b/src/nepi/data/processing/ping/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ping/parser.py b/src/nepi/data/processing/ping/parser.py
new file mode 100644 (file)
index 0000000..3396207
--- /dev/null
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse log files generated using ping. 
+#
+
+import re
+
+# RE to match line starting "traceroute to"
+_rre = re.compile("\d+ bytes from ((?P<hostname>[^\s]+) )?\(?(?P<ip>[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P<time>[^\s]+) ms")
+
+def parse_file(filename):
+    """
+        filename: path to traceroute file
+
+    """
+
+    f = open(filename, "r")
+
+    # Traceroute info
+    target_ip = None
+    target_hostname = None
+   
+    data = []
+
+    for line in f:
+        # match traceroute to ...
+        m = re.match(_rre, line)
+        if not m:
+            continue
+
+        target_ip = m.groupdict()["ip"]
+        # FIX THIS: Make sure the regular expression does not inlcude 
+        # the ')' in the ip group 
+        target_ip = target_ip.replace(")","")
+        target_hostname = m.groupdict()["hostname"]
+        time = m.groupdict()["time"]
+        data.append((target_ip, target_hostname, time))
+
+    f.close()
+
+    return data
+
+def annotate_cn_node(graph, nid1, ips2nids, data):
+    for (target_ip, target_hostname, time) in data:
+        nid2 = ips2nid[target_ip]
+
+        if "delays" not in graph.edge[nid1][nid2]:
+            graph.edge[nid1][nid2]["delays"] = []
+
+        time = float(time.replace("ms", "").replace(" ",""))
+
+        graph.edge[nid1][nid2]["delays"].append(time)
+
+def annotate_cn_graph(logs_dir, graph): 
+    """ Add delay inormation to graph using data collected using
+    ping.
+
+    """
+    ips2nids = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nids[ip] = nid
+
+    # Walk through the ping logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different host
+        nid = os.path.basename(dirpath)
+    
+        for fname in fnames:
+            if fname.endswith(".ping"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nids, data)
+
+    # Take as weight the most frequent value
+    for nid1, nid2 in graph.edges():
+        delays = collections.Counter(graph.edge[nid1][nid2]["delays"])
+        weight = delays.most_common(1)[0][0]
+        del graph.edge[nid1][nid2]["delays"]
+        graph.edge[nid1][nid2]["weight"] = weight
+
+    return graph
+
+
index 19fbfc2..bf0a853 100644 (file)
@@ -55,6 +55,7 @@ class Flags:
     # Attribute global is set to all resources of rtype
     Global  = 1 << 7 # 128
 
     # Attribute global is set to all resources of rtype
     Global  = 1 << 7 # 128
 
+
 class Attribute(object):
     """
     .. class:: Class Args :
 class Attribute(object):
     """
     .. class:: Class Args :
index 1357e4f..5664d2f 100644 (file)
@@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -101,7 +102,7 @@ class ExperimentController(object):
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -116,7 +117,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -126,10 +127,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -160,22 +160,31 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None, local_dir = None, persist = False):
-        """ ExperimentController entity to model an execute a network experiment.
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            add_node_callback = None, add_edge_callback = None, **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
         
         :param exp_id: Human readable name to identify the experiment
         
         :param exp_id: Human readable name to identify the experiment
-        :type name: str
+        :type exp_id: str
 
         :param local_dir: Path to local directory where to store experiment
             related files
 
         :param local_dir: Path to local directory where to store experiment
             related files
-        :type name: str
+        :type local_dir: str
 
         :param persist: Save an XML description of the experiment after 
         completion at local_dir
 
         :param persist: Save an XML description of the experiment after 
         completion at local_dir
-        :type name: bool
+        :type persist: bool
 
 
-        """
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
 
 
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -233,6 +242,12 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback and add_edge_callback:
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -313,11 +328,20 @@ class ExperimentController(object):
 
     @property
     def persist(self):
 
     @property
     def persist(self):
-        """ If Trie persist the ExperimentController to XML format upon completion
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
 
         """
         return self._persist
 
 
         """
         return self._persist
 
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -478,7 +502,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
 
             :param rtype: Resource type
             :type rtype: string
@@ -488,7 +512,7 @@ class ExperimentController(object):
         """
         rms = []
         for guid, rm in self._resources.iteritems():
         """
         rms = []
         for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
                 rms.append(rm)
         return rms
 
@@ -497,16 +521,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
         keys = self._resources.keys()
 
         return keys
 
 
         """
         keys = self._resources.keys()
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -1184,3 +1223,18 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        ### Add resources to the EC
+        for nid in self.netgraph.graph.nodes():
+            add_node_callback(self, nid)
+
+        #### Add connections between resources
+        for nid1, nid2 in self.netgraph.graph.edges():
+            add_edge_callback(self, nid1, nid2)
+
+
index c389cfe..f4b19f5 100644 (file)
@@ -113,11 +113,13 @@ def failtrap(func):
         try:
             return func(self, *args, **kwargs)
         except:
         try:
             return func(self, *args, **kwargs)
         except:
+            self.fail()
+            
             import traceback
             err = traceback.format_exc()
             import traceback
             err = traceback.format_exc()
-            self.error(err)
-            self.debug("SETTING guid %d to state FAILED" % self.guid)
-            self.fail()
+            logger = Logger(self._rtype)
+            logger.error(err)
+            logger.error("SETTING guid %d to state FAILED" % self.guid)
             raise
     
     return wrapped
             raise
     
     return wrapped
@@ -560,11 +562,14 @@ class ResourceManager(Logger):
             try:
                 self.do_release()
             except:
             try:
                 self.do_release()
             except:
+                self.set_released()
+
                 import traceback
                 err = traceback.format_exc()
                 import traceback
                 err = traceback.format_exc()
-                self.error(err)
-
-                self.set_released()
+                msg = " %s guid %d ----- FAILED TO RELEASED ----- \n %s " % (
+                        self._rtype, self.guid, err)
+                logger = Logger(self._rtype)
+                logger.debug(msg)
 
     def fail(self):
         """ Sets the RM to state FAILED.
 
     def fail(self):
         """ Sets the RM to state FAILED.
@@ -1050,12 +1055,18 @@ class ResourceManager(Logger):
     def set_released(self, time = None):
         """ Mark ResourceManager as REALEASED """
         self.set_state(ResourceState.RELEASED, "_release_time", time)
     def set_released(self, time = None):
         """ Mark ResourceManager as REALEASED """
         self.set_state(ResourceState.RELEASED, "_release_time", time)
-        self.debug("----- RELEASED ---- ")
+
+        msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_failed(self, time = None):
         """ Mark ResourceManager as FAILED """
         self.set_state(ResourceState.FAILED, "_failed_time", time)
 
     def set_failed(self, time = None):
         """ Mark ResourceManager as FAILED """
         self.set_state(ResourceState.FAILED, "_failed_time", time)
-        self.debug("----- FAILED ---- ")
+
+        msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_discovered(self, time = None):
         """ Mark ResourceManager as DISCOVERED """
 
     def set_discovered(self, time = None):
         """ Mark ResourceManager as DISCOVERED """
index 75e766f..52b4bde 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
 
 import math
 import numpy
 
 import math
 import numpy
@@ -140,12 +140,15 @@ class ExperimentRunner(object):
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
-
+    
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
+        if ec.state == ECState.FAILED:
+            raise RuntimeError, "Experiment failed"
+
         return ec
 
 
         return ec
 
 
index 5e06dab..d095a25 100644 (file)
@@ -173,7 +173,9 @@ class LinuxApplication(ResourceManager):
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
+        self._node = None
         self._home = "app-%s" % self.guid
         self._home = "app-%s" % self.guid
+
         # whether the command should run in foreground attached
         # to a terminal
         self._in_foreground = False
         # whether the command should run in foreground attached
         # to a terminal
         self._in_foreground = False
@@ -194,9 +196,16 @@ class LinuxApplication(ResourceManager):
 
     @property
     def node(self):
 
     @property
     def node(self):
-        node = self.get_connected(LinuxNode.get_rtype())
-        if node: return node[0]
-        raise RuntimeError, "Application must be connected to Node"
+        if not self._node:
+            node = self.get_connected(LinuxNode.get_rtype())
+            if not node: 
+                msg = "Application %s guid %d NOT connected to Node" % (
+                        self._rtype, self.guid)
+                raise RuntimeError, msg
+
+            self._node = node[0]
+
+        return self._node
 
     @property
     def app_home(self):
 
     @property
     def app_home(self):
index 8bf9bd1..700ab10 100644 (file)
@@ -85,14 +85,37 @@ class LinuxFIBEntry(LinuxApplication):
         super(LinuxFIBEntry, self).__init__(ec, guid)
         self._home = "fib-%s" % self.guid
         self._ping = None
         super(LinuxFIBEntry, self).__init__(ec, guid)
         self._home = "fib-%s" % self.guid
         self._ping = None
-        self._mtr = None
         self._traceroute = None
         self._traceroute = None
+        self._ccnd = None
 
     @property
     def ccnd(self):
 
     @property
     def ccnd(self):
-        ccnd = self.get_connected(LinuxCCND.get_rtype())
-        if ccnd: return ccnd[0]
-        return None
+        if not self._ccnd:
+            ccnd = self.get_connected(LinuxCCND.get_rtype())
+            if ccnd: 
+                self._ccnd = ccnd[0]
+            
+        return self._ccnd
+
+    @property
+    def ping(self):
+        if not self._ping:
+            from nepi.resources.linux.ping import LinuxPing
+            ping = self.get_connected(LinuxPing.get_rtype())
+            if ping: 
+                self._ping = ping[0]
+            
+        return self._ping
+
+    @property
+    def traceroute(self):
+        if not self._traceroute:
+            from nepi.resources.linux.traceroute import LinuxTraceroute
+            traceroute = self.get_connected(LinuxTraceroute.get_rtype())
+            if traceroute: 
+                self._traceroute = traceroute[0]
+            
+        return self._traceroute
 
     @property
     def node(self):
 
     @property
     def node(self):
@@ -101,11 +124,14 @@ class LinuxFIBEntry(LinuxApplication):
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         if name == "ping":
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         if name == "ping":
-            return self.ec.trace(self._ping, "stdout", attr, block, offset)
-        if name == "mtr":
-            return self.ec.trace(self._mtr, "stdout", attr, block, offset)
+            if not self.ping:
+                return None
+            return self.ec.trace(self.ping, "stdout", attr, block, offset)
+
         if name == "traceroute":
         if name == "traceroute":
-            return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
+            if not self.traceroute:
+                return None
+            return self.ec.trace(self.traceroute, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
     
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
     
@@ -159,38 +185,27 @@ class LinuxFIBEntry(LinuxApplication):
             raise RuntimeError, msg
         
     def configure(self):
             raise RuntimeError, msg
         
     def configure(self):
-        if self.trace_enabled("ping"):
+        if self.trace_enabled("ping") and not self.ping:
             self.info("Configuring PING trace")
             self.info("Configuring PING trace")
-            self._ping = self.ec.register_resource("LinuxPing")
-            self.ec.set(self._ping, "printTimestamp", True)
-            self.ec.set(self._ping, "target", self.get("host"))
-            self.ec.set(self._ping, "earlyStart", True)
-            self.ec.register_connection(self._ping, self.node.guid)
+            ping = self.ec.register_resource("LinuxPing")
+            self.ec.set(ping, "printTimestamp", True)
+            self.ec.set(ping, "target", self.get("host"))
+            self.ec.set(ping, "earlyStart", True)
+            self.ec.register_connection(ping, self.node.guid)
+            self.ec.register_connection(ping, self.guid)
             # schedule ping deploy
             # schedule ping deploy
-            self.ec.deploy(guids=[self._ping], group = self.deployment_group)
-
-        if self.trace_enabled("mtr"):
-            self.info("Configuring MTR trace")
-            self._mtr = self.ec.register_resource("LinuxMtr")
-            self.ec.set(self._mtr, "noDns", True)
-            self.ec.set(self._mtr, "printTimestamp", True)
-            self.ec.set(self._mtr, "continuous", True)
-            self.ec.set(self._mtr, "target", self.get("host"))
-            self.ec.set(self._mtr, "earlyStart", True)
-            self.ec.register_connection(self._mtr, self.node.guid)
-            # schedule mtr deploy
-            self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
+            self.ec.deploy(guids=[ping], group = self.deployment_group)
 
 
-        if self.trace_enabled("traceroute"):
+        if self.trace_enabled("traceroute") and not self.traceroute:
             self.info("Configuring TRACEROUTE trace")
             self.info("Configuring TRACEROUTE trace")
-            self._traceroute = self.ec.register_resource("LinuxTraceroute")
-            self.ec.set(self._traceroute, "printTimestamp", True)
-            self.ec.set(self._traceroute, "continuous", True)
-            self.ec.set(self._traceroute, "target", self.get("host"))
-            self.ec.set(self._traceroute, "earlyStart", True)
-            self.ec.register_connection(self._traceroute, self.node.guid)
+            traceroute = self.ec.register_resource("LinuxTraceroute")
+            self.ec.set(traceroute, "printTimestamp", True)
+            self.ec.set(traceroute, "continuous", True)
+            self.ec.set(traceroute, "target", self.get("host"))
+            self.ec.set(traceroute, "earlyStart", True)
+            self.ec.register_connection(traceroute, self.node.guid)
             # schedule mtr deploy
             # schedule mtr deploy
-            self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+            self.ec.deploy(guids=[traceroute], group = self.deployment_group)
 
     def do_start(self):
         if self.state == ResourceState.READY:
 
     def do_start(self):
         if self.state == ResourceState.READY:
index 522a174..44624a2 100644 (file)
 # Should it be made thread-safe?
 class GuidGenerator(object):
     def __init__(self):
 # Should it be made thread-safe?
 class GuidGenerator(object):
     def __init__(self):
-        self._guids = list()
+        self._last_guid = 0
 
     def next(self, guid = None):
 
     def next(self, guid = None):
-        if guid != None:
-            return guid
-        else:
-            last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
-            guid = last_guid + 1 
-        self._guids.append(guid)
-        self._guids.sort()
+        if guid == None:
+            guid = self._last_guid + 1
+
+        self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
         return guid
 
         return guid
 
index c3ba814..d939ece 100644 (file)
@@ -39,7 +39,7 @@ class NetGraph(object):
 
     """
 
 
     """
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, **kwargs):
         """ A graph can be generated using a specified pattern 
         (LADDER, MESH, TREE, etc), or provided as an argument.
 
         """ A graph can be generated using a specified pattern 
         (LADDER, MESH, TREE, etc), or provided as an argument.
 
@@ -88,7 +88,7 @@ class NetGraph(object):
             branches = kwargs.get("branches")
 
             self._topo_type = topo_type
             branches = kwargs.get("branches")
 
             self._topo_type = topo_type
-            self._graph = self.generate_grap(topo_type, node_count, 
+            self._graph = self.generate_graph(topo_type, node_count, 
                     branches = branches)
 
         if kwargs.get("assign_ips"):
                     branches = branches)
 
         if kwargs.get("assign_ips"):
@@ -124,21 +124,21 @@ class NetGraph(object):
         return self.graph.edges()
 
     def generate_graph(self, topo_type, node_count, branches = None):
         return self.graph.edges()
 
     def generate_graph(self, topo_type, node_count, branches = None):
-        if topo_type == LADDER:
+        if topo_type == TopologyType.LADDER:
             total_nodes = node_count/2
             graph = networkx.ladder_graph(total_nodes)
 
             total_nodes = node_count/2
             graph = networkx.ladder_graph(total_nodes)
 
-        elif topo_type == LINEAR:
+        elif topo_type == TopologyType.LINEAR:
             graph = networkx.path_graph(node_count)
 
             graph = networkx.path_graph(node_count)
 
-        elif topo_type == MESH:
+        elif topo_type == TopologyType.MESH:
             graph = networkx.complete_graph(node_count)
 
             graph = networkx.complete_graph(node_count)
 
-        elif topo_type == TREE:
+        elif topo_type == TopologyType.TREE:
             h = math.log(node_count + 1)/math.log(2) - 1
             graph = networkx.balanced_tree(2, h)
 
             h = math.log(node_count + 1)/math.log(2) - 1
             graph = networkx.balanced_tree(2, h)
 
-        elif topo_type == STAR:
+        elif topo_type == TopologyType.STAR:
             graph = networkx.Graph()
             graph.add_node(0)
 
             graph = networkx.Graph()
             graph.add_node(0)
 
@@ -155,9 +155,8 @@ class NetGraph(object):
 
         # node ids are int, make them str
         g = networkx.Graph()
 
         # node ids are int, make them str
         g = networkx.Graph()
-        g.add_nodes_from(map(lambda nid: NODES[str(nid)], 
-            graph.nodes()))
-        g.add_edges_from(map(lambda t: (NODES[str(t[0])], NODES[str(t[1])]), 
+        g.add_nodes_from(map(lambda nid: str(nid), graph.nodes()))
+        g.add_edges_from(map(lambda t: (str(t[0]), str(t[1])), 
             graph.edges()))
 
         return g
             graph.edges()))
 
         return g
@@ -183,6 +182,30 @@ class NetGraph(object):
             # confidence interval of the mean RTT
             self.graph.edge[nid1][nid2]["weight_ci"] = None
 
             # confidence interval of the mean RTT
             self.graph.edge[nid1][nid2]["weight_ci"] = None
 
+    def annotate_node_ip(self, nid, ip):
+        if "ips" not in self.graph.node[nid]:
+            self.graph.node[nid]["ips"] = list()
+
+        self.graph.node[nid]["ips"].append(ip)
+    
+    def annotate_node(self, nid, name, value):
+        self.graph.node[nid][name] = value
+    
+    def node_annotation(self, nid, name):
+        return self.graph.node[nid].get(name)
+    
+    def del_node_annotation(self, nid, name):
+        del self.graph.node[nid][name]
+
+    def annotate_edge(self, nid1, nid2, name, value):
+        self.graph.edge[nid1][nid2][name] = value
+    
+    def edge_annotation(self, nid1, nid2, name):
+        return self.graph.edge[nid1][nid2].get(name)
+    
+    def del_edge_annotation(self, nid1, nid2, name):
+        del self.graph.edge[nid1][nid2][name]
+
     def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
         """ Assign IP addresses to each end of each edge of the network graph,
         computing all the point to point subnets and addresses in the network
     def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
         """ Assign IP addresses to each end of each edge of the network graph,
         computing all the point to point subnets and addresses in the network
@@ -211,9 +234,14 @@ class NetGraph(object):
             new_prefix = 31
         else:
             raise RuntimeError, "Invalid IP version %d" % version
             new_prefix = 31
         else:
             raise RuntimeError, "Invalid IP version %d" % version
+        
+        ## Clear all previusly assigned IPs
+        for nid in self.graph.node():
+            self.graph.node[nid]["ips"] = list()
 
 
+        ## Generate and assign new IPs
         sub_itr = net.iter_subnets(new_prefix = new_prefix)
         sub_itr = net.iter_subnets(new_prefix = new_prefix)
-
+        
         for nid1, nid2 in self.graph.edges():
             #### Compute subnets for each link
             
         for nid1, nid2 in self.graph.edges():
             #### Compute subnets for each link
             
@@ -237,16 +265,19 @@ class NetGraph(object):
             self.graph.edge[nid1][nid2]["net"]["network"] = mask
             self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen
 
             self.graph.edge[nid1][nid2]["net"]["network"] = mask
             self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen
 
+            self.annotate_node_ip(nid1, ip1)
+            self.annotate_node_ip(nid2, ip2)
+
     def get_p2p_info(self, nid1, nid2):
         net = self.graph.edge[nid1][nid2]["net"]
         return ( net[nid1], net[nid2], net["mask"], net["network"], 
                 net["prefixlen"] )
 
     def set_source(self, nid):
     def get_p2p_info(self, nid1, nid2):
         net = self.graph.edge[nid1][nid2]["net"]
         return ( net[nid1], net[nid2], net["mask"], net["network"], 
                 net["prefixlen"] )
 
     def set_source(self, nid):
-        graph.node[nid]["source"] = True
+        self.graph.node[nid]["source"] = True
 
     def set_target(self, nid):
 
     def set_target(self, nid):
-        graph.node[nid]["target"] = True
+        self.graph.node[nid]["target"] = True
 
     def targets(self):
         """ Returns the nodes that are targets """
 
     def targets(self):
         """ Returns the nodes that are targets """
@@ -256,7 +287,7 @@ class NetGraph(object):
     def sources(self):
         """ Returns the nodes that are sources """
         return [nid for nid in self.graph.nodes() \
     def sources(self):
         """ Returns the nodes that are sources """
         return [nid for nid in self.graph.nodes() \
-                if self.graph.node[nid].get("sources")]
+                if self.graph.node[nid].get("source")]
 
     def select_target_zero(self):
         """ Marks the node 0 as target
 
     def select_target_zero(self):
         """ Marks the node 0 as target
@@ -276,9 +307,9 @@ class NetGraph(object):
             source = leaves.pop(random.randint(0, len(leaves) - 1))
         else:
             # options must not be already sources or targets
             source = leaves.pop(random.randint(0, len(leaves) - 1))
         else:
             # options must not be already sources or targets
-            options = [ k for k,v in graph.degree().iteritems() \
-                    if v == 1 and not graph.node[k].get("source") \
-                        and not graph.node[k].get("target")]
+            options = [ k for k,v in self.graph.degree().iteritems() \
+                    if v == 1 and not self.graph.node[k].get("source") \
+                        and not self.graph.node[k].get("target")]
 
             source = options.pop(random.randint(0, len(options) - 1))
         
 
             source = options.pop(random.randint(0, len(options) - 1))
         
index 125360e..077a12d 100644 (file)
@@ -93,12 +93,15 @@ class ECXMLParser(object):
         ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
         ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
         ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir))
         ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
         ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
         ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir))
+
         doc.appendChild(ecnode)
 
         for guid, rm in ec._resources.iteritems():
             self._rm_to_xml(doc, ecnode, ec, guid, rm)
 
         return doc
         doc.appendChild(ecnode)
 
         for guid, rm in ec._resources.iteritems():
             self._rm_to_xml(doc, ecnode, ec, guid, rm)
 
         return doc
+    
+    def _netgraph_to_xml(self, doc, ecnode, ec):
 
     def _rm_to_xml(self, doc, ecnode, ec, guid, rm):
         rmnode = doc.createElement("rm")
 
     def _rm_to_xml(self, doc, ecnode, ec, guid, rm):
         rmnode = doc.createElement("rm")
index 48b826e..6b5622e 100644 (file)
@@ -23,7 +23,7 @@ import os
 try:
     import networkx
 except ImportError:
 try:
     import networkx
 except ImportError:
-    msg = ("Networkx library is not installed, you will not be able to plot.")
+    msg = "Networkx library is not installed, you will not be able to plot."
     logger = Logger("Plotter")
     logger.debug(msg)
 
     logger = Logger("Plotter")
     logger.debug(msg)
 
diff --git a/src/nepi/util/statfuncs.py b/src/nepi/util/statfuncs.py
new file mode 100644 (file)
index 0000000..5c0b2ca
--- /dev/null
@@ -0,0 +1,46 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import math
+import numpy
+from scipy import stats
+
+def compute_mean(sample):
+    # TODO: Discard outliers !!!!
+
+    if not sample:
+        print " CANNOT COMPUTE STATS for ", sample
+        return (0, 0, 0, 0)
+
+    x = numpy.array(sample)
+
+    # sample mean and standard deviation
+    n, min_max, mean, var, skew, kurt = stats.describe(x)
+    std = math.sqrt(var)
+
+    # for the population mean and std ...
+    # mean = x.mean()
+    # std = x.std()
+    
+    # Calculate confidence interval t-distribution
+    ## BUG: Use quantil of NORMAL distribution, not t-student quantil distribution
+    ci = stats.t.interval(0.95, n-1, loc = mean, scale = std/math.sqrt(n))
+
+    return (mean, std, ci[0], ci[1])
+
index 3d7c366..f7fbc85 100644 (file)
@@ -113,3 +113,11 @@ def stabsformat(sdate, dbase = None):
 
     return None
 
 
     return None
 
+def compute_delay_ms(timestamp2, timestamp1):
+    d1 = datetime.datetime.fromtimestamp(float(timestamp1))
+    d2 = datetime.datetime.fromtimestamp(float(timestamp2))
+    delay = d2 - d1
+
+    # round up resolution - round up to miliseconds
+    return delay.total_seconds() * 1000
+
index 3896ea2..993a0f4 100755 (executable)
@@ -132,7 +132,7 @@ class SerializerTestCase(unittest.TestCase):
 
         # Load serialized experiment
         ec2 = ExperimentController.load(filepath)
 
         # Load serialized experiment
         ec2 = ExperimentController.load(filepath)
-        apps = ec2.get_resources_by_type("dummy::Application")
+        apps = ec2.filter_resources("dummy::Application")
         ec2.deploy()
         ec2.wait_finished(apps)
         ec2.shutdown()
         ec2.deploy()
         ec2.wait_finished(apps)
         ec2.shutdown()