3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 sys.path.append("/etc/planetlab")
37 from plc_config import *
39 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found")
40 PLC_NAME = "PlanetLab"
41 PLC_SLICE_PREFIX = "pl"
42 PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
43 PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
46 seconds_per_day = 24 * 60 * 60
52 datafile = "/var/lib/misc/bwmon.dat"
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
61 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
62 # 5.4 Gbyte per day max allowed transfered per recording period
63 default_MaxKByte = 5662310
64 default_ThreshKByte = int(.8 * default_MaxKByte)
65 # 16.4 Gbyte per day max allowed transfered per recording period to I2
66 default_Maxi2KByte = 17196646
67 default_Threshi2KByte = int(.8 * default_Maxi2KByte)
68 # Default share quanta
72 period = 1 * seconds_per_day
77 The slice %(slice)s has transmitted more than %(bytes)s from
78 %(hostname)s to %(class)s destinations
81 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
84 Please reduce the average %(class)s transmission rate
85 of the slice to %(limit)s per %(period)s.
91 %(date)s %(hostname)s bwcap %(slice)s
94 def format_bytes(bytes, si = True):
96 Formats bytes into a string
101 # Officially, a kibibyte
104 if bytes >= (kilo * kilo * kilo):
105 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
106 elif bytes >= 1000000:
107 return "%.1f MB" % (bytes / (kilo * kilo))
109 return "%.1f KB" % (bytes / kilo)
111 return "%.0f bytes" % bytes
113 def format_period(seconds):
115 Formats a period in seconds into a string
118 if seconds == (24 * 60 * 60):
120 elif seconds == (60 * 60):
122 elif seconds > (24 * 60 * 60):
123 return "%.1f days" % (seconds / 24. / 60. / 60.)
124 elif seconds > (60 * 60):
125 return "%.1f hours" % (seconds / 60. / 60.)
127 return "%.1f minutes" % (seconds / 60.)
129 return "%.0f seconds" % seconds
131 def slicemail(slice, subject, body):
133 Front end to sendmail. Sends email to slice alias with given subject and body.
136 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
138 # PLC has a separate list for pl_mom messages
139 if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
140 to = ["pl-mom@planet-lab.org"]
142 to = [PLC_MAIL_SUPPORT_ADDRESS]
144 if slice is not None and slice != "root":
145 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
147 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
149 'version': sys.version.split(" ")[0],
155 Content-type: text/plain
159 X-Mailer: Python/%(version)s
162 """.lstrip() % header)
172 Stores the last recorded bandwidth parameters of a slice.
174 xid - slice context/VServer ID
176 time - beginning of recording period in UNIX seconds
177 bytes - low bandwidth bytes transmitted at the beginning of the recording period
178 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
179 MaxKByte - total volume of data allowed
180 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
181 Maxi2KByte - same as MaxKByte, but for i2
182 Threshi2Kbyte - same as Threshi2KByte, but for i2
183 MaxRate - max_rate slice attribute.
184 Maxi2Rate - max_exempt_rate slice attribute.
185 Share - Used by Sirius to loan min rates
186 Sharei2 - Used by Sirius to loan min rates for i2
187 self.emailed - did slice recv email during this recording period
191 def __init__(self, xid, name, rspec):
197 self.MaxRate = default_MaxRate
198 self.MinRate = default_MinRate
199 self.Maxi2Rate = default_Maxi2Rate
200 self.Mini2Rate = default_Mini2Rate
201 self.MaxKByte = default_MaxKByte
202 self.ThreshKByte = default_ThreshKByte
203 self.Maxi2KByte = default_Maxi2KByte
204 self.Threshi2KByte = default_Threshi2KByte
205 self.Share = default_Share
206 self.Sharei2 = default_Share
209 self.updateSliceAttributes(rspec)
210 bwlimit.set(xid = self.xid,
211 minrate = self.MinRate * 1000,
212 maxrate = self.MaxRate * 1000,
213 maxexemptrate = self.Maxi2Rate * 1000,
214 minexemptrate = self.Mini2Rate * 1000,
220 def updateSliceAttributes(self, rspec):
222 Use respects from GetSlivers to PLC to populate slice object. Also
223 do some sanity checking.
226 # Sanity check plus policy decision for MinRate:
227 # Minrate cant be greater than 25% of MaxRate or NodeCap.
228 MinRate = int(rspec.get("net_min_rate", default_MinRate))
229 if MinRate > int(.25 * default_MaxRate):
230 MinRate = int(.25 * default_MaxRate)
231 if MinRate != self.MinRate:
232 self.MinRate = MinRate
233 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
235 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
236 if MaxRate != self.MaxRate:
237 self.MaxRate = MaxRate
238 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
240 Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate))
241 if Mini2Rate != self.Mini2Rate:
242 self.Mini2Rate = Mini2Rate
243 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
245 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
246 if Maxi2Rate != self.Maxi2Rate:
247 self.Maxi2Rate = Maxi2Rate
248 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
250 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
251 if MaxKByte != self.MaxKByte:
252 self.MaxKByte = MaxKByte
253 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
255 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
256 if Maxi2KByte != self.Maxi2KByte:
257 self.Maxi2KByte = Maxi2KByte
258 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
260 ThreshKByte = int(rspec.get('net_thresh_kbyte', default_ThreshKByte))
261 if ThreshKByte != self.ThreshKByte:
262 self.ThreshKByte = ThreshKByte
263 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
265 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', default_Threshi2KByte))
266 if Threshi2KByte != self.Threshi2KByte:
267 self.Threshi2KByte = Threshi2KByte
268 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
270 Share = int(rspec.get('net_share', default_Share))
271 if Share != self.Share:
273 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
275 Sharei2 = int(rspec.get('net_i2_share', default_Share))
276 if Sharei2 != self.Sharei2:
277 self.Sharei2 = Sharei2
278 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
281 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
283 Begin a new recording period. Remove caps by restoring limits
284 to their default values.
287 # Query Node Manager for max rate overrides
288 self.updateSliceAttributes(rspec)
290 # Reset baseline time
291 self.time = time.time()
293 # Reset baseline byte coutns
294 self.bytes = usedbytes
295 self.i2bytes = usedi2bytes
299 maxrate = self.MaxRate * 1000
300 maxi2rate = self.Maxi2Rate * 1000
302 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
303 logger.log("bwmon: %s reset to %s/%s" % \
305 bwlimit.format_tc_rate(maxrate),
306 bwlimit.format_tc_rate(maxi2rate)))
307 bwlimit.set(xid = self.xid,
308 minrate = self.MinRate * 1000,
309 maxrate = self.MaxRate * 1000,
310 maxexemptrate = self.Maxi2Rate * 1000,
311 minexemptrate = self.Mini2Rate * 1000,
314 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
316 Update byte counts and check if byte thresholds have been
317 exceeded. If exceeded, cap to remaining bytes in limit over remaining in period.
318 Recalculate every time module runs.
321 # Query Node Manager for max rate overrides
322 self.updateSliceAttributes(rspec)
324 # Prepare message parameters from the template
326 params = {'slice': self.name, 'hostname': socket.gethostname(),
327 'since': time.asctime(time.gmtime(self.time)) + " GMT",
328 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
329 'date': time.asctime(time.gmtime()) + " GMT",
330 'period': format_period(period)}
332 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
334 logger.log("bwmon: %s over thresh %s" \
335 % (self.name, format_bytes(self.ThreshKByte * 1024)))
336 sum = self.bytes + (self.ThreshKByte * 1024)
337 maxbyte = self.MaxKByte * 1024
338 bytesused = usedbytes - self.bytes
339 timeused = int(time.time() - self.time)
340 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
341 if new_maxrate < (self.MinRate * 1000):
342 new_maxrate = self.MinRate * 1000
344 new_maxrate = self.MaxRate * 1000
346 # Format template parameters for low bandwidth message
347 params['class'] = "low bandwidth"
348 params['bytes'] = format_bytes(usedbytes - self.bytes)
349 params['limit'] = format_bytes(self.MaxKByte * 1024)
350 params['thresh'] = format_bytes(self.ThreshKByte * 1024)
351 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
354 logger.log("bwmon: %(slice)s %(class)s " \
355 "%(bytes)s of %(limit)s max %(thresh)s thresh (%(new_maxrate)s/s maxrate)" % \
358 # Cap low bandwidth burst rate
359 if new_maxrate != runningmaxrate:
360 message += template % params
361 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
363 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
364 maxi2byte = self.Maxi2KByte * 1024
365 i2bytesused = usedi2bytes - self.i2bytes
366 timeused = int(time.time() - self.time)
367 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
368 if new_maxi2rate < (self.Mini2Rate * 1000):
369 new_maxi2rate = self.Mini2Rate * 1000
371 new_maxi2rate = self.Maxi2Rate * 1000
373 # Format template parameters for high bandwidth message
374 params['class'] = "high bandwidth"
375 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
376 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
377 params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
380 logger.log("bwmon: %(slice)s %(class)s " \
381 "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params)
383 # Cap high bandwidth burst rate
384 if new_maxi2rate != runningmaxi2rate:
385 message += template % params
386 logger.log("bwmon: %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params)
389 if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
390 bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
393 if message and self.emailed == False:
394 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
396 logger.log("bwmon: "+ subject)
397 logger.log("bwmon: "+ message + (footer % params))
400 slicemail(self.name, subject, message + (footer % params))
402 def gethtbs(root_xid, default_xid):
404 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
405 Turn off HTBs without names.
408 for params in bwlimit.get():
411 minexemptrate, maxexemptrate,
412 usedbytes, usedi2bytes) = params
414 name = bwlimit.get_slice(xid)
417 and (xid != root_xid) \
418 and (xid != default_xid):
419 # Orphaned (not associated with a slice) class
421 logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
424 livehtbs[xid] = {'share': share,
427 'maxexemptrate': maxexemptrate,
428 'minexemptrate': minexemptrate,
429 'usedbytes': usedbytes,
431 'usedi2bytes': usedi2bytes}
437 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
446 default_ThreshKByte,\
448 default_Threshi2KByte,\
454 # Incase the limits have changed.
455 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
456 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
458 # Incase default isn't set yet.
459 if default_MaxRate == -1:
460 default_MaxRate = 1000000
463 f = open(datafile, "r+")
464 logger.log("bwmon: Loading %s" % datafile)
465 (version, slices, deaddb) = pickle.load(f)
467 # Check version of data file
468 if version != "$Id$":
469 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
476 # Get/set special slice IDs
477 root_xid = bwlimit.get_xid("root")
478 default_xid = bwlimit.get_xid("default")
480 # Since root is required for sanity, its not in the API/plc database, so pass {}
482 if root_xid not in slices.keys():
483 slices[root_xid] = Slice(root_xid, "root", {})
484 slices[root_xid].reset(0, 0, 0, 0, {})
486 # Used by bwlimit. pass {} since there is no rspec (like above).
487 if default_xid not in slices.keys():
488 slices[default_xid] = Slice(default_xid, "default", {})
489 slices[default_xid].reset(0, 0, 0, 0, {})
492 # Get running slivers that should be on this node (from plc). {xid: name}
493 # db keys on name, bwmon keys on xid. db doesnt have xid either.
494 for plcSliver in nmdbcopy.keys():
495 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
497 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__())
498 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__())
500 # Get actual running values from tc.
501 # Update slice totals and bandwidth. {xid: {values}}
502 kernelhtbs = gethtbs(root_xid, default_xid)
503 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__())
505 # The dat file has HTBs for slices, but the HTBs aren't running
506 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
507 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__() )
509 for nohtbslice in nohtbslices:
510 if live.has_key(nohtbslice):
511 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
513 # The dat file doesnt have HTB for the slice, but slice is running and
515 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
516 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() )
517 for slicenodat in slicesnodat:
518 slices[slicenodat] = Slice(slicenodat,
519 live[slicenodat]['name'],
520 live[slicenodat]['_rspec'])
523 # Slices in GetSlivers but not running HTBs
524 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
525 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__())
528 for newslice in newslicesxids:
529 # Delegated slices dont have xids (which are uids) since they haven't been
531 if newslice != None and live[newslice].has_key('_rspec') == True:
532 if live[newslice]['name'] not in deaddb.keys():
533 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
534 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
535 # and made a dict of computed values.
536 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
537 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
538 # Double check time for dead slice in deaddb is within 24hr recording period.
539 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
540 deadslice = deaddb[live[newslice]['name']]
541 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
542 slices[newslice] = deadslice['slice']
543 slices[newslice].xid = newslice
545 slices[newslice].reset(deadslice['slice'].MaxRate,
546 deadslice['slice'].Maxi2Rate,
547 deadslice['htb']['usedbytes'],
548 deadslice['htb']['usedi2bytes'],
549 live[newslice]['_rspec'])
551 slices[newslice].update(deadslice['slice'].MaxRate,
552 deadslice['slice'].Maxi2Rate,
553 deadslice['htb']['usedbytes'],
554 deadslice['htb']['usedi2bytes'],
555 live[newslice]['_rspec'])
556 # Since the slice has been reinitialed, remove from dead database.
557 del deaddb[deadslice]
559 logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name'])
561 # Move dead slices that exist in the pickle file, but
562 # aren't instantiated by PLC into the dead dict until
563 # recording period is over. This is to avoid the case where a slice is dynamically created
564 # and destroyed then recreated to get around byte limits.
565 dead = Set(slices.keys()) - Set(live.keys())
566 logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2))
568 if xid == root_xid or xid == default_xid:
570 logger.log("bwmon: removing dead slice %s " % xid)
571 if slices.has_key(xid):
572 # add slice (by name) to deaddb
573 deaddb[slices[xid].name] = {'slice': slices[xid], 'htb': kernelhtbs[xid]}
575 if kernelhtbs.has_key(xid): bwlimit.off(xid)
578 for (deadslicexid, deadslice) in deaddb.iteritems():
579 if (time.time() >= (deadslice.time() + period)):
580 logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name)
581 del deaddb[deadslicexid]
583 # Get actual running values from tc since we've added and removed buckets.
584 # Update slice totals and bandwidth. {xid: {values}}
585 kernelhtbs = gethtbs(root_xid, default_xid)
586 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__())
588 for (xid, slice) in slices.iteritems():
589 # Monitor only the specified slices
590 if xid == root_xid or xid == default_xid: continue
591 if names and name not in names:
594 if (time.time() >= (slice.time + period)) or \
595 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
596 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
597 # Reset to defaults every 24 hours or if it appears
598 # that the byte counters have overflowed (or, more
599 # likely, the node was restarted or the HTB buckets
600 # were re-initialized).
601 slice.reset(kernelhtbs[xid]['maxrate'], \
602 kernelhtbs[xid]['maxexemptrate'], \
603 kernelhtbs[xid]['usedbytes'], \
604 kernelhtbs[xid]['usedi2bytes'], \
607 if debug: logger.log("bwmon: Updating slice %s" % slice.name)
609 slice.update(kernelhtbs[xid]['maxrate'], \
610 kernelhtbs[xid]['maxexemptrate'], \
611 kernelhtbs[xid]['usedbytes'], \
612 kernelhtbs[xid]['usedi2bytes'], \
615 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile))
616 f = open(datafile, "w")
617 pickle.dump((version, slices, deaddb), f)
620 lock = threading.Event()
622 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
623 if debug: logger.log("bwmon: Thread started")
626 if debug: logger.log("bwmon: Event received. Running.")
627 database.db_lock.acquire()
628 nmdbcopy = copy.deepcopy(database.db)
629 database.db_lock.release()
631 except: logger.log_exc()
635 tools.as_daemon_thread(run)
637 def GetSlivers(*args):