Fix missing variable in classqueue
[nepi.git] / src / nepi / testbeds / planetlab / scripts / classqueue.py
1 import collections
2 import itertools
3 import random
4 import re
5 import sys
6 import iovec
7
8 dstats = collections.defaultdict(int)
9 astats = collections.defaultdict(int)
10 dump_count = [0]
11
12 _red = True
13 _size = 1000
14 _classes = (
15     "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
16     "udp.st.nvp.rdp.ddp.pvp.mtp.srp.smp.136:"
17     "tcp.icmp*4:"
18     "ip.gre.etherip.l2tp:"
19     "hopopt.shim6.ipv6.ipv6route.ipv6frag.ipv6icmp.ipv6nonxt.ipv6opts*4:"
20     "crtp.crudp*8:"
21     "*3"
22 )
23 _logdropped = False
24
25 def clsmap(cls):
26     global _protomap
27     if cls in _protomap:
28         return _protomap[cls]
29     elif cls == "":
30         return None
31     else:
32         return int(cls)
33
34 def _parse_classes(classes):
35     """
36      Class list structure:
37        <CLASSLIST> ::= <CLASS> ":" CLASSLIST
38                     |  <CLASS>
39        <CLASS>     ::= <PROTOLIST> "*" <PRIORITYSPEC>
40                     |  <DFLTCLASS>
41        <DFLTCLASS> ::= "*" <PRIORITYSPEC>
42        <PROTOLIST> ::= <PROTO> "." <PROTOLIST>
43                     |  <PROTO>
44        <PROTO>     ::= <NAME> | <NUMBER>
45        <NAME>      ::= --see http://en.wikipedia.org/wiki/List_of_IP_protocol_numbers --
46                        --only in lowercase, with special characters removed--
47                        --or see below--
48        <NUMBER>    ::= [0-9]+
49        <PRIORITYSPEC> ::= <THOUGHPUT> [ "#" <SIZE> ] [ "p" <PRIORITY> ]
50        <THOUGHPUT> ::= NUMBER -- default 1
51        <PRIORITY>  ::= NUMBER -- default 0
52        <SIZE>      ::= NUMBER -- default 1
53     """
54     classes = map(lambda x:x.split('*',2),classes.split(':'))
55     priorex = re.compile(r"(?P<thoughput>\d+)?(?:#(?P<size>\d+))?(?:p(?P<priority>\d+))?")
56     for cls in classes:
57         if not cls:
58             cls.append("")
59         if len(cls) < 2:
60             cls.append("")
61         prio = priorex.match(cls[1])
62         if not prio:
63             prio = (1,0,1)
64         else:
65             prio = (
66                 int(prio.group("thoughput") or 1),
67                 int(prio.group("priority") or 0),
68                 int(prio.group("size") or 1),
69             )
70         cls[1] = prio
71         cls[0] = map(clsmap, cls[0].split('.'))
72         if not cls[0]:
73             cls[0] = [None]
74     
75     return classes
76     
77
78 class ClassQueue(object):
79     def __init__(self):
80         self.size = _size
81         self.len = 0
82
83         # Prepare classes
84         self.classspec = _parse_classes(_classes)
85
86         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
87         
88         self.classmap = dict(
89             (proto, cls)
90             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
91             for proto in protos
92         )
93
94         self.priomap = [
95             prio
96             for cls in xrange(len(self.classspec))
97             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
98         ]
99         
100         self.sizemap = [
101             size * _size
102             for cls in xrange(len(self.classspec))
103             for protos, (thoughput, prio, size) in ( self.classspec[cls], )
104         ]
105         
106         order = [ 
107             cls
108             for cls, (protos, (thoughput, prio, size)) in enumerate(self.classspec)
109             for i in xrange(thoughput)
110         ]
111         self.order = [
112             filter(lambda x : self.priomap[x] == prio, order)
113             for prio in reversed(sorted(set(self.priomap)))
114         ]
115         for order in self.order:
116             random.shuffle(order)
117         
118         if None not in self.classmap:
119             raise RuntimeError, "No default class: a default class must be present"
120         
121         # add retries
122         self.queues.append(collections.deque())
123         self.priomap.append(-1)
124         self.sizemap.append(_size)
125         self.order.insert(0, [len(self.queues)-1])
126         
127         self.classes = set()
128         self.clear()
129     
130     def __nonzero__(self):
131         return self.len > 0
132     
133     def __len__(self):
134         return self.len
135
136     def clear(self):
137         self.classes.clear()
138         self.cycle = None
139         self.cyclelen = None
140         self.cycle_update = True
141         self.len = 0
142         self.queues = [ collections.deque() for cls in xrange(len(self.classspec)) ]
143     
144     def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
145         if len(packet) >= 10:
146             proto = ord(packet[9])
147             rv = self.classmap.get(proto)
148             if rv is None:
149                 rv = self.classmap.get(None)
150         else:
151             proto = 0
152             rv = self.classmap.get(None)
153         return proto, rv, self.sizemap[rv]
154     
155     def get_packetdrop_p(self, qlen, qsize, packet):
156         pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
157         pdrop *= pdrop
158         return pdrop
159     
160     def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
161         proto,qi,size = self.queuefor(packet)
162         q = self.queues[qi]
163         lq = len(q)
164         if lq < size:
165             dropped = 0
166             if lq > (size/2) and _red:
167                 pdrop = self.get_packetdrop_p(lq, size, packet)
168                 if rng() < pdrop:
169                     dropped = 1
170             if not dropped:
171                 classes = self.classes
172                 if qi not in classes:
173                     classes.add(qi)
174                     self.cycle_update = True
175                 q.append(packet)
176                 self.len += 1
177         # packet dropped
178         else:
179             dropped = 1
180         if _logdropped:
181             if dropped:
182                 dstats[proto] += 1
183             else:
184                 astats[proto] += 1
185             self.dump_stats()
186
187     def appendleft(self, packet):
188         self.queues[-1].append(packet)
189         self.len += 1
190     
191     def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
192         return self.popleft(pop=pop)
193     
194     def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
195         queues = self.queues
196         classes = self.classes
197
198         if len(classes)==1:
199             # shortcut for non-tos traffic
200             rv = pop(queues[iter(classes).next()])
201             self.len -= 1
202             return rv
203
204         if self.cycle_update:
205             cycle = [
206                 filter(classes.__contains__, order)
207                 for order in self.order
208             ]
209             self.cycle = map(itertools.cycle, cycle)
210             self.cyclelen = map(len,cycle)
211             self.cycle_update = False
212         
213         for prio, (cycle, cyclelen) in enumerate(zip(self.cycle, self.cyclelen)):
214             cycle = cycle.next
215             for i in xrange(cyclelen):
216                 qi = cycle()
217                 if qi in classes:
218                     q = queues[qi]
219                     if q:
220                         rv = pop(q)
221                         self.len -= 1
222                         return rv
223                     else:
224                         # Needs to update the cycle
225                         classes.remove(qi)
226                         self.cycle_update = True
227         else:
228             raise IndexError, "pop from an empty queue"
229
230     def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
231         if dump_count[0] >= 10000:
232             dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
233             astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
234             fd = open('dropped_stats', 'w')
235             iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
236             fd.close()
237             dump_count[0] = 0
238         else:
239             dump_count[0] += 1
240
241 queueclass = ClassQueue
242
243 def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
244     global _size, _classes, _logdropped
245     _size = int(size)
246     _classes = classes
247     _red = red
248     _logdropped = logdropped.lower() in ('true','1','on')
249     
250     if _logdropped:
251         # Truncate stats
252         open('dropped_stats', 'w').close()
253
254 _protomap = {
255     '3pc'       :       34,
256     'an'        :       107,
257     'ah'        :       51,
258     'argus'     :       13,
259     'aris'      :       104,
260     'ax25'      :       93,
261     'bbn-rcc-mon'       :       10,
262     'bna'       :       49,
263     'brsatmon'  :       76,
264     'cbt'       :       7,
265     'cftp'      :       62,
266     'chaos'     :       16,
267     'compaqpeer'        :       110,
268     'cphb'      :       73,
269     'cpnx'      :       72,
270     'crtp'      :       126,
271     'crudp'     :       127,
272     'dccp'      :       33,
273     'dcn-meas'  :       19,
274     'ddp'       :       37,
275     'ddx'       :       116,
276     'dgp'       :       86,
277     'egp'       :       8,
278     'eigrp'     :       88,
279     'emcon'     :       14,
280     'encap'     :       98,
281     'esp'       :       50,
282     'etherip'   :       97,
283     'fc'        :       133,
284     'fire'      :       125,
285     'ggp'       :       3,
286     'gmtp'      :       100,
287     'gre'       :       47,
288     'hip'       :       139,
289     'hmp'       :       20,
290     'hopopt'    :       0,
291     'iatp'      :       117,
292     'icmp'      :       1,
293     'idpr'      :       35,
294     'idprcmtp'  :       38,
295     'idrp'      :       45,
296     'ifmp'      :       101,
297     'igmp'      :       2,
298     'igp'       :       9,
299     'il'        :       40,
300     'inlsp'     :       52,
301     'ip'        :       4,
302     'ipcomp'    :       108,
303     'ipcv'      :       71,
304     'ipip'      :       94,
305     'iplt'      :       129,
306     'ippc'      :       67,
307     'iptm'      :       84,
308     'ipv6'      :       41,
309     'ipv6frag'  :       44,
310     'ipv6icmp'  :       58,
311     'ipv6nonxt' :       59,
312     'ipv6opts'  :       60,
313     'ipv6route' :       43,
314     'ipxinip'   :       111,
315     'irtp'      :       28,
316     'isoip'     :       80,
317     'isotp4'    :       29,
318     'kryptolan' :       65,
319     'l2tp'      :       115,
320     'larp'      :       91,
321     'leaf1'     :       25,
322     'leaf2'     :       26,
323     'manet'     :       138,
324     'meritinp'  :       32,
325     'mfensp'    :       31,
326     'mhrp'      :       48,
327     'micp'      :       95,
328     'mobile'    :       55,
329     'mtp'       :       92,
330     'mux'       :       18,
331     'narp'      :       54,
332     'netblt'    :       30,
333     'nsfnetigp' :       85,
334     'nvp'       :       11,
335     'ospf'      :       89,
336     'pgm'       :       113,
337     'pim'       :       103,
338     'pipe'      :       131,
339     'pnni'      :       102,
340     'prm'       :       21,
341     'ptp'       :       123,
342     'pup'       :       12,
343     'pvp'       :       75,
344     'qnx'       :       106,
345     'rdp'       :       27,
346     'rsvp'      :       46,
347     'rvd'       :       66,
348     'satexpak'  :       64,
349     'satmon'    :       69,
350     'sccsp'     :       96,
351     'scps'      :       105,
352     'sctp'      :       132,
353     'sdrp'      :       42,
354     'securevmtp'        :       82,
355     'shim6'     :       140,
356     'skip'      :       57,
357     'sm'        :       122,
358     'smp'       :       121,
359     'snp'       :       109,
360     'spriterpc' :       90,
361     'sps'       :       130,
362     'srp'       :       119,
363     'sscopmce'  :       128,
364     'st'        :       5,
365     'stp'       :       118,
366     'sunnd'     :       77,
367     'swipe'     :       53,
368     'tcf'       :       87,
369     'tcp'       :       6,
370     'tlsp'      :       56,
371     'tp'        :       39,
372     'trunk1'    :       23,
373     'trunk2'    :       24,
374     'ttp'       :       84,
375     'udp'       :       17,
376     'uti'       :       120,
377     'vines'     :       83,
378     'visa'      :       70,
379     'vmtp'      :       81,
380     'vrrp'      :       112,
381     'wbexpak'   :       79,
382     'wbmon'     :       78,
383     'wsn'       :       74,
384     'xnet'      :       15,
385     'xnsidp'    :       22,
386     'xtp'       :       36
387 }
388