Fix interval support in loggingclassqueue
[nepi.git] / src / nepi / testbeds / planetlab / scripts / loggingclassqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7 import threading
8 import time
9 import classqueue
10
11 _outpath = "output"
12 _interval = 0
13
14 class QueueLogger(threading.Thread):
15     def __init__(self, queues, drops, accepts, outpath):
16         super(QueueLogger,self).__init__()
17         self.queues = queues
18         self.drops = drops
19         self.accepts = accepts
20         self.outpath = outpath
21         self.setDaemon(True)
22         self._event = threading.Event()
23         self._terminate = False
24     
25     def run(self):
26         if _interval > 0:
27             interval = _interval
28         else:
29             interval = 1
30         
31         t0 = time.time()
32         with open(self.outpath, "w") as outfile:
33             outfile.writelines((",".join(
34                 ["time"]
35                 + map("q%02dlen".__mod__, xrange(len(self.queues)))
36                 + map("q%02ddrops".__mod__, xrange(len(self.queues)))
37                 + map("q%02daccepts".__mod__, xrange(len(self.queues)))
38             ), "\n"))
39             
40             while not self._terminate:
41                 self._event.wait(interval)
42                 if self._terminate:
43                     break
44                 
45                 self._event.clear()
46                 
47                 queuelens = map(len,self.queues)
48                 
49                 queuedrops = list(self.drops)
50                 self.drops[:] = [0] * len(self.drops)
51                 
52                 queueaccepts = list(self.accepts)
53                 self.accepts[:] = [0] * len(self.accepts)
54                 
55                 outfile.writelines((",".join(
56                     [str(time.time()-t0)]
57                     + map(str, queuelens)
58                     + map(str, queuedrops)
59                     + map(str, queueaccepts)
60                 ), "\n"))
61                 outfile.flush()
62             
63     def terminate(self):
64         self._terminate = True
65         self.wakeup()
66     
67     def wakeup(self):
68         self._event.set()
69
70 class LoggingClassQueue(classqueue.ClassQueue):
71     outpath_suffix = itertools.cycle(('_f','_b'))
72     
73     def __init__(self):
74         self.accepts = []
75         self.drops = []
76         super(LoggingClassQueue, self).__init__()
77         
78         # Prepare logger thread
79         self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath+self.outpath_suffix.next())
80         self.logger.start()
81     
82     def __del__(self):
83         self.logger.terminate()
84
85     def clear(self):
86         super(LoggingClassQueue, self).clear()
87         self.accepts[:] = [0] * len(self.queues)
88         self.drops[:] = [0] * len(self.queues)
89     
90     def append(self, packet):
91         proto,qi,size = self.queuefor(packet)
92         dropped = super(LoggingClassQueue, self).append(packet)
93         
94         if dropped:
95             self.drops[qi] += 1
96         else:
97             self.accepts[qi] += 1
98         
99         if _interval == 0:
100             self.logger.wakeup()
101         
102         return dropped
103
104 queueclass = LoggingClassQueue
105
106 def init(outpath="output", interval=0, **kw):
107     global _outpath, _interval
108     _outpath = outpath
109     _interval = float(interval)
110     classqueue.init(**kw)