Logging classqueue
[nepi.git] / src / nepi / testbeds / planetlab / scripts / classqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7
8 dstats = collections.defaultdict(int)
9 astats = collections.defaultdict(int)
10 dump_count = [0]
11
12 _red = True
13 _size = 1000
14 _classes = (
15     "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
16     "udp.st.nvp.rdp.ddp.pvp.mtp.srp.smp.136:"
17     "tcp.icmp*4:"
18     "ip.gre.etherip.l2tp:"
19     "hopopt.shim6.ipv6.ipv6route.ipv6frag.ipv6icmp.ipv6nonxt.ipv6opts*4:"
20     "crtp.crudp*8:"
21     "*3"
22 )
23 _logdropped = False
24
25 def clsmap(cls):
26     global _protomap
27     if cls in _protomap:
28         return _protomap[cls]
29     elif cls == "":
30         return None
31     else:
32         return int(cls)
33
34 def _parse_classes(classes):
35     """
36      Class list structure:
37        <CLASSLIST> ::= <CLASS> ":" CLASSLIST
38                     |  <CLASS>
39        <CLASS>     ::= <PROTOLIST> "*" <PRIORITYSPEC>
40                     |  <DFLTCLASS>
41        <DFLTCLASS> ::= "*" <PRIORITYSPEC>
42        <PROTOLIST> ::= <PROTO> "." <PROTOLIST>
43                     |  <PROTO>
44        <PROTO>     ::= <NAME> | <NUMBER>
45        <NAME>      ::= --see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers --
46                        --only in lowercase, with special characters removed--
47                        --or see below--
48        <NUMBER>    ::= [0-9]+
49        <PRIORITYSPEC> ::= <THOUGHPUT> [ "#" <SIZE> ] [ "p" <PRIORITY> ]
50        <THOUGHPUT> ::= NUMBER -- default 1
51        <PRIORITY>  ::= NUMBER -- default 0
52        <SIZE>      ::= NUMBER -- default 1
53     """
54     classes = map(lambda x:x.split('*',2),classes.split(':'))
55     priorex = re.compile(r"(?P<thoughput>\d+)?(?:#(?P<size>\d+))?(?:p(?P<priority>\d+))?")
56     for cls in classes:
57         if not cls:
58             cls.append("")
59         if len(cls) < 2:
60             cls.append("")
61         prio = priorex.match(cls[1])
62         if not prio:
63             prio = (1,0,1)
64         else:
65             prio = (
66                 int(prio.group("thoughput") or 1),
67                 int(prio.group("priority") or 0),
68                 int(prio.group("size") or 1),
69             )
70         cls[1] = prio
71         cls[0] = map(clsmap, cls[0].split('.'))
72         if not cls[0]:
73             cls[0] = [None]
74     
75     return classes
76     
77
78 class ClassQueue(object):
79     def __init__(self):
80         self.size = _size
81         self.len = 0
82
83         # Prepare classes
84         self.classspec = _parse_classes(_classes)
85
86         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
87         
88         self.classmap = dict(
89             (proto, cls)
90             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
91             for proto in protos
92         )
93
94         self.priomap = [
95             prio
96             for cls in xrange(len(self.classspec))
97             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
98         ]
99         
100         self.sizemap = [
101             size * _size
102             for cls in xrange(len(self.classspec))
103             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
104         ]
105         
106         order = [ 
107             cls
108             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
109             for i in xrange(thoughput)
110         ]
111         self.order = [
112             filter(lambda x : self.priomap[x] == prio, order)
113             for prio in reversed(sorted(set(self.priomap)))
114         ]
115         for order in self.order:
116             random.shuffle(order)
117         
118         if None not in self.classmap:
119             raise RuntimeError, "No default class: a default class must be present"
120         
121         # add retries
122         self.queues.append(collections.deque())
123         self.priomap.append(-1)
124         self.sizemap.append(_size)
125         self.order.insert(0, [len(self.queues)-1])
126         
127         self.classes = set()
128         self.clear()
129     
130     def __nonzero__(self):
131         return self.len > 0
132     
133     def __len__(self):
134         return self.len
135
136     def clear(self):
137         self.classes.clear()
138         self.cycle = None
139         self.cyclelen = None
140         self.cycle_update = True
141         self.len = 0
142         self.queues[:] = [ collections.deque() for cls in xrange(len(self.classspec)) ]
143     
144     def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
145         if len(packet) >= 10:
146             proto = ord(packet[9])
147             rv = self.classmap.get(proto)
148             if rv is None:
149                 rv = self.classmap.get(None)
150         else:
151             proto = 0
152             rv = self.classmap.get(None)
153         return proto, rv, self.sizemap[rv]
154     
155     def get_packetdrop_p(self, qlen, qsize, packet):
156         pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
157         pdrop *= pdrop
158         return pdrop
159     
160     def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
161         proto,qi,size = self.queuefor(packet)
162         q = self.queues[qi]
163         lq = len(q)
164         if lq < size:
165             dropped = 0
166             if lq > (size/2) and _red:
167                 pdrop = self.get_packetdrop_p(lq, size, packet)
168                 if rng() < pdrop:
169                     dropped = 1
170             if not dropped:
171                 classes = self.classes
172                 if qi not in classes:
173                     classes.add(qi)
174                     self.cycle_update = True
175                 q.append(packet)
176                 self.len += 1
177         # packet dropped
178         else:
179             dropped = 1
180         if _logdropped:
181             if dropped:
182                 dstats[proto] += 1
183             else:
184                 astats[proto] += 1
185             self.dump_stats()
186         return dropped
187
188     def appendleft(self, packet):
189         self.queues[-1].append(packet)
190         self.len += 1
191     
192     def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
193         return self.popleft(pop=pop)
194     
195     def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
196         queues = self.queues
197         classes = self.classes
198
199         if len(classes)==1:
200             # shortcut for non-tos traffic
201             rv = pop(queues[iter(classes).next()])
202             self.len -= 1
203             return rv
204
205         if self.cycle_update:
206             cycle = [
207                 filter(classes.__contains__, order)
208                 for order in self.order
209             ]
210             self.cycle = map(itertools.cycle, cycle)
211             self.cyclelen = map(len,cycle)
212             self.cycle_update = False
213         
214         for prio, (cycle, cyclelen) in enumerate(zip(self.cycle, self.cyclelen)):
215             cycle = cycle.next
216             for i in xrange(cyclelen):
217                 qi = cycle()
218                 if qi in classes:
219                     q = queues[qi]
220                     if q:
221                         rv = pop(q)
222                         self.len -= 1
223                         return rv
224                     else:
225                         # Needs to update the cycle
226                         classes.remove(qi)
227                         self.cycle_update = True
228         else:
229             raise IndexError, "pop from an empty queue"
230
231     def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
232         if dump_count[0] >= 10000:
233             try:
234                 dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
235                 astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
236                 fd = open('dropped_stats', 'w')
237                 iovec.writev(fd.fileno(), "Classes: ", _classes, "\nDropped:\n", dstatsstr, "Accepted:\n", astatsstr)
238                 fd.close()
239             except:
240                 # who cares
241                 pass
242             dump_count[0] = 0
243         else:
244             dump_count[0] += 1
245
246 queueclass = ClassQueue
247
248 def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
249     global _size, _classes, _logdropped
250     _size = int(size)
251     _classes = classes
252     _red = red
253     _logdropped = logdropped.lower() in ('true','1','on')
254     
255     if _logdropped:
256         # Truncate stats
257         open('dropped_stats', 'w').close()
258
259 _protomap = {
260     '3pc'       :       34,
261     'an'        :       107,
262     'ah'        :       51,
263     'argus'     :       13,
264     'aris'      :       104,
265     'ax25'      :       93,
266     'bbn-rcc-mon'       :       10,
267     'bna'       :       49,
268     'brsatmon'  :       76,
269     'cbt'       :       7,
270     'cftp'      :       62,
271     'chaos'     :       16,
272     'compaqpeer'        :       110,
273     'cphb'      :       73,
274     'cpnx'      :       72,
275     'crtp'      :       126,
276     'crudp'     :       127,
277     'dccp'      :       33,
278     'dcn-meas'  :       19,
279     'ddp'       :       37,
280     'ddx'       :       116,
281     'dgp'       :       86,
282     'egp'       :       8,
283     'eigrp'     :       88,
284     'emcon'     :       14,
285     'encap'     :       98,
286     'esp'       :       50,
287     'etherip'   :       97,
288     'fc'        :       133,
289     'fire'      :       125,
290     'ggp'       :       3,
291     'gmtp'      :       100,
292     'gre'       :       47,
293     'hip'       :       139,
294     'hmp'       :       20,
295     'hopopt'    :       0,
296     'iatp'      :       117,
297     'icmp'      :       1,
298     'idpr'      :       35,
299     'idprcmtp'  :       38,
300     'idrp'      :       45,
301     'ifmp'      :       101,
302     'igmp'      :       2,
303     'igp'       :       9,
304     'il'        :       40,
305     'inlsp'     :       52,
306     'ip'        :       4,
307     'ipcomp'    :       108,
308     'ipcv'      :       71,
309     'ipip'      :       94,
310     'iplt'      :       129,
311     'ippc'      :       67,
312     'iptm'      :       84,
313     'ipv6'      :       41,
314     'ipv6frag'  :       44,
315     'ipv6icmp'  :       58,
316     'ipv6nonxt' :       59,
317     'ipv6opts'  :       60,
318     'ipv6route' :       43,
319     'ipxinip'   :       111,
320     'irtp'      :       28,
321     'isoip'     :       80,
322     'isotp4'    :       29,
323     'kryptolan' :       65,
324     'l2tp'      :       115,
325     'larp'      :       91,
326     'leaf1'     :       25,
327     'leaf2'     :       26,
328     'manet'     :       138,
329     'meritinp'  :       32,
330     'mfensp'    :       31,
331     'mhrp'      :       48,
332     'micp'      :       95,
333     'mobile'    :       55,
334     'mtp'       :       92,
335     'mux'       :       18,
336     'narp'      :       54,
337     'netblt'    :       30,
338     'nsfnetigp' :       85,
339     'nvp'       :       11,
340     'ospf'      :       89,
341     'pgm'       :       113,
342     'pim'       :       103,
343     'pipe'      :       131,
344     'pnni'      :       102,
345     'prm'       :       21,
346     'ptp'       :       123,
347     'pup'       :       12,
348     'pvp'       :       75,
349     'qnx'       :       106,
350     'rdp'       :       27,
351     'rsvp'      :       46,
352     'rvd'       :       66,
353     'satexpak'  :       64,
354     'satmon'    :       69,
355     'sccsp'     :       96,
356     'scps'      :       105,
357     'sctp'      :       132,
358     'sdrp'      :       42,
359     'securevmtp'        :       82,
360     'shim6'     :       140,
361     'skip'      :       57,
362     'sm'        :       122,
363     'smp'       :       121,
364     'snp'       :       109,
365     'spriterpc' :       90,
366     'sps'       :       130,
367     'srp'       :       119,
368     'sscopmce'  :       128,
369     'st'        :       5,
370     'stp'       :       118,
371     'sunnd'     :       77,
372     'swipe'     :       53,
373     'tcf'       :       87,
374     'tcp'       :       6,
375     'tlsp'      :       56,
376     'tp'        :       39,
377     'trunk1'    :       23,
378     'trunk2'    :       24,
379     'ttp'       :       84,
380     'udp'       :       17,
381     'uti'       :       120,
382     'vines'     :       83,
383     'visa'      :       70,
384     'vmtp'      :       81,
385     'vrrp'      :       112,
386     'wbexpak'   :       79,
387     'wbmon'     :       78,
388     'wsn'       :       74,
389     'xnet'      :       15,
390     'xnsidp'    :       22,
391     'xtp'       :       36
392 }
393