fix coresched locating cgroup and reduce verbosity
[nodemanager.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
8 #
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
11 #
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
16 #
17
18 import os
19 import sys
20 import time
21 import pickle
22 import socket
23 import copy
24 import threading
25
26 import plnode.bwlimit as bwlimit
27
28 import logger
29 import tools
30 import database
31 from config import Config
32
33 priority = 20
34
35 # Defaults
36 # Set DEBUG to True if you don't want to send emails
37 DEBUG = False
38 # Set ENABLE to False to setup buckets, but not limit.
39 ENABLE = True
40
41 DB_FILE = "/var/lib/nodemanager/bwmon.pickle"
42
43 # Constants
44 seconds_per_day = 24 * 60 * 60
45 bits_per_byte = 8
46
47 dev_default = tools.get_default_if()
48 # Burst to line rate (or node cap).  Set by NM. in KBit/s
49 default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
50 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
51 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
52 # 5.4 Gbyte per day max allowed transfered per recording period
53 # 5.4 Gbytes per day is aprox 512k/s for 24hrs (approx because original math was wrong
54 # but its better to keep a higher byte total and keep people happy than correct
55 # the problem and piss people off.
56 # default_MaxKByte = 5662310
57
58 # -- 6/1/09
59 # llp wants to double these, so we use the following
60 # 1mbit * 24hrs * 60mins * 60secs = bits/day
61 # 1000000 * 24 * 60 * 60 / (1024 * 8)
62 default_MaxKByte = 10546875
63
64 # 16.4 Gbyte per day max allowed transfered per recording period to I2
65 # default_Maxi2KByte = 17196646
66
67 # -- 6/1/09
68 # 3Mb/s for 24hrs a day (30.17 gigs)
69 default_Maxi2KByte = 31640625
70
71 # Default share quanta
72 default_Share = 1
73
74 # Average over 1 day
75 period = 1 * seconds_per_day
76
77 # Message template
78 template = \
79 """
80 The slice %(slice)s has transmitted more than %(bytes)s from
81 %(hostname)s to %(class)s destinations
82 since %(since)s.
83
84 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
85 until %(until)s.
86
87 Please reduce the average %(class)s transmission rate
88 of the slice to %(limit)s per %(period)s.
89
90 """.lstrip()
91
92 footer = \
93 """
94 %(date)s %(hostname)s bwcap %(slice)s
95 """.lstrip()
96
97 def format_bytes(bytes, si = True):
98     """
99     Formats bytes into a string
100     """
101     if si:
102         kilo = 1000.
103     else:
104         # Officially, a kibibyte
105         kilo = 1024.
106
107     if bytes >= (kilo * kilo * kilo):
108         return "%.1f GB" % (bytes / (kilo * kilo * kilo))
109     elif bytes >= 1000000:
110         return "%.1f MB" % (bytes / (kilo * kilo))
111     elif bytes >= 1000:
112         return "%.1f KB" % (bytes / kilo)
113     else:
114         return "%.0f bytes" % bytes
115
116 def format_period(seconds):
117     """
118     Formats a period in seconds into a string
119     """
120
121     if seconds == (24 * 60 * 60):
122         return "day"
123     elif seconds == (60 * 60):
124         return "hour"
125     elif seconds > (24 * 60 * 60):
126         return "%.1f days" % (seconds / 24. / 60. / 60.)
127     elif seconds > (60 * 60):
128         return "%.1f hours" % (seconds / 60. / 60.)
129     elif seconds > (60):
130         return "%.1f minutes" % (seconds / 60.)
131     else:
132         return "%.0f seconds" % seconds
133
134 def slicemail(slice, subject, body):
135     '''
136     Front end to sendmail.  Sends email to slice alias with given subject and body.
137     '''
138     config = Config()
139     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % config.PLC_MAIL_SUPPORT_ADDRESS, "w")
140
141     # Parsed from MyPLC config
142     to = [config.PLC_MAIL_MOM_LIST_ADDRESS]
143
144     if slice is not None and slice != "root":
145         to.append(config.PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
146
147     header = {'from': "%s Support <%s>" % (config.PLC_NAME,
148                                            config.PLC_MAIL_SUPPORT_ADDRESS),
149               'to': ", ".join(to),
150               'version': sys.version.split(" ")[0],
151               'subject': subject}
152
153     # Write headers
154     sendmail.write(
155 """
156 Content-type: text/plain
157 From: %(from)s
158 Reply-To: %(from)s
159 To: %(to)s
160 X-Mailer: Python/%(version)s
161 Subject: %(subject)s
162
163 """.lstrip() % header)
164
165     # Write body
166     sendmail.write(body)
167     # Done
168     sendmail.close()
169
170
171 class Slice:
172     """
173     Stores the last recorded bandwidth parameters of a slice.
174
175     xid - slice context/VServer ID
176     name - slice name
177     time - beginning of recording period in UNIX seconds
178     bytes - low bandwidth bytes transmitted at the beginning of the recording period
179     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
180     MaxKByte - total volume of data allowed
181     ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
182     Maxi2KByte - same as MaxKByte, but for i2
183     Threshi2Kbyte - same as Threshi2KByte, but for i2
184     MaxRate - max_rate slice attribute.
185     Maxi2Rate - max_exempt_rate slice attribute.
186     Share - Used by Sirius to loan min rates
187     Sharei2 - Used by Sirius to loan min rates for i2
188     self.emailed - did slice recv email during this recording period
189
190     """
191
192     def __init__(self, xid, name, rspec):
193         self.xid = xid
194         self.name = name
195         self.time = 0
196         self.bytes = 0
197         self.i2bytes = 0
198         self.MaxRate = default_MaxRate
199         self.MinRate = bwlimit.bwmin / 1000
200         self.Maxi2Rate = default_Maxi2Rate
201         self.Mini2Rate = bwlimit.bwmin / 1000
202         self.MaxKByte = default_MaxKByte
203         self.ThreshKByte = int(.8 * self.MaxKByte)
204         self.Maxi2KByte = default_Maxi2KByte
205         self.Threshi2KByte = int(.8 * self.Maxi2KByte)
206         self.Share = default_Share
207         self.Sharei2 = default_Share
208         self.emailed = False
209         self.capped = False
210
211         self.updateSliceTags(rspec)
212         bwlimit.set(xid = self.xid, dev = dev_default,
213                 minrate = self.MinRate * 1000,
214                 maxrate = self.MaxRate * 1000,
215                 maxexemptrate = self.Maxi2Rate * 1000,
216                 minexemptrate = self.Mini2Rate * 1000,
217                 share = self.Share)
218
219     def __repr__(self):
220         return self.name
221
222     def updateSliceTags(self, rspec):
223         '''
224         Use respects from GetSlivers to PLC to populate slice object.  Also
225         do some sanity checking.
226         '''
227
228         # Sanity check plus policy decision for MinRate:
229         # Minrate cant be greater than 25% of MaxRate or NodeCap.
230         MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
231         if MinRate > int(.25 * default_MaxRate):
232             MinRate = int(.25 * default_MaxRate)
233         if MinRate != self.MinRate:
234             self.MinRate = MinRate
235             logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
236
237         MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
238         if MaxRate != self.MaxRate:
239             self.MaxRate = MaxRate
240             logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
241
242         Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
243         if Mini2Rate != self.Mini2Rate:
244             self.Mini2Rate = Mini2Rate
245             logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
246
247         Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
248         if Maxi2Rate != self.Maxi2Rate:
249             self.Maxi2Rate = Maxi2Rate
250             logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
251
252         MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
253         if MaxKByte != self.MaxKByte:
254             self.MaxKByte = MaxKByte
255             logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
256
257         Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
258         if Maxi2KByte != self.Maxi2KByte:
259             self.Maxi2KByte = Maxi2KByte
260             logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
261
262         ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
263         if ThreshKByte != self.ThreshKByte:
264             self.ThreshKByte = ThreshKByte
265             logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
266
267         Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
268         if Threshi2KByte != self.Threshi2KByte:
269             self.Threshi2KByte = Threshi2KByte
270             logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
271
272         Share = int(rspec.get('net_share', default_Share))
273         if Share != self.Share:
274             self.Share = Share
275             logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
276
277         Sharei2 = int(rspec.get('net_i2_share', default_Share))
278         if Sharei2 != self.Sharei2:
279             self.Sharei2 = Sharei2
280             logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
281
282
283     def reset(self, runningrates, rspec):
284         """
285         Begin a new recording period. Remove caps by restoring limits
286         to their default values.
287         """
288         # Cache share for later comparison
289         self.Share = runningrates.get('share', 1)
290
291         # Query Node Manager for max rate overrides
292         self.updateSliceTags(rspec)
293
294         # Reset baseline time
295         self.time = time.time()
296
297         # Reset baseline byte coutns
298         self.bytes = runningrates.get('usedbytes', 0)
299         self.i2bytes = runningrates.get('usedi2bytes', 0)
300
301         # Reset email
302         self.emailed = False
303         # Reset flag
304         self.capped = False
305         # Reset rates.
306         maxrate = self.MaxRate * 1000
307         minrate = self.MinRate * 1000
308         maxi2rate = self.Maxi2Rate * 1000
309         mini2rate = self.Mini2Rate * 1000
310
311         if (maxrate != runningrates.get('maxrate', 0)) or \
312          (minrate != runningrates.get('maxrate', 0)) or \
313          (maxi2rate != runningrates.get('maxexemptrate', 0)) or \
314          (mini2rate != runningrates.get('minexemptrate', 0)) or \
315          (self.Share != runningrates.get('share', 0)):
316             logger.log("bwmon: %s reset to %s/%s" % \
317                            (self.name,
318                             bwlimit.format_tc_rate(maxrate),
319                             bwlimit.format_tc_rate(maxi2rate)))
320             bwlimit.set(xid = self.xid, dev = dev_default,
321                 minrate = self.MinRate * 1000,
322                 maxrate = self.MaxRate * 1000,
323                 maxexemptrate = self.Maxi2Rate * 1000,
324                 minexemptrate = self.Mini2Rate * 1000,
325                 share = self.Share)
326
327     def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
328         """
329         Notify the slice it's being capped.
330         """
331          # Prepare message parameters from the template
332         message = ""
333         params = {'slice': self.name, 'hostname': socket.gethostname(),
334                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
335                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
336                   'date': time.asctime(time.gmtime()) + " GMT",
337                   'period': format_period(period)}
338
339         if new_maxrate != (self.MaxRate * 1000):
340             # Format template parameters for low bandwidth message
341             params['class'] = "low bandwidth"
342             params['bytes'] = format_bytes(usedbytes - self.bytes)
343             params['limit'] = format_bytes(self.MaxKByte * 1024)
344             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
345
346             # Cap low bandwidth burst rate
347             message += template % params
348             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
349
350         if new_maxexemptrate != (self.Maxi2Rate * 1000):
351             # Format template parameters for high bandwidth message
352             params['class'] = "high bandwidth"
353             params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
354             params['limit'] = format_bytes(self.Maxi2KByte * 1024)
355             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
356
357             message += template % params
358             logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
359
360         # Notify slice
361         if self.emailed == False:
362             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
363             if DEBUG:
364                 logger.log("bwmon: "+ subject)
365                 logger.log("bwmon: "+ message + (footer % params))
366             else:
367                 self.emailed = True
368                 logger.log("bwmon: Emailing %s" % self.name)
369                 slicemail(self.name, subject, message + (footer % params))
370
371
372     def update(self, runningrates, rspec):
373         """
374         Update byte counts and check if byte thresholds have been
375         exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
376         Recalculate every time module runs.
377         """
378         # cache share for later comparison
379         runningrates['share'] = self.Share
380
381         # Query Node Manager for max rate overrides
382         self.updateSliceTags(rspec)
383
384         usedbytes = runningrates['usedbytes']
385         usedi2bytes = runningrates['usedi2bytes']
386
387         # Check limits.
388         if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
389             sum = self.bytes + (self.ThreshKByte * 1024)
390             maxbyte = self.MaxKByte * 1024
391             bytesused = usedbytes - self.bytes
392             timeused = int(time.time() - self.time)
393             # Calcuate new rate. in bit/s
394             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
395             # Never go under MinRate
396             if new_maxrate < (self.MinRate * 1000):
397                 new_maxrate = self.MinRate * 1000
398             # State information.  I'm capped.
399             self.capped += True
400         else:
401             # Sanity Check
402             new_maxrate = self.MaxRate * 1000
403             self.capped += False
404
405         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
406             maxi2byte = self.Maxi2KByte * 1024
407             i2bytesused = usedi2bytes - self.i2bytes
408             timeused = int(time.time() - self.time)
409             # Calcuate New Rate.
410             new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
411             # Never go under MinRate
412             if new_maxi2rate < (self.Mini2Rate * 1000):
413                 new_maxi2rate = self.Mini2Rate * 1000
414             # State information.  I'm capped.
415             self.capped += True
416         else:
417             # Sanity
418             new_maxi2rate = self.Maxi2Rate * 1000
419             self.capped += False
420
421         # Check running values against newly calculated values so as not to run tc
422         # unnecessarily
423         if (runningrates['maxrate'] != new_maxrate) or \
424         (runningrates['minrate'] != self.MinRate * 1000) or \
425         (runningrates['maxexemptrate'] != new_maxi2rate) or \
426         ('minexemptrate' in runningrates and runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \
427         (runningrates['share'] != self.Share):
428             # Apply parameters
429             bwlimit.set(xid = self.xid, dev = dev_default,
430                 minrate = self.MinRate * 1000,
431                 maxrate = new_maxrate,
432                 minexemptrate = self.Mini2Rate * 1000,
433                 maxexemptrate = new_maxi2rate,
434                 share = self.Share)
435
436         # Notify slice
437         if self.capped == True:
438             self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
439
440
441 def gethtbs(root_xid, default_xid):
442     """
443     Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
444     Turn off HTBs without names.
445     """
446     livehtbs = {}
447     for params in bwlimit.get(dev = dev_default):
448         (xid, share,
449          minrate, maxrate,
450          minexemptrate, maxexemptrate,
451          usedbytes, usedi2bytes) = params
452
453         name = bwlimit.get_slice(xid)
454
455         if (name is None) \
456         and (xid != root_xid) \
457         and (xid != default_xid):
458             # Orphaned (not associated with a slice) class
459             name = "%d?" % xid
460             logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
461             bwlimit.off(xid, dev = dev_default)
462
463         livehtbs[xid] = {'share': share,
464             'minrate': minrate,
465             'maxrate': maxrate,
466             'maxexemptrate': maxexemptrate,
467             'minexemptrate': minexemptrate,
468             'usedbytes': usedbytes,
469             'name': name,
470             'usedi2bytes': usedi2bytes}
471
472     return livehtbs
473
474 def sync(nmdbcopy):
475     """
476     Syncs tc, db, and bwmon.pickle.
477     Then, starts new slices, kills old ones, and updates byte accounts for each running slice.
478     Sends emails and caps those that went over their limit.
479     """
480     # Defaults
481     global DB_FILE, \
482         period, \
483         default_MaxRate, \
484         default_Maxi2Rate, \
485         default_MaxKByte,\
486         default_Maxi2KByte,\
487         default_Share, \
488         dev_default
489
490     # All slices
491     names = []
492     # In case the limits have changed.
493     default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000)
494     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
495
496     # Incase default isn't set yet.
497     if default_MaxRate == -1:
498         default_MaxRate = 1000000
499
500     # xxx $Id$ 
501     # with svn we used to have a trick to detect upgrades of this file
502     # this has gone with the move to git, without any noticeable effect on operations though
503     try:
504         f = open(DB_FILE, "r+")
505         logger.verbose("bwmon: Loading %s" % DB_FILE)
506         (version, slices, deaddb) = pickle.load(f)
507         f.close()
508         # Check version of data file
509         if version != "$Id$":
510             logger.log("bwmon: Not using old version '%s' data file %s" % (version, DB_FILE))
511             raise Exception
512     except Exception:
513         version = "$Id$"
514         slices = {}
515         deaddb = {}
516
517     # Get/set special slice IDs
518     root_xid = bwlimit.get_xid("root")
519     default_xid = bwlimit.get_xid("default")
520
521     # Since root is required for sanity, its not in the API/plc database, so pass {}
522     # to use defaults.
523     if root_xid not in slices.keys():
524         slices[root_xid] = Slice(root_xid, "root", {})
525         slices[root_xid].reset({}, {})
526
527     # Used by bwlimit.  pass {} since there is no rspec (like above).
528     if default_xid not in slices.keys():
529         slices[default_xid] = Slice(default_xid, "default", {})
530         slices[default_xid].reset({}, {})
531
532     live = {}
533     # Get running slivers that should be on this node (from plc). {xid: name}
534     # db keys on name, bwmon keys on xid.  db doesnt have xid either.
535     for plcSliver in nmdbcopy.keys():
536         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
537
538     logger.verbose("bwmon: Found %s instantiated slices" % live.keys().__len__())
539     logger.verbose("bwmon: Found %s slices in dat file" % slices.values().__len__())
540
541     # Get actual running values from tc.
542     # Update slice totals and bandwidth. {xid: {values}}
543     kernelhtbs = gethtbs(root_xid, default_xid)
544     logger.verbose("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__())
545
546     # The dat file has HTBs for slices, but the HTBs aren't running
547     nohtbslices =  set(slices.keys()) - set(kernelhtbs.keys())
548     logger.verbose( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__())
549     # Reset tc counts.
550     for nohtbslice in nohtbslices:
551         if live.has_key(nohtbslice):
552             slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] )
553         else:
554             logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
555             del slices[nohtbslice]
556
557     # The dat file doesnt have HTB for the slice but kern has HTB
558     slicesnodat = set(kernelhtbs.keys()) - set(slices.keys())
559     logger.verbose( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__())
560     for slicenodat in slicesnodat:
561         # But slice is running
562         if live.has_key(slicenodat):
563             # init the slice.  which means start accounting over since kernel
564             # htb was already there.
565             slices[slicenodat] = Slice(slicenodat,
566                 live[slicenodat]['name'],
567                 live[slicenodat]['_rspec'])
568
569     # Get new slices.
570     # Slices in GetSlivers but not running HTBs
571     newslicesxids = set(live.keys()) - set(kernelhtbs.keys())
572     logger.verbose("bwmon: Found %s new slices" % newslicesxids.__len__())
573
574     # Setup new slices
575     for newslice in newslicesxids:
576         # Delegated slices dont have xids (which are uids) since they haven't been
577         # instantiated yet.
578         if newslice != None and live[newslice].has_key('_rspec') == True:
579             # Check to see if we recently deleted this slice.
580             if live[newslice]['name'] not in deaddb.keys():
581                 logger.log( "bwmon: new slice %s" % live[newslice]['name'] )
582                 # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
583                 # and made a dict of computed values.
584                 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
585                 slices[newslice].reset( {}, live[newslice]['_rspec'] )
586             # Double check time for dead slice in deaddb is within 24hr recording period.
587             elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
588                 deadslice = deaddb[live[newslice]['name']]
589                 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
590                 slices[newslice] = deadslice['slice']
591                 slices[newslice].xid = newslice
592                 # Start the HTB
593                 newvals = {"maxrate": deadslice['slice'].MaxRate * 1000,
594                             "minrate": deadslice['slice'].MinRate * 1000,
595                             "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000,
596                             "usedbytes": deadslice['htb']['usedbytes'] * 1000,
597                             "usedi2bytes": deadslice['htb']['usedi2bytes'],
598                             "share":deadslice['htb']['share']}
599                 slices[newslice].reset(newvals, live[newslice]['_rspec'])
600                 # Bring up to date
601                 slices[newslice].update(newvals, live[newslice]['_rspec'])
602                 # Since the slice has been reinitialed, remove from dead database.
603                 del deaddb[deadslice['slice'].name]
604                 del newvals
605         else:
606             logger.log("bwmon: Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
607
608     # Move dead slices that exist in the pickle file, but
609     # aren't instantiated by PLC into the dead dict until
610     # recording period is over.  This is to avoid the case where a slice is dynamically created
611     # and destroyed then recreated to get around byte limits.
612     deadxids = set(slices.keys()) - set(live.keys())
613     logger.verbose("bwmon: Found %s dead slices" % (deadxids.__len__() - 2))
614     for deadxid in deadxids:
615         if deadxid == root_xid or deadxid == default_xid:
616             continue
617         logger.log("bwmon: removing dead slice %s " % deadxid)
618         if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
619             # add slice (by name) to deaddb
620             logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
621             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
622             del slices[deadxid]
623         if kernelhtbs.has_key(deadxid):
624             logger.verbose("bwmon: Removing HTB for %s." % deadxid)
625             bwlimit.off(deadxid, dev = dev_default)
626
627     # Clean up deaddb
628     for deadslice in deaddb.keys():
629         if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
630             logger.log("bwmon: Removing dead slice %s from dat." \
631                         % deaddb[deadslice]['slice'].name)
632             del deaddb[deadslice]
633
634     # Get actual running values from tc since we've added and removed buckets.
635     # Update slice totals and bandwidth. {xid: {values}}
636     kernelhtbs = gethtbs(root_xid, default_xid)
637     logger.verbose("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__())
638
639     # Update all byte limites on all slices
640     for (xid, slice) in slices.iteritems():
641         # Monitor only the specified slices
642         if xid == root_xid or xid == default_xid: continue
643         if names and name not in names:
644             continue
645
646         if (time.time() >= (slice.time + period)) or \
647             (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
648             (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
649             # Reset to defaults every 24 hours or if it appears
650             # that the byte counters have overflowed (or, more
651             # likely, the node was restarted or the HTB buckets
652             # were re-initialized).
653             slice.reset(kernelhtbs[xid], live[xid]['_rspec'])
654         elif ENABLE:
655             logger.verbose("bwmon: Updating slice %s" % slice.name)
656             # Update byte counts
657             slice.update(kernelhtbs[xid], live[xid]['_rspec'])
658
659     logger.verbose("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),DB_FILE))
660     f = open(DB_FILE, "w")
661     pickle.dump((version, slices, deaddb), f)
662     f.close()
663
664 # doesnt use generic default interface because this runs as its own thread.
665 # changing the config variable will not have an effect since GetSlivers: pass
666 def getDefaults(nmdbcopy):
667     '''
668     Get defaults from default slice's slice attributes.
669     '''
670     status = True
671     # default slice
672     dfltslice = nmdbcopy.get(Config().PLC_SLICE_PREFIX+"_default")
673     if dfltslice:
674         if dfltslice['rspec']['net_max_rate'] == -1:
675             allOff()
676             status = False
677     return status
678
679
680 def allOff():
681     """
682     Turn off all slice HTBs
683     """
684     # Get/set special slice IDs
685     root_xid = bwlimit.get_xid("root")
686     default_xid = bwlimit.get_xid("default")
687     kernelhtbs = gethtbs(root_xid, default_xid)
688     if len(kernelhtbs):
689         logger.log("bwmon: Disabling all running HTBs.")
690         for htb in kernelhtbs.keys(): bwlimit.off(htb, dev = dev_default)
691
692
693 lock = threading.Event()
694 def run():
695     """
696     When run as a thread, wait for event, lock db, deep copy it, release it,
697     run bwmon.GetSlivers(), then go back to waiting.
698     """
699     logger.verbose("bwmon: Thread started")
700     while True:
701         lock.wait()
702         logger.verbose("bwmon: Event received.  Running.")
703         database.db_lock.acquire()
704         nmdbcopy = copy.deepcopy(database.db)
705         database.db_lock.release()
706         try:
707             if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0:
708                 # class show to check if net:InitNodeLimit:bwlimit.init has run.
709                 sync(nmdbcopy)
710             else: logger.log("bwmon: BW limits DISABLED.")
711         except: logger.log_exc("bwmon failed")
712         lock.clear()
713
714 def start(*args):
715     tools.as_daemon_thread(run)
716
717 def GetSlivers(*args):
718     logger.verbose ("bwmon: triggering dummy GetSlivers")
719     pass