0fcae273b02d4a96471ffd50f745c68cc8cc259c
[nepi.git] / src / nepi / testbeds / planetlab / scripts / classqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7
8 dstats = collections.defaultdict(int)
9 astats = collections.defaultdict(int)
10 dump_count = [0]
11
12 _red = True
13 _size = 1000
14 _classes = (
15     "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
16     "udp.st.nvp.rdp.ddp.pvp.mtp.srp.smp.136:"
17     "tcp.icmp*4:"
18     "ip.gre.etherip.l2tp:"
19     "hopopt.shim6.ipv6.ipv6route.ipv6frag.ipv6icmp.ipv6nonxt.ipv6opts*4:"
20     "crtp.crudp*8:"
21     "*3"
22 )
23
24 def clsmap(cls):
25     global _protomap
26     if cls in _protomap:
27         return _protomap[cls]
28     elif cls == "":
29         return None
30     else:
31         return int(cls)
32
33 def _parse_classes(classes):
34     """
35      Class list structure:
36        <CLASSLIST> ::= <CLASS> ":" CLASSLIST
37                     |  <CLASS>
38        <CLASS>     ::= <PROTOLIST> "*" <PRIORITYSPEC>
39                     |  <DFLTCLASS>
40        <DFLTCLASS> ::= "*" <PRIORITYSPEC>
41        <PROTOLIST> ::= <PROTO> "." <PROTOLIST>
42                     |  <PROTO>
43        <PROTO>     ::= <NAME> | <NUMBER>
44        <NAME>      ::= --see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers --
45                        --only in lowercase, with special characters removed--
46                        --or see below--
47        <NUMBER>    ::= [0-9]+
48        <PRIORITYSPEC> ::= <THOUGHPUT> [ "#" <SIZE> ] [ "p" <PRIORITY> ]
49        <THOUGHPUT> ::= NUMBER -- default 1
50        <PRIORITY>  ::= NUMBER -- default 0
51        <SIZE>      ::= NUMBER -- default 1
52     """
53     classes = map(lambda x:x.split('*',2),classes.split(':'))
54     priorex = re.compile(r"(?P<thoughput>\d+)?(?:#(?P<size>\d+))?(?:p(?P<priority>\d+))?")
55     for cls in classes:
56         if not cls:
57             cls.append("")
58         if len(cls) < 2:
59             cls.append("")
60         prio = priorex.match(cls[1])
61         if not prio:
62             prio = (1,0,1)
63         else:
64             prio = (
65                 int(prio.group("thoughput") or 1),
66                 int(prio.group("priority") or 0),
67                 int(prio.group("size") or 1),
68             )
69         cls[1] = prio
70         cls[0] = map(clsmap, cls[0].split('.'))
71         if not cls[0]:
72             cls[0] = [None]
73     
74     return classes
75     
76
77 class ClassQueue(object):
78     def __init__(self):
79         self.size = _size
80         self.len = 0
81
82         # Prepare classes
83         self.classspec = _parse_classes(_classes)
84
85         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
86         
87         self.classmap = dict(
88             (proto, cls)
89             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
90             for proto in protos
91         )
92
93         self.priomap = [
94             prio
95             for cls in xrange(len(self.classspec))
96             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
97         ]
98         
99         self.sizemap = [
100             size * _size
101             for cls in xrange(len(self.classspec))
102             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
103         ]
104         
105         order = [ 
106             cls
107             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
108             for i in xrange(thoughput)
109         ]
110         self.order = [
111             filter(lambda x : self.priomap[x] == prio, order)
112             for prio in reversed(sorted(set(self.priomap)))
113         ]
114         for order in self.order:
115             random.shuffle(order)
116         
117         if None not in self.classmap:
118             raise RuntimeError, "No default class: a default class must be present"
119         
120         # add retries
121         self.queues.append(collections.deque())
122         self.priomap.append(-1)
123         self.sizemap.append(_size)
124         self.order.insert(0, [len(self.queues)-1])
125         
126         self.classes = set()
127         self.clear()
128     
129     def __nonzero__(self):
130         return self.len > 0
131     
132     def __len__(self):
133         return self.len
134
135     def clear(self):
136         self.classes.clear()
137         self.cycle = None
138         self.cyclelen = None
139         self.cycle_update = True
140         self.len = 0
141         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
142     
143     def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
144         if len(packet) >= 10:
145             proto = ord(packet[9])
146             rv = self.classmap.get(proto)
147             if rv is None:
148                 rv = self.classmap.get(None)
149         else:
150             proto = 0
151             rv = self.classmap.get(None)
152         return proto, rv, self.sizemap[rv]
153     
154     def get_packetdrop_p(self, qlen, qsize, packet):
155         pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
156         pdrop *= pdrop
157         return pdrop
158     
159     def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
160         proto,qi,size = self.queuefor(packet)
161         q = self.queues[qi]
162         lq = len(q)
163         if lq < size:
164             dropped = 0
165             if lq > (size/2) and _red:
166                 pdrop = self.get_packetdrop_p(lq, size, packet)
167                 if rng() < pdrop:
168                     dropped = 1
169             if not dropped:
170                 classes = self.classes
171                 if qi not in classes:
172                     classes.add(qi)
173                     self.cycle_update = True
174                 q.append(packet)
175                 self.len += 1
176         # packet dropped
177         else:
178             dropped = 1
179         if _logdropped:
180             if dropped:
181                 dstats[proto] += 1
182             else:
183                 astats[proto] += 1
184             self.dump_stats()
185
186     def appendleft(self, packet):
187         self.queues[-1].append(packet)
188         self.len += 1
189     
190     def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
191         return self.popleft(pop=pop)
192     
193     def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
194         queues = self.queues
195         classes = self.classes
196
197         if len(classes)==1:
198             # shortcut for non-tos traffic
199             rv = pop(queues[iter(classes).next()])
200             self.len -= 1
201             return rv
202
203         if self.cycle_update:
204             cycle = [
205                 filter(classes.__contains__, order)
206                 for order in self.order
207             ]
208             self.cycle = map(itertools.cycle, cycle)
209             self.cyclelen = map(len,cycle)
210             self.cycle_update = False
211         
212         for prio, (cycle, cyclelen) in enumerate(zip(self.cycle, self.cyclelen)):
213             cycle = cycle.next
214             for i in xrange(cyclelen):
215                 qi = cycle()
216                 if qi in classes:
217                     q = queues[qi]
218                     if q:
219                         rv = pop(q)
220                         self.len -= 1
221                         return rv
222                     else:
223                         # Needs to update the cycle
224                         classes.remove(qi)
225                         self.cycle_update = True
226         else:
227             raise IndexError, "pop from an empty queue"
228
229     def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
230         if dump_count[0] >= 10000:
231             dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
232             astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
233             fd = open('dropped_stats', 'w')
234             iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
235             fd.close()
236             dump_count[0] = 0
237         else:
238             dump_count[0] += 1
239
240 queueclass = ClassQueue
241
242 def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
243     global _size, _classes, _logdropped
244     _size = int(size)
245     _classes = classes
246     _red = red
247     _logdropped = logdropped.lower() in ('true','1','on')
248     
249     if _logdropped:
250         # Truncate stats
251         open('dropped_stats', 'w').close()
252
253 _protomap = {
254     '3pc'       :       34,
255     'an'        :       107,
256     'ah'        :       51,
257     'argus'     :       13,
258     'aris'      :       104,
259     'ax25'      :       93,
260     'bbn-rcc-mon'       :       10,
261     'bna'       :       49,
262     'brsatmon'  :       76,
263     'cbt'       :       7,
264     'cftp'      :       62,
265     'chaos'     :       16,
266     'compaqpeer'        :       110,
267     'cphb'      :       73,
268     'cpnx'      :       72,
269     'crtp'      :       126,
270     'crudp'     :       127,
271     'dccp'      :       33,
272     'dcn-meas'  :       19,
273     'ddp'       :       37,
274     'ddx'       :       116,
275     'dgp'       :       86,
276     'egp'       :       8,
277     'eigrp'     :       88,
278     'emcon'     :       14,
279     'encap'     :       98,
280     'esp'       :       50,
281     'etherip'   :       97,
282     'fc'        :       133,
283     'fire'      :       125,
284     'ggp'       :       3,
285     'gmtp'      :       100,
286     'gre'       :       47,
287     'hip'       :       139,
288     'hmp'       :       20,
289     'hopopt'    :       0,
290     'iatp'      :       117,
291     'icmp'      :       1,
292     'idpr'      :       35,
293     'idprcmtp'  :       38,
294     'idrp'      :       45,
295     'ifmp'      :       101,
296     'igmp'      :       2,
297     'igp'       :       9,
298     'il'        :       40,
299     'inlsp'     :       52,
300     'ip'        :       4,
301     'ipcomp'    :       108,
302     'ipcv'      :       71,
303     'ipip'      :       94,
304     'iplt'      :       129,
305     'ippc'      :       67,
306     'iptm'      :       84,
307     'ipv6'      :       41,
308     'ipv6frag'  :       44,
309     'ipv6icmp'  :       58,
310     'ipv6nonxt' :       59,
311     'ipv6opts'  :       60,
312     'ipv6route' :       43,
313     'ipxinip'   :       111,
314     'irtp'      :       28,
315     'isoip'     :       80,
316     'isotp4'    :       29,
317     'kryptolan' :       65,
318     'l2tp'      :       115,
319     'larp'      :       91,
320     'leaf1'     :       25,
321     'leaf2'     :       26,
322     'manet'     :       138,
323     'meritinp'  :       32,
324     'mfensp'    :       31,
325     'mhrp'      :       48,
326     'micp'      :       95,
327     'mobile'    :       55,
328     'mtp'       :       92,
329     'mux'       :       18,
330     'narp'      :       54,
331     'netblt'    :       30,
332     'nsfnetigp' :       85,
333     'nvp'       :       11,
334     'ospf'      :       89,
335     'pgm'       :       113,
336     'pim'       :       103,
337     'pipe'      :       131,
338     'pnni'      :       102,
339     'prm'       :       21,
340     'ptp'       :       123,
341     'pup'       :       12,
342     'pvp'       :       75,
343     'qnx'       :       106,
344     'rdp'       :       27,
345     'rsvp'      :       46,
346     'rvd'       :       66,
347     'satexpak'  :       64,
348     'satmon'    :       69,
349     'sccsp'     :       96,
350     'scps'      :       105,
351     'sctp'      :       132,
352     'sdrp'      :       42,
353     'securevmtp'        :       82,
354     'shim6'     :       140,
355     'skip'      :       57,
356     'sm'        :       122,
357     'smp'       :       121,
358     'snp'       :       109,
359     'spriterpc' :       90,
360     'sps'       :       130,
361     'srp'       :       119,
362     'sscopmce'  :       128,
363     'st'        :       5,
364     'stp'       :       118,
365     'sunnd'     :       77,
366     'swipe'     :       53,
367     'tcf'       :       87,
368     'tcp'       :       6,
369     'tlsp'      :       56,
370     'tp'        :       39,
371     'trunk1'    :       23,
372     'trunk2'    :       24,
373     'ttp'       :       84,
374     'udp'       :       17,
375     'uti'       :       120,
376     'vines'     :       83,
377     'visa'      :       70,
378     'vmtp'      :       81,
379     'vrrp'      :       112,
380     'wbexpak'   :       79,
381     'wbmon'     :       78,
382     'wsn'       :       74,
383     'xnet'      :       15,
384     'xnsidp'    :       22,
385     'xtp'       :       36
386 }
387