ClassQueueFilter working in PlanetLab. Results need to be verified.
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 23 Aug 2011 00:00:38 +0000 (02:00 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 23 Aug 2011 00:00:38 +0000 (02:00 +0200)
src/nepi/core/execute.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/classqueue.py
src/nepi/testbeds/planetlab/tunproto.py

index 2af29f1..552b4a9 100644 (file)
@@ -1048,34 +1048,57 @@ class ExperimentController(object):
                     elem_cross_data[attr_name] = _undefer(attr_value)
         
         return cross_data
-
     """
 class ExperimentSuite(object):
-    def __init__(self, experiment_xml, access_config, nexp,
+    def __init__(self, experiment_xml, access_config, repetitions,
             duration, wait_guids):
         self._experiment_xml = experiment_xml
         self._access_config = access_config
         self._experiments = dict()
-        self._nexp = nexp
+        self._repetitions = repetitions
         self._duration = duration
         self._wait_guids = wait_guids
-        self._curr = None
+        self._current = None
+        self._status = TS.STATUS_ZERO
+        self._thread = None
+
+    def start(self):
+        self._status  = TS.STATUS_STARTED
+        self._thread = threading.Thread(target = self._run_experiment_suite)
+        self._thread.start()
 
+    def shutdown(self):
+        if self._thread:
+            self._thread.join()
+            self._thread = None
 
-    def _run_one_exp(self):
+    def _run_experiment_suite(self):
+        for i in xrange[0, self.repetitions]:
+            self._current = i
+            self._run_one_experiment()
+
+    def _run_one_experiment(self):
         access_config = proxy.AccessConfiguration()
+        for attr in self._access_config.attributes:
+            access_config.set_attribute_value(attr.name, attr.value)
+        access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        root_dir = "%s_%d" % (
+                access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
+                self._current)
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
         controller = proxy.create_experiment_controller(self._experiment_xml,
                 access_config)
-        self._experiments[self._curr] = controller
+        self._experiments[self._current] = controller
         controller.start()
         started_at = time.time()
+        # wait until all specified guids have finished execution
         if self._wait_guids:
-            while not controller.is_finished(92) and \
-                    not controller.is_finished(108) and \
-                    not controller.is_finished(126) and \
-                    not controller.is_finished(84):
+            while all(itertools.imap(controller.is_finished, self._wait_guids):
+                time.sleep(0.5)
+        # wait until the minimum experiment duration time has elapsed 
+        if self._duration:
+            while (time.time() - started_at) < self._duration:
                 time.sleep(0.5)
-        # add results to instructions
         controller.stop()
         #download results!!
         controller.shutdown()
@@ -1087,4 +1110,3 @@ class ExperimentSuite(object):
 
 
 
-
index 292b92e..f86026a 100644 (file)
@@ -544,10 +544,10 @@ class TestbedController(testbed_impl.TestbedController):
         self._traces.clear()
 
     def trace(self, guid, trace_id, attribute='value'):
-        app = self._elements[guid]
+        elem = self._elements[guid]
 
         if attribute == 'value':
-            path = app.sync_trace(self.home_directory, trace_id)
+            path = elem.sync_trace(self.home_directory, trace_id)
             if path:
                 fd = open(path, "r")
                 content = fd.read()
@@ -555,9 +555,9 @@ class TestbedController(testbed_impl.TestbedController):
             else:
                 content = None
         elif attribute == 'path':
-            content = app.remote_trace_path(trace_id)
+            content = elem.remote_trace_path(trace_id)
         elif attribute == 'name':
-            content = app.remote_trace_name(trace_id)
+            content = elem.remote_trace_name(trace_id)
         else:
             content = None
         return content
@@ -708,3 +708,10 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_tun_filter(self, parameters):
         return self._make_generic(parameters, self._interfaces.TunFilter)
 
+    def _make_class_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
+
+    def _make_tos_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+
+
index 8d0b5fa..80f834a 100644 (file)
@@ -284,18 +284,22 @@ class TunIface(object):
         if self.peer_proto_impl:
             self.peer_proto_impl.async_launch_wait()
 
-    def sync_trace(self, local_dir, whichtrace):
+    def sync_trace(self, local_dir, whichtrace, tracemap = None):
         if self.peer_proto_impl:
-            return self.peer_proto_impl.sync_trace(local_dir, whichtrace)
+            return self.peer_proto_impl.sync_trace(local_dir, whichtrace,
+                    tracemap)
         else:
             return None
 
-    def remote_trace_path(self, whichtrace):
+    def remote_trace_path(self, whichtrace, tracemap = None):
         if self.peer_proto_impl:
-            return self.peer_proto_impl.remote_trace_path(whichtrace)
+            return self.peer_proto_impl.remote_trace_path(whichtrace, tracemap)
         else:
             return None
 
+    def remote_trace_name(self, whichtrace):
+        return whichtrace
+
 class TapIface(TunIface):
     _PROTO_MAP = tunproto.TAP_PROTO_MAP
     _KIND = 'TAP'
@@ -470,6 +474,10 @@ class NetPipe(object):
         return local_path
     
 class TunFilter(object):
+    _TRACEMAP = {
+        # tracename : (remotename, localname)
+    }
+    
     def __init__(self, api=None):
         if not api:
             api = plcapi.PLCAPI()
@@ -477,6 +485,7 @@ class TunFilter(object):
         
         # Attributes
         self.module = None
+        self.args = None
 
         # These get initialised when the filter is connected
         self.peer_guid = None
@@ -520,3 +529,38 @@ class TunFilter(object):
     del _get
     del _set
 
+    def remote_trace_path(self, whichtrace):
+        iface = self.iface()
+        if iface is not None:
+            return iface.remote_trace_path(whichtrace, self._TRACEMAP)
+        return None
+
+    def remote_trace_name(self, whichtrace):
+        iface = self.iface()
+        if iface is not None:
+            return iface.remote_trace_name(whichtrace, self._TRACEMAP)
+        return None
+
+    def sync_trace(self, local_dir, whichtrace):
+        iface = self.iface()
+        if iface is not None:
+            return iface.sync_trace(local_dir, whichtrace, self._TRACEMAP)
+        return None
+
+class ClassQueueFilter(TunFilter):
+    _TRACEMAP = {
+        # tracename : (remotename, localname)
+        'dropped_stats' : ('dropped_stats', 'dropped_stats')
+    }
+    
+    def __init__(self, api=None):
+        super(ClassQueueFilter, self).__init__(api)
+        # Attributes
+        self.module = "classqueue.py"
+
+class ToSQueueFilter(TunFilter):
+    def __init__(self, api=None):
+        super(ToSQueueFilter, self).__init__(api)
+        # Attributes
+        self.module = "tosqueue.py"
+
index 0123ce6..f008c5c 100644 (file)
@@ -121,9 +121,14 @@ def connect_tun_iface_peer(proto, testbed_instance, iface_guid, peer_iface_guid)
 def connect_tun_iface_filter(testbed_instance, iface_guid, filter_guid):
     iface = testbed_instance._elements[iface_guid]
     filt = testbed_instance._elements[filter_guid]
+    traces = testbed_instance._get_traces(filter_guid)
+    if 'dropped_stats' in traces: 
+        args = filt.args if filt.args else ""
+        filt.args = ','.join(filt.args.split(',') + ["logdropped=true",])
     iface.filter_module = filt
     filt.iface_guid = iface_guid
     filt.iface = weakref.ref(iface)
+
     if filt.peer_guid:
         connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
 
@@ -187,7 +192,6 @@ def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_dat
     crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
     crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
 
-
 def connect_dep(testbed_instance, node_guid, app_guid):
     node = testbed_instance._elements[node_guid]
     app = testbed_instance._elements[app_guid]
@@ -281,13 +285,14 @@ def create_tunfilter(testbed_instance, guid):
     testbed_instance.elements[guid] = element
 
 def create_classqueuefilter(testbed_instance, guid):
-    element = create_tunfilter(testbed_instance, guid)
-    element.module = "classqueue.py"
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_class_queue_filter(parameters)
+    testbed_instance.elements[guid] = element
 
 def create_tosqueuefilter(testbed_instance, guid):
-    element = create_tunfilter(testbed_instance, guid)
-    element.module = "tosqueue.py"
-
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_tos_queue_filter(parameters)
+    testbed_instance.elements[guid] = element
 
 def create_application(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
@@ -1195,9 +1200,13 @@ traces = dict({
                 "name": "output",
                 "help": "Extra output trace for applications. When activated this trace can be referenced with wildcard a reference from an Application command line. Ex: command: 'tcpdump -w {#[elemet-label].trace[trace-id].[name|path]#}' ",
               }),
+    "dropped_stats": dict({
+                "name": "dropped_stats",
+                "help": "Information on dropped packets on a filer or queue associated to a network interface",
+            }),
     })
 
-create_order = [ INTERNET, NODE, NODEIFACE, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
 configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
@@ -1361,6 +1370,7 @@ factories_info = dict({
                 "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
             ],
             "connector_types": ["->fd","udp","tcp"],
+            "traces": ["dropped_stats"],
         }),
     TOSQUEUEFILTER : dict({
             "help": "TUN classfull queue that classifies according to the TOS (RFC 791) IP field.\n\n"
index f36fbd3..fc79962 100644 (file)
@@ -2,6 +2,8 @@ import collections
 import itertools
 import random
 import re
+import sys
+import iovec
 
 _size = 1000
 _classes = (
@@ -71,7 +73,9 @@ class ClassQueue(object):
     def __init__(self):
         self.size = _size
         self.len = 0
-        
+        self.stats = collections.defaultdict(int)
+        self.dump_count = 0
+
         # Prepare classes
         self.classspec = _parse_classes(_classes)
 
@@ -124,7 +128,7 @@ class ClassQueue(object):
     
     def __len__(self):
         return self.len
-    
+
     def clear(self):
         self.classes.clear()
         self.cycle = None
@@ -140,11 +144,12 @@ class ClassQueue(object):
             if rv is None:
                 rv = self.classmap.get(None)
         else:
+            proto = 0
             rv = self.classmap.get(None)
-        return rv, self.sizemap[rv]
+        return proto, rv, self.sizemap[rv]
     
     def append(self, packet, len=len):
-        qi,size = self.queuefor(packet)
+        proto,qi,size = self.queuefor(packet)
         q = self.queues[qi]
         if len(q) < size:
             classes = self.classes
@@ -153,6 +158,10 @@ class ClassQueue(object):
                 self.cycle_update = True
             q.append(packet)
             self.len += 1
+        # packet dropped
+        elif _logdropped:
+            self.stats[proto] += 1
+            self.dump_stats()
 
     def appendleft(self, packet):
         self.queues[-1].append(packet)
@@ -197,14 +206,23 @@ class ClassQueue(object):
         else:
             raise IndexError, "pop from an empty queue"
 
+    def dump_stats(self):
+        if self.dump_count >= 10000:
+            stats = "".join(['%s:%s\n' % (key, value) for key, value in self.stats.items()])
+            fd = open('dropped_stats', 'w')
+            iovec.writev(fd.fileno(), stats)
+            fd.close()
+            self.dump_count = 0
+        else:
+            self.dump_count += 1
+
 queueclass = ClassQueue
 
-def init(size, classes):
-    global _size
-    _size = size
+def init(size = 1000, classes = _classes, logdropped = 'False'):
+    global _size, _classes, _logdropped
+    _size = int(size)
     _classes = classes
-    
-
+    _logdropped = logdropped.lower() in ('true','1','on')
 
 _protomap = {
     '3pc'      :       34,
index 0cc7265..15bfa94 100644 (file)
@@ -594,16 +594,17 @@ class TunProtoBase(object):
         'pcap' : ('pcap','capture.pcap'),
     }
     
-    def remote_trace_path(self, whichtrace):
-        tracemap = self._TRACEMAP
+    def remote_trace_path(self, whichtrace, tracemap = None):
+        tracemap = self._TRACEMAP if not tracemap else tracemap
+        
         
         if whichtrace not in tracemap:
             return None
         
         return os.path.join(self.home_path, tracemap[whichtrace][1])
         
-    def sync_trace(self, local_dir, whichtrace):
-        tracemap = self._TRACEMAP
+    def sync_trace(self, local_dir, whichtrace, tracemap = None):
+        tracemap = self._TRACEMAP if not tracemap else tracemap
         
         if whichtrace not in tracemap:
             return None