3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 # Set DEBUG to True if you don't want to send emails
38 # Set ENABLE to False to setup buckets, but not limit.
41 datafile = "/var/lib/misc/bwmon.dat"
44 sys.path.append("/etc/planetlab")
45 from plc_config import *
48 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
49 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
52 seconds_per_day = 24 * 60 * 60
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
58 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
59 # 5.4 Gbyte per day max allowed transfered per recording period
60 default_MaxKByte = 5662310
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_Maxi2KByte = 17196646
63 # Default share quanta
67 period = 1 * seconds_per_day
72 The slice %(slice)s has transmitted more than %(bytes)s from
73 %(hostname)s to %(class)s destinations
76 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
79 Please reduce the average %(class)s transmission rate
80 of the slice to %(limit)s per %(period)s.
86 %(date)s %(hostname)s bwcap %(slice)s
89 def format_bytes(bytes, si = True):
91 Formats bytes into a string
96 # Officially, a kibibyte
99 if bytes >= (kilo * kilo * kilo):
100 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
101 elif bytes >= 1000000:
102 return "%.1f MB" % (bytes / (kilo * kilo))
104 return "%.1f KB" % (bytes / kilo)
106 return "%.0f bytes" % bytes
108 def format_period(seconds):
110 Formats a period in seconds into a string
113 if seconds == (24 * 60 * 60):
115 elif seconds == (60 * 60):
117 elif seconds > (24 * 60 * 60):
118 return "%.1f days" % (seconds / 24. / 60. / 60.)
119 elif seconds > (60 * 60):
120 return "%.1f hours" % (seconds / 60. / 60.)
122 return "%.1f minutes" % (seconds / 60.)
124 return "%.0f seconds" % seconds
126 def slicemail(slice, subject, body):
128 Front end to sendmail. Sends email to slice alias with given subject and body.
131 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
133 # Parsed from MyPLC config
134 to = [PLC_MAIL_MOM_LIST_ADDRESS]
136 if slice is not None and slice != "root":
137 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
139 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
141 'version': sys.version.split(" ")[0],
147 Content-type: text/plain
151 X-Mailer: Python/%(version)s
154 """.lstrip() % header)
164 Stores the last recorded bandwidth parameters of a slice.
166 xid - slice context/VServer ID
168 time - beginning of recording period in UNIX seconds
169 bytes - low bandwidth bytes transmitted at the beginning of the recording period
170 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
171 MaxKByte - total volume of data allowed
172 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
173 Maxi2KByte - same as MaxKByte, but for i2
174 Threshi2Kbyte - same as Threshi2KByte, but for i2
175 MaxRate - max_rate slice attribute.
176 Maxi2Rate - max_exempt_rate slice attribute.
177 Share - Used by Sirius to loan min rates
178 Sharei2 - Used by Sirius to loan min rates for i2
179 self.emailed - did slice recv email during this recording period
183 def __init__(self, xid, name, rspec):
189 self.MaxRate = default_MaxRate
190 self.MinRate = bwlimit.bwmin / 1000
191 self.Maxi2Rate = default_Maxi2Rate
192 self.Mini2Rate = bwlimit.bwmin / 1000
193 self.MaxKByte = default_MaxKByte
194 self.ThreshKByte = int(.8 * self.MaxKByte)
195 self.Maxi2KByte = default_Maxi2KByte
196 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
197 self.Share = default_Share
198 self.Sharei2 = default_Share
202 self.updateSliceAttributes(rspec)
203 bwlimit.set(xid = self.xid,
204 minrate = self.MinRate * 1000,
205 maxrate = self.MaxRate * 1000,
206 maxexemptrate = self.Maxi2Rate * 1000,
207 minexemptrate = self.Mini2Rate * 1000,
213 def updateSliceAttributes(self, rspec):
215 Use respects from GetSlivers to PLC to populate slice object. Also
216 do some sanity checking.
219 # Sanity check plus policy decision for MinRate:
220 # Minrate cant be greater than 25% of MaxRate or NodeCap.
221 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
222 if MinRate > int(.25 * default_MaxRate):
223 MinRate = int(.25 * default_MaxRate)
224 if MinRate != self.MinRate:
225 self.MinRate = MinRate
226 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
228 MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
229 if MaxRate != self.MaxRate:
230 self.MaxRate = MaxRate
231 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
233 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
234 if Mini2Rate != self.Mini2Rate:
235 self.Mini2Rate = Mini2Rate
236 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
238 Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
239 if Maxi2Rate != self.Maxi2Rate:
240 self.Maxi2Rate = Maxi2Rate
241 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
243 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
244 if MaxKByte != self.MaxKByte:
245 self.MaxKByte = MaxKByte
246 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
248 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
249 if Maxi2KByte != self.Maxi2KByte:
250 self.Maxi2KByte = Maxi2KByte
251 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
253 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
254 if ThreshKByte != self.ThreshKByte:
255 self.ThreshKByte = ThreshKByte
256 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
258 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
259 if Threshi2KByte != self.Threshi2KByte:
260 self.Threshi2KByte = Threshi2KByte
261 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
263 Share = int(rspec.get('net_share', default_Share))
264 if Share != self.Share:
266 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
268 Sharei2 = int(rspec.get('net_i2_share', default_Share))
269 if Sharei2 != self.Sharei2:
270 self.Sharei2 = Sharei2
271 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
274 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
276 Begin a new recording period. Remove caps by restoring limits
277 to their default values.
279 # Query Node Manager for max rate overrides
280 self.updateSliceAttributes(rspec)
282 # Reset baseline time
283 self.time = time.time()
285 # Reset baseline byte coutns
286 self.bytes = usedbytes
287 self.i2bytes = usedi2bytes
294 maxrate = self.MaxRate * 1000
295 maxi2rate = self.Maxi2Rate * 1000
296 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
297 logger.log("bwmon: %s reset to %s/%s" % \
299 bwlimit.format_tc_rate(maxrate),
300 bwlimit.format_tc_rate(maxi2rate)), 1)
301 bwlimit.set(xid = self.xid,
302 minrate = self.MinRate * 1000,
303 maxrate = self.MaxRate * 1000,
304 maxexemptrate = self.Maxi2Rate * 1000,
305 minexemptrate = self.Mini2Rate * 1000,
308 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
310 Notify the slice it's being capped.
312 # Prepare message parameters from the template
314 params = {'slice': self.name, 'hostname': socket.gethostname(),
315 'since': time.asctime(time.gmtime(self.time)) + " GMT",
316 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
317 'date': time.asctime(time.gmtime()) + " GMT",
318 'period': format_period(period)}
320 if new_maxrate != (self.MaxRate * 1000):
321 # Format template parameters for low bandwidth message
322 params['class'] = "low bandwidth"
323 params['bytes'] = format_bytes(usedbytes - self.bytes)
324 params['limit'] = format_bytes(self.MaxKByte * 1024)
325 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
327 # Cap low bandwidth burst rate
328 message += template % params
329 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
331 if new_maxexemptrate != (self.Maxi2Rate * 1000):
332 # Format template parameters for high bandwidth message
333 params['class'] = "high bandwidth"
334 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
335 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
336 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
338 message += template % params
339 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
342 if self.emailed == False:
343 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
345 logger.log("bwmon: "+ subject)
346 logger.log("bwmon: "+ message + (footer % params))
349 logger.log("bwmon: Emailing %s" % self.name)
350 slicemail(self.name, subject, message + (footer % params))
353 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
355 Update byte counts and check if byte thresholds have been
356 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
357 Recalculate every time module runs.
360 # copy self.Min* and self.*share values for comparison later.
361 runningMinRate = self.MinRate
362 runningMini2Rate = self.Mini2Rate
363 runningshare = self.Share
364 runningsharei2 = self.Sharei2
366 # Query Node Manager for max rate overrides
367 self.updateSliceAttributes(rspec)
370 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
371 sum = self.bytes + (self.ThreshKByte * 1024)
372 maxbyte = self.MaxKByte * 1024
373 bytesused = usedbytes - self.bytes
374 timeused = int(time.time() - self.time)
375 # Calcuate new rate. in bit/s
376 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
377 # Never go under MinRate
378 if new_maxrate < (self.MinRate * 1000):
379 new_maxrate = self.MinRate * 1000
380 # State information. I'm capped.
384 new_maxrate = self.MaxRate * 1000
387 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
388 maxi2byte = self.Maxi2KByte * 1024
389 i2bytesused = usedi2bytes - self.i2bytes
390 timeused = int(time.time() - self.time)
392 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
393 # Never go under MinRate
394 if new_maxi2rate < (self.Mini2Rate * 1000):
395 new_maxi2rate = self.Mini2Rate * 1000
396 # State information. I'm capped.
400 new_maxi2rate = self.Maxi2Rate * 1000
403 # Check running values against newly calculated values so as not to run tc
405 if (runningmaxrate != new_maxrate) or \
406 (runningMinRate != self.MinRate) or \
407 (runningmaxi2rate != new_maxi2rate) or \
408 (runningMini2Rate != self.Mini2Rate) or \
409 (runningshare != self.Share):
411 bwlimit.set(xid = self.xid,
412 minrate = self.MinRate * 1000,
413 maxrate = new_maxrate,
414 minexemptrate = self.Mini2Rate * 1000,
415 maxexemptrate = new_maxi2rate,
419 if self.capped == True:
420 self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
423 def gethtbs(root_xid, default_xid):
425 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
426 Turn off HTBs without names.
429 for params in bwlimit.get():
432 minexemptrate, maxexemptrate,
433 usedbytes, usedi2bytes) = params
435 name = bwlimit.get_slice(xid)
438 and (xid != root_xid) \
439 and (xid != default_xid):
440 # Orphaned (not associated with a slice) class
442 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
445 livehtbs[xid] = {'share': share,
448 'maxexemptrate': maxexemptrate,
449 'minexemptrate': minexemptrate,
450 'usedbytes': usedbytes,
452 'usedi2bytes': usedi2bytes}
458 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
471 # Incase the limits have changed.
472 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
473 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
475 # Incase default isn't set yet.
476 if default_MaxRate == -1:
477 default_MaxRate = 10000000
480 f = open(datafile, "r+")
481 logger.log("bwmon: Loading %s" % datafile, 2)
482 (version, slices, deaddb) = pickle.load(f)
484 # Check version of data file
485 if version != "$Id$":
486 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
493 # Get/set special slice IDs
494 root_xid = bwlimit.get_xid("root")
495 default_xid = bwlimit.get_xid("default")
497 # Since root is required for sanity, its not in the API/plc database, so pass {}
499 if root_xid not in slices.keys():
500 slices[root_xid] = Slice(root_xid, "root", {})
501 slices[root_xid].reset(0, 0, 0, 0, {})
503 # Used by bwlimit. pass {} since there is no rspec (like above).
504 if default_xid not in slices.keys():
505 slices[default_xid] = Slice(default_xid, "default", {})
506 slices[default_xid].reset(0, 0, 0, 0, {})
509 # Get running slivers that should be on this node (from plc). {xid: name}
510 # db keys on name, bwmon keys on xid. db doesnt have xid either.
511 for plcSliver in nmdbcopy.keys():
512 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
514 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
515 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
517 # Get actual running values from tc.
518 # Update slice totals and bandwidth. {xid: {values}}
519 kernelhtbs = gethtbs(root_xid, default_xid)
520 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
522 # The dat file has HTBs for slices, but the HTBs aren't running
523 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
524 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
526 for nohtbslice in nohtbslices:
527 if live.has_key(nohtbslice):
528 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
530 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
531 del slices[nohtbslice]
533 # The dat file doesnt have HTB for the slice but kern has HTB
534 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
535 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
536 for slicenodat in slicesnodat:
537 # But slice is running
538 if live.has_key(slicenodat):
539 # init the slice. which means start accounting over since kernel
540 # htb was already there.
541 slices[slicenodat] = Slice(slicenodat,
542 live[slicenodat]['name'],
543 live[slicenodat]['_rspec'])
546 # Slices in GetSlivers but not running HTBs
547 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
548 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
551 for newslice in newslicesxids:
552 # Delegated slices dont have xids (which are uids) since they haven't been
554 if newslice != None and live[newslice].has_key('_rspec') == True:
555 # Check to see if we recently deleted this slice.
556 if live[newslice]['name'] not in deaddb.keys():
557 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
558 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
559 # and made a dict of computed values.
560 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
561 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
562 # Double check time for dead slice in deaddb is within 24hr recording period.
563 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
564 deadslice = deaddb[live[newslice]['name']]
565 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
566 slices[newslice] = deadslice['slice']
567 slices[newslice].xid = newslice
569 slices[newslice].reset(deadslice['slice'].MaxRate,
570 deadslice['slice'].Maxi2Rate,
571 deadslice['htb']['usedbytes'],
572 deadslice['htb']['usedi2bytes'],
573 live[newslice]['_rspec'])
575 slices[newslice].update(deadslice['slice'].MaxRate,
576 deadslice['slice'].Maxi2Rate,
577 deadslice['htb']['usedbytes'],
578 deadslice['htb']['usedi2bytes'],
579 deadslice['htb']['share'],
580 live[newslice]['_rspec'])
581 # Since the slice has been reinitialed, remove from dead database.
582 del deaddb[deadslice['slice'].name]
584 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
586 # Move dead slices that exist in the pickle file, but
587 # aren't instantiated by PLC into the dead dict until
588 # recording period is over. This is to avoid the case where a slice is dynamically created
589 # and destroyed then recreated to get around byte limits.
590 deadxids = Set(slices.keys()) - Set(live.keys())
591 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
592 for deadxid in deadxids:
593 if deadxid == root_xid or deadxid == default_xid:
595 logger.log("bwmon: removing dead slice %s " % deadxid)
596 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
597 # add slice (by name) to deaddb
598 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
599 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
601 if kernelhtbs.has_key(deadxid):
602 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
606 for deadslice in deaddb.keys():
607 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
608 logger.log("bwmon: Removing dead slice %s from dat." \
609 % deaddb[deadslice]['slice'].name)
610 del deaddb[deadslice]
612 # Get actual running values from tc since we've added and removed buckets.
613 # Update slice totals and bandwidth. {xid: {values}}
614 kernelhtbs = gethtbs(root_xid, default_xid)
615 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
617 # Update all byte limites on all slices
618 for (xid, slice) in slices.iteritems():
619 # Monitor only the specified slices
620 if xid == root_xid or xid == default_xid: continue
621 if names and name not in names:
624 if (time.time() >= (slice.time + period)) or \
625 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
626 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
627 # Reset to defaults every 24 hours or if it appears
628 # that the byte counters have overflowed (or, more
629 # likely, the node was restarted or the HTB buckets
630 # were re-initialized).
631 slice.reset(kernelhtbs[xid]['maxrate'], \
632 kernelhtbs[xid]['maxexemptrate'], \
633 kernelhtbs[xid]['usedbytes'], \
634 kernelhtbs[xid]['usedi2bytes'], \
637 logger.log("bwmon: Updating slice %s" % slice.name, 2)
639 slice.update(kernelhtbs[xid]['maxrate'], \
640 kernelhtbs[xid]['maxexemptrate'], \
641 kernelhtbs[xid]['usedbytes'], \
642 kernelhtbs[xid]['usedi2bytes'], \
643 kernelhtbs[xid]['share'],
646 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
647 f = open(datafile, "w")
648 pickle.dump((version, slices, deaddb), f)
652 def getDefaults(nmdbcopy):
654 Get defaults from default slice's slice attributes.
658 dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default")
660 if dfltslice['rspec']['net_max_rate'] == -1:
668 Turn off all slice HTBs
670 # Get/set special slice IDs
671 root_xid = bwlimit.get_xid("root")
672 default_xid = bwlimit.get_xid("default")
673 kernelhtbs = gethtbs(root_xid, default_xid)
675 logger.log("bwlimit: Disabling all running HTBs.")
676 for htb in kernelhtbs.keys(): bwlimit.off(htb)
679 lock = threading.Event()
682 When run as a thread, wait for event, lock db, deep copy it, release it,
683 run bwmon.GetSlivers(), then go back to waiting.
685 logger.log("bwmon: Thread started", 2)
688 logger.log("bwmon: Event received. Running.", 2)
689 database.db_lock.acquire()
690 nmdbcopy = copy.deepcopy(database.db)
691 database.db_lock.release()
693 if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev eth0")) > 0:
694 # class show to check if net:InitNodeLimit:bwlimit.init has run.
696 else: logger.log("bwmon: BW limits DISABLED.")
697 except: logger.log_exc()
701 tools.as_daemon_thread(run)
703 def GetSlivers(*args):