Setting tag nodemanager-2.0-38
[nodemanager.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
8 #
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
11 #
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
16 #
17
18 import os
19 import sys
20 import time
21 import pickle
22 import socket
23 import copy
24 import threading
25
26 import logger
27 import tools
28 import bwlimit
29 import database
30 from config import Config
31
32 priority = 20
33
34 # Defaults
35 # Set DEBUG to True if you don't want to send emails
36 DEBUG = False
37 # Set ENABLE to False to setup buckets, but not limit.
38 ENABLE = True
39
40 DB_FILE = "/var/lib/nodemanager/bwmon.pickle"
41
42 # Constants
43 seconds_per_day = 24 * 60 * 60
44 bits_per_byte = 8
45
46 dev_default = tools.get_default_if()
47 # Burst to line rate (or node cap).  Set by NM. in KBit/s
48 default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
49 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
50 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
51 # 5.4 Gbyte per day max allowed transfered per recording period
52 # 5.4 Gbytes per day is aprox 512k/s for 24hrs (approx because original math was wrong
53 # but its better to keep a higher byte total and keep people happy than correct
54 # the problem and piss people off.
55 # default_MaxKByte = 5662310
56
57 # -- 6/1/09
58 # llp wants to double these, so we use the following
59 # 1mbit * 24hrs * 60mins * 60secs = bits/day
60 # 1000000 * 24 * 60 * 60 / (1024 * 8)
61 default_MaxKByte = 10546875
62
63 # 16.4 Gbyte per day max allowed transfered per recording period to I2
64 # default_Maxi2KByte = 17196646
65
66 # -- 6/1/09
67 # 3Mb/s for 24hrs a day (30.17 gigs)
68 default_Maxi2KByte = 31640625
69
70 # Default share quanta
71 default_Share = 1
72
73 # Average over 1 day
74 period = 1 * seconds_per_day
75
76 # Message template
77 template = \
78 """
79 The slice %(slice)s has transmitted more than %(bytes)s from
80 %(hostname)s to %(class)s destinations
81 since %(since)s.
82
83 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
84 until %(until)s.
85
86 Please reduce the average %(class)s transmission rate
87 of the slice to %(limit)s per %(period)s.
88
89 """.lstrip()
90
91 footer = \
92 """
93 %(date)s %(hostname)s bwcap %(slice)s
94 """.lstrip()
95
96 def format_bytes(bytes, si = True):
97     """
98     Formats bytes into a string
99     """
100     if si:
101         kilo = 1000.
102     else:
103         # Officially, a kibibyte
104         kilo = 1024.
105
106     if bytes >= (kilo * kilo * kilo):
107         return "%.1f GB" % (bytes / (kilo * kilo * kilo))
108     elif bytes >= 1000000:
109         return "%.1f MB" % (bytes / (kilo * kilo))
110     elif bytes >= 1000:
111         return "%.1f KB" % (bytes / kilo)
112     else:
113         return "%.0f bytes" % bytes
114
115 def format_period(seconds):
116     """
117     Formats a period in seconds into a string
118     """
119
120     if seconds == (24 * 60 * 60):
121         return "day"
122     elif seconds == (60 * 60):
123         return "hour"
124     elif seconds > (24 * 60 * 60):
125         return "%.1f days" % (seconds / 24. / 60. / 60.)
126     elif seconds > (60 * 60):
127         return "%.1f hours" % (seconds / 60. / 60.)
128     elif seconds > (60):
129         return "%.1f minutes" % (seconds / 60.)
130     else:
131         return "%.0f seconds" % seconds
132
133 def slicemail(slice, subject, body):
134     '''
135     Front end to sendmail.  Sends email to slice alias with given subject and body.
136     '''
137     config = Config()
138     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % config.PLC_MAIL_SUPPORT_ADDRESS, "w")
139
140     # Parsed from MyPLC config
141     to = [config.PLC_MAIL_MOM_LIST_ADDRESS]
142
143     if slice is not None and slice != "root":
144         to.append(config.PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
145
146     header = {'from': "%s Support <%s>" % (config.PLC_NAME,
147                                            config.PLC_MAIL_SUPPORT_ADDRESS),
148               'to': ", ".join(to),
149               'version': sys.version.split(" ")[0],
150               'subject': subject}
151
152     # Write headers
153     sendmail.write(
154 """
155 Content-type: text/plain
156 From: %(from)s
157 Reply-To: %(from)s
158 To: %(to)s
159 X-Mailer: Python/%(version)s
160 Subject: %(subject)s
161
162 """.lstrip() % header)
163
164     # Write body
165     sendmail.write(body)
166     # Done
167     sendmail.close()
168
169
170 class Slice:
171     """
172     Stores the last recorded bandwidth parameters of a slice.
173
174     xid - slice context/VServer ID
175     name - slice name
176     time - beginning of recording period in UNIX seconds
177     bytes - low bandwidth bytes transmitted at the beginning of the recording period
178     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
179     MaxKByte - total volume of data allowed
180     ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
181     Maxi2KByte - same as MaxKByte, but for i2
182     Threshi2Kbyte - same as Threshi2KByte, but for i2
183     MaxRate - max_rate slice attribute.
184     Maxi2Rate - max_exempt_rate slice attribute.
185     Share - Used by Sirius to loan min rates
186     Sharei2 - Used by Sirius to loan min rates for i2
187     self.emailed - did slice recv email during this recording period
188
189     """
190
191     def __init__(self, xid, name, rspec):
192         self.xid = xid
193         self.name = name
194         self.time = 0
195         self.bytes = 0
196         self.i2bytes = 0
197         self.MaxRate = default_MaxRate
198         self.MinRate = bwlimit.bwmin / 1000
199         self.Maxi2Rate = default_Maxi2Rate
200         self.Mini2Rate = bwlimit.bwmin / 1000
201         self.MaxKByte = default_MaxKByte
202         self.ThreshKByte = int(.8 * self.MaxKByte)
203         self.Maxi2KByte = default_Maxi2KByte
204         self.Threshi2KByte = int(.8 * self.Maxi2KByte)
205         self.Share = default_Share
206         self.Sharei2 = default_Share
207         self.emailed = False
208         self.capped = False
209
210         self.updateSliceTags(rspec)
211         bwlimit.set(xid = self.xid, dev = dev_default,
212                 minrate = self.MinRate * 1000,
213                 maxrate = self.MaxRate * 1000,
214                 maxexemptrate = self.Maxi2Rate * 1000,
215                 minexemptrate = self.Mini2Rate * 1000,
216                 share = self.Share)
217
218     def __repr__(self):
219         return self.name
220
221     def updateSliceTags(self, rspec):
222         '''
223         Use respects from GetSlivers to PLC to populate slice object.  Also
224         do some sanity checking.
225         '''
226
227         # Sanity check plus policy decision for MinRate:
228         # Minrate cant be greater than 25% of MaxRate or NodeCap.
229         MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
230         if MinRate > int(.25 * default_MaxRate):
231             MinRate = int(.25 * default_MaxRate)
232         if MinRate != self.MinRate:
233             self.MinRate = MinRate
234             logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
235
236         MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
237         if MaxRate != self.MaxRate:
238             self.MaxRate = MaxRate
239             logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
240
241         Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
242         if Mini2Rate != self.Mini2Rate:
243             self.Mini2Rate = Mini2Rate
244             logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
245
246         Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
247         if Maxi2Rate != self.Maxi2Rate:
248             self.Maxi2Rate = Maxi2Rate
249             logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
250
251         MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
252         if MaxKByte != self.MaxKByte:
253             self.MaxKByte = MaxKByte
254             logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
255
256         Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
257         if Maxi2KByte != self.Maxi2KByte:
258             self.Maxi2KByte = Maxi2KByte
259             logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
260
261         ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
262         if ThreshKByte != self.ThreshKByte:
263             self.ThreshKByte = ThreshKByte
264             logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
265
266         Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
267         if Threshi2KByte != self.Threshi2KByte:
268             self.Threshi2KByte = Threshi2KByte
269             logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
270
271         Share = int(rspec.get('net_share', default_Share))
272         if Share != self.Share:
273             self.Share = Share
274             logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
275
276         Sharei2 = int(rspec.get('net_i2_share', default_Share))
277         if Sharei2 != self.Sharei2:
278             self.Sharei2 = Sharei2
279             logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
280
281
282     def reset(self, runningrates, rspec):
283         """
284         Begin a new recording period. Remove caps by restoring limits
285         to their default values.
286         """
287         # Cache share for later comparison
288         self.Share = runningrates.get('share', 1)
289
290         # Query Node Manager for max rate overrides
291         self.updateSliceTags(rspec)
292
293         # Reset baseline time
294         self.time = time.time()
295
296         # Reset baseline byte coutns
297         self.bytes = runningrates.get('usedbytes', 0)
298         self.i2bytes = runningrates.get('usedi2bytes', 0)
299
300         # Reset email
301         self.emailed = False
302         # Reset flag
303         self.capped = False
304         # Reset rates.
305         maxrate = self.MaxRate * 1000
306         minrate = self.MinRate * 1000
307         maxi2rate = self.Maxi2Rate * 1000
308         mini2rate = self.Mini2Rate * 1000
309
310         if (maxrate != runningrates.get('maxrate', 0)) or \
311          (minrate != runningrates.get('maxrate', 0)) or \
312          (maxi2rate != runningrates.get('maxexemptrate', 0)) or \
313          (mini2rate != runningrates.get('minexemptrate', 0)) or \
314          (self.Share != runningrates.get('share', 0)):
315             logger.log("bwmon: %s reset to %s/%s" % \
316                            (self.name,
317                             bwlimit.format_tc_rate(maxrate),
318                             bwlimit.format_tc_rate(maxi2rate)))
319             bwlimit.set(xid = self.xid, dev = dev_default,
320                 minrate = self.MinRate * 1000,
321                 maxrate = self.MaxRate * 1000,
322                 maxexemptrate = self.Maxi2Rate * 1000,
323                 minexemptrate = self.Mini2Rate * 1000,
324                 share = self.Share)
325
326     def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
327         """
328         Notify the slice it's being capped.
329         """
330          # Prepare message parameters from the template
331         message = ""
332         params = {'slice': self.name, 'hostname': socket.gethostname(),
333                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
334                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
335                   'date': time.asctime(time.gmtime()) + " GMT",
336                   'period': format_period(period)}
337
338         if new_maxrate != (self.MaxRate * 1000):
339             # Format template parameters for low bandwidth message
340             params['class'] = "low bandwidth"
341             params['bytes'] = format_bytes(usedbytes - self.bytes)
342             params['limit'] = format_bytes(self.MaxKByte * 1024)
343             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
344
345             # Cap low bandwidth burst rate
346             message += template % params
347             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
348
349         if new_maxexemptrate != (self.Maxi2Rate * 1000):
350             # Format template parameters for high bandwidth message
351             params['class'] = "high bandwidth"
352             params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
353             params['limit'] = format_bytes(self.Maxi2KByte * 1024)
354             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
355
356             message += template % params
357             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
358
359         # Notify slice
360         if self.emailed == False:
361             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
362             if DEBUG:
363                 logger.log("bwmon: "+ subject)
364                 logger.log("bwmon: "+ message + (footer % params))
365             else:
366                 self.emailed = True
367                 logger.log("bwmon: Emailing %s" % self.name)
368                 slicemail(self.name, subject, message + (footer % params))
369
370
371     def update(self, runningrates, rspec):
372         """
373         Update byte counts and check if byte thresholds have been
374         exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
375         Recalculate every time module runs.
376         """
377         # cache share for later comparison
378         runningrates['share'] = self.Share
379
380         # Query Node Manager for max rate overrides
381         self.updateSliceTags(rspec)
382
383         usedbytes = runningrates['usedbytes']
384         usedi2bytes = runningrates['usedi2bytes']
385
386         # Check limits.
387         if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
388             sum = self.bytes + (self.ThreshKByte * 1024)
389             maxbyte = self.MaxKByte * 1024
390             bytesused = usedbytes - self.bytes
391             timeused = int(time.time() - self.time)
392             # Calcuate new rate. in bit/s
393             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
394             # Never go under MinRate
395             if new_maxrate < (self.MinRate * 1000):
396                 new_maxrate = self.MinRate * 1000
397             # State information.  I'm capped.
398             self.capped += True
399         else:
400             # Sanity Check
401             new_maxrate = self.MaxRate * 1000
402             self.capped += False
403
404         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
405             maxi2byte = self.Maxi2KByte * 1024
406             i2bytesused = usedi2bytes - self.i2bytes
407             timeused = int(time.time() - self.time)
408             # Calcuate New Rate.
409             new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
410             # Never go under MinRate
411             if new_maxi2rate < (self.Mini2Rate * 1000):
412                 new_maxi2rate = self.Mini2Rate * 1000
413             # State information.  I'm capped.
414             self.capped += True
415         else:
416             # Sanity
417             new_maxi2rate = self.Maxi2Rate * 1000
418             self.capped += False
419
420         # Check running values against newly calculated values so as not to run tc
421         # unnecessarily
422         if (runningrates['maxrate'] != new_maxrate) or \
423         (runningrates['minrate'] != self.MinRate * 1000) or \
424         (runningrates['maxexemptrate'] != new_maxi2rate) or \
425         (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \
426         (runningrates['share'] != self.Share):
427             # Apply parameters
428             bwlimit.set(xid = self.xid, dev = dev_default,
429                 minrate = self.MinRate * 1000,
430                 maxrate = new_maxrate,
431                 minexemptrate = self.Mini2Rate * 1000,
432                 maxexemptrate = new_maxi2rate,
433                 share = self.Share)
434
435         # Notify slice
436         if self.capped == True:
437             self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
438
439
440 def gethtbs(root_xid, default_xid):
441     """
442     Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
443     Turn off HTBs without names.
444     """
445     livehtbs = {}
446     for params in bwlimit.get(dev = dev_default):
447         (xid, share,
448          minrate, maxrate,
449          minexemptrate, maxexemptrate,
450          usedbytes, usedi2bytes) = params
451
452         name = bwlimit.get_slice(xid)
453
454         if (name is None) \
455         and (xid != root_xid) \
456         and (xid != default_xid):
457             # Orphaned (not associated with a slice) class
458             name = "%d?" % xid
459             logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
460             bwlimit.off(xid, dev = dev_default)
461
462         livehtbs[xid] = {'share': share,
463             'minrate': minrate,
464             'maxrate': maxrate,
465             'maxexemptrate': maxexemptrate,
466             'minexemptrate': minexemptrate,
467             'usedbytes': usedbytes,
468             'name': name,
469             'usedi2bytes': usedi2bytes}
470
471     return livehtbs
472
473 def sync(nmdbcopy):
474     """
475     Syncs tc, db, and bwmon.pickle.
476     Then, starts new slices, kills old ones, and updates byte accounts for each running slice.
477     Sends emails and caps those that went over their limit.
478     """
479     # Defaults
480     global DB_FILE, \
481         period, \
482         default_MaxRate, \
483         default_Maxi2Rate, \
484         default_MaxKByte,\
485         default_Maxi2KByte,\
486         default_Share, \
487         dev_default
488
489     # All slices
490     names = []
491     # In case the limits have changed.
492     default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
493     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
494
495     # Incase default isn't set yet.
496     if default_MaxRate == -1:
497         default_MaxRate = 1000000
498
499     # xxx $Id$ 
500     # with svn we used to have a trick to detect upgrades of this file
501     # this has gone with the move to git, without any noticeable effect on operations though
502     try:
503         f = open(DB_FILE, "r+")
504         logger.verbose("bwmon: Loading %s" % DB_FILE)
505         (version, slices, deaddb) = pickle.load(f)
506         f.close()
507         # Check version of data file
508         if version != "$Id$":
509             logger.log("bwmon: Not using old version '%s' data file %s" % (version, DB_FILE))
510             raise Exception
511     except Exception:
512         version = "$Id$"
513         slices = {}
514         deaddb = {}
515
516     # Get/set special slice IDs
517     root_xid = bwlimit.get_xid("root")
518     default_xid = bwlimit.get_xid("default")
519
520     # Since root is required for sanity, its not in the API/plc database, so pass {}
521     # to use defaults.
522     if root_xid not in slices.keys():
523         slices[root_xid] = Slice(root_xid, "root", {})
524         slices[root_xid].reset({}, {})
525
526     # Used by bwlimit.  pass {} since there is no rspec (like above).
527     if default_xid not in slices.keys():
528         slices[default_xid] = Slice(default_xid, "default", {})
529         slices[default_xid].reset({}, {})
530
531     live = {}
532     # Get running slivers that should be on this node (from plc). {xid: name}
533     # db keys on name, bwmon keys on xid.  db doesnt have xid either.
534     for plcSliver in nmdbcopy.keys():
535         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
536
537     logger.verbose("bwmon: Found %s instantiated slices" % live.keys().__len__())
538     logger.verbose("bwmon: Found %s slices in dat file" % slices.values().__len__())
539
540     # Get actual running values from tc.
541     # Update slice totals and bandwidth. {xid: {values}}
542     kernelhtbs = gethtbs(root_xid, default_xid)
543     logger.verbose("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__())
544
545     # The dat file has HTBs for slices, but the HTBs aren't running
546     nohtbslices =  set(slices.keys()) - set(kernelhtbs.keys())
547     logger.verbose( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__())
548     # Reset tc counts.
549     for nohtbslice in nohtbslices:
550         if live.has_key(nohtbslice):
551             slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] )
552         else:
553             logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
554             del slices[nohtbslice]
555
556     # The dat file doesnt have HTB for the slice but kern has HTB
557     slicesnodat = set(kernelhtbs.keys()) - set(slices.keys())
558     logger.verbose( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__())
559     for slicenodat in slicesnodat:
560         # But slice is running
561         if live.has_key(slicenodat):
562             # init the slice.  which means start accounting over since kernel
563             # htb was already there.
564             slices[slicenodat] = Slice(slicenodat,
565                 live[slicenodat]['name'],
566                 live[slicenodat]['_rspec'])
567
568     # Get new slices.
569     # Slices in GetSlivers but not running HTBs
570     newslicesxids = set(live.keys()) - set(kernelhtbs.keys())
571     logger.verbose("bwmon: Found %s new slices" % newslicesxids.__len__())
572
573     # Setup new slices
574     for newslice in newslicesxids:
575         # Delegated slices dont have xids (which are uids) since they haven't been
576         # instantiated yet.
577         if newslice != None and live[newslice].has_key('_rspec') == True:
578             # Check to see if we recently deleted this slice.
579             if live[newslice]['name'] not in deaddb.keys():
580                 logger.log( "bwmon: new slice %s" % live[newslice]['name'] )
581                 # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
582                 # and made a dict of computed values.
583                 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
584                 slices[newslice].reset( {}, live[newslice]['_rspec'] )
585             # Double check time for dead slice in deaddb is within 24hr recording period.
586             elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
587                 deadslice = deaddb[live[newslice]['name']]
588                 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
589                 slices[newslice] = deadslice['slice']
590                 slices[newslice].xid = newslice
591                 # Start the HTB
592                 newvals = {"maxrate": deadslice['slice'].MaxRate * 1000,
593                             "minrate": deadslice['slice'].MinRate * 1000,
594                             "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000,
595                             "usedbytes": deadslice['htb']['usedbytes'] * 1000,
596                             "usedi2bytes": deadslice['htb']['usedi2bytes'],
597                             "share":deadslice['htb']['share']}
598                 slices[newslice].reset(newvals, live[newslice]['_rspec'])
599                 # Bring up to date
600                 slices[newslice].update(newvals, live[newslice]['_rspec'])
601                 # Since the slice has been reinitialed, remove from dead database.
602                 del deaddb[deadslice['slice'].name]
603                 del newvals
604         else:
605             logger.log("bwmon: Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
606
607     # Move dead slices that exist in the pickle file, but
608     # aren't instantiated by PLC into the dead dict until
609     # recording period is over.  This is to avoid the case where a slice is dynamically created
610     # and destroyed then recreated to get around byte limits.
611     deadxids = set(slices.keys()) - set(live.keys())
612     logger.verbose("bwmon: Found %s dead slices" % (deadxids.__len__() - 2))
613     for deadxid in deadxids:
614         if deadxid == root_xid or deadxid == default_xid:
615             continue
616         logger.log("bwmon: removing dead slice %s " % deadxid)
617         if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
618             # add slice (by name) to deaddb
619             logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
620             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
621             del slices[deadxid]
622         if kernelhtbs.has_key(deadxid):
623             logger.verbose("bwmon: Removing HTB for %s." % deadxid)
624             bwlimit.off(deadxid, dev = dev_default)
625
626     # Clean up deaddb
627     for deadslice in deaddb.keys():
628         if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
629             logger.log("bwmon: Removing dead slice %s from dat." \
630                         % deaddb[deadslice]['slice'].name)
631             del deaddb[deadslice]
632
633     # Get actual running values from tc since we've added and removed buckets.
634     # Update slice totals and bandwidth. {xid: {values}}
635     kernelhtbs = gethtbs(root_xid, default_xid)
636     logger.verbose("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__())
637
638     # Update all byte limites on all slices
639     for (xid, slice) in slices.iteritems():
640         # Monitor only the specified slices
641         if xid == root_xid or xid == default_xid: continue
642         if names and name not in names:
643             continue
644
645         if (time.time() >= (slice.time + period)) or \
646             (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
647             (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
648             # Reset to defaults every 24 hours or if it appears
649             # that the byte counters have overflowed (or, more
650             # likely, the node was restarted or the HTB buckets
651             # were re-initialized).
652             slice.reset(kernelhtbs[xid], live[xid]['_rspec'])
653         elif ENABLE:
654             logger.verbose("bwmon: Updating slice %s" % slice.name)
655             # Update byte counts
656             slice.update(kernelhtbs[xid], live[xid]['_rspec'])
657
658     logger.verbose("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),DB_FILE))
659     f = open(DB_FILE, "w")
660     pickle.dump((version, slices, deaddb), f)
661     f.close()
662
663 # doesnt use generic default interface because this runs as its own thread.
664 # changing the config variable will not have an effect since GetSlivers: pass
665 def getDefaults(nmdbcopy):
666     '''
667     Get defaults from default slice's slice attributes.
668     '''
669     status = True
670     # default slice
671     dfltslice = nmdbcopy.get(Config().PLC_SLICE_PREFIX+"_default")
672     if dfltslice:
673         if dfltslice['rspec']['net_max_rate'] == -1:
674             allOff()
675             status = False
676     return status
677
678
679 def allOff():
680     """
681     Turn off all slice HTBs
682     """
683     # Get/set special slice IDs
684     root_xid = bwlimit.get_xid("root")
685     default_xid = bwlimit.get_xid("default")
686     kernelhtbs = gethtbs(root_xid, default_xid)
687     if len(kernelhtbs):
688         logger.log("bwmon: Disabling all running HTBs.")
689         for htb in kernelhtbs.keys(): bwlimit.off(htb, dev = dev_default)
690
691
692 lock = threading.Event()
693 def run():
694     """
695     When run as a thread, wait for event, lock db, deep copy it, release it,
696     run bwmon.GetSlivers(), then go back to waiting.
697     """
698     logger.verbose("bwmon: Thread started")
699     while True:
700         lock.wait()
701         logger.verbose("bwmon: Event received.  Running.")
702         database.db_lock.acquire()
703         nmdbcopy = copy.deepcopy(database.db)
704         database.db_lock.release()
705         try:
706             if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0:
707                 # class show to check if net:InitNodeLimit:bwlimit.init has run.
708                 sync(nmdbcopy)
709             else: logger.log("bwmon: BW limits DISABLED.")
710         except: logger.log_exc("bwmon failed")
711         lock.clear()
712
713 def start(*args):
714     tools.as_daemon_thread(run)
715
716 def GetSlivers(*args):
717     logger.verbose ("bwmon: triggering dummy GetSlivers")
718     pass