Logging classqueue
[nepi.git] / src / nepi / testbeds / planetlab / scripts / loggingclassqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7 import threading
8 import time
9 import classqueue
10
11 _outpath = "output"
12
13 class QueueLogger(threading.Thread):
14     def __init__(self, queues, drops, accepts, outpath):
15         super(QueueLogger,self).__init__()
16         self.queues = queues
17         self.drops = drops
18         self.accepts = accepts
19         self.outpath = outpath
20         self.setDaemon(True)
21         self._event = threading.Event()
22         self._terminate = False
23     
24     def run(self):
25         t0 = time.time()
26         with open(self.outpath, "w+") as outfile:
27             outfile.write(",".join(
28                 ["time"]
29                 + map("q%02dlen".__mod__, xrange(len(self.queues)))
30                 + map("q%02ddrops".__mod__, xrange(len(self.queues)))
31                 + map("q%02daccepts".__mod__, xrange(len(self.queues)))
32             ))
33             
34             while not self._terminate:
35                 self._event.wait(1)
36                 if self._terminate:
37                     break
38                 
39                 self._event.clear()
40                 
41                 queuelens = map(len,self.queues)
42                 
43                 queuedrops = list(self.drops)
44                 self.drops[:] = [0] * len(self.drops)
45                 
46                 queueaccepts = list(self.accepts)
47                 self.accepts[:] = [0] * len(self.accepts)
48                 
49                 outfile.write(",".join(
50                     [str(time.time)]
51                     + map(str, queuelens)
52                     + map(str, queuedrops)
53                     + map(str, queueaccepts)
54                 ))
55                 outfile.flush()
56             
57     def terminate(self):
58         self._terminate = True
59         self.wakeup()
60     
61     def wakeup(self):
62         self._event.set()
63
64 class LoggingClassQueue(classqueue.ClassQueue):
65     def __init__(self):
66         self.accepts = []
67         self.drops = []
68         super(LoggingClassQueue, self).__init__()
69         
70         # Prepare logger thread
71         self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath)
72         self.logger.start()
73     
74     def __del__(self):
75         self.logger.terminate()
76
77     def clear(self):
78         super(LoggingClassQueue, self).clear()
79         self.accepts[:] = [0] * len(self.queues)
80         self.drops[:] = [0] * len(self.queues)
81     
82     def append(self, packet):
83         proto,qi,size = self.queuefor(packet)
84         dropped = super(LoggingClassQueue, self).append(packet)
85         
86         if dropped:
87             self.drops[qi] += 1
88         else:
89             self.accepts[qi] += 1
90         
91         return dropped
92
93 queueclass = LoggingClassQueue
94
95 def init(outpath="output", **kw):
96     global _outpath
97     _outpath = outpath
98     classqueue.init(**kw)