3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
38 datafile = "/var/lib/misc/bwmon.dat"
41 sys.path.append("/etc/planetlab")
42 from plc_config import *
44 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
45 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
48 seconds_per_day = 24 * 60 * 60
51 # Burst to line rate (or node cap). Set by NM. in KBit/s
52 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
53 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
54 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
55 # 5.4 Gbyte per day max allowed transfered per recording period
56 default_MaxKByte = 5662310
57 # 16.4 Gbyte per day max allowed transfered per recording period to I2
58 default_Maxi2KByte = 17196646
59 # Default share quanta
63 period = 1 * seconds_per_day
68 The slice %(slice)s has transmitted more than %(bytes)s from
69 %(hostname)s to %(class)s destinations
72 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
75 Please reduce the average %(class)s transmission rate
76 of the slice to %(limit)s per %(period)s.
82 %(date)s %(hostname)s bwcap %(slice)s
85 def format_bytes(bytes, si = True):
87 Formats bytes into a string
92 # Officially, a kibibyte
95 if bytes >= (kilo * kilo * kilo):
96 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
97 elif bytes >= 1000000:
98 return "%.1f MB" % (bytes / (kilo * kilo))
100 return "%.1f KB" % (bytes / kilo)
102 return "%.0f bytes" % bytes
104 def format_period(seconds):
106 Formats a period in seconds into a string
109 if seconds == (24 * 60 * 60):
111 elif seconds == (60 * 60):
113 elif seconds > (24 * 60 * 60):
114 return "%.1f days" % (seconds / 24. / 60. / 60.)
115 elif seconds > (60 * 60):
116 return "%.1f hours" % (seconds / 60. / 60.)
118 return "%.1f minutes" % (seconds / 60.)
120 return "%.0f seconds" % seconds
122 def slicemail(slice, subject, body):
124 Front end to sendmail. Sends email to slice alias with given subject and body.
127 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
129 # Parsed from MyPLC config
130 to = [PLC_MAIL_MOM_LIST_ADDRESS]
132 if slice is not None and slice != "root":
133 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
135 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
137 'version': sys.version.split(" ")[0],
143 Content-type: text/plain
147 X-Mailer: Python/%(version)s
150 """.lstrip() % header)
160 Stores the last recorded bandwidth parameters of a slice.
162 xid - slice context/VServer ID
164 time - beginning of recording period in UNIX seconds
165 bytes - low bandwidth bytes transmitted at the beginning of the recording period
166 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
167 MaxKByte - total volume of data allowed
168 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
169 Maxi2KByte - same as MaxKByte, but for i2
170 Threshi2Kbyte - same as Threshi2KByte, but for i2
171 MaxRate - max_rate slice attribute.
172 Maxi2Rate - max_exempt_rate slice attribute.
173 Share - Used by Sirius to loan min rates
174 Sharei2 - Used by Sirius to loan min rates for i2
175 self.emailed - did slice recv email during this recording period
179 def __init__(self, xid, name, rspec):
185 self.MaxRate = default_MaxRate
186 self.MinRate = bwlimit.bwmin / 1000
187 self.Maxi2Rate = default_Maxi2Rate
188 self.Mini2Rate = bwlimit.bwmin / 1000
189 self.MaxKByte = default_MaxKByte
190 self.ThreshKByte = int(.8 * self.MaxKByte)
191 self.Maxi2KByte = default_Maxi2KByte
192 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
193 self.Share = default_Share
194 self.Sharei2 = default_Share
198 self.updateSliceAttributes(rspec)
199 bwlimit.set(xid = self.xid,
200 minrate = self.MinRate * 1000,
201 maxrate = self.MaxRate * 1000,
202 maxexemptrate = self.Maxi2Rate * 1000,
203 minexemptrate = self.Mini2Rate * 1000,
209 def updateSliceAttributes(self, rspec):
211 Use respects from GetSlivers to PLC to populate slice object. Also
212 do some sanity checking.
215 # Sanity check plus policy decision for MinRate:
216 # Minrate cant be greater than 25% of MaxRate or NodeCap.
217 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
218 if MinRate > int(.25 * default_MaxRate):
219 MinRate = int(.25 * default_MaxRate)
220 if MinRate != self.MinRate:
221 self.MinRate = MinRate
222 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
224 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
225 if MaxRate != self.MaxRate:
226 self.MaxRate = MaxRate
227 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
229 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
230 if Mini2Rate != self.Mini2Rate:
231 self.Mini2Rate = Mini2Rate
232 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
234 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
235 if Maxi2Rate != self.Maxi2Rate:
236 self.Maxi2Rate = Maxi2Rate
237 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
239 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
240 if MaxKByte != self.MaxKByte:
241 self.MaxKByte = MaxKByte
242 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
244 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
245 if Maxi2KByte != self.Maxi2KByte:
246 self.Maxi2KByte = Maxi2KByte
247 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
249 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
250 if ThreshKByte != self.ThreshKByte:
251 self.ThreshKByte = ThreshKByte
252 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
254 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
255 if Threshi2KByte != self.Threshi2KByte:
256 self.Threshi2KByte = Threshi2KByte
257 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
259 Share = int(rspec.get('net_share', default_Share))
260 if Share != self.Share:
262 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
264 Sharei2 = int(rspec.get('net_i2_share', default_Share))
265 if Sharei2 != self.Sharei2:
266 self.Sharei2 = Sharei2
267 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
270 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
272 Begin a new recording period. Remove caps by restoring limits
273 to their default values.
276 # Query Node Manager for max rate overrides
277 self.updateSliceAttributes(rspec)
279 # Reset baseline time
280 self.time = time.time()
282 # Reset baseline byte coutns
283 self.bytes = usedbytes
284 self.i2bytes = usedi2bytes
291 maxrate = self.MaxRate * 1000
292 maxi2rate = self.Maxi2Rate * 1000
293 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
294 logger.log("bwmon: %s reset to %s/%s" % \
296 bwlimit.format_tc_rate(maxrate),
297 bwlimit.format_tc_rate(maxi2rate)), 1)
298 bwlimit.set(xid = self.xid,
299 minrate = self.MinRate * 1000,
300 maxrate = self.MaxRate * 1000,
301 maxexemptrate = self.Maxi2Rate * 1000,
302 minexemptrate = self.Mini2Rate * 1000,
305 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
307 Notify the slice it's being capped.
309 # Prepare message parameters from the template
311 params = {'slice': self.name, 'hostname': socket.gethostname(),
312 'since': time.asctime(time.gmtime(self.time)) + " GMT",
313 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
314 'date': time.asctime(time.gmtime()) + " GMT",
315 'period': format_period(period)}
317 if new_maxrate != self.MaxRate:
318 # Format template parameters for low bandwidth message
319 params['class'] = "low bandwidth"
320 params['bytes'] = format_bytes(usedbytes - self.bytes)
321 params['limit'] = format_bytes(self.MaxKByte * 1024)
322 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
324 # Cap low bandwidth burst rate
325 message += template % params
326 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
328 if new_maxexemptrate != self.Maxi2Rate:
329 # Format template parameters for high bandwidth message
330 params['class'] = "high bandwidth"
331 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
332 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
333 params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
335 message += template % params
336 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
339 if message and self.emailed == False:
340 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
342 logger.log("bwmon: "+ subject)
343 logger.log("bwmon: "+ message + (footer % params))
346 slicemail(self.name, subject, message + (footer % params))
349 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
351 Update byte counts and check if byte thresholds have been
352 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
353 Recalculate every time module runs.
356 # Query Node Manager for max rate overrides
357 self.updateSliceAttributes(rspec)
360 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
361 sum = self.bytes + (self.ThreshKByte * 1024)
362 maxbyte = self.MaxKByte * 1024
363 bytesused = usedbytes - self.bytes
364 timeused = int(time.time() - self.time)
366 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
367 # Never go under MinRate
368 if new_maxrate < (self.MinRate * 1000):
369 new_maxrate = self.MinRate * 1000
370 # State information. I'm capped.
374 new_maxrate = self.MaxRate * 1000
377 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
378 maxi2byte = self.Maxi2KByte * 1024
379 i2bytesused = usedi2bytes - self.i2bytes
380 timeused = int(time.time() - self.time)
382 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
383 # Never go under MinRate
384 if new_maxi2rate < (self.Mini2Rate * 1000):
385 new_maxi2rate = self.Mini2Rate * 1000
386 # State information. I'm capped.
390 new_maxi2rate = self.Maxi2Rate * 1000
394 bwlimit.set(xid = self.xid,
395 minrate = self.MinRate * 1000,
396 maxrate = new_maxrate,
397 minexemptrate = self.Mini2Rate * 1000,
398 maxexemptrate = new_maxi2rate,
402 if self.capped == True and self.emailed == False:
403 self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes)
406 def gethtbs(root_xid, default_xid):
408 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
409 Turn off HTBs without names.
412 for params in bwlimit.get():
415 minexemptrate, maxexemptrate,
416 usedbytes, usedi2bytes) = params
418 name = bwlimit.get_slice(xid)
421 and (xid != root_xid) \
422 and (xid != default_xid):
423 # Orphaned (not associated with a slice) class
425 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
428 livehtbs[xid] = {'share': share,
431 'maxexemptrate': maxexemptrate,
432 'minexemptrate': minexemptrate,
433 'usedbytes': usedbytes,
435 'usedi2bytes': usedi2bytes}
441 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
455 # Incase the limits have changed.
456 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
457 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
459 # Incase default isn't set yet.
460 if default_MaxRate == -1:
461 default_MaxRate = 1000000
464 f = open(datafile, "r+")
465 logger.log("bwmon: Loading %s" % datafile, 2)
466 (version, slices, deaddb) = pickle.load(f)
468 # Check version of data file
469 if version != "$Id$":
470 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
477 # Get/set special slice IDs
478 root_xid = bwlimit.get_xid("root")
479 default_xid = bwlimit.get_xid("default")
481 # Since root is required for sanity, its not in the API/plc database, so pass {}
483 if root_xid not in slices.keys():
484 slices[root_xid] = Slice(root_xid, "root", {})
485 slices[root_xid].reset(0, 0, 0, 0, {})
487 # Used by bwlimit. pass {} since there is no rspec (like above).
488 if default_xid not in slices.keys():
489 slices[default_xid] = Slice(default_xid, "default", {})
490 slices[default_xid].reset(0, 0, 0, 0, {})
493 # Get running slivers that should be on this node (from plc). {xid: name}
494 # db keys on name, bwmon keys on xid. db doesnt have xid either.
495 for plcSliver in nmdbcopy.keys():
496 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
498 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
499 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
501 # Get actual running values from tc.
502 # Update slice totals and bandwidth. {xid: {values}}
503 kernelhtbs = gethtbs(root_xid, default_xid)
504 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
506 # The dat file has HTBs for slices, but the HTBs aren't running
507 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
508 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
510 for nohtbslice in nohtbslices:
511 if live.has_key(nohtbslice):
512 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
514 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
515 del slices[nohtbslice]
517 # The dat file doesnt have HTB for the slice but kern has HTB
518 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
519 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
520 for slicenodat in slicesnodat:
521 # But slice is running
522 if live.has_key(slicenodat):
523 # init the slice. which means start accounting over since kernel
524 # htb was already there.
525 slices[slicenodat] = Slice(slicenodat,
526 live[slicenodat]['name'],
527 live[slicenodat]['_rspec'])
530 # Slices in GetSlivers but not running HTBs
531 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
532 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
535 for newslice in newslicesxids:
536 # Delegated slices dont have xids (which are uids) since they haven't been
538 if newslice != None and live[newslice].has_key('_rspec') == True:
539 # Check to see if we recently deleted this slice.
540 if live[newslice]['name'] not in deaddb.keys():
541 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
542 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
543 # and made a dict of computed values.
544 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
545 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
546 # Double check time for dead slice in deaddb is within 24hr recording period.
547 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
548 deadslice = deaddb[live[newslice]['name']]
549 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
550 slices[newslice] = deadslice['slice']
551 slices[newslice].xid = newslice
553 slices[newslice].reset(deadslice['slice'].MaxRate,
554 deadslice['slice'].Maxi2Rate,
555 deadslice['htb']['usedbytes'],
556 deadslice['htb']['usedi2bytes'],
557 live[newslice]['_rspec'])
559 slices[newslice].update(deadslice['slice'].MaxRate,
560 deadslice['slice'].Maxi2Rate,
561 deadslice['htb']['usedbytes'],
562 deadslice['htb']['usedi2bytes'],
563 deadslice['htb']['share'],
564 live[newslice]['_rspec'])
565 # Since the slice has been reinitialed, remove from dead database.
566 del deaddb[deadslice['slice'].name]
568 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
570 # Move dead slices that exist in the pickle file, but
571 # aren't instantiated by PLC into the dead dict until
572 # recording period is over. This is to avoid the case where a slice is dynamically created
573 # and destroyed then recreated to get around byte limits.
574 deadxids = Set(slices.keys()) - Set(live.keys())
575 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
576 for deadxid in deadxids:
577 if deadxid == root_xid or deadxid == default_xid:
579 logger.log("bwmon: removing dead slice %s " % deadxid)
580 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
581 # add slice (by name) to deaddb
582 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
583 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
585 if kernelhtbs.has_key(deadxid):
586 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
590 for deadslice in deaddb.keys():
591 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
592 logger.log("bwmon: Removing dead slice %s from dat." \
593 % deaddb[deadslice]['slice'].name)
594 del deaddb[deadslice]
596 # Get actual running values from tc since we've added and removed buckets.
597 # Update slice totals and bandwidth. {xid: {values}}
598 kernelhtbs = gethtbs(root_xid, default_xid)
599 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
601 for (xid, slice) in slices.iteritems():
602 # Monitor only the specified slices
603 if xid == root_xid or xid == default_xid: continue
604 if names and name not in names:
607 if (time.time() >= (slice.time + period)) or \
608 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
609 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
610 # Reset to defaults every 24 hours or if it appears
611 # that the byte counters have overflowed (or, more
612 # likely, the node was restarted or the HTB buckets
613 # were re-initialized).
614 slice.reset(kernelhtbs[xid]['maxrate'], \
615 kernelhtbs[xid]['maxexemptrate'], \
616 kernelhtbs[xid]['usedbytes'], \
617 kernelhtbs[xid]['usedi2bytes'], \
620 logger.log("bwmon: Updating slice %s" % slice.name, 2)
622 slice.update(kernelhtbs[xid]['maxrate'], \
623 kernelhtbs[xid]['maxexemptrate'], \
624 kernelhtbs[xid]['usedbytes'], \
625 kernelhtbs[xid]['usedi2bytes'], \
626 kernelhtbs[xid]['share'],
629 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
630 f = open(datafile, "w")
631 pickle.dump((version, slices, deaddb), f)
634 lock = threading.Event()
636 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
637 logger.log("bwmon: Thread started", 2)
640 logger.log("bwmon: Event received. Running.", 2)
641 database.db_lock.acquire()
642 nmdbcopy = copy.deepcopy(database.db)
643 database.db_lock.release()
645 except: logger.log_exc()
649 tools.as_daemon_thread(run)
651 def GetSlivers(*args):