svn kwds
[nodemanager.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
8 #
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
11 #
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
16 #
17
18 import os
19 import sys
20 import time
21 import pickle
22 import socket
23 import copy
24 import threading
25
26 import logger
27 import tools
28 import bwlimit
29 import database
30
31 priority = 20
32
33 # Defaults
34 # Set DEBUG to True if you don't want to send emails
35 DEBUG = False
36 # Set ENABLE to False to setup buckets, but not limit.
37 ENABLE = True
38
39 DB_FILE = "/var/lib/nodemanager/bwmon.pickle"
40
41 try:
42     sys.path.append("/etc/planetlab")
43     from plc_config import *
44 except:
45     DEBUG = True
46     logger.verbose("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found")
47     logger.log("bwmon: Running in DEBUG mode.  Logging to file and not emailing.")
48
49 # Constants
50 seconds_per_day = 24 * 60 * 60
51 bits_per_byte = 8
52
53 dev_default = tools.get_default_if()
54 # Burst to line rate (or node cap).  Set by NM. in KBit/s
55 default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
56 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
57 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
58 # 5.4 Gbyte per day max allowed transfered per recording period
59 # 5.4 Gbytes per day is aprox 512k/s for 24hrs (approx because original math was wrong
60 # but its better to keep a higher byte total and keep people happy than correct
61 # the problem and piss people off.
62 # default_MaxKByte = 5662310
63
64 # -- 6/1/09
65 # llp wants to double these, so we use the following
66 # 1mbit * 24hrs * 60mins * 60secs = bits/day
67 # 1000000 * 24 * 60 * 60 / (1024 * 8)
68 default_MaxKByte = 10546875
69
70 # 16.4 Gbyte per day max allowed transfered per recording period to I2
71 # default_Maxi2KByte = 17196646
72
73 # -- 6/1/09
74 # 3Mb/s for 24hrs a day (30.17 gigs)
75 default_Maxi2KByte = 31640625
76
77 # Default share quanta
78 default_Share = 1
79
80 # Average over 1 day
81 period = 1 * seconds_per_day
82
83 # Message template
84 template = \
85 """
86 The slice %(slice)s has transmitted more than %(bytes)s from
87 %(hostname)s to %(class)s destinations
88 since %(since)s.
89
90 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
91 until %(until)s.
92
93 Please reduce the average %(class)s transmission rate
94 of the slice to %(limit)s per %(period)s.
95
96 """.lstrip()
97
98 footer = \
99 """
100 %(date)s %(hostname)s bwcap %(slice)s
101 """.lstrip()
102
103 def format_bytes(bytes, si = True):
104     """
105     Formats bytes into a string
106     """
107     if si:
108         kilo = 1000.
109     else:
110         # Officially, a kibibyte
111         kilo = 1024.
112
113     if bytes >= (kilo * kilo * kilo):
114         return "%.1f GB" % (bytes / (kilo * kilo * kilo))
115     elif bytes >= 1000000:
116         return "%.1f MB" % (bytes / (kilo * kilo))
117     elif bytes >= 1000:
118         return "%.1f KB" % (bytes / kilo)
119     else:
120         return "%.0f bytes" % bytes
121
122 def format_period(seconds):
123     """
124     Formats a period in seconds into a string
125     """
126
127     if seconds == (24 * 60 * 60):
128         return "day"
129     elif seconds == (60 * 60):
130         return "hour"
131     elif seconds > (24 * 60 * 60):
132         return "%.1f days" % (seconds / 24. / 60. / 60.)
133     elif seconds > (60 * 60):
134         return "%.1f hours" % (seconds / 60. / 60.)
135     elif seconds > (60):
136         return "%.1f minutes" % (seconds / 60.)
137     else:
138         return "%.0f seconds" % seconds
139
140 def slicemail(slice, subject, body):
141     '''
142     Front end to sendmail.  Sends email to slice alias with given subject and body.
143     '''
144
145     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
146
147     # Parsed from MyPLC config
148     to = [PLC_MAIL_MOM_LIST_ADDRESS]
149
150     if slice is not None and slice != "root":
151         to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
152
153     header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
154               'to': ", ".join(to),
155               'version': sys.version.split(" ")[0],
156               'subject': subject}
157
158     # Write headers
159     sendmail.write(
160 """
161 Content-type: text/plain
162 From: %(from)s
163 Reply-To: %(from)s
164 To: %(to)s
165 X-Mailer: Python/%(version)s
166 Subject: %(subject)s
167
168 """.lstrip() % header)
169
170     # Write body
171     sendmail.write(body)
172     # Done
173     sendmail.close()
174
175
176 class Slice:
177     """
178     Stores the last recorded bandwidth parameters of a slice.
179
180     xid - slice context/VServer ID
181     name - slice name
182     time - beginning of recording period in UNIX seconds
183     bytes - low bandwidth bytes transmitted at the beginning of the recording period
184     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
185     MaxKByte - total volume of data allowed
186     ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
187     Maxi2KByte - same as MaxKByte, but for i2
188     Threshi2Kbyte - same as Threshi2KByte, but for i2
189     MaxRate - max_rate slice attribute.
190     Maxi2Rate - max_exempt_rate slice attribute.
191     Share - Used by Sirius to loan min rates
192     Sharei2 - Used by Sirius to loan min rates for i2
193     self.emailed - did slice recv email during this recording period
194
195     """
196
197     def __init__(self, xid, name, rspec):
198         self.xid = xid
199         self.name = name
200         self.time = 0
201         self.bytes = 0
202         self.i2bytes = 0
203         self.MaxRate = default_MaxRate
204         self.MinRate = bwlimit.bwmin / 1000
205         self.Maxi2Rate = default_Maxi2Rate
206         self.Mini2Rate = bwlimit.bwmin / 1000
207         self.MaxKByte = default_MaxKByte
208         self.ThreshKByte = int(.8 * self.MaxKByte)
209         self.Maxi2KByte = default_Maxi2KByte
210         self.Threshi2KByte = int(.8 * self.Maxi2KByte)
211         self.Share = default_Share
212         self.Sharei2 = default_Share
213         self.emailed = False
214         self.capped = False
215
216         self.updateSliceTags(rspec)
217         bwlimit.set(xid = self.xid, dev = dev_default,
218                 minrate = self.MinRate * 1000,
219                 maxrate = self.MaxRate * 1000,
220                 maxexemptrate = self.Maxi2Rate * 1000,
221                 minexemptrate = self.Mini2Rate * 1000,
222                 share = self.Share)
223
224     def __repr__(self):
225         return self.name
226
227     def updateSliceTags(self, rspec):
228         '''
229         Use respects from GetSlivers to PLC to populate slice object.  Also
230         do some sanity checking.
231         '''
232
233         # Sanity check plus policy decision for MinRate:
234         # Minrate cant be greater than 25% of MaxRate or NodeCap.
235         MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
236         if MinRate > int(.25 * default_MaxRate):
237             MinRate = int(.25 * default_MaxRate)
238         if MinRate != self.MinRate:
239             self.MinRate = MinRate
240             logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
241
242         MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
243         if MaxRate != self.MaxRate:
244             self.MaxRate = MaxRate
245             logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
246
247         Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
248         if Mini2Rate != self.Mini2Rate:
249             self.Mini2Rate = Mini2Rate
250             logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
251
252         Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
253         if Maxi2Rate != self.Maxi2Rate:
254             self.Maxi2Rate = Maxi2Rate
255             logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
256
257         MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
258         if MaxKByte != self.MaxKByte:
259             self.MaxKByte = MaxKByte
260             logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
261
262         Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
263         if Maxi2KByte != self.Maxi2KByte:
264             self.Maxi2KByte = Maxi2KByte
265             logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
266
267         ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
268         if ThreshKByte != self.ThreshKByte:
269             self.ThreshKByte = ThreshKByte
270             logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
271
272         Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
273         if Threshi2KByte != self.Threshi2KByte:
274             self.Threshi2KByte = Threshi2KByte
275             logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
276
277         Share = int(rspec.get('net_share', default_Share))
278         if Share != self.Share:
279             self.Share = Share
280             logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
281
282         Sharei2 = int(rspec.get('net_i2_share', default_Share))
283         if Sharei2 != self.Sharei2:
284             self.Sharei2 = Sharei2
285             logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
286
287
288     def reset(self, runningrates, rspec):
289         """
290         Begin a new recording period. Remove caps by restoring limits
291         to their default values.
292         """
293         # Cache share for later comparison
294         self.Share = runningrates.get('share', 1)
295
296         # Query Node Manager for max rate overrides
297         self.updateSliceTags(rspec)
298
299         # Reset baseline time
300         self.time = time.time()
301
302         # Reset baseline byte coutns
303         self.bytes = runningrates.get('usedbytes', 0)
304         self.i2bytes = runningrates.get('usedi2bytes', 0)
305
306         # Reset email
307         self.emailed = False
308         # Reset flag
309         self.capped = False
310         # Reset rates.
311         maxrate = self.MaxRate * 1000
312         minrate = self.MinRate * 1000
313         maxi2rate = self.Maxi2Rate * 1000
314         mini2rate = self.Mini2Rate * 1000
315
316         if (maxrate != runningrates.get('maxrate', 0)) or \
317          (minrate != runningrates.get('maxrate', 0)) or \
318          (maxi2rate != runningrates.get('maxexemptrate', 0)) or \
319          (mini2rate != runningrates.get('minexemptrate', 0)) or \
320          (self.Share != runningrates.get('share', 0)):
321             logger.log("bwmon: %s reset to %s/%s" % \
322                            (self.name,
323                             bwlimit.format_tc_rate(maxrate),
324                             bwlimit.format_tc_rate(maxi2rate)))
325             bwlimit.set(xid = self.xid, dev = dev_default,
326                 minrate = self.MinRate * 1000,
327                 maxrate = self.MaxRate * 1000,
328                 maxexemptrate = self.Maxi2Rate * 1000,
329                 minexemptrate = self.Mini2Rate * 1000,
330                 share = self.Share)
331
332     def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
333         """
334         Notify the slice it's being capped.
335         """
336          # Prepare message parameters from the template
337         message = ""
338         params = {'slice': self.name, 'hostname': socket.gethostname(),
339                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
340                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
341                   'date': time.asctime(time.gmtime()) + " GMT",
342                   'period': format_period(period)}
343
344         if new_maxrate != (self.MaxRate * 1000):
345             # Format template parameters for low bandwidth message
346             params['class'] = "low bandwidth"
347             params['bytes'] = format_bytes(usedbytes - self.bytes)
348             params['limit'] = format_bytes(self.MaxKByte * 1024)
349             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
350
351             # Cap low bandwidth burst rate
352             message += template % params
353             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
354
355         if new_maxexemptrate != (self.Maxi2Rate * 1000):
356             # Format template parameters for high bandwidth message
357             params['class'] = "high bandwidth"
358             params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
359             params['limit'] = format_bytes(self.Maxi2KByte * 1024)
360             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
361
362             message += template % params
363             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
364
365         # Notify slice
366         if self.emailed == False:
367             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
368             if DEBUG:
369                 logger.log("bwmon: "+ subject)
370                 logger.log("bwmon: "+ message + (footer % params))
371             else:
372                 self.emailed = True
373                 logger.log("bwmon: Emailing %s" % self.name)
374                 slicemail(self.name, subject, message + (footer % params))
375
376
377     def update(self, runningrates, rspec):
378         """
379         Update byte counts and check if byte thresholds have been
380         exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
381         Recalculate every time module runs.
382         """
383         # cache share for later comparison
384         runningrates['share'] = self.Share
385
386         # Query Node Manager for max rate overrides
387         self.updateSliceTags(rspec)
388
389         usedbytes = runningrates['usedbytes']
390         usedi2bytes = runningrates['usedi2bytes']
391
392         # Check limits.
393         if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
394             sum = self.bytes + (self.ThreshKByte * 1024)
395             maxbyte = self.MaxKByte * 1024
396             bytesused = usedbytes - self.bytes
397             timeused = int(time.time() - self.time)
398             # Calcuate new rate. in bit/s
399             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
400             # Never go under MinRate
401             if new_maxrate < (self.MinRate * 1000):
402                 new_maxrate = self.MinRate * 1000
403             # State information.  I'm capped.
404             self.capped += True
405         else:
406             # Sanity Check
407             new_maxrate = self.MaxRate * 1000
408             self.capped += False
409
410         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
411             maxi2byte = self.Maxi2KByte * 1024
412             i2bytesused = usedi2bytes - self.i2bytes
413             timeused = int(time.time() - self.time)
414             # Calcuate New Rate.
415             new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
416             # Never go under MinRate
417             if new_maxi2rate < (self.Mini2Rate * 1000):
418                 new_maxi2rate = self.Mini2Rate * 1000
419             # State information.  I'm capped.
420             self.capped += True
421         else:
422             # Sanity
423             new_maxi2rate = self.Maxi2Rate * 1000
424             self.capped += False
425
426         # Check running values against newly calculated values so as not to run tc
427         # unnecessarily
428         if (runningrates['maxrate'] != new_maxrate) or \
429         (runningrates['minrate'] != self.MinRate * 1000) or \
430         (runningrates['maxexemptrate'] != new_maxi2rate) or \
431         (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \
432         (runningrates['share'] != self.Share):
433             # Apply parameters
434             bwlimit.set(xid = self.xid, dev = dev_default,
435                 minrate = self.MinRate * 1000,
436                 maxrate = new_maxrate,
437                 minexemptrate = self.Mini2Rate * 1000,
438                 maxexemptrate = new_maxi2rate,
439                 share = self.Share)
440
441         # Notify slice
442         if self.capped == True:
443             self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
444
445
446 def gethtbs(root_xid, default_xid):
447     """
448     Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
449     Turn off HTBs without names.
450     """
451     livehtbs = {}
452     for params in bwlimit.get(dev = dev_default):
453         (xid, share,
454          minrate, maxrate,
455          minexemptrate, maxexemptrate,
456          usedbytes, usedi2bytes) = params
457
458         name = bwlimit.get_slice(xid)
459
460         if (name is None) \
461         and (xid != root_xid) \
462         and (xid != default_xid):
463             # Orphaned (not associated with a slice) class
464             name = "%d?" % xid
465             logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
466             bwlimit.off(xid, dev = dev_default)
467
468         livehtbs[xid] = {'share': share,
469             'minrate': minrate,
470             'maxrate': maxrate,
471             'maxexemptrate': maxexemptrate,
472             'minexemptrate': minexemptrate,
473             'usedbytes': usedbytes,
474             'name': name,
475             'usedi2bytes': usedi2bytes}
476
477     return livehtbs
478
479 def sync(nmdbcopy):
480     """
481     Syncs tc, db, and bwmon.pickle.
482     Then, starts new slices, kills old ones, and updates byte accounts for each running slice.
483     Sends emails and caps those that went over their limit.
484     """
485     # Defaults
486     global DB_FILE, \
487         period, \
488         default_MaxRate, \
489         default_Maxi2Rate, \
490         default_MaxKByte,\
491         default_Maxi2KByte,\
492         default_Share, \
493         dev_default
494
495     # All slices
496     names = []
497     # In case the limits have changed.
498     default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
499     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
500
501     # Incase default isn't set yet.
502     if default_MaxRate == -1:
503         default_MaxRate = 1000000
504
505     try:
506         f = open(DB_FILE, "r+")
507         logger.verbose("bwmon: Loading %s" % DB_FILE)
508         (version, slices, deaddb) = pickle.load(f)
509         f.close()
510         # Check version of data file
511         if version != "$Id$":
512             logger.log("bwmon: Not using old version '%s' data file %s" % (version, DB_FILE))
513             raise Exception
514     except Exception:
515         version = "$Id$"
516         slices = {}
517         deaddb = {}
518
519     # Get/set special slice IDs
520     root_xid = bwlimit.get_xid("root")
521     default_xid = bwlimit.get_xid("default")
522
523     # Since root is required for sanity, its not in the API/plc database, so pass {}
524     # to use defaults.
525     if root_xid not in slices.keys():
526         slices[root_xid] = Slice(root_xid, "root", {})
527         slices[root_xid].reset({}, {})
528
529     # Used by bwlimit.  pass {} since there is no rspec (like above).
530     if default_xid not in slices.keys():
531         slices[default_xid] = Slice(default_xid, "default", {})
532         slices[default_xid].reset({}, {})
533
534     live = {}
535     # Get running slivers that should be on this node (from plc). {xid: name}
536     # db keys on name, bwmon keys on xid.  db doesnt have xid either.
537     for plcSliver in nmdbcopy.keys():
538         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
539
540     logger.verbose("bwmon: Found %s instantiated slices" % live.keys().__len__())
541     logger.verbose("bwmon: Found %s slices in dat file" % slices.values().__len__())
542
543     # Get actual running values from tc.
544     # Update slice totals and bandwidth. {xid: {values}}
545     kernelhtbs = gethtbs(root_xid, default_xid)
546     logger.verbose("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__())
547
548     # The dat file has HTBs for slices, but the HTBs aren't running
549     nohtbslices =  set(slices.keys()) - set(kernelhtbs.keys())
550     logger.verbose( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__())
551     # Reset tc counts.
552     for nohtbslice in nohtbslices:
553         if live.has_key(nohtbslice):
554             slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] )
555         else:
556             logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
557             del slices[nohtbslice]
558
559     # The dat file doesnt have HTB for the slice but kern has HTB
560     slicesnodat = set(kernelhtbs.keys()) - set(slices.keys())
561     logger.verbose( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__())
562     for slicenodat in slicesnodat:
563         # But slice is running
564         if live.has_key(slicenodat):
565             # init the slice.  which means start accounting over since kernel
566             # htb was already there.
567             slices[slicenodat] = Slice(slicenodat,
568                 live[slicenodat]['name'],
569                 live[slicenodat]['_rspec'])
570
571     # Get new slices.
572     # Slices in GetSlivers but not running HTBs
573     newslicesxids = set(live.keys()) - set(kernelhtbs.keys())
574     logger.verbose("bwmon: Found %s new slices" % newslicesxids.__len__())
575
576     # Setup new slices
577     for newslice in newslicesxids:
578         # Delegated slices dont have xids (which are uids) since they haven't been
579         # instantiated yet.
580         if newslice != None and live[newslice].has_key('_rspec') == True:
581             # Check to see if we recently deleted this slice.
582             if live[newslice]['name'] not in deaddb.keys():
583                 logger.log( "bwmon: new slice %s" % live[newslice]['name'] )
584                 # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
585                 # and made a dict of computed values.
586                 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
587                 slices[newslice].reset( {}, live[newslice]['_rspec'] )
588             # Double check time for dead slice in deaddb is within 24hr recording period.
589             elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
590                 deadslice = deaddb[live[newslice]['name']]
591                 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
592                 slices[newslice] = deadslice['slice']
593                 slices[newslice].xid = newslice
594                 # Start the HTB
595                 newvals = {"maxrate": deadslice['slice'].MaxRate * 1000,
596                             "minrate": deadslice['slice'].MinRate * 1000,
597                             "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000,
598                             "usedbytes": deadslice['htb']['usedbytes'] * 1000,
599                             "usedi2bytes": deadslice['htb']['usedi2bytes'],
600                             "share":deadslice['htb']['share']}
601                 slices[newslice].reset(newvals, live[newslice]['_rspec'])
602                 # Bring up to date
603                 slices[newslice].update(newvals, live[newslice]['_rspec'])
604                 # Since the slice has been reinitialed, remove from dead database.
605                 del deaddb[deadslice['slice'].name]
606                 del newvals
607         else:
608             logger.log("bwmon: Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
609
610     # Move dead slices that exist in the pickle file, but
611     # aren't instantiated by PLC into the dead dict until
612     # recording period is over.  This is to avoid the case where a slice is dynamically created
613     # and destroyed then recreated to get around byte limits.
614     deadxids = set(slices.keys()) - set(live.keys())
615     logger.verbose("bwmon: Found %s dead slices" % (deadxids.__len__() - 2))
616     for deadxid in deadxids:
617         if deadxid == root_xid or deadxid == default_xid:
618             continue
619         logger.log("bwmon: removing dead slice %s " % deadxid)
620         if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
621             # add slice (by name) to deaddb
622             logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
623             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
624             del slices[deadxid]
625         if kernelhtbs.has_key(deadxid):
626             logger.verbose("bwmon: Removing HTB for %s." % deadxid)
627             bwlimit.off(deadxid, dev = dev_default)
628
629     # Clean up deaddb
630     for deadslice in deaddb.keys():
631         if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
632             logger.log("bwmon: Removing dead slice %s from dat." \
633                         % deaddb[deadslice]['slice'].name)
634             del deaddb[deadslice]
635
636     # Get actual running values from tc since we've added and removed buckets.
637     # Update slice totals and bandwidth. {xid: {values}}
638     kernelhtbs = gethtbs(root_xid, default_xid)
639     logger.verbose("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__())
640
641     # Update all byte limites on all slices
642     for (xid, slice) in slices.iteritems():
643         # Monitor only the specified slices
644         if xid == root_xid or xid == default_xid: continue
645         if names and name not in names:
646             continue
647
648         if (time.time() >= (slice.time + period)) or \
649             (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
650             (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
651             # Reset to defaults every 24 hours or if it appears
652             # that the byte counters have overflowed (or, more
653             # likely, the node was restarted or the HTB buckets
654             # were re-initialized).
655             slice.reset(kernelhtbs[xid], live[xid]['_rspec'])
656         elif ENABLE:
657             logger.verbose("bwmon: Updating slice %s" % slice.name)
658             # Update byte counts
659             slice.update(kernelhtbs[xid], live[xid]['_rspec'])
660
661     logger.verbose("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),DB_FILE))
662     f = open(DB_FILE, "w")
663     pickle.dump((version, slices, deaddb), f)
664     f.close()
665
666 # doesnt use generic default interface because this runs as its own thread.
667 # changing the config variable will not have an effect since GetSlivers: pass
668 def getDefaults(nmdbcopy):
669     '''
670     Get defaults from default slice's slice attributes.
671     '''
672     status = True
673     # default slice
674     dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default")
675     if dfltslice:
676         if dfltslice['rspec']['net_max_rate'] == -1:
677             allOff()
678             status = False
679     return status
680
681
682 def allOff():
683     """
684     Turn off all slice HTBs
685     """
686     # Get/set special slice IDs
687     root_xid = bwlimit.get_xid("root")
688     default_xid = bwlimit.get_xid("default")
689     kernelhtbs = gethtbs(root_xid, default_xid)
690     if len(kernelhtbs):
691         logger.log("bwmon: Disabling all running HTBs.")
692         for htb in kernelhtbs.keys(): bwlimit.off(htb, dev = dev_default)
693
694
695 lock = threading.Event()
696 def run():
697     """
698     When run as a thread, wait for event, lock db, deep copy it, release it,
699     run bwmon.GetSlivers(), then go back to waiting.
700     """
701     logger.verbose("bwmon: Thread started")
702     while True:
703         lock.wait()
704         logger.verbose("bwmon: Event received.  Running.")
705         database.db_lock.acquire()
706         nmdbcopy = copy.deepcopy(database.db)
707         database.db_lock.release()
708         try:
709             if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0:
710                 # class show to check if net:InitNodeLimit:bwlimit.init has run.
711                 sync(nmdbcopy)
712             else: logger.log("bwmon: BW limits DISABLED.")
713         except: logger.log_exc("bwmon failed")
714         lock.clear()
715
716 def start(*args):
717     tools.as_daemon_thread(run)
718
719 def GetSlivers(*args):
720     logger.verbose ("bwmon: triggering dummy GetSlivers")
721     pass