3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 # Set DEBUG to True if you don't want to send emails
38 # Set ENABLE to False to setup buckets, but not limit.
41 datafile = "/var/lib/misc/bwmon.dat"
44 sys.path.append("/etc/planetlab")
45 from plc_config import *
48 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
49 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
52 seconds_per_day = 24 * 60 * 60
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
58 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
59 # 5.4 Gbyte per day max allowed transfered per recording period
60 default_MaxKByte = 5662310
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_Maxi2KByte = 17196646
63 # Default share quanta
67 period = 1 * seconds_per_day
72 The slice %(slice)s has transmitted more than %(bytes)s from
73 %(hostname)s to %(class)s destinations
76 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
79 Please reduce the average %(class)s transmission rate
80 of the slice to %(limit)s per %(period)s.
86 %(date)s %(hostname)s bwcap %(slice)s
89 def format_bytes(bytes, si = True):
91 Formats bytes into a string
96 # Officially, a kibibyte
99 if bytes >= (kilo * kilo * kilo):
100 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
101 elif bytes >= 1000000:
102 return "%.1f MB" % (bytes / (kilo * kilo))
104 return "%.1f KB" % (bytes / kilo)
106 return "%.0f bytes" % bytes
108 def format_period(seconds):
110 Formats a period in seconds into a string
113 if seconds == (24 * 60 * 60):
115 elif seconds == (60 * 60):
117 elif seconds > (24 * 60 * 60):
118 return "%.1f days" % (seconds / 24. / 60. / 60.)
119 elif seconds > (60 * 60):
120 return "%.1f hours" % (seconds / 60. / 60.)
122 return "%.1f minutes" % (seconds / 60.)
124 return "%.0f seconds" % seconds
126 def slicemail(slice, subject, body):
128 Front end to sendmail. Sends email to slice alias with given subject and body.
131 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
133 # Parsed from MyPLC config
134 to = [PLC_MAIL_MOM_LIST_ADDRESS]
136 if slice is not None and slice != "root":
137 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
139 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
141 'version': sys.version.split(" ")[0],
147 Content-type: text/plain
151 X-Mailer: Python/%(version)s
154 """.lstrip() % header)
164 Stores the last recorded bandwidth parameters of a slice.
166 xid - slice context/VServer ID
168 time - beginning of recording period in UNIX seconds
169 bytes - low bandwidth bytes transmitted at the beginning of the recording period
170 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
171 MaxKByte - total volume of data allowed
172 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
173 Maxi2KByte - same as MaxKByte, but for i2
174 Threshi2Kbyte - same as Threshi2KByte, but for i2
175 MaxRate - max_rate slice attribute.
176 Maxi2Rate - max_exempt_rate slice attribute.
177 Share - Used by Sirius to loan min rates
178 Sharei2 - Used by Sirius to loan min rates for i2
179 self.emailed - did slice recv email during this recording period
183 def __init__(self, xid, name, rspec):
189 self.MaxRate = default_MaxRate
190 self.MinRate = bwlimit.bwmin / 1000
191 self.Maxi2Rate = default_Maxi2Rate
192 self.Mini2Rate = bwlimit.bwmin / 1000
193 self.MaxKByte = default_MaxKByte
194 self.ThreshKByte = int(.8 * self.MaxKByte)
195 self.Maxi2KByte = default_Maxi2KByte
196 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
197 self.Share = default_Share
198 self.Sharei2 = default_Share
202 self.updateSliceAttributes(rspec)
203 bwlimit.set(xid = self.xid,
204 minrate = self.MinRate * 1000,
205 maxrate = self.MaxRate * 1000,
206 maxexemptrate = self.Maxi2Rate * 1000,
207 minexemptrate = self.Mini2Rate * 1000,
213 def updateSliceAttributes(self, rspec):
215 Use respects from GetSlivers to PLC to populate slice object. Also
216 do some sanity checking.
219 # Sanity check plus policy decision for MinRate:
220 # Minrate cant be greater than 25% of MaxRate or NodeCap.
221 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
222 if MinRate > int(.25 * default_MaxRate):
223 MinRate = int(.25 * default_MaxRate)
224 if MinRate != self.MinRate:
225 self.MinRate = MinRate
226 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
228 MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
229 if MaxRate != self.MaxRate:
230 self.MaxRate = MaxRate
231 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
233 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
234 if Mini2Rate != self.Mini2Rate:
235 self.Mini2Rate = Mini2Rate
236 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
238 Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
239 if Maxi2Rate != self.Maxi2Rate:
240 self.Maxi2Rate = Maxi2Rate
241 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
243 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
244 if MaxKByte != self.MaxKByte:
245 self.MaxKByte = MaxKByte
246 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
248 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
249 if Maxi2KByte != self.Maxi2KByte:
250 self.Maxi2KByte = Maxi2KByte
251 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
253 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
254 if ThreshKByte != self.ThreshKByte:
255 self.ThreshKByte = ThreshKByte
256 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
258 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
259 if Threshi2KByte != self.Threshi2KByte:
260 self.Threshi2KByte = Threshi2KByte
261 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
263 Share = int(rspec.get('net_share', default_Share))
264 if Share != self.Share:
266 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
268 Sharei2 = int(rspec.get('net_i2_share', default_Share))
269 if Sharei2 != self.Sharei2:
270 self.Sharei2 = Sharei2
271 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
274 def reset(self, runningrates, rspec):
276 Begin a new recording period. Remove caps by restoring limits
277 to their default values.
279 # Cache share for later comparison
280 self.Share = runningrates.get('share', 1)
282 # Query Node Manager for max rate overrides
283 self.updateSliceAttributes(rspec)
285 # Reset baseline time
286 self.time = time.time()
288 # Reset baseline byte coutns
289 self.bytes = runningrates.get('usedbytes', 0)
290 self.i2bytes = runningrates.get('usedi2bytes', 0)
297 maxrate = self.MaxRate * 1000
298 minrate = self.MinRate * 1000
299 maxi2rate = self.Maxi2Rate * 1000
300 mini2rate = self.Mini2Rate * 1000
302 if (maxrate != runningrates.get('maxrate', 0)) or \
303 (minrate != runningrates.get('maxrate', 0)) or \
304 (maxi2rate != runningrates.get('maxexemptrate', 0)) or \
305 (mini2rate != runningrates.get('minexemptrate', 0)) or \
306 (self.Share != runningrates.get('share', 0)):
307 logger.log("bwmon: %s reset to %s/%s" % \
309 bwlimit.format_tc_rate(maxrate),
310 bwlimit.format_tc_rate(maxi2rate)), 1)
311 bwlimit.set(xid = self.xid,
312 minrate = self.MinRate * 1000,
313 maxrate = self.MaxRate * 1000,
314 maxexemptrate = self.Maxi2Rate * 1000,
315 minexemptrate = self.Mini2Rate * 1000,
318 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
320 Notify the slice it's being capped.
322 # Prepare message parameters from the template
324 params = {'slice': self.name, 'hostname': socket.gethostname(),
325 'since': time.asctime(time.gmtime(self.time)) + " GMT",
326 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
327 'date': time.asctime(time.gmtime()) + " GMT",
328 'period': format_period(period)}
330 if new_maxrate != (self.MaxRate * 1000):
331 # Format template parameters for low bandwidth message
332 params['class'] = "low bandwidth"
333 params['bytes'] = format_bytes(usedbytes - self.bytes)
334 params['limit'] = format_bytes(self.MaxKByte * 1024)
335 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
337 # Cap low bandwidth burst rate
338 message += template % params
339 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
341 if new_maxexemptrate != (self.Maxi2Rate * 1000):
342 # Format template parameters for high bandwidth message
343 params['class'] = "high bandwidth"
344 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
345 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
346 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
348 message += template % params
349 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
352 if self.emailed == False:
353 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
355 logger.log("bwmon: "+ subject)
356 logger.log("bwmon: "+ message + (footer % params))
359 logger.log("bwmon: Emailing %s" % self.name)
360 slicemail(self.name, subject, message + (footer % params))
363 def update(self, runningrates, rspec):
365 Update byte counts and check if byte thresholds have been
366 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
367 Recalculate every time module runs.
369 # cache share for later comparison
370 runningrates['share'] = self.Share
372 # Query Node Manager for max rate overrides
373 self.updateSliceAttributes(rspec)
375 usedbytes = runningrates['usedbytes']
376 usedi2bytes = runningrates['usedi2bytes']
379 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
380 sum = self.bytes + (self.ThreshKByte * 1024)
381 maxbyte = self.MaxKByte * 1024
382 bytesused = usedbytes - self.bytes
383 timeused = int(time.time() - self.time)
384 # Calcuate new rate. in bit/s
385 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
386 # Never go under MinRate
387 if new_maxrate < (self.MinRate * 1000):
388 new_maxrate = self.MinRate * 1000
389 # State information. I'm capped.
393 new_maxrate = self.MaxRate * 1000
396 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
397 maxi2byte = self.Maxi2KByte * 1024
398 i2bytesused = usedi2bytes - self.i2bytes
399 timeused = int(time.time() - self.time)
401 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
402 # Never go under MinRate
403 if new_maxi2rate < (self.Mini2Rate * 1000):
404 new_maxi2rate = self.Mini2Rate * 1000
405 # State information. I'm capped.
409 new_maxi2rate = self.Maxi2Rate * 1000
412 # Check running values against newly calculated values so as not to run tc
414 if (runningrates['maxrate'] != new_maxrate) or \
415 (runningrates['minrate'] != self.MinRate * 1000) or \
416 (runningrates['maxexemptrate'] != new_maxi2rate) or \
417 (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \
418 (runningrates['share'] != self.Share):
420 bwlimit.set(xid = self.xid,
421 minrate = self.MinRate * 1000,
422 maxrate = new_maxrate,
423 minexemptrate = self.Mini2Rate * 1000,
424 maxexemptrate = new_maxi2rate,
428 if self.capped == True:
429 self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
432 def gethtbs(root_xid, default_xid):
434 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
435 Turn off HTBs without names.
438 for params in bwlimit.get():
441 minexemptrate, maxexemptrate,
442 usedbytes, usedi2bytes) = params
444 name = bwlimit.get_slice(xid)
447 and (xid != root_xid) \
448 and (xid != default_xid):
449 # Orphaned (not associated with a slice) class
451 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
454 livehtbs[xid] = {'share': share,
457 'maxexemptrate': maxexemptrate,
458 'minexemptrate': minexemptrate,
459 'usedbytes': usedbytes,
461 'usedi2bytes': usedi2bytes}
467 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
480 # Incase the limits have changed.
481 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
482 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
484 # Incase default isn't set yet.
485 if default_MaxRate == -1:
486 default_MaxRate = 1000000
489 f = open(datafile, "r+")
490 logger.log("bwmon: Loading %s" % datafile, 2)
491 (version, slices, deaddb) = pickle.load(f)
493 # Check version of data file
494 if version != "$Id$":
495 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
502 # Get/set special slice IDs
503 root_xid = bwlimit.get_xid("root")
504 default_xid = bwlimit.get_xid("default")
506 # Since root is required for sanity, its not in the API/plc database, so pass {}
508 if root_xid not in slices.keys():
509 slices[root_xid] = Slice(root_xid, "root", {})
510 slices[root_xid].reset({}, {})
512 # Used by bwlimit. pass {} since there is no rspec (like above).
513 if default_xid not in slices.keys():
514 slices[default_xid] = Slice(default_xid, "default", {})
515 slices[default_xid].reset({}, {})
518 # Get running slivers that should be on this node (from plc). {xid: name}
519 # db keys on name, bwmon keys on xid. db doesnt have xid either.
520 for plcSliver in nmdbcopy.keys():
521 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
523 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
524 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
526 # Get actual running values from tc.
527 # Update slice totals and bandwidth. {xid: {values}}
528 kernelhtbs = gethtbs(root_xid, default_xid)
529 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
531 # The dat file has HTBs for slices, but the HTBs aren't running
532 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
533 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
535 for nohtbslice in nohtbslices:
536 if live.has_key(nohtbslice):
537 slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] )
539 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
540 del slices[nohtbslice]
542 # The dat file doesnt have HTB for the slice but kern has HTB
543 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
544 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
545 for slicenodat in slicesnodat:
546 # But slice is running
547 if live.has_key(slicenodat):
548 # init the slice. which means start accounting over since kernel
549 # htb was already there.
550 slices[slicenodat] = Slice(slicenodat,
551 live[slicenodat]['name'],
552 live[slicenodat]['_rspec'])
555 # Slices in GetSlivers but not running HTBs
556 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
557 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
560 for newslice in newslicesxids:
561 # Delegated slices dont have xids (which are uids) since they haven't been
563 if newslice != None and live[newslice].has_key('_rspec') == True:
564 # Check to see if we recently deleted this slice.
565 if live[newslice]['name'] not in deaddb.keys():
566 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
567 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
568 # and made a dict of computed values.
569 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
570 slices[newslice].reset( {}, live[newslice]['_rspec'] )
571 # Double check time for dead slice in deaddb is within 24hr recording period.
572 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
573 deadslice = deaddb[live[newslice]['name']]
574 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
575 slices[newslice] = deadslice['slice']
576 slices[newslice].xid = newslice
578 newvals = {"maxrate": deadslice['slice'].MaxRate * 1000,
579 "minrate": deadslice['slice'].MinRate * 1000,
580 "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000,
581 "usedbytes": deadslice['htb']['usedbytes'] * 1000,
582 "usedi2bytes": deadslice['htb']['usedi2bytes'],
583 "share":deadslice['htb']['share']}
584 slices[newslice].reset(newvals, live[newslice]['_rspec'])
586 slices[newslice].update(newvals, live[newslice]['_rspec'])
587 # Since the slice has been reinitialed, remove from dead database.
588 del deaddb[deadslice['slice'].name]
591 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
593 # Move dead slices that exist in the pickle file, but
594 # aren't instantiated by PLC into the dead dict until
595 # recording period is over. This is to avoid the case where a slice is dynamically created
596 # and destroyed then recreated to get around byte limits.
597 deadxids = Set(slices.keys()) - Set(live.keys())
598 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
599 for deadxid in deadxids:
600 if deadxid == root_xid or deadxid == default_xid:
602 logger.log("bwmon: removing dead slice %s " % deadxid)
603 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
604 # add slice (by name) to deaddb
605 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
606 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
608 if kernelhtbs.has_key(deadxid):
609 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
613 for deadslice in deaddb.keys():
614 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
615 logger.log("bwmon: Removing dead slice %s from dat." \
616 % deaddb[deadslice]['slice'].name)
617 del deaddb[deadslice]
619 # Get actual running values from tc since we've added and removed buckets.
620 # Update slice totals and bandwidth. {xid: {values}}
621 kernelhtbs = gethtbs(root_xid, default_xid)
622 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
624 # Update all byte limites on all slices
625 for (xid, slice) in slices.iteritems():
626 # Monitor only the specified slices
627 if xid == root_xid or xid == default_xid: continue
628 if names and name not in names:
631 if (time.time() >= (slice.time + period)) or \
632 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
633 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
634 # Reset to defaults every 24 hours or if it appears
635 # that the byte counters have overflowed (or, more
636 # likely, the node was restarted or the HTB buckets
637 # were re-initialized).
638 slice.reset(kernelhtbs[xid], live[xid]['_rspec'])
640 logger.log("bwmon: Updating slice %s" % slice.name, 2)
642 slice.update(kernelhtbs[xid], live[xid]['_rspec'])
644 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
645 f = open(datafile, "w")
646 pickle.dump((version, slices, deaddb), f)
649 lock = threading.Event()
651 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
652 logger.log("bwmon: Thread started", 2)
655 logger.log("bwmon: Event received. Running.", 2)
656 database.db_lock.acquire()
657 nmdbcopy = copy.deepcopy(database.db)
658 database.db_lock.release()
660 except: logger.log_exc()
664 tools.as_daemon_thread(run)
666 def GetSlivers(*args):