3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 # Set DEBUG to True if you don't want to send emails
38 # Set ENABLE to False to setup buckets, but not limit.
41 datafile = "/var/lib/misc/bwmon.dat"
44 sys.path.append("/etc/planetlab")
45 from plc_config import *
48 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
49 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
52 seconds_per_day = 24 * 60 * 60
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
58 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
59 # 5.4 Gbyte per day max allowed transfered per recording period
60 default_MaxKByte = 5662310
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_Maxi2KByte = 17196646
63 # Default share quanta
67 period = 1 * seconds_per_day
72 The slice %(slice)s has transmitted more than %(bytes)s from
73 %(hostname)s to %(class)s destinations
76 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
79 Please reduce the average %(class)s transmission rate
80 of the slice to %(limit)s per %(period)s.
86 %(date)s %(hostname)s bwcap %(slice)s
89 def format_bytes(bytes, si = True):
91 Formats bytes into a string
96 # Officially, a kibibyte
99 if bytes >= (kilo * kilo * kilo):
100 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
101 elif bytes >= 1000000:
102 return "%.1f MB" % (bytes / (kilo * kilo))
104 return "%.1f KB" % (bytes / kilo)
106 return "%.0f bytes" % bytes
108 def format_period(seconds):
110 Formats a period in seconds into a string
113 if seconds == (24 * 60 * 60):
115 elif seconds == (60 * 60):
117 elif seconds > (24 * 60 * 60):
118 return "%.1f days" % (seconds / 24. / 60. / 60.)
119 elif seconds > (60 * 60):
120 return "%.1f hours" % (seconds / 60. / 60.)
122 return "%.1f minutes" % (seconds / 60.)
124 return "%.0f seconds" % seconds
126 def slicemail(slice, subject, body):
128 Front end to sendmail. Sends email to slice alias with given subject and body.
131 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
133 # Parsed from MyPLC config
134 to = [PLC_MAIL_MOM_LIST_ADDRESS]
136 if slice is not None and slice != "root":
137 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
139 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
141 'version': sys.version.split(" ")[0],
147 Content-type: text/plain
151 X-Mailer: Python/%(version)s
154 """.lstrip() % header)
164 Stores the last recorded bandwidth parameters of a slice.
166 xid - slice context/VServer ID
168 time - beginning of recording period in UNIX seconds
169 bytes - low bandwidth bytes transmitted at the beginning of the recording period
170 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
171 MaxKByte - total volume of data allowed
172 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
173 Maxi2KByte - same as MaxKByte, but for i2
174 Threshi2Kbyte - same as Threshi2KByte, but for i2
175 MaxRate - max_rate slice attribute.
176 Maxi2Rate - max_exempt_rate slice attribute.
177 Share - Used by Sirius to loan min rates
178 Sharei2 - Used by Sirius to loan min rates for i2
179 self.emailed - did slice recv email during this recording period
183 def __init__(self, xid, name, rspec):
189 self.MaxRate = default_MaxRate
190 self.MinRate = bwlimit.bwmin / 1000
191 self.Maxi2Rate = default_Maxi2Rate
192 self.Mini2Rate = bwlimit.bwmin / 1000
193 self.MaxKByte = default_MaxKByte
194 self.ThreshKByte = int(.8 * self.MaxKByte)
195 self.Maxi2KByte = default_Maxi2KByte
196 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
197 self.Share = default_Share
198 self.Sharei2 = default_Share
202 self.updateSliceAttributes(rspec)
203 bwlimit.set(xid = self.xid,
204 minrate = self.MinRate * 1000,
205 maxrate = self.MaxRate * 1000,
206 maxexemptrate = self.Maxi2Rate * 1000,
207 minexemptrate = self.Mini2Rate * 1000,
213 def updateSliceAttributes(self, rspec):
215 Use respects from GetSlivers to PLC to populate slice object. Also
216 do some sanity checking.
219 # Sanity check plus policy decision for MinRate:
220 # Minrate cant be greater than 25% of MaxRate or NodeCap.
221 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
222 if MinRate > int(.25 * default_MaxRate):
223 MinRate = int(.25 * default_MaxRate)
224 if MinRate != self.MinRate:
225 self.MinRate = MinRate
226 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
228 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
229 if MaxRate != self.MaxRate:
230 self.MaxRate = MaxRate
231 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
233 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
234 if Mini2Rate != self.Mini2Rate:
235 self.Mini2Rate = Mini2Rate
236 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
238 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
239 if Maxi2Rate != self.Maxi2Rate:
240 self.Maxi2Rate = Maxi2Rate
241 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
243 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
244 if MaxKByte != self.MaxKByte:
245 self.MaxKByte = MaxKByte
246 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
248 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
249 if Maxi2KByte != self.Maxi2KByte:
250 self.Maxi2KByte = Maxi2KByte
251 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
253 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
254 if ThreshKByte != self.ThreshKByte:
255 self.ThreshKByte = ThreshKByte
256 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
258 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
259 if Threshi2KByte != self.Threshi2KByte:
260 self.Threshi2KByte = Threshi2KByte
261 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
263 Share = int(rspec.get('net_share', default_Share))
264 if Share != self.Share:
266 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
268 Sharei2 = int(rspec.get('net_i2_share', default_Share))
269 if Sharei2 != self.Sharei2:
270 self.Sharei2 = Sharei2
271 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
274 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
276 Begin a new recording period. Remove caps by restoring limits
277 to their default values.
280 # Query Node Manager for max rate overrides
281 self.updateSliceAttributes(rspec)
283 # Reset baseline time
284 self.time = time.time()
286 # Reset baseline byte coutns
287 self.bytes = usedbytes
288 self.i2bytes = usedi2bytes
295 maxrate = self.MaxRate * 1000
296 maxi2rate = self.Maxi2Rate * 1000
297 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
298 logger.log("bwmon: %s reset to %s/%s" % \
300 bwlimit.format_tc_rate(maxrate),
301 bwlimit.format_tc_rate(maxi2rate)), 1)
302 bwlimit.set(xid = self.xid,
303 minrate = self.MinRate * 1000,
304 maxrate = self.MaxRate * 1000,
305 maxexemptrate = self.Maxi2Rate * 1000,
306 minexemptrate = self.Mini2Rate * 1000,
309 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
311 Notify the slice it's being capped.
313 # Prepare message parameters from the template
315 params = {'slice': self.name, 'hostname': socket.gethostname(),
316 'since': time.asctime(time.gmtime(self.time)) + " GMT",
317 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
318 'date': time.asctime(time.gmtime()) + " GMT",
319 'period': format_period(period)}
321 if new_maxrate != (self.MaxRate * 1000):
322 # Format template parameters for low bandwidth message
323 params['class'] = "low bandwidth"
324 params['bytes'] = format_bytes(usedbytes - self.bytes)
325 params['limit'] = format_bytes(self.MaxKByte * 1024)
326 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
328 # Cap low bandwidth burst rate
329 message += template % params
330 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
332 if new_maxexemptrate != (self.Maxi2Rate * 1000):
333 # Format template parameters for high bandwidth message
334 params['class'] = "high bandwidth"
335 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
336 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
337 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
339 message += template % params
340 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
343 if self.emailed == False:
344 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
346 logger.log("bwmon: "+ subject)
347 logger.log("bwmon: "+ message + (footer % params))
350 logger.log("bwmon: Emailing %s" % self.name)
351 slicemail(self.name, subject, message + (footer % params))
354 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
356 Update byte counts and check if byte thresholds have been
357 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
358 Recalculate every time module runs.
361 # Query Node Manager for max rate overrides
362 self.updateSliceAttributes(rspec)
365 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
366 sum = self.bytes + (self.ThreshKByte * 1024)
367 maxbyte = self.MaxKByte * 1024
368 bytesused = usedbytes - self.bytes
369 timeused = int(time.time() - self.time)
371 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
372 # Never go under MinRate
373 if new_maxrate < (self.MinRate * 1000):
374 new_maxrate = self.MinRate * 1000
375 # State information. I'm capped.
379 new_maxrate = self.MaxRate * 1000
382 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
383 maxi2byte = self.Maxi2KByte * 1024
384 i2bytesused = usedi2bytes - self.i2bytes
385 timeused = int(time.time() - self.time)
387 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
388 # Never go under MinRate
389 if new_maxi2rate < (self.Mini2Rate * 1000):
390 new_maxi2rate = self.Mini2Rate * 1000
391 # State information. I'm capped.
395 new_maxi2rate = self.Maxi2Rate * 1000
399 bwlimit.set(xid = self.xid,
400 minrate = self.MinRate * 1000,
401 maxrate = new_maxrate,
402 minexemptrate = self.Mini2Rate * 1000,
403 maxexemptrate = new_maxi2rate,
407 if self.capped == True:
408 self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
411 def gethtbs(root_xid, default_xid):
413 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
414 Turn off HTBs without names.
417 for params in bwlimit.get():
420 minexemptrate, maxexemptrate,
421 usedbytes, usedi2bytes) = params
423 name = bwlimit.get_slice(xid)
426 and (xid != root_xid) \
427 and (xid != default_xid):
428 # Orphaned (not associated with a slice) class
430 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
433 livehtbs[xid] = {'share': share,
436 'maxexemptrate': maxexemptrate,
437 'minexemptrate': minexemptrate,
438 'usedbytes': usedbytes,
440 'usedi2bytes': usedi2bytes}
446 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
459 # Incase the limits have changed.
460 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
461 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
463 # Incase default isn't set yet.
464 if default_MaxRate == -1:
465 default_MaxRate = 1000000
468 f = open(datafile, "r+")
469 logger.log("bwmon: Loading %s" % datafile, 2)
470 (version, slices, deaddb) = pickle.load(f)
472 # Check version of data file
473 if version != "$Id$":
474 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
481 # Get/set special slice IDs
482 root_xid = bwlimit.get_xid("root")
483 default_xid = bwlimit.get_xid("default")
485 # Since root is required for sanity, its not in the API/plc database, so pass {}
487 if root_xid not in slices.keys():
488 slices[root_xid] = Slice(root_xid, "root", {})
489 slices[root_xid].reset(0, 0, 0, 0, {})
491 # Used by bwlimit. pass {} since there is no rspec (like above).
492 if default_xid not in slices.keys():
493 slices[default_xid] = Slice(default_xid, "default", {})
494 slices[default_xid].reset(0, 0, 0, 0, {})
497 # Get running slivers that should be on this node (from plc). {xid: name}
498 # db keys on name, bwmon keys on xid. db doesnt have xid either.
499 for plcSliver in nmdbcopy.keys():
500 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
502 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
503 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
505 # Get actual running values from tc.
506 # Update slice totals and bandwidth. {xid: {values}}
507 kernelhtbs = gethtbs(root_xid, default_xid)
508 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
510 # The dat file has HTBs for slices, but the HTBs aren't running
511 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
512 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
514 for nohtbslice in nohtbslices:
515 if live.has_key(nohtbslice):
516 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
518 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
519 del slices[nohtbslice]
521 # The dat file doesnt have HTB for the slice but kern has HTB
522 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
523 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
524 for slicenodat in slicesnodat:
525 # But slice is running
526 if live.has_key(slicenodat):
527 # init the slice. which means start accounting over since kernel
528 # htb was already there.
529 slices[slicenodat] = Slice(slicenodat,
530 live[slicenodat]['name'],
531 live[slicenodat]['_rspec'])
534 # Slices in GetSlivers but not running HTBs
535 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
536 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
539 for newslice in newslicesxids:
540 # Delegated slices dont have xids (which are uids) since they haven't been
542 if newslice != None and live[newslice].has_key('_rspec') == True:
543 # Check to see if we recently deleted this slice.
544 if live[newslice]['name'] not in deaddb.keys():
545 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
546 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
547 # and made a dict of computed values.
548 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
549 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
550 # Double check time for dead slice in deaddb is within 24hr recording period.
551 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
552 deadslice = deaddb[live[newslice]['name']]
553 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
554 slices[newslice] = deadslice['slice']
555 slices[newslice].xid = newslice
557 slices[newslice].reset(deadslice['slice'].MaxRate,
558 deadslice['slice'].Maxi2Rate,
559 deadslice['htb']['usedbytes'],
560 deadslice['htb']['usedi2bytes'],
561 live[newslice]['_rspec'])
563 slices[newslice].update(deadslice['slice'].MaxRate,
564 deadslice['slice'].Maxi2Rate,
565 deadslice['htb']['usedbytes'],
566 deadslice['htb']['usedi2bytes'],
567 deadslice['htb']['share'],
568 live[newslice]['_rspec'])
569 # Since the slice has been reinitialed, remove from dead database.
570 del deaddb[deadslice['slice'].name]
572 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
574 # Move dead slices that exist in the pickle file, but
575 # aren't instantiated by PLC into the dead dict until
576 # recording period is over. This is to avoid the case where a slice is dynamically created
577 # and destroyed then recreated to get around byte limits.
578 deadxids = Set(slices.keys()) - Set(live.keys())
579 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
580 for deadxid in deadxids:
581 if deadxid == root_xid or deadxid == default_xid:
583 logger.log("bwmon: removing dead slice %s " % deadxid)
584 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
585 # add slice (by name) to deaddb
586 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
587 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
589 if kernelhtbs.has_key(deadxid):
590 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
594 for deadslice in deaddb.keys():
595 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
596 logger.log("bwmon: Removing dead slice %s from dat." \
597 % deaddb[deadslice]['slice'].name)
598 del deaddb[deadslice]
600 # Get actual running values from tc since we've added and removed buckets.
601 # Update slice totals and bandwidth. {xid: {values}}
602 kernelhtbs = gethtbs(root_xid, default_xid)
603 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
605 # Update all byte limites on all slices
606 for (xid, slice) in slices.iteritems():
607 # Monitor only the specified slices
608 if xid == root_xid or xid == default_xid: continue
609 if names and name not in names:
612 if (time.time() >= (slice.time + period)) or \
613 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
614 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
615 # Reset to defaults every 24 hours or if it appears
616 # that the byte counters have overflowed (or, more
617 # likely, the node was restarted or the HTB buckets
618 # were re-initialized).
619 slice.reset(kernelhtbs[xid]['maxrate'], \
620 kernelhtbs[xid]['maxexemptrate'], \
621 kernelhtbs[xid]['usedbytes'], \
622 kernelhtbs[xid]['usedi2bytes'], \
625 logger.log("bwmon: Updating slice %s" % slice.name, 2)
627 slice.update(kernelhtbs[xid]['maxrate'], \
628 kernelhtbs[xid]['maxexemptrate'], \
629 kernelhtbs[xid]['usedbytes'], \
630 kernelhtbs[xid]['usedi2bytes'], \
631 kernelhtbs[xid]['share'],
634 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
635 f = open(datafile, "w")
636 pickle.dump((version, slices, deaddb), f)
639 lock = threading.Event()
641 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
642 logger.log("bwmon: Thread started", 2)
645 logger.log("bwmon: Event received. Running.", 2)
646 database.db_lock.acquire()
647 nmdbcopy = copy.deepcopy(database.db)
648 database.db_lock.release()
650 except: logger.log_exc()
654 tools.as_daemon_thread(run)
656 def GetSlivers(*args):