From 1be379272c93252ae54334f58df16b21d121f286 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Thu, 31 May 2012 03:19:23 -0300 Subject: [PATCH] Several small fixes to logging queue: * Separate forward and backward queue logs * Add an "interval" so that both immediate and timed measurements are possible * Wake up logging thread on append when doing immediate logging * Properly implement the outpath-injected args attribute * Add missing newlines in csv log --- src/nepi/testbeds/planetlab/interfaces.py | 11 +++++-- src/nepi/testbeds/planetlab/metadata.py | 12 ++++--- .../planetlab/scripts/loggingclassqueue.py | 32 +++++++++++++------ 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index acc8c8f5..dddc0057 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -582,7 +582,8 @@ class LoggingClassQueueFilter(ClassQueueFilter): _TRACEMAP = ClassQueueFilter._TRACEMAP.copy() _TRACEMAP.update({ # tracename : (remotename, localname) - 'queue_stats' : ('queue_stats', 'queue_stats') + 'queue_stats_f' : ('queue_stats_f', 'queue_stats_f'), + 'queue_stats_b' : ('queue_stats_b', 'queue_stats_b'), }) def __init__(self, api=None): @@ -590,10 +591,14 @@ class LoggingClassQueueFilter(ClassQueueFilter): # Attributes self.module = "loggingclassqueue.py classqueue.py" + def _args_get(self): # Inject outpath - args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self.args or "").split(',')))) + args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self._args or "").split(',')))) args["outpath"] = "queue_stats" - self.args = ",".join(map("=".join, args.iteritems())) + return ",".join(map("=".join, args.iteritems())) + def _args_set(self, value): + self._args = value + args = property(_args_get, _args_set) class ToSQueueFilter(TunFilter): def __init__(self, api=None): diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index eae9c7f8..37a4c03d 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -1372,9 +1372,13 @@ traces = dict({ "name": "dropped_stats", "help": "Information on dropped packets on a filer or queue associated to a network interface", }), - "queue_stats": dict({ - "name": "queue_stats", - "help": "Detailled, fine-grained information on queue state, csv format.", + "queue_stats_f": dict({ + "name": "queue_stats_f", + "help": "Detailled, fine-grained information on egress queue state, csv format.", + }), + "queue_stats_b": dict({ + "name": "queue_stats_b", + "help": "Detailled, fine-grained information on ingress queue state, csv format.", }), }) @@ -1581,7 +1585,7 @@ factories_info = dict({ "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", ], "connector_types": ["->fd","udp","tcp"], - "traces": ["dropped_stats","queue_stats"], + "traces": ["dropped_stats","queue_stats_f","queue_stats_b"], }), TOSQUEUEFILTER : dict({ "help": "TUN classfull queue that classifies according to the TOS (RFC 791) IP field.\n\n" diff --git a/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py b/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py index 9ad3ee9d..439aa121 100644 --- a/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py +++ b/src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py @@ -9,6 +9,7 @@ import time import classqueue _outpath = "output" +_interval = 0 class QueueLogger(threading.Thread): def __init__(self, queues, drops, accepts, outpath): @@ -22,17 +23,22 @@ class QueueLogger(threading.Thread): self._terminate = False def run(self): + if _interval > 0: + interval = _interval + else: + interval = 1 + t0 = time.time() - with open(self.outpath, "w+") as outfile: - outfile.write(",".join( + with open(self.outpath, "w") as outfile: + outfile.writelines((",".join( ["time"] + map("q%02dlen".__mod__, xrange(len(self.queues))) + map("q%02ddrops".__mod__, xrange(len(self.queues))) + map("q%02daccepts".__mod__, xrange(len(self.queues))) - )) + ), "\n")) while not self._terminate: - self._event.wait(1) + self._event.wait(interval) if self._terminate: break @@ -46,12 +52,12 @@ class QueueLogger(threading.Thread): queueaccepts = list(self.accepts) self.accepts[:] = [0] * len(self.accepts) - outfile.write(",".join( - [str(time.time)] + outfile.writelines((",".join( + [str(time.time()-t0)] + map(str, queuelens) + map(str, queuedrops) + map(str, queueaccepts) - )) + ), "\n")) outfile.flush() def terminate(self): @@ -62,13 +68,15 @@ class QueueLogger(threading.Thread): self._event.set() class LoggingClassQueue(classqueue.ClassQueue): + outpath_suffix = itertools.cycle(('_f','_b')) + def __init__(self): self.accepts = [] self.drops = [] super(LoggingClassQueue, self).__init__() # Prepare logger thread - self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath) + self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath+self.outpath_suffix.next()) self.logger.start() def __del__(self): @@ -88,11 +96,15 @@ class LoggingClassQueue(classqueue.ClassQueue): else: self.accepts[qi] += 1 + if _interval == 0: + self.logger.wakeup() + return dropped queueclass = LoggingClassQueue -def init(outpath="output", **kw): - global _outpath +def init(outpath="output", interval=0, **kw): + global _outpath, _interval _outpath = outpath + _interval = interval classqueue.init(**kw) -- 2.45.2