3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
38 datafile = "/var/lib/misc/bwmon.dat"
41 sys.path.append("/etc/planetlab")
42 from plc_config import *
44 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
45 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
48 seconds_per_day = 24 * 60 * 60
51 # Burst to line rate (or node cap). Set by NM. in KBit/s
52 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
53 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
57 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
58 # 5.4 Gbyte per day max allowed transfered per recording period
59 default_MaxKByte = 5662310
60 # 16.4 Gbyte per day max allowed transfered per recording period to I2
61 default_Maxi2KByte = 17196646
62 # Default share quanta
66 period = 1 * seconds_per_day
71 The slice %(slice)s has transmitted more than %(bytes)s from
72 %(hostname)s to %(class)s destinations
75 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
78 Please reduce the average %(class)s transmission rate
79 of the slice to %(limit)s per %(period)s.
85 %(date)s %(hostname)s bwcap %(slice)s
88 def format_bytes(bytes, si = True):
90 Formats bytes into a string
95 # Officially, a kibibyte
98 if bytes >= (kilo * kilo * kilo):
99 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
100 elif bytes >= 1000000:
101 return "%.1f MB" % (bytes / (kilo * kilo))
103 return "%.1f KB" % (bytes / kilo)
105 return "%.0f bytes" % bytes
107 def format_period(seconds):
109 Formats a period in seconds into a string
112 if seconds == (24 * 60 * 60):
114 elif seconds == (60 * 60):
116 elif seconds > (24 * 60 * 60):
117 return "%.1f days" % (seconds / 24. / 60. / 60.)
118 elif seconds > (60 * 60):
119 return "%.1f hours" % (seconds / 60. / 60.)
121 return "%.1f minutes" % (seconds / 60.)
123 return "%.0f seconds" % seconds
125 def slicemail(slice, subject, body):
127 Front end to sendmail. Sends email to slice alias with given subject and body.
130 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
132 # Parsed from MyPLC config
133 to = [PLC_MAIL_MOM_LIST_ADDRESS]
135 if slice is not None and slice != "root":
136 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
138 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
140 'version': sys.version.split(" ")[0],
146 Content-type: text/plain
150 X-Mailer: Python/%(version)s
153 """.lstrip() % header)
163 Stores the last recorded bandwidth parameters of a slice.
165 xid - slice context/VServer ID
167 time - beginning of recording period in UNIX seconds
168 bytes - low bandwidth bytes transmitted at the beginning of the recording period
169 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
170 MaxKByte - total volume of data allowed
171 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
172 Maxi2KByte - same as MaxKByte, but for i2
173 Threshi2Kbyte - same as Threshi2KByte, but for i2
174 MaxRate - max_rate slice attribute.
175 Maxi2Rate - max_exempt_rate slice attribute.
176 Share - Used by Sirius to loan min rates
177 Sharei2 - Used by Sirius to loan min rates for i2
178 self.emailed - did slice recv email during this recording period
182 def __init__(self, xid, name, rspec):
188 self.MaxRate = default_MaxRate
189 self.MinRate = default_MinRate
190 self.Maxi2Rate = default_Maxi2Rate
191 self.Mini2Rate = default_Mini2Rate
192 self.MaxKByte = default_MaxKByte
193 self.ThreshKByte = (.8 * self.MaxKByte)
194 self.Maxi2KByte = default_Maxi2KByte
195 self.Threshi2KByte = (.8 * self.Maxi2KByte)
196 self.Share = default_Share
197 self.Sharei2 = default_Share
201 self.updateSliceAttributes(rspec)
202 bwlimit.set(xid = self.xid,
203 minrate = self.MinRate * 1000,
204 maxrate = self.MaxRate * 1000,
205 maxexemptrate = self.Maxi2Rate * 1000,
206 minexemptrate = self.Mini2Rate * 1000,
212 def updateSliceAttributes(self, rspec):
214 Use respects from GetSlivers to PLC to populate slice object. Also
215 do some sanity checking.
218 # Sanity check plus policy decision for MinRate:
219 # Minrate cant be greater than 25% of MaxRate or NodeCap.
220 MinRate = int(rspec.get("net_min_rate", default_MinRate))
221 if MinRate > int(.25 * default_MaxRate):
222 MinRate = int(.25 * default_MaxRate)
223 if MinRate != self.MinRate:
224 self.MinRate = MinRate
225 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
227 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
228 if MaxRate != self.MaxRate:
229 self.MaxRate = MaxRate
230 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
232 Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate))
233 if Mini2Rate != self.Mini2Rate:
234 self.Mini2Rate = Mini2Rate
235 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
237 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
238 if Maxi2Rate != self.Maxi2Rate:
239 self.Maxi2Rate = Maxi2Rate
240 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
242 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
243 if MaxKByte != self.MaxKByte:
244 self.MaxKByte = MaxKByte
245 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
247 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
248 if Maxi2KByte != self.Maxi2KByte:
249 self.Maxi2KByte = Maxi2KByte
250 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
252 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
253 if ThreshKByte != self.ThreshKByte:
254 self.ThreshKByte = ThreshKByte
255 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
257 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
258 if Threshi2KByte != self.Threshi2KByte:
259 self.Threshi2KByte = Threshi2KByte
260 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
262 Share = int(rspec.get('net_share', default_Share))
263 if Share != self.Share:
265 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
267 Sharei2 = int(rspec.get('net_i2_share', default_Share))
268 if Sharei2 != self.Sharei2:
269 self.Sharei2 = Sharei2
270 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
273 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
275 Begin a new recording period. Remove caps by restoring limits
276 to their default values.
279 # Query Node Manager for max rate overrides
280 self.updateSliceAttributes(rspec)
282 # Reset baseline time
283 self.time = time.time()
285 # Reset baseline byte coutns
286 self.bytes = usedbytes
287 self.i2bytes = usedi2bytes
294 maxrate = self.MaxRate * 1000
295 maxi2rate = self.Maxi2Rate * 1000
296 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
297 logger.log("bwmon: %s reset to %s/%s" % \
299 bwlimit.format_tc_rate(maxrate),
300 bwlimit.format_tc_rate(maxi2rate)), 1)
301 bwlimit.set(xid = self.xid,
302 minrate = self.MinRate * 1000,
303 maxrate = self.MaxRate * 1000,
304 maxexemptrate = self.Maxi2Rate * 1000,
305 minexemptrate = self.Mini2Rate * 1000,
308 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
310 Notify the slice it's being capped.
312 # Prepare message parameters from the template
314 params = {'slice': self.name, 'hostname': socket.gethostname(),
315 'since': time.asctime(time.gmtime(self.time)) + " GMT",
316 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
317 'date': time.asctime(time.gmtime()) + " GMT",
318 'period': format_period(period)}
320 if new_maxrate != self.MaxRate:
321 # Format template parameters for low bandwidth message
322 params['class'] = "low bandwidth"
323 params['bytes'] = format_bytes(usedbytes - self.bytes)
324 params['limit'] = format_bytes(self.MaxKByte * 1024)
325 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
327 # Cap low bandwidth burst rate
328 message += template % params
329 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
331 if new_maxexemptrate != self.Maxi2Rate:
332 # Format template parameters for high bandwidth message
333 params['class'] = "high bandwidth"
334 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
335 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
336 params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
338 message += template % params
339 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
342 if message and self.emailed == False:
343 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
345 logger.log("bwmon: "+ subject)
346 logger.log("bwmon: "+ message + (footer % params))
349 slicemail(self.name, subject, message + (footer % params))
352 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
354 Update byte counts and check if byte thresholds have been
355 exceeded. If exceeded, cap to remaining bytes in limit over remaining in period.
356 Recalculate every time module runs.
359 # Query Node Manager for max rate overrides
360 self.updateSliceAttributes(rspec)
362 # Check shares for Sirius loans.
363 if runningshare != self.Share:
364 logger.log("bwmon: Updating share to %s" % self.share)
365 bwlimit.set(xid = self.xid,
366 minrate = self.MinRate * 1000,
367 maxrate = self.MaxRate * 1000,
368 maxexemptrate = self.Maxi2Rate * 1000,
369 minexemptrate = self.Mini2Rate * 1000,
372 # Prepare message parameters from the template
374 #params = {'slice': self.name, 'hostname': socket.gethostname(),
375 # 'since': time.asctime(time.gmtime(self.time)) + " GMT",
376 # 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
377 # 'date': time.asctime(time.gmtime()) + " GMT",
378 # 'period': format_period(period)}
381 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
382 sum = self.bytes + (self.ThreshKByte * 1024)
383 maxbyte = self.MaxKByte * 1024
384 bytesused = usedbytes - self.bytes
385 timeused = int(time.time() - self.time)
387 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
388 # Never go under MinRate
389 if new_maxrate < (self.MinRate * 1000):
390 new_maxrate = self.MinRate * 1000
391 # State information. I'm capped.
395 new_maxrate = self.MaxRate * 1000
399 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
400 maxi2byte = self.Maxi2KByte * 1024
401 i2bytesused = usedi2bytes - self.i2bytes
402 timeused = int(time.time() - self.time)
404 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
405 # Never go under MinRate
406 if new_maxi2rate < (self.Mini2Rate * 1000):
407 new_maxi2rate = self.Mini2Rate * 1000
408 # State information. I'm capped.
412 new_maxi2rate = self.Maxi2Rate * 1000
416 if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
417 bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
420 if self.capped == True and self.emailed == False:
421 self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes)
424 def gethtbs(root_xid, default_xid):
426 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
427 Turn off HTBs without names.
430 for params in bwlimit.get():
433 minexemptrate, maxexemptrate,
434 usedbytes, usedi2bytes) = params
436 name = bwlimit.get_slice(xid)
439 and (xid != root_xid) \
440 and (xid != default_xid):
441 # Orphaned (not associated with a slice) class
443 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
446 livehtbs[xid] = {'share': share,
449 'maxexemptrate': maxexemptrate,
450 'minexemptrate': minexemptrate,
451 'usedbytes': usedbytes,
453 'usedi2bytes': usedi2bytes}
459 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
468 default_ThreshKByte,\
470 default_Threshi2KByte,\
476 # Incase the limits have changed.
477 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
478 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
480 # Incase default isn't set yet.
481 if default_MaxRate == -1:
482 default_MaxRate = 1000000
485 f = open(datafile, "r+")
486 logger.log("bwmon: Loading %s" % datafile, 2)
487 (version, slices, deaddb) = pickle.load(f)
489 # Check version of data file
490 if version != "$Id$":
491 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
498 # Get/set special slice IDs
499 root_xid = bwlimit.get_xid("root")
500 default_xid = bwlimit.get_xid("default")
502 # Since root is required for sanity, its not in the API/plc database, so pass {}
504 if root_xid not in slices.keys():
505 slices[root_xid] = Slice(root_xid, "root", {})
506 slices[root_xid].reset(0, 0, 0, 0, {})
508 # Used by bwlimit. pass {} since there is no rspec (like above).
509 if default_xid not in slices.keys():
510 slices[default_xid] = Slice(default_xid, "default", {})
511 slices[default_xid].reset(0, 0, 0, 0, {})
514 # Get running slivers that should be on this node (from plc). {xid: name}
515 # db keys on name, bwmon keys on xid. db doesnt have xid either.
516 for plcSliver in nmdbcopy.keys():
517 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
519 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
520 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
522 # Get actual running values from tc.
523 # Update slice totals and bandwidth. {xid: {values}}
524 kernelhtbs = gethtbs(root_xid, default_xid)
525 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
527 # The dat file has HTBs for slices, but the HTBs aren't running
528 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
529 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
531 for nohtbslice in nohtbslices:
532 if live.has_key(nohtbslice):
533 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
535 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
536 del slices[nohtbslice]
538 # The dat file doesnt have HTB for the slice but kern has HTB
539 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
540 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
541 for slicenodat in slicesnodat:
542 # But slice is running
543 if live.has_key(slicenodat):
544 # init the slice. which means start accounting over since kernel
545 # htb was already there.
546 slices[slicenodat] = Slice(slicenodat,
547 live[slicenodat]['name'],
548 live[slicenodat]['_rspec'])
551 # Slices in GetSlivers but not running HTBs
552 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
553 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
556 for newslice in newslicesxids:
557 # Delegated slices dont have xids (which are uids) since they haven't been
559 if newslice != None and live[newslice].has_key('_rspec') == True:
560 # Check to see if we recently deleted this slice.
561 if live[newslice]['name'] not in deaddb.keys():
562 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
563 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
564 # and made a dict of computed values.
565 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
566 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
567 # Double check time for dead slice in deaddb is within 24hr recording period.
568 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
569 deadslice = deaddb[live[newslice]['name']]
570 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
571 slices[newslice] = deadslice['slice']
572 slices[newslice].xid = newslice
574 slices[newslice].reset(deadslice['slice'].MaxRate,
575 deadslice['slice'].Maxi2Rate,
576 deadslice['htb']['usedbytes'],
577 deadslice['htb']['usedi2bytes'],
578 live[newslice]['_rspec'])
580 slices[newslice].update(deadslice['slice'].MaxRate,
581 deadslice['slice'].Maxi2Rate,
582 deadslice['htb']['usedbytes'],
583 deadslice['htb']['usedi2bytes'],
584 deadslice['htb']['share'],
585 live[newslice]['_rspec'])
586 # Since the slice has been reinitialed, remove from dead database.
587 del deaddb[deadslice['slice'].name]
589 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
591 # Move dead slices that exist in the pickle file, but
592 # aren't instantiated by PLC into the dead dict until
593 # recording period is over. This is to avoid the case where a slice is dynamically created
594 # and destroyed then recreated to get around byte limits.
595 deadxids = Set(slices.keys()) - Set(live.keys())
596 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
597 for deadxid in deadxids:
598 if deadxid == root_xid or deadxid == default_xid:
600 logger.log("bwmon: removing dead slice %s " % deadxid)
601 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
602 # add slice (by name) to deaddb
603 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
604 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
606 if kernelhtbs.has_key(deadxid):
607 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
611 for deadslice in deaddb.keys():
612 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
613 logger.log("bwmon: Removing dead slice %s from dat." \
614 % deaddb[deadslice]['slice'].name)
615 del deaddb[deadslice]
617 # Get actual running values from tc since we've added and removed buckets.
618 # Update slice totals and bandwidth. {xid: {values}}
619 kernelhtbs = gethtbs(root_xid, default_xid)
620 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
622 for (xid, slice) in slices.iteritems():
623 # Monitor only the specified slices
624 if xid == root_xid or xid == default_xid: continue
625 if names and name not in names:
628 if (time.time() >= (slice.time + period)) or \
629 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
630 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
631 # Reset to defaults every 24 hours or if it appears
632 # that the byte counters have overflowed (or, more
633 # likely, the node was restarted or the HTB buckets
634 # were re-initialized).
635 slice.reset(kernelhtbs[xid]['maxrate'], \
636 kernelhtbs[xid]['maxexemptrate'], \
637 kernelhtbs[xid]['usedbytes'], \
638 kernelhtbs[xid]['usedi2bytes'], \
641 logger.log("bwmon: Updating slice %s" % slice.name, 2)
643 slice.update(kernelhtbs[xid]['maxrate'], \
644 kernelhtbs[xid]['maxexemptrate'], \
645 kernelhtbs[xid]['usedbytes'], \
646 kernelhtbs[xid]['usedi2bytes'], \
647 kernelhtbs[xid]['share'],
650 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
651 f = open(datafile, "w")
652 pickle.dump((version, slices, deaddb), f)
655 lock = threading.Event()
657 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
658 logger.log("bwmon: Thread started", 2)
661 logger.log("bwmon: Event received. Running.", 2)
662 database.db_lock.acquire()
663 nmdbcopy = copy.deepcopy(database.db)
664 database.db_lock.release()
666 except: logger.log_exc()
670 tools.as_daemon_thread(run)
672 def GetSlivers(*args):