From fc2519242a091a0bf5b008f880e835cdc4cf7b3a Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Tue, 29 May 2012 21:21:41 -0300 Subject: [PATCH] Logging classqueue --- .../testbeds/planetlab/scripts/classqueue.py | 3 +- .../planetlab/scripts/loggingclassqueue.py | 98 +++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py diff --git a/src/nepi/testbeds/planetlab/scripts/classqueue.py b/src/nepi/testbeds/planetlab/scripts/classqueue.py index 412a0a49..01a5dc60 100644 --- a/src/nepi/testbeds/planetlab/scripts/classqueue.py +++ b/src/nepi/testbeds/planetlab/scripts/classqueue.py @@ -139,7 +139,7 @@ class ClassQueue(object): self.cyclelen = None self.cycle_update = True self.len = 0 - self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ] + self.queues[:] = [ collections.deque() for cls in xrange(len(self.classspec)) ] def queuefor(self, packet, ord=ord, len=len, classmask=0xEC): if len(packet) >= 10: @@ -183,6 +183,7 @@ class ClassQueue(object): else: astats[proto] += 1 self.dump_stats() + return dropped def appendleft(self, packet): self.queues[-1].append(packet) diff --git a/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py b/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py new file mode 100644 index 00000000..9ad3ee9d --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py @@ -0,0 +1,98 @@ +import collections +import itertools +import random +import re +import sys +import iovec +import threading +import time +import classqueue + +_outpath = "output" + +class QueueLogger(threading.Thread): + def __init__(self, queues, drops, accepts, outpath): + super(QueueLogger,self).__init__() + self.queues = queues + self.drops = drops + self.accepts = accepts + self.outpath = outpath + self.setDaemon(True) + self._event = threading.Event() + self._terminate = False + + def run(self): + t0 = time.time() + with open(self.outpath, "w+") as outfile: + outfile.write(",".join( + ["time"] + + map("q%02dlen".__mod__, xrange(len(self.queues))) + + map("q%02ddrops".__mod__, xrange(len(self.queues))) + + map("q%02daccepts".__mod__, xrange(len(self.queues))) + )) + + while not self._terminate: + self._event.wait(1) + if self._terminate: + break + + self._event.clear() + + queuelens = map(len,self.queues) + + queuedrops = list(self.drops) + self.drops[:] = [0] * len(self.drops) + + queueaccepts = list(self.accepts) + self.accepts[:] = [0] * len(self.accepts) + + outfile.write(",".join( + [str(time.time)] + + map(str, queuelens) + + map(str, queuedrops) + + map(str, queueaccepts) + )) + outfile.flush() + + def terminate(self): + self._terminate = True + self.wakeup() + + def wakeup(self): + self._event.set() + +class LoggingClassQueue(classqueue.ClassQueue): + def __init__(self): + self.accepts = [] + self.drops = [] + super(LoggingClassQueue, self).__init__() + + # Prepare logger thread + self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath) + self.logger.start() + + def __del__(self): + self.logger.terminate() + + def clear(self): + super(LoggingClassQueue, self).clear() + self.accepts[:] = [0] * len(self.queues) + self.drops[:] = [0] * len(self.queues) + + def append(self, packet): + proto,qi,size = self.queuefor(packet) + dropped = super(LoggingClassQueue, self).append(packet) + + if dropped: + self.drops[qi] += 1 + else: + self.accepts[qi] += 1 + + return dropped + +queueclass = LoggingClassQueue + +def init(outpath="output", **kw): + global _outpath + _outpath = outpath + classqueue.init(**kw) -- 2.45.2