3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
18 # $Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $
37 sys.path.append("/etc/planetlab")
38 from plc_config import *
40 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found")
41 PLC_NAME = "PlanetLab"
42 PLC_SLICE_PREFIX = "pl"
43 PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
44 PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
47 seconds_per_day = 24 * 60 * 60
53 datafile = "/var/lib/misc/bwmon.dat"
56 # Burst to line rate (or node cap). Set by NM. in KBit/s
57 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
58 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
62 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
63 # 5.4 Gbyte per day max allowed transfered per recording period
64 default_MaxKByte = 5662310
65 default_ThreshKByte = int(.8 * default_MaxKByte)
66 # 16.4 Gbyte per day max allowed transfered per recording period to I2
67 default_Maxi2KByte = 17196646
68 default_Threshi2KByte = int(.8 * default_Maxi2KByte)
69 # Default share quanta
73 period = 1 * seconds_per_day
78 The slice %(slice)s has transmitted more than %(bytes)s from
79 %(hostname)s to %(class)s destinations
82 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
85 Please reduce the average %(class)s transmission rate
86 of the slice to %(limit)s per %(period)s.
92 %(date)s %(hostname)s bwcap %(slice)s
95 def format_bytes(bytes, si = True):
97 Formats bytes into a string
102 # Officially, a kibibyte
105 if bytes >= (kilo * kilo * kilo):
106 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
107 elif bytes >= 1000000:
108 return "%.1f MB" % (bytes / (kilo * kilo))
110 return "%.1f KB" % (bytes / kilo)
112 return "%.0f bytes" % bytes
114 def format_period(seconds):
116 Formats a period in seconds into a string
119 if seconds == (24 * 60 * 60):
121 elif seconds == (60 * 60):
123 elif seconds > (24 * 60 * 60):
124 return "%.1f days" % (seconds / 24. / 60. / 60.)
125 elif seconds > (60 * 60):
126 return "%.1f hours" % (seconds / 60. / 60.)
128 return "%.1f minutes" % (seconds / 60.)
130 return "%.0f seconds" % seconds
132 def slicemail(slice, subject, body):
133 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
135 # PLC has a separate list for pl_mom messages
136 if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
137 to = ["pl-mom@planet-lab.org"]
139 to = [PLC_MAIL_SUPPORT_ADDRESS]
141 if slice is not None and slice != "root":
142 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
144 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
146 'version': sys.version.split(" ")[0],
152 Content-type: text/plain
156 X-Mailer: Python/%(version)s
159 """.lstrip() % header)
169 Stores the last recorded bandwidth parameters of a slice.
171 xid - slice context/VServer ID
173 time - beginning of recording period in UNIX seconds
174 bytes - low bandwidth bytes transmitted at the beginning of the recording period
175 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
176 ByteMax - total volume of data allowed
177 ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
178 ExemptByteMax - Same as above, but for i2.
179 ExemptByteThresh - i2 ByteThresh
180 maxrate - max_rate slice attribute.
181 maxexemptrate - max_exempt_rate slice attribute.
182 self.emailed = did we email during this recording period
186 def __init__(self, xid, name, rspec):
192 self.MaxRate = default_MaxRate
193 self.MinRate = default_MinRate
194 self.Maxi2Rate = default_Maxi2Rate
195 self.Mini2Rate = default_Mini2Rate
196 self.MaxKByte = default_MaxKByte
197 self.ThreshKByte = default_ThreshKByte
198 self.Maxi2KByte = default_Maxi2KByte
199 self.Threshi2KByte = default_Threshi2KByte
200 self.Share = default_Share
201 self.Sharei2 = default_Share
204 self.updateSliceAttributes(rspec)
205 bwlimit.set(xid = self.xid,
206 minrate = self.MinRate * 1000,
207 maxrate = self.MaxRate * 1000,
208 maxexemptrate = self.Maxi2Rate * 1000,
209 minexemptrate = self.Mini2Rate * 1000,
215 def updateSliceAttributes(self, rspec):
218 # Sanity check plus policy decision for MinRate:
219 # Minrate cant be greater than 25% of MaxRate or NodeCap.
220 MinRate = int(rspec.get("net_min_rate", default_MinRate))
221 if MinRate > int(.25 * default_MaxRate):
222 MinRate = int(.25 * default_MaxRate)
223 if MinRate != self.MinRate:
224 self.MinRate = MinRate
225 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
227 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
228 if MaxRate != self.MaxRate:
229 self.MaxRate = MaxRate
230 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
232 Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate))
233 if Mini2Rate != self.Mini2Rate:
234 self.Mini2Rate = Mini2Rate
235 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
237 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
238 if Maxi2Rate != self.Maxi2Rate:
239 self.Maxi2Rate = Maxi2Rate
240 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
242 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
243 if MaxKByte != self.MaxKByte:
244 self.MaxKByte = MaxKByte
245 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
247 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
248 if Maxi2KByte != self.Maxi2KByte:
249 self.Maxi2KByte = Maxi2KByte
250 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
252 ThreshKByte = int(rspec.get('net_thresh_kbyte', default_ThreshKByte))
253 if ThreshKByte != self.ThreshKByte:
254 self.ThreshKByte = ThreshKByte
255 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
257 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', default_Threshi2KByte))
258 if Threshi2KByte != self.Threshi2KByte:
259 self.Threshi2KByte = Threshi2KByte
260 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
262 Share = int(rspec.get('net_share', default_Share))
263 if Share != self.Share:
265 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
267 Sharei2 = int(rspec.get('net_i2_share', default_Share))
268 if Sharei2 != self.Sharei2:
269 self.Sharei2 = Sharei2
270 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
273 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
275 Begin a new recording period. Remove caps by restoring limits
276 to their default values.
279 # Query Node Manager for max rate overrides
280 self.updateSliceAttributes(rspec)
282 # Reset baseline time
283 self.time = time.time()
285 # Reset baseline byte coutns
286 self.bytes = usedbytes
287 self.i2bytes = usedi2bytes
291 maxrate = self.MaxRate * 1000
292 maxi2rate = self.Maxi2Rate * 1000
294 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
295 logger.log("bwmon: %s reset to %s/%s" % \
297 bwlimit.format_tc_rate(maxrate),
298 bwlimit.format_tc_rate(maxi2rate)))
299 bwlimit.set(xid = self.xid,
300 minrate = self.MinRate * 1000,
301 maxrate = self.MaxRate * 1000,
302 maxexemptrate = self.Maxi2Rate * 1000,
303 minexemptrate = self.Mini2Rate * 1000,
306 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
308 Update byte counts and check if byte limits have been
312 # Query Node Manager for max rate overrides
313 self.updateSliceAttributes(rspec)
315 # Prepare message parameters from the template
317 params = {'slice': self.name, 'hostname': socket.gethostname(),
318 'since': time.asctime(time.gmtime(self.time)) + " GMT",
319 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
320 'date': time.asctime(time.gmtime()) + " GMT",
321 'period': format_period(period)}
323 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
325 logger.log("bwmon: %s over thresh %s" \
326 % (self.name, format_bytes(self.ThreshKByte * 1024)))
327 sum = self.bytes + (self.ThreshKByte * 1024)
328 maxbyte = self.MaxKByte * 1024
329 bytesused = usedbytes - self.bytes
330 timeused = int(time.time() - self.time)
331 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
332 if new_maxrate < (self.MinRate * 1000):
333 new_maxrate = self.MinRate * 1000
335 new_maxrate = self.MaxRate * 1000
337 # Format template parameters for low bandwidth message
338 params['class'] = "low bandwidth"
339 params['bytes'] = format_bytes(usedbytes - self.bytes)
340 params['limit'] = format_bytes(self.MaxKByte * 1024)
341 params['thresh'] = format_bytes(self.ThreshKByte * 1024)
342 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
345 logger.log("bwmon: %(slice)s %(class)s " \
346 "%(bytes)s of %(limit)s max %(thresh)s thresh (%(new_maxrate)s/s maxrate)" % \
349 # Cap low bandwidth burst rate
350 if new_maxrate != runningmaxrate:
351 message += template % params
352 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
354 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
355 maxi2byte = self.Maxi2KByte * 1024
356 i2bytesused = usedi2bytes - self.i2bytes
357 timeused = int(time.time() - self.time)
358 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
359 if new_maxi2rate < (self.Mini2Rate * 1000):
360 new_maxi2rate = self.Mini2Rate * 1000
362 new_maxi2rate = self.Maxi2Rate * 1000
364 # Format template parameters for high bandwidth message
365 params['class'] = "high bandwidth"
366 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
367 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
368 params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
371 logger.log("bwmon: %(slice)s %(class)s " \
372 "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params)
374 # Cap high bandwidth burst rate
375 if new_maxi2rate != runningmaxi2rate:
376 message += template % params
377 logger.log("bwmon: %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params)
380 if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
381 bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
384 if message and self.emailed == False:
385 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
387 logger.log("bwmon: "+ subject)
388 logger.log("bwmon: "+ message + (footer % params))
391 slicemail(self.name, subject, message + (footer % params))
393 def gethtbs(root_xid, default_xid):
395 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
396 Turn off HTBs without names.
399 for params in bwlimit.get():
402 minexemptrate, maxexemptrate,
403 usedbytes, usedi2bytes) = params
405 name = bwlimit.get_slice(xid)
408 and (xid != root_xid) \
409 and (xid != default_xid):
410 # Orphaned (not associated with a slice) class
412 logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
415 livehtbs[xid] = {'share': share,
418 'maxexemptrate': maxexemptrate,
419 'minexemptrate': minexemptrate,
420 'usedbytes': usedbytes,
422 'usedi2bytes': usedi2bytes}
428 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
437 default_ThreshKByte,\
439 default_Threshi2KByte,\
445 # Incase the limits have changed.
446 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
447 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
449 # Incase default isn't set yet.
450 if default_MaxRate == -1:
451 default_MaxRate = 1000000
454 f = open(datafile, "r+")
455 logger.log("bwmon: Loading %s" % datafile)
456 (version, slices) = pickle.load(f)
458 # Check version of data file
459 if version != "$Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $":
460 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
463 version = "$Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $"
466 # Get/set special slice IDs
467 root_xid = bwlimit.get_xid("root")
468 default_xid = bwlimit.get_xid("default")
470 # Since root is required for sanity, its not in the API/plc database, so pass {}
472 if root_xid not in slices.keys():
473 slices[root_xid] = Slice(root_xid, "root", {})
474 slices[root_xid].reset(0, 0, 0, 0, {})
476 # Used by bwlimit. pass {} since there is no rspec (like above).
477 if default_xid not in slices.keys():
478 slices[default_xid] = Slice(default_xid, "default", {})
479 slices[default_xid].reset(0, 0, 0, 0, {})
482 # Get running slivers that should be on this node (from plc). {xid: name}
483 # db keys on name, bwmon keys on xid. db doesnt have xid either.
484 for plcSliver in nmdbcopy.keys():
485 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
487 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__())
488 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__())
490 # Get actual running values from tc.
491 # Update slice totals and bandwidth. {xid: {values}}
492 livehtbs = gethtbs(root_xid, default_xid)
493 logger.log("bwmon: Found %s running HTBs" % livehtbs.keys().__len__())
496 # live.xids - runing(slices).xids = new.xids
497 #newslicesxids = Set(live.keys()) - Set(slices.keys())
498 newslicesxids = Set(live.keys()) - Set(livehtbs.keys())
499 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__())
501 # Incase we rebooted and need to keep track of already running htbs
502 norecxids = Set(livehtbs.keys()) - Set(slices.keys())
503 logger.log("bwmon: Found %s slices that have htbs but not in dat." % norecxids.__len__())
505 for norecxid in norecxids:
506 slices[norecxid] = Slice(norecxid, live[norecxid]['name'], live[norecxid]['_rspec'])
507 slices[norecxid].reset(livehtbs[norecxid]['maxrate'],
508 livehtbs[norecxid]['maxexemptrate'],
509 livehtbs[norecxid]['usedbytes'],
510 livehtbs[norecxid]['usedi2bytes'],
511 live[norecxid]['_rspec'])
514 for newslice in newslicesxids:
515 # Delegated slices dont have xids (which are uids) since they haven't been
517 if newslice != None and live[newslice].has_key('_rspec') == True:
518 logger.log("bwmon: New Slice %s" % live[newslice]['name'])
519 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
520 # and made a dict of computed values.
521 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
522 slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec'])
524 logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name'])
526 # Delete dead slices.
527 # First delete dead slices that exist in the pickle file, but
528 # aren't instantiated by PLC.
529 dead = Set(slices.keys()) - Set(live.keys())
530 logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2))
532 if xid == root_xid or xid == default_xid:
534 logger.log("bwmon: removing dead slice %s " % xid)
535 if slices.has_key(xid): del slices[xid]
536 if livehtbs.has_key(xid): bwlimit.off(xid)
538 # Get actual running values from tc since we've added and removed buckets.
539 # Update slice totals and bandwidth. {xid: {values}}
540 livehtbs = gethtbs(root_xid, default_xid)
541 logger.log("bwmon: now %s running HTBs" % livehtbs.keys().__len__())
543 for (xid, slice) in slices.iteritems():
544 # Monitor only the specified slices
545 if xid == root_xid or xid == default_xid: continue
546 if names and name not in names:
549 if (time.time() >= (slice.time + period)) or \
550 (livehtbs[xid]['usedbytes'] < slice.bytes) or \
551 (livehtbs[xid]['usedi2bytes'] < slice.i2bytes):
552 # Reset to defaults every 24 hours or if it appears
553 # that the byte counters have overflowed (or, more
554 # likely, the node was restarted or the HTB buckets
555 # were re-initialized).
556 slice.reset(livehtbs[xid]['maxrate'], \
557 livehtbs[xid]['maxexemptrate'], \
558 livehtbs[xid]['usedbytes'], \
559 livehtbs[xid]['usedi2bytes'], \
562 if debug: logger.log("bwmon: Updating slice %s" % slice.name)
564 slice.update(livehtbs[xid]['maxrate'], \
565 livehtbs[xid]['maxexemptrate'], \
566 livehtbs[xid]['usedbytes'], \
567 livehtbs[xid]['usedi2bytes'], \
570 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile))
571 f = open(datafile, "w")
572 pickle.dump((version, slices), f)
575 lock = threading.Event()
577 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
578 if debug: logger.log("bwmon: Thread started")
581 if debug: logger.log("bwmon: Event received. Running.")
582 database.db_lock.acquire()
583 nmdbcopy = copy.deepcopy(database.db)
584 database.db_lock.release()
586 except: logger.log_exc()
590 tools.as_daemon_thread(run)
592 def GetSlivers(*args):