Trap errors in dropped packet trace dumps - no need to break the whole overlay if...
[nepi.git] / src / nepi / testbeds / planetlab / scripts / classqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7
8 dstats = collections.defaultdict(int)
9 astats = collections.defaultdict(int)
10 dump_count = [0]
11
12 _red = True
13 _size = 1000
14 _classes = (
15     "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
16     "udp.st.nvp.rdp.ddp.pvp.mtp.srp.smp.136:"
17     "tcp.icmp*4:"
18     "ip.gre.etherip.l2tp:"
19     "hopopt.shim6.ipv6.ipv6route.ipv6frag.ipv6icmp.ipv6nonxt.ipv6opts*4:"
20     "crtp.crudp*8:"
21     "*3"
22 )
23 _logdropped = False
24
25 def clsmap(cls):
26     global _protomap
27     if cls in _protomap:
28         return _protomap[cls]
29     elif cls == "":
30         return None
31     else:
32         return int(cls)
33
34 def _parse_classes(classes):
35     """
36      Class list structure:
37        <CLASSLIST> ::= <CLASS> ":" CLASSLIST
38                     |  <CLASS>
39        <CLASS>     ::= <PROTOLIST> "*" <PRIORITYSPEC>
40                     |  <DFLTCLASS>
41        <DFLTCLASS> ::= "*" <PRIORITYSPEC>
42        <PROTOLIST> ::= <PROTO> "." <PROTOLIST>
43                     |  <PROTO>
44        <PROTO>     ::= <NAME> | <NUMBER>
45        <NAME>      ::= --see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers --
46                        --only in lowercase, with special characters removed--
47                        --or see below--
48        <NUMBER>    ::= [0-9]+
49        <PRIORITYSPEC> ::= <THOUGHPUT> [ "#" <SIZE> ] [ "p" <PRIORITY> ]
50        <THOUGHPUT> ::= NUMBER -- default 1
51        <PRIORITY>  ::= NUMBER -- default 0
52        <SIZE>      ::= NUMBER -- default 1
53     """
54     classes = map(lambda x:x.split('*',2),classes.split(':'))
55     priorex = re.compile(r"(?P<thoughput>\d+)?(?:#(?P<size>\d+))?(?:p(?P<priority>\d+))?")
56     for cls in classes:
57         if not cls:
58             cls.append("")
59         if len(cls) < 2:
60             cls.append("")
61         prio = priorex.match(cls[1])
62         if not prio:
63             prio = (1,0,1)
64         else:
65             prio = (
66                 int(prio.group("thoughput") or 1),
67                 int(prio.group("priority") or 0),
68                 int(prio.group("size") or 1),
69             )
70         cls[1] = prio
71         cls[0] = map(clsmap, cls[0].split('.'))
72         if not cls[0]:
73             cls[0] = [None]
74     
75     return classes
76     
77
78 class ClassQueue(object):
79     def __init__(self):
80         self.size = _size
81         self.len = 0
82
83         # Prepare classes
84         self.classspec = _parse_classes(_classes)
85
86         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
87         
88         self.classmap = dict(
89             (proto, cls)
90             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
91             for proto in protos
92         )
93
94         self.priomap = [
95             prio
96             for cls in xrange(len(self.classspec))
97             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
98         ]
99         
100         self.sizemap = [
101             size * _size
102             for cls in xrange(len(self.classspec))
103             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
104         ]
105         
106         order = [ 
107             cls
108             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
109             for i in xrange(thoughput)
110         ]
111         self.order = [
112             filter(lambda x : self.priomap[x] == prio, order)
113             for prio in reversed(sorted(set(self.priomap)))
114         ]
115         for order in self.order:
116             random.shuffle(order)
117         
118         if None not in self.classmap:
119             raise RuntimeError, "No default class: a default class must be present"
120         
121         # add retries
122         self.queues.append(collections.deque())
123         self.priomap.append(-1)
124         self.sizemap.append(_size)
125         self.order.insert(0, [len(self.queues)-1])
126         
127         self.classes = set()
128         self.clear()
129     
130     def __nonzero__(self):
131         return self.len > 0
132     
133     def __len__(self):
134         return self.len
135
136     def clear(self):
137         self.classes.clear()
138         self.cycle = None
139         self.cyclelen = None
140         self.cycle_update = True
141         self.len = 0
142         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
143     
144     def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
145         if len(packet) >= 10:
146             proto = ord(packet[9])
147             rv = self.classmap.get(proto)
148             if rv is None:
149                 rv = self.classmap.get(None)
150         else:
151             proto = 0
152             rv = self.classmap.get(None)
153         return proto, rv, self.sizemap[rv]
154     
155     def get_packetdrop_p(self, qlen, qsize, packet):
156         pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
157         pdrop *= pdrop
158         return pdrop
159     
160     def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
161         proto,qi,size = self.queuefor(packet)
162         q = self.queues[qi]
163         lq = len(q)
164         if lq < size:
165             dropped = 0
166             if lq > (size/2) and _red:
167                 pdrop = self.get_packetdrop_p(lq, size, packet)
168                 if rng() < pdrop:
169                     dropped = 1
170             if not dropped:
171                 classes = self.classes
172                 if qi not in classes:
173                     classes.add(qi)
174                     self.cycle_update = True
175                 q.append(packet)
176                 self.len += 1
177         # packet dropped
178         else:
179             dropped = 1
180         if _logdropped:
181             if dropped:
182                 dstats[proto] += 1
183             else:
184                 astats[proto] += 1
185             self.dump_stats()
186
187     def appendleft(self, packet):
188         self.queues[-1].append(packet)
189         self.len += 1
190     
191     def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
192         return self.popleft(pop=pop)
193     
194     def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
195         queues = self.queues
196         classes = self.classes
197
198         if len(classes)==1:
199             # shortcut for non-tos traffic
200             rv = pop(queues[iter(classes).next()])
201             self.len -= 1
202             return rv
203
204         if self.cycle_update:
205             cycle = [
206                 filter(classes.__contains__, order)
207                 for order in self.order
208             ]
209             self.cycle = map(itertools.cycle, cycle)
210             self.cyclelen = map(len,cycle)
211             self.cycle_update = False
212         
213         for prio, (cycle, cyclelen) in enumerate(zip(self.cycle, self.cyclelen)):
214             cycle = cycle.next
215             for i in xrange(cyclelen):
216                 qi = cycle()
217                 if qi in classes:
218                     q = queues[qi]
219                     if q:
220                         rv = pop(q)
221                         self.len -= 1
222                         return rv
223                     else:
224                         # Needs to update the cycle
225                         classes.remove(qi)
226                         self.cycle_update = True
227         else:
228             raise IndexError, "pop from an empty queue"
229
230     def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
231         if dump_count[0] >= 10000:
232             try:
233                 dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
234                 astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
235                 fd = open('dropped_stats', 'w')
236                 iovec.writev(fd.fileno(), "Classes: ", _classes, "\nDropped:\n", dstatsstr, "Accepted:\n", astatsstr)
237                 fd.close()
238             except:
239                 # who cares
240                 pass
241             dump_count[0] = 0
242         else:
243             dump_count[0] += 1
244
245 queueclass = ClassQueue
246
247 def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
248     global _size, _classes, _logdropped
249     _size = int(size)
250     _classes = classes
251     _red = red
252     _logdropped = logdropped.lower() in ('true','1','on')
253     
254     if _logdropped:
255         # Truncate stats
256         open('dropped_stats', 'w').close()
257
258 _protomap = {
259     '3pc'       :       34,
260     'an'        :       107,
261     'ah'        :       51,
262     'argus'     :       13,
263     'aris'      :       104,
264     'ax25'      :       93,
265     'bbn-rcc-mon'       :       10,
266     'bna'       :       49,
267     'brsatmon'  :       76,
268     'cbt'       :       7,
269     'cftp'      :       62,
270     'chaos'     :       16,
271     'compaqpeer'        :       110,
272     'cphb'      :       73,
273     'cpnx'      :       72,
274     'crtp'      :       126,
275     'crudp'     :       127,
276     'dccp'      :       33,
277     'dcn-meas'  :       19,
278     'ddp'       :       37,
279     'ddx'       :       116,
280     'dgp'       :       86,
281     'egp'       :       8,
282     'eigrp'     :       88,
283     'emcon'     :       14,
284     'encap'     :       98,
285     'esp'       :       50,
286     'etherip'   :       97,
287     'fc'        :       133,
288     'fire'      :       125,
289     'ggp'       :       3,
290     'gmtp'      :       100,
291     'gre'       :       47,
292     'hip'       :       139,
293     'hmp'       :       20,
294     'hopopt'    :       0,
295     'iatp'      :       117,
296     'icmp'      :       1,
297     'idpr'      :       35,
298     'idprcmtp'  :       38,
299     'idrp'      :       45,
300     'ifmp'      :       101,
301     'igmp'      :       2,
302     'igp'       :       9,
303     'il'        :       40,
304     'inlsp'     :       52,
305     'ip'        :       4,
306     'ipcomp'    :       108,
307     'ipcv'      :       71,
308     'ipip'      :       94,
309     'iplt'      :       129,
310     'ippc'      :       67,
311     'iptm'      :       84,
312     'ipv6'      :       41,
313     'ipv6frag'  :       44,
314     'ipv6icmp'  :       58,
315     'ipv6nonxt' :       59,
316     'ipv6opts'  :       60,
317     'ipv6route' :       43,
318     'ipxinip'   :       111,
319     'irtp'      :       28,
320     'isoip'     :       80,
321     'isotp4'    :       29,
322     'kryptolan' :       65,
323     'l2tp'      :       115,
324     'larp'      :       91,
325     'leaf1'     :       25,
326     'leaf2'     :       26,
327     'manet'     :       138,
328     'meritinp'  :       32,
329     'mfensp'    :       31,
330     'mhrp'      :       48,
331     'micp'      :       95,
332     'mobile'    :       55,
333     'mtp'       :       92,
334     'mux'       :       18,
335     'narp'      :       54,
336     'netblt'    :       30,
337     'nsfnetigp' :       85,
338     'nvp'       :       11,
339     'ospf'      :       89,
340     'pgm'       :       113,
341     'pim'       :       103,
342     'pipe'      :       131,
343     'pnni'      :       102,
344     'prm'       :       21,
345     'ptp'       :       123,
346     'pup'       :       12,
347     'pvp'       :       75,
348     'qnx'       :       106,
349     'rdp'       :       27,
350     'rsvp'      :       46,
351     'rvd'       :       66,
352     'satexpak'  :       64,
353     'satmon'    :       69,
354     'sccsp'     :       96,
355     'scps'      :       105,
356     'sctp'      :       132,
357     'sdrp'      :       42,
358     'securevmtp'        :       82,
359     'shim6'     :       140,
360     'skip'      :       57,
361     'sm'        :       122,
362     'smp'       :       121,
363     'snp'       :       109,
364     'spriterpc' :       90,
365     'sps'       :       130,
366     'srp'       :       119,
367     'sscopmce'  :       128,
368     'st'        :       5,
369     'stp'       :       118,
370     'sunnd'     :       77,
371     'swipe'     :       53,
372     'tcf'       :       87,
373     'tcp'       :       6,
374     'tlsp'      :       56,
375     'tp'        :       39,
376     'trunk1'    :       23,
377     'trunk2'    :       24,
378     'ttp'       :       84,
379     'udp'       :       17,
380     'uti'       :       120,
381     'vines'     :       83,
382     'visa'      :       70,
383     'vmtp'      :       81,
384     'vrrp'      :       112,
385     'wbexpak'   :       79,
386     'wbmon'     :       78,
387     'wsn'       :       74,
388     'xnet'      :       15,
389     'xnsidp'    :       22,
390     'xtp'       :       36
391 }
392