From: Alina Quereilhac Date: Tue, 23 Aug 2011 00:00:38 +0000 (+0200) Subject: ClassQueueFilter working in PlanetLab. Results need to be verified. X-Git-Tag: nepi-3.0.0~276 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=32b534c25d3eaf585b0781d66952838148859479;p=nepi.git ClassQueueFilter working in PlanetLab. Results need to be verified. --- diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 2af29f11..552b4a9f 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -1048,34 +1048,57 @@ class ExperimentController(object): elem_cross_data[attr_name] = _undefer(attr_value) return cross_data - """ class ExperimentSuite(object): - def __init__(self, experiment_xml, access_config, nexp, + def __init__(self, experiment_xml, access_config, repetitions, duration, wait_guids): self._experiment_xml = experiment_xml self._access_config = access_config self._experiments = dict() - self._nexp = nexp + self._repetitions = repetitions self._duration = duration self._wait_guids = wait_guids - self._curr = None + self._current = None + self._status = TS.STATUS_ZERO + self._thread = None + + def start(self): + self._status = TS.STATUS_STARTED + self._thread = threading.Thread(target = self._run_experiment_suite) + self._thread.start() + def shutdown(self): + if self._thread: + self._thread.join() + self._thread = None - def _run_one_exp(self): + def _run_experiment_suite(self): + for i in xrange[0, self.repetitions]: + self._current = i + self._run_one_experiment() + + def _run_one_experiment(self): access_config = proxy.AccessConfiguration() + for attr in self._access_config.attributes: + access_config.set_attribute_value(attr.name, attr.value) + access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + root_dir = "%s_%d" % ( + access_config.get_attribute_value(DC.ROOT_DIRECTORY), + self._current) + access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir) controller = proxy.create_experiment_controller(self._experiment_xml, access_config) - self._experiments[self._curr] = controller + self._experiments[self._current] = controller controller.start() started_at = time.time() + # wait until all specified guids have finished execution if self._wait_guids: - while not controller.is_finished(92) and \ - not controller.is_finished(108) and \ - not controller.is_finished(126) and \ - not controller.is_finished(84): + while all(itertools.imap(controller.is_finished, self._wait_guids): + time.sleep(0.5) + # wait until the minimum experiment duration time has elapsed + if self._duration: + while (time.time() - started_at) < self._duration: time.sleep(0.5) - # add results to instructions controller.stop() #download results!! controller.shutdown() @@ -1087,4 +1110,3 @@ class ExperimentSuite(object): - diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 292b92e1..f86026a9 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -544,10 +544,10 @@ class TestbedController(testbed_impl.TestbedController): self._traces.clear() def trace(self, guid, trace_id, attribute='value'): - app = self._elements[guid] + elem = self._elements[guid] if attribute == 'value': - path = app.sync_trace(self.home_directory, trace_id) + path = elem.sync_trace(self.home_directory, trace_id) if path: fd = open(path, "r") content = fd.read() @@ -555,9 +555,9 @@ class TestbedController(testbed_impl.TestbedController): else: content = None elif attribute == 'path': - content = app.remote_trace_path(trace_id) + content = elem.remote_trace_path(trace_id) elif attribute == 'name': - content = app.remote_trace_name(trace_id) + content = elem.remote_trace_name(trace_id) else: content = None return content @@ -708,3 +708,10 @@ class TestbedController(testbed_impl.TestbedController): def _make_tun_filter(self, parameters): return self._make_generic(parameters, self._interfaces.TunFilter) + def _make_class_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ClassQueueFilter) + + def _make_tos_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ToSQueueFilter) + + diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index 8d0b5fa8..80f834a3 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -284,18 +284,22 @@ class TunIface(object): if self.peer_proto_impl: self.peer_proto_impl.async_launch_wait() - def sync_trace(self, local_dir, whichtrace): + def sync_trace(self, local_dir, whichtrace, tracemap = None): if self.peer_proto_impl: - return self.peer_proto_impl.sync_trace(local_dir, whichtrace) + return self.peer_proto_impl.sync_trace(local_dir, whichtrace, + tracemap) else: return None - def remote_trace_path(self, whichtrace): + def remote_trace_path(self, whichtrace, tracemap = None): if self.peer_proto_impl: - return self.peer_proto_impl.remote_trace_path(whichtrace) + return self.peer_proto_impl.remote_trace_path(whichtrace, tracemap) else: return None + def remote_trace_name(self, whichtrace): + return whichtrace + class TapIface(TunIface): _PROTO_MAP = tunproto.TAP_PROTO_MAP _KIND = 'TAP' @@ -470,6 +474,10 @@ class NetPipe(object): return local_path class TunFilter(object): + _TRACEMAP = { + # tracename : (remotename, localname) + } + def __init__(self, api=None): if not api: api = plcapi.PLCAPI() @@ -477,6 +485,7 @@ class TunFilter(object): # Attributes self.module = None + self.args = None # These get initialised when the filter is connected self.peer_guid = None @@ -520,3 +529,38 @@ class TunFilter(object): del _get del _set + def remote_trace_path(self, whichtrace): + iface = self.iface() + if iface is not None: + return iface.remote_trace_path(whichtrace, self._TRACEMAP) + return None + + def remote_trace_name(self, whichtrace): + iface = self.iface() + if iface is not None: + return iface.remote_trace_name(whichtrace, self._TRACEMAP) + return None + + def sync_trace(self, local_dir, whichtrace): + iface = self.iface() + if iface is not None: + return iface.sync_trace(local_dir, whichtrace, self._TRACEMAP) + return None + +class ClassQueueFilter(TunFilter): + _TRACEMAP = { + # tracename : (remotename, localname) + 'dropped_stats' : ('dropped_stats', 'dropped_stats') + } + + def __init__(self, api=None): + super(ClassQueueFilter, self).__init__(api) + # Attributes + self.module = "classqueue.py" + +class ToSQueueFilter(TunFilter): + def __init__(self, api=None): + super(ToSQueueFilter, self).__init__(api) + # Attributes + self.module = "tosqueue.py" + diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index 0123ce6c..f008c5cc 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -121,9 +121,14 @@ def connect_tun_iface_peer(proto, testbed_instance, iface_guid, peer_iface_guid) def connect_tun_iface_filter(testbed_instance, iface_guid, filter_guid): iface = testbed_instance._elements[iface_guid] filt = testbed_instance._elements[filter_guid] + traces = testbed_instance._get_traces(filter_guid) + if 'dropped_stats' in traces: + args = filt.args if filt.args else "" + filt.args = ','.join(filt.args.split(',') + ["logdropped=true",]) iface.filter_module = filt filt.iface_guid = iface_guid filt.iface = weakref.ref(iface) + if filt.peer_guid: connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid) @@ -187,7 +192,6 @@ def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_dat crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data) crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data) - def connect_dep(testbed_instance, node_guid, app_guid): node = testbed_instance._elements[node_guid] app = testbed_instance._elements[app_guid] @@ -281,13 +285,14 @@ def create_tunfilter(testbed_instance, guid): testbed_instance.elements[guid] = element def create_classqueuefilter(testbed_instance, guid): - element = create_tunfilter(testbed_instance, guid) - element.module = "classqueue.py" + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_class_queue_filter(parameters) + testbed_instance.elements[guid] = element def create_tosqueuefilter(testbed_instance, guid): - element = create_tunfilter(testbed_instance, guid) - element.module = "tosqueue.py" - + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_tos_queue_filter(parameters) + testbed_instance.elements[guid] = element def create_application(testbed_instance, guid): parameters = testbed_instance._get_parameters(guid) @@ -1195,9 +1200,13 @@ traces = dict({ "name": "output", "help": "Extra output trace for applications. When activated this trace can be referenced with wildcard a reference from an Application command line. Ex: command: 'tcpdump -w {#[elemet-label].trace[trace-id].[name|path]#}' ", }), + "dropped_stats": dict({ + "name": "dropped_stats", + "help": "Information on dropped packets on a filer or queue associated to a network interface", + }), }) -create_order = [ INTERNET, NODE, NODEIFACE, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] +create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] @@ -1361,6 +1370,7 @@ factories_info = dict({ "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", ], "connector_types": ["->fd","udp","tcp"], + "traces": ["dropped_stats"], }), TOSQUEUEFILTER : dict({ "help": "TUN classfull queue that classifies according to the TOS (RFC 791) IP field.\n\n" diff --git a/src/nepi/testbeds/planetlab/scripts/classqueue.py b/src/nepi/testbeds/planetlab/scripts/classqueue.py index f36fbd3c..fc799627 100644 --- a/src/nepi/testbeds/planetlab/scripts/classqueue.py +++ b/src/nepi/testbeds/planetlab/scripts/classqueue.py @@ -2,6 +2,8 @@ import collections import itertools import random import re +import sys +import iovec _size = 1000 _classes = ( @@ -71,7 +73,9 @@ class ClassQueue(object): def __init__(self): self.size = _size self.len = 0 - + self.stats = collections.defaultdict(int) + self.dump_count = 0 + # Prepare classes self.classspec = _parse_classes(_classes) @@ -124,7 +128,7 @@ class ClassQueue(object): def __len__(self): return self.len - + def clear(self): self.classes.clear() self.cycle = None @@ -140,11 +144,12 @@ class ClassQueue(object): if rv is None: rv = self.classmap.get(None) else: + proto = 0 rv = self.classmap.get(None) - return rv, self.sizemap[rv] + return proto, rv, self.sizemap[rv] def append(self, packet, len=len): - qi,size = self.queuefor(packet) + proto,qi,size = self.queuefor(packet) q = self.queues[qi] if len(q) < size: classes = self.classes @@ -153,6 +158,10 @@ class ClassQueue(object): self.cycle_update = True q.append(packet) self.len += 1 + # packet dropped + elif _logdropped: + self.stats[proto] += 1 + self.dump_stats() def appendleft(self, packet): self.queues[-1].append(packet) @@ -197,14 +206,23 @@ class ClassQueue(object): else: raise IndexError, "pop from an empty queue" + def dump_stats(self): + if self.dump_count >= 10000: + stats = "".join(['%s:%s\n' % (key, value) for key, value in self.stats.items()]) + fd = open('dropped_stats', 'w') + iovec.writev(fd.fileno(), stats) + fd.close() + self.dump_count = 0 + else: + self.dump_count += 1 + queueclass = ClassQueue -def init(size, classes): - global _size - _size = size +def init(size = 1000, classes = _classes, logdropped = 'False'): + global _size, _classes, _logdropped + _size = int(size) _classes = classes - - + _logdropped = logdropped.lower() in ('true','1','on') _protomap = { '3pc' : 34, diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 0cc72650..15bfa943 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -594,16 +594,17 @@ class TunProtoBase(object): 'pcap' : ('pcap','capture.pcap'), } - def remote_trace_path(self, whichtrace): - tracemap = self._TRACEMAP + def remote_trace_path(self, whichtrace, tracemap = None): + tracemap = self._TRACEMAP if not tracemap else tracemap + if whichtrace not in tracemap: return None return os.path.join(self.home_path, tracemap[whichtrace][1]) - def sync_trace(self, local_dir, whichtrace): - tracemap = self._TRACEMAP + def sync_trace(self, local_dir, whichtrace, tracemap = None): + tracemap = self._TRACEMAP if not tracemap else tracemap if whichtrace not in tracemap: return None