14 class QueueLogger(threading.Thread):
15 def __init__(self, queues, drops, accepts, outpath):
16 super(QueueLogger,self).__init__()
19 self.accepts = accepts
20 self.outpath = outpath
22 self._event = threading.Event()
23 self._terminate = False
32 with open(self.outpath, "w") as outfile:
33 outfile.writelines((",".join(
35 + map("q%02dlen".__mod__, xrange(len(self.queues)))
36 + map("q%02ddrops".__mod__, xrange(len(self.queues)))
37 + map("q%02daccepts".__mod__, xrange(len(self.queues)))
40 while not self._terminate:
41 self._event.wait(interval)
47 queuelens = map(len,self.queues)
49 queuedrops = list(self.drops)
50 self.drops[:] = [0] * len(self.drops)
52 queueaccepts = list(self.accepts)
53 self.accepts[:] = [0] * len(self.accepts)
55 outfile.writelines((",".join(
58 + map(str, queuedrops)
59 + map(str, queueaccepts)
64 self._terminate = True
70 class LoggingClassQueue(classqueue.ClassQueue):
71 outpath_suffix = itertools.cycle(('_f','_b'))
76 super(LoggingClassQueue, self).__init__()
78 # Prepare logger thread
79 self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath+self.outpath_suffix.next())
83 self.logger.terminate()
86 super(LoggingClassQueue, self).clear()
87 self.accepts[:] = [0] * len(self.queues)
88 self.drops[:] = [0] * len(self.queues)
90 def append(self, packet):
91 proto,qi,size = self.queuefor(packet)
92 dropped = super(LoggingClassQueue, self).append(packet)
104 queueclass = LoggingClassQueue
106 def init(outpath="output", interval=0, **kw):
107 global _outpath, _interval
109 _interval = float(interval)
110 classqueue.init(**kw)