_TRACEMAP = ClassQueueFilter._TRACEMAP.copy()
_TRACEMAP.update({
# tracename : (remotename, localname)
- 'queue_stats' : ('queue_stats', 'queue_stats')
+ 'queue_stats_f' : ('queue_stats_f', 'queue_stats_f'),
+ 'queue_stats_b' : ('queue_stats_b', 'queue_stats_b'),
})
def __init__(self, api=None):
# Attributes
self.module = "loggingclassqueue.py classqueue.py"
+ def _args_get(self):
# Inject outpath
- args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self.args or "").split(','))))
+ args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self._args or "").split(','))))
args["outpath"] = "queue_stats"
- self.args = ",".join(map("=".join, args.iteritems()))
+ return ",".join(map("=".join, args.iteritems()))
+ def _args_set(self, value):
+ self._args = value
+ args = property(_args_get, _args_set)
class ToSQueueFilter(TunFilter):
def __init__(self, api=None):
"name": "dropped_stats",
"help": "Information on dropped packets on a filer or queue associated to a network interface",
}),
- "queue_stats": dict({
- "name": "queue_stats",
- "help": "Detailled, fine-grained information on queue state, csv format.",
+ "queue_stats_f": dict({
+ "name": "queue_stats_f",
+ "help": "Detailled, fine-grained information on egress queue state, csv format.",
+ }),
+ "queue_stats_b": dict({
+ "name": "queue_stats_b",
+ "help": "Detailled, fine-grained information on ingress queue state, csv format.",
}),
})
"tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
],
"connector_types": ["->fd","udp","tcp"],
- "traces": ["dropped_stats","queue_stats"],
+ "traces": ["dropped_stats","queue_stats_f","queue_stats_b"],
}),
TOSQUEUEFILTER : dict({
"help": "TUN classfull queue that classifies according to the TOS (RFC 791) IP field.\n\n"
import classqueue
_outpath = "output"
+_interval = 0
class QueueLogger(threading.Thread):
def __init__(self, queues, drops, accepts, outpath):
self._terminate = False
def run(self):
+ if _interval > 0:
+ interval = _interval
+ else:
+ interval = 1
+
t0 = time.time()
- with open(self.outpath, "w+") as outfile:
- outfile.write(",".join(
+ with open(self.outpath, "w") as outfile:
+ outfile.writelines((",".join(
["time"]
+ map("q%02dlen".__mod__, xrange(len(self.queues)))
+ map("q%02ddrops".__mod__, xrange(len(self.queues)))
+ map("q%02daccepts".__mod__, xrange(len(self.queues)))
- ))
+ ), "\n"))
while not self._terminate:
- self._event.wait(1)
+ self._event.wait(interval)
if self._terminate:
break
queueaccepts = list(self.accepts)
self.accepts[:] = [0] * len(self.accepts)
- outfile.write(",".join(
- [str(time.time)]
+ outfile.writelines((",".join(
+ [str(time.time()-t0)]
+ map(str, queuelens)
+ map(str, queuedrops)
+ map(str, queueaccepts)
- ))
+ ), "\n"))
outfile.flush()
def terminate(self):
self._event.set()
class LoggingClassQueue(classqueue.ClassQueue):
+ outpath_suffix = itertools.cycle(('_f','_b'))
+
def __init__(self):
self.accepts = []
self.drops = []
super(LoggingClassQueue, self).__init__()
# Prepare logger thread
- self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath)
+ self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath+self.outpath_suffix.next())
self.logger.start()
def __del__(self):
else:
self.accepts[qi] += 1
+ if _interval == 0:
+ self.logger.wakeup()
+
return dropped
queueclass = LoggingClassQueue
-def init(outpath="output", **kw):
- global _outpath
+def init(outpath="output", interval=0, **kw):
+ global _outpath, _interval
_outpath = outpath
+ _interval = interval
classqueue.init(**kw)