3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 sys.path.append("/etc/planetlab")
37 from plc_config import *
39 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found")
40 PLC_NAME = "PlanetLab"
41 PLC_SLICE_PREFIX = "pl"
42 PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
43 PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
46 seconds_per_day = 24 * 60 * 60
52 datafile = "/var/lib/misc/bwmon.dat"
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
61 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
62 # 5.4 Gbyte per day max allowed transfered per recording period
63 default_MaxKByte = 5662310
64 # 16.4 Gbyte per day max allowed transfered per recording period to I2
65 default_Maxi2KByte = 17196646
66 # Default share quanta
70 period = 1 * seconds_per_day
75 The slice %(slice)s has transmitted more than %(bytes)s from
76 %(hostname)s to %(class)s destinations
79 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
82 Please reduce the average %(class)s transmission rate
83 of the slice to %(limit)s per %(period)s.
89 %(date)s %(hostname)s bwcap %(slice)s
92 def format_bytes(bytes, si = True):
94 Formats bytes into a string
99 # Officially, a kibibyte
102 if bytes >= (kilo * kilo * kilo):
103 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
104 elif bytes >= 1000000:
105 return "%.1f MB" % (bytes / (kilo * kilo))
107 return "%.1f KB" % (bytes / kilo)
109 return "%.0f bytes" % bytes
111 def format_period(seconds):
113 Formats a period in seconds into a string
116 if seconds == (24 * 60 * 60):
118 elif seconds == (60 * 60):
120 elif seconds > (24 * 60 * 60):
121 return "%.1f days" % (seconds / 24. / 60. / 60.)
122 elif seconds > (60 * 60):
123 return "%.1f hours" % (seconds / 60. / 60.)
125 return "%.1f minutes" % (seconds / 60.)
127 return "%.0f seconds" % seconds
129 def slicemail(slice, subject, body):
131 Front end to sendmail. Sends email to slice alias with given subject and body.
134 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
136 # PLC has a separate list for pl_mom messages
137 if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
138 to = ["pl-mom@planet-lab.org"]
140 to = [PLC_MAIL_SUPPORT_ADDRESS]
142 if slice is not None and slice != "root":
143 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
145 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
147 'version': sys.version.split(" ")[0],
153 Content-type: text/plain
157 X-Mailer: Python/%(version)s
160 """.lstrip() % header)
170 Stores the last recorded bandwidth parameters of a slice.
172 xid - slice context/VServer ID
174 time - beginning of recording period in UNIX seconds
175 bytes - low bandwidth bytes transmitted at the beginning of the recording period
176 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
177 MaxKByte - total volume of data allowed
178 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
179 Maxi2KByte - same as MaxKByte, but for i2
180 Threshi2Kbyte - same as Threshi2KByte, but for i2
181 MaxRate - max_rate slice attribute.
182 Maxi2Rate - max_exempt_rate slice attribute.
183 Share - Used by Sirius to loan min rates
184 Sharei2 - Used by Sirius to loan min rates for i2
185 self.emailed - did slice recv email during this recording period
189 def __init__(self, xid, name, rspec):
195 self.MaxRate = default_MaxRate
196 self.MinRate = default_MinRate
197 self.Maxi2Rate = default_Maxi2Rate
198 self.Mini2Rate = default_Mini2Rate
199 self.MaxKByte = default_MaxKByte
200 self.ThreshKByte = default_ThreshKByte
201 self.Maxi2KByte = default_Maxi2KByte
202 self.Threshi2KByte = default_Threshi2KByte
203 self.Share = default_Share
204 self.Sharei2 = default_Share
208 self.updateSliceAttributes(rspec)
209 bwlimit.set(xid = self.xid,
210 minrate = self.MinRate * 1000,
211 maxrate = self.MaxRate * 1000,
212 maxexemptrate = self.Maxi2Rate * 1000,
213 minexemptrate = self.Mini2Rate * 1000,
219 def updateSliceAttributes(self, rspec):
221 Use respects from GetSlivers to PLC to populate slice object. Also
222 do some sanity checking.
225 # Sanity check plus policy decision for MinRate:
226 # Minrate cant be greater than 25% of MaxRate or NodeCap.
227 MinRate = int(rspec.get("net_min_rate", default_MinRate))
228 if MinRate > int(.25 * default_MaxRate):
229 MinRate = int(.25 * default_MaxRate)
230 if MinRate != self.MinRate:
231 self.MinRate = MinRate
232 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
234 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
235 if MaxRate != self.MaxRate:
236 self.MaxRate = MaxRate
237 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
239 Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate))
240 if Mini2Rate != self.Mini2Rate:
241 self.Mini2Rate = Mini2Rate
242 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
244 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
245 if Maxi2Rate != self.Maxi2Rate:
246 self.Maxi2Rate = Maxi2Rate
247 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
249 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
250 if MaxKByte != self.MaxKByte:
251 self.MaxKByte = MaxKByte
252 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
254 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
255 if Maxi2KByte != self.Maxi2KByte:
256 self.Maxi2KByte = Maxi2KByte
257 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
259 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
260 if ThreshKByte != self.ThreshKByte:
261 self.ThreshKByte = ThreshKByte
262 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
264 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
265 if Threshi2KByte != self.Threshi2KByte:
266 self.Threshi2KByte = Threshi2KByte
267 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
269 Share = int(rspec.get('net_share', default_Share))
270 if Share != self.Share:
272 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
274 Sharei2 = int(rspec.get('net_i2_share', default_Share))
275 if Sharei2 != self.Sharei2:
276 self.Sharei2 = Sharei2
277 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
280 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
282 Begin a new recording period. Remove caps by restoring limits
283 to their default values.
286 # Query Node Manager for max rate overrides
287 self.updateSliceAttributes(rspec)
289 # Reset baseline time
290 self.time = time.time()
292 # Reset baseline byte coutns
293 self.bytes = usedbytes
294 self.i2bytes = usedi2bytes
301 maxrate = self.MaxRate * 1000
302 maxi2rate = self.Maxi2Rate * 1000
303 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
304 logger.log("bwmon: %s reset to %s/%s" % \
306 bwlimit.format_tc_rate(maxrate),
307 bwlimit.format_tc_rate(maxi2rate)))
308 bwlimit.set(xid = self.xid,
309 minrate = self.MinRate * 1000,
310 maxrate = self.MaxRate * 1000,
311 maxexemptrate = self.Maxi2Rate * 1000,
312 minexemptrate = self.Mini2Rate * 1000,
315 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes)
317 Notify the slice it's being capped.
319 # Prepare message parameters from the template
321 params = {'slice': self.name, 'hostname': socket.gethostname(),
322 'since': time.asctime(time.gmtime(self.time)) + " GMT",
323 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
324 'date': time.asctime(time.gmtime()) + " GMT",
325 'period': format_period(period)}
326 if new_maxrate ! = self.MaxRate:
327 # Format template parameters for low bandwidth message
328 params['class'] = "low bandwidth"
329 params['bytes'] = format_bytes(usedbytes - self.bytes)
330 params['limit'] = format_bytes(self.MaxKByte * 1024)
331 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
333 # Cap low bandwidth burst rate
334 message += template % params
335 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
337 if new_maxexemptrate != self.Maxi2Rate:
338 # Format template parameters for high bandwidth message
339 params['class'] = "high bandwidth"
340 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
341 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
342 params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
344 message += template % params
345 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
348 if message and self.emailed == False:
349 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
351 logger.log("bwmon: "+ subject)
352 logger.log("bwmon: "+ message + (footer % params))
355 slicemail(self.name, subject, message + (footer % params))
358 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
360 Update byte counts and check if byte thresholds have been
361 exceeded. If exceeded, cap to remaining bytes in limit over remaining in period.
362 Recalculate every time module runs.
365 # Query Node Manager for max rate overrides
366 self.updateSliceAttributes(rspec)
368 # Prepare message parameters from the template
370 #params = {'slice': self.name, 'hostname': socket.gethostname(),
371 # 'since': time.asctime(time.gmtime(self.time)) + " GMT",
372 # 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
373 # 'date': time.asctime(time.gmtime()) + " GMT",
374 # 'period': format_period(period)}
377 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
378 sum = self.bytes + (self.ThreshKByte * 1024)
379 maxbyte = self.MaxKByte * 1024
380 bytesused = usedbytes - self.bytes
381 timeused = int(time.time() - self.time)
383 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
384 # Never go under MinRate
385 if new_maxrate < (self.MinRate * 1000):
386 new_maxrate = self.MinRate * 1000
387 # State information. I'm capped.
391 new_maxrate = self.MaxRate * 1000
394 ## Format template parameters for low bandwidth message
395 #params['class'] = "low bandwidth"
396 #params['bytes'] = format_bytes(usedbytes - self.bytes)
397 #params['limit'] = format_bytes(self.MaxKByte * 1024)
398 #params['thresh'] = format_bytes(self.ThreshKByte * 1024)
399 #params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
401 # Cap low bandwidth burst rate
402 #if new_maxrate != runningmaxrate:
403 # message += template % params
404 # logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
406 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
407 maxi2byte = self.Maxi2KByte * 1024
408 i2bytesused = usedi2bytes - self.i2bytes
409 timeused = int(time.time() - self.time)
411 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
412 # Never go under MinRate
413 if new_maxi2rate < (self.Mini2Rate * 1000):
414 new_maxi2rate = self.Mini2Rate * 1000
415 # State information. I'm capped.
419 new_maxi2rate = self.Maxi2Rate * 1000
422 # Format template parameters for high bandwidth message
423 #params['class'] = "high bandwidth"
424 #params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
425 #params['limit'] = format_bytes(self.Maxi2KByte * 1024)
426 #params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
428 # Cap high bandwidth burst rate
429 #if new_maxi2rate != runningmaxi2rate:
430 # message += template % params
431 # logger.log("bwmon: %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params)
434 if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
435 bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
438 if self.capped == True and self.emailed == False:
439 self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes)
440 # subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
442 # logger.log("bwmon: "+ subject)
443 # logger.log("bwmon: "+ message + (footer % params))
445 # self.emailed = True
446 # slicemail(self.name, subject, message + (footer % params))
448 def gethtbs(root_xid, default_xid):
450 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
451 Turn off HTBs without names.
454 for params in bwlimit.get():
457 minexemptrate, maxexemptrate,
458 usedbytes, usedi2bytes) = params
460 name = bwlimit.get_slice(xid)
463 and (xid != root_xid) \
464 and (xid != default_xid):
465 # Orphaned (not associated with a slice) class
467 logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
470 livehtbs[xid] = {'share': share,
473 'maxexemptrate': maxexemptrate,
474 'minexemptrate': minexemptrate,
475 'usedbytes': usedbytes,
477 'usedi2bytes': usedi2bytes}
483 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
492 default_ThreshKByte,\
494 default_Threshi2KByte,\
500 # Incase the limits have changed.
501 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
502 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
504 # Incase default isn't set yet.
505 if default_MaxRate == -1:
506 default_MaxRate = 1000000
509 f = open(datafile, "r+")
510 logger.log("bwmon: Loading %s" % datafile)
511 (version, slices, deaddb) = pickle.load(f)
513 # Check version of data file
514 if version != "$Id$":
515 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
522 # Get/set special slice IDs
523 root_xid = bwlimit.get_xid("root")
524 default_xid = bwlimit.get_xid("default")
526 # Since root is required for sanity, its not in the API/plc database, so pass {}
528 if root_xid not in slices.keys():
529 slices[root_xid] = Slice(root_xid, "root", {})
530 slices[root_xid].reset(0, 0, 0, 0, {})
532 # Used by bwlimit. pass {} since there is no rspec (like above).
533 if default_xid not in slices.keys():
534 slices[default_xid] = Slice(default_xid, "default", {})
535 slices[default_xid].reset(0, 0, 0, 0, {})
538 # Get running slivers that should be on this node (from plc). {xid: name}
539 # db keys on name, bwmon keys on xid. db doesnt have xid either.
540 for plcSliver in nmdbcopy.keys():
541 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
543 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__())
544 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__())
546 # Get actual running values from tc.
547 # Update slice totals and bandwidth. {xid: {values}}
548 kernelhtbs = gethtbs(root_xid, default_xid)
549 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__())
551 # The dat file has HTBs for slices, but the HTBs aren't running
552 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
553 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__() )
555 for nohtbslice in nohtbslices:
556 if live.has_key(nohtbslice):
557 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
559 # The dat file doesnt have HTB for the slice, but slice is running and
561 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
562 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() )
563 for slicenodat in slicesnodat:
564 slices[slicenodat] = Slice(slicenodat,
565 live[slicenodat]['name'],
566 live[slicenodat]['_rspec'])
569 # Slices in GetSlivers but not running HTBs
570 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
571 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__())
574 for newslice in newslicesxids:
575 # Delegated slices dont have xids (which are uids) since they haven't been
577 if newslice != None and live[newslice].has_key('_rspec') == True:
578 # Check to see if we recently deleted this slice.
579 if live[newslice]['name'] not in deaddb.keys():
580 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
581 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
582 # and made a dict of computed values.
583 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
584 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
585 # Double check time for dead slice in deaddb is within 24hr recording period.
586 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
587 deadslice = deaddb[live[newslice]['name']]
588 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
589 slices[newslice] = deadslice['slice']
590 slices[newslice].xid = newslice
592 slices[newslice].reset(deadslice['slice'].MaxRate,
593 deadslice['slice'].Maxi2Rate,
594 deadslice['htb']['usedbytes'],
595 deadslice['htb']['usedi2bytes'],
596 live[newslice]['_rspec'])
598 slices[newslice].update(deadslice['slice'].MaxRate,
599 deadslice['slice'].Maxi2Rate,
600 deadslice['htb']['usedbytes'],
601 deadslice['htb']['usedi2bytes'],
602 live[newslice]['_rspec'])
603 # Since the slice has been reinitialed, remove from dead database.
604 del deaddb[deadslice]
606 logger.log("bwmon Slice %s doesn't have xid. Must be delegated."\
607 "Skipping." % live[newslice]['name'])
609 # Move dead slices that exist in the pickle file, but
610 # aren't instantiated by PLC into the dead dict until
611 # recording period is over. This is to avoid the case where a slice is dynamically created
612 # and destroyed then recreated to get around byte limits.
613 deadxids = Set(slices.keys()) - Set(live.keys())
614 logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2))
615 for deadxid in deadxids:
616 if deadxid == root_xid or deadxid == default_xid:
618 logger.log("bwmon: removing dead slice %s " % deadxid)
619 if slices.has_key(deadxid):
620 # add slice (by name) to deaddb
621 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
623 if kernelhtbs.has_key(deadxid):
627 for (deadslice, deadhtb) in deaddb.iteritems():
628 if (time.time() >= (deadslice.time() + period)):
629 logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name)
630 del deaddb[deadslice.name]
632 # Get actual running values from tc since we've added and removed buckets.
633 # Update slice totals and bandwidth. {xid: {values}}
634 kernelhtbs = gethtbs(root_xid, default_xid)
635 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__())
637 for (xid, slice) in slices.iteritems():
638 # Monitor only the specified slices
639 if xid == root_xid or xid == default_xid: continue
640 if names and name not in names:
643 if (time.time() >= (slice.time + period)) or \
644 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
645 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
646 # Reset to defaults every 24 hours or if it appears
647 # that the byte counters have overflowed (or, more
648 # likely, the node was restarted or the HTB buckets
649 # were re-initialized).
650 slice.reset(kernelhtbs[xid]['maxrate'], \
651 kernelhtbs[xid]['maxexemptrate'], \
652 kernelhtbs[xid]['usedbytes'], \
653 kernelhtbs[xid]['usedi2bytes'], \
656 if debug: logger.log("bwmon: Updating slice %s" % slice.name)
658 slice.update(kernelhtbs[xid]['maxrate'], \
659 kernelhtbs[xid]['maxexemptrate'], \
660 kernelhtbs[xid]['usedbytes'], \
661 kernelhtbs[xid]['usedi2bytes'], \
664 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile))
665 f = open(datafile, "w")
666 pickle.dump((version, slices, deaddb), f)
669 lock = threading.Event()
671 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
672 if debug: logger.log("bwmon: Thread started")
675 if debug: logger.log("bwmon: Event received. Running.")
676 database.db_lock.acquire()
677 nmdbcopy = copy.deepcopy(database.db)
678 database.db_lock.release()
680 except: logger.log_exc()
684 tools.as_daemon_thread(run)
686 def GetSlivers(*args):