3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
38 datafile = "/var/lib/misc/bwmon.dat"
41 sys.path.append("/etc/planetlab")
42 from plc_config import *
44 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
45 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
48 seconds_per_day = 24 * 60 * 60
51 # Burst to line rate (or node cap). Set by NM. in KBit/s
52 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
53 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
54 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
55 # 5.4 Gbyte per day max allowed transfered per recording period
56 default_MaxKByte = 5662310
57 # 16.4 Gbyte per day max allowed transfered per recording period to I2
58 default_Maxi2KByte = 17196646
59 # Default share quanta
63 period = 1 * seconds_per_day
68 The slice %(slice)s has transmitted more than %(bytes)s from
69 %(hostname)s to %(class)s destinations
72 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
75 Please reduce the average %(class)s transmission rate
76 of the slice to %(limit)s per %(period)s.
82 %(date)s %(hostname)s bwcap %(slice)s
85 def format_bytes(bytes, si = True):
87 Formats bytes into a string
92 # Officially, a kibibyte
95 if bytes >= (kilo * kilo * kilo):
96 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
97 elif bytes >= 1000000:
98 return "%.1f MB" % (bytes / (kilo * kilo))
100 return "%.1f KB" % (bytes / kilo)
102 return "%.0f bytes" % bytes
104 def format_period(seconds):
106 Formats a period in seconds into a string
109 if seconds == (24 * 60 * 60):
111 elif seconds == (60 * 60):
113 elif seconds > (24 * 60 * 60):
114 return "%.1f days" % (seconds / 24. / 60. / 60.)
115 elif seconds > (60 * 60):
116 return "%.1f hours" % (seconds / 60. / 60.)
118 return "%.1f minutes" % (seconds / 60.)
120 return "%.0f seconds" % seconds
122 def slicemail(slice, subject, body):
124 Front end to sendmail. Sends email to slice alias with given subject and body.
127 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
129 # Parsed from MyPLC config
130 to = [PLC_MAIL_MOM_LIST_ADDRESS]
132 if slice is not None and slice != "root":
133 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
135 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
137 'version': sys.version.split(" ")[0],
143 Content-type: text/plain
147 X-Mailer: Python/%(version)s
150 """.lstrip() % header)
160 Stores the last recorded bandwidth parameters of a slice.
162 xid - slice context/VServer ID
164 time - beginning of recording period in UNIX seconds
165 bytes - low bandwidth bytes transmitted at the beginning of the recording period
166 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
167 MaxKByte - total volume of data allowed
168 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
169 Maxi2KByte - same as MaxKByte, but for i2
170 Threshi2Kbyte - same as Threshi2KByte, but for i2
171 MaxRate - max_rate slice attribute.
172 Maxi2Rate - max_exempt_rate slice attribute.
173 Share - Used by Sirius to loan min rates
174 Sharei2 - Used by Sirius to loan min rates for i2
175 self.emailed - did slice recv email during this recording period
179 def __init__(self, xid, name, rspec):
185 self.MaxRate = default_MaxRate
186 self.MinRate = bwlimit.bwmin / 1000
187 self.Maxi2Rate = default_Maxi2Rate
188 self.Mini2Rate = bwlimit.bwmin / 1000
189 self.MaxKByte = default_MaxKByte
190 self.ThreshKByte = int(.8 * self.MaxKByte)
191 self.Maxi2KByte = default_Maxi2KByte
192 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
193 self.Share = default_Share
194 self.Sharei2 = default_Share
198 self.updateSliceAttributes(rspec)
199 bwlimit.set(xid = self.xid,
200 minrate = self.MinRate * 1000,
201 maxrate = self.MaxRate * 1000,
202 maxexemptrate = self.Maxi2Rate * 1000,
203 minexemptrate = self.Mini2Rate * 1000,
209 def updateSliceAttributes(self, rspec):
211 Use respects from GetSlivers to PLC to populate slice object. Also
212 do some sanity checking.
215 # Sanity check plus policy decision for MinRate:
216 # Minrate cant be greater than 25% of MaxRate or NodeCap.
217 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
218 if MinRate > int(.25 * default_MaxRate):
219 MinRate = int(.25 * default_MaxRate)
220 if MinRate != self.MinRate:
221 self.MinRate = MinRate
222 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
224 MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
225 if MaxRate != self.MaxRate:
226 self.MaxRate = MaxRate
227 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
229 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
230 if Mini2Rate != self.Mini2Rate:
231 self.Mini2Rate = Mini2Rate
232 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
234 Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
235 if Maxi2Rate != self.Maxi2Rate:
236 self.Maxi2Rate = Maxi2Rate
237 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
239 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
240 if MaxKByte != self.MaxKByte:
241 self.MaxKByte = MaxKByte
242 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
244 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
245 if Maxi2KByte != self.Maxi2KByte:
246 self.Maxi2KByte = Maxi2KByte
247 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
249 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
250 if ThreshKByte != self.ThreshKByte:
251 self.ThreshKByte = ThreshKByte
252 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
254 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
255 if Threshi2KByte != self.Threshi2KByte:
256 self.Threshi2KByte = Threshi2KByte
257 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
259 Share = int(rspec.get('net_share', default_Share))
260 if Share != self.Share:
262 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
264 Sharei2 = int(rspec.get('net_i2_share', default_Share))
265 if Sharei2 != self.Sharei2:
266 self.Sharei2 = Sharei2
267 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
270 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
272 Begin a new recording period. Remove caps by restoring limits
273 to their default values.
276 # Query Node Manager for max rate overrides
277 self.updateSliceAttributes(rspec)
279 # Reset baseline time
280 self.time = time.time()
282 # Reset baseline byte coutns
283 self.bytes = usedbytes
284 self.i2bytes = usedi2bytes
291 maxrate = self.MaxRate * 1000
292 maxi2rate = self.Maxi2Rate * 1000
293 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
294 logger.log("bwmon: %s reset to %s/%s" % \
296 bwlimit.format_tc_rate(maxrate),
297 bwlimit.format_tc_rate(maxi2rate)), 1)
298 bwlimit.set(xid = self.xid,
299 minrate = self.MinRate * 1000,
300 maxrate = self.MaxRate * 1000,
301 maxexemptrate = self.Maxi2Rate * 1000,
302 minexemptrate = self.Mini2Rate * 1000,
305 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
307 Notify the slice it's being capped.
309 # Prepare message parameters from the template
311 params = {'slice': self.name, 'hostname': socket.gethostname(),
312 'since': time.asctime(time.gmtime(self.time)) + " GMT",
313 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
314 'date': time.asctime(time.gmtime()) + " GMT",
315 'period': format_period(period)}
317 if new_maxrate != (self.MaxRate * 1000):
318 # Format template parameters for low bandwidth message
319 params['class'] = "low bandwidth"
320 params['bytes'] = format_bytes(usedbytes - self.bytes)
321 params['limit'] = format_bytes(self.MaxKByte * 1024)
322 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
324 # Cap low bandwidth burst rate
325 message += template % params
326 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
328 if new_maxexemptrate != (self.Maxi2Rate * 1000):
329 # Format template parameters for high bandwidth message
330 params['class'] = "high bandwidth"
331 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
332 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
333 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
335 message += template % params
336 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
339 if self.emailed == False:
340 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
342 logger.log("bwmon: "+ subject)
343 logger.log("bwmon: "+ message + (footer % params))
346 logger.log("bwmon: Emailing %s" % self.name)
347 slicemail(self.name, subject, message + (footer % params))
350 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
352 Update byte counts and check if byte thresholds have been
353 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
354 Recalculate every time module runs.
357 # Query Node Manager for max rate overrides
358 self.updateSliceAttributes(rspec)
361 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
362 sum = self.bytes + (self.ThreshKByte * 1024)
363 maxbyte = self.MaxKByte * 1024
364 bytesused = usedbytes - self.bytes
365 timeused = int(time.time() - self.time)
367 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
368 # Never go under MinRate
369 if new_maxrate < (self.MinRate * 1000):
370 new_maxrate = self.MinRate * 1000
371 # State information. I'm capped.
375 new_maxrate = self.MaxRate * 1000
378 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
379 maxi2byte = self.Maxi2KByte * 1024
380 i2bytesused = usedi2bytes - self.i2bytes
381 timeused = int(time.time() - self.time)
383 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
384 # Never go under MinRate
385 if new_maxi2rate < (self.Mini2Rate * 1000):
386 new_maxi2rate = self.Mini2Rate * 1000
387 # State information. I'm capped.
391 new_maxi2rate = self.Maxi2Rate * 1000
395 bwlimit.set(xid = self.xid,
396 minrate = self.MinRate * 1000,
397 maxrate = new_maxrate,
398 minexemptrate = self.Mini2Rate * 1000,
399 maxexemptrate = new_maxi2rate,
403 if self.capped == True:
404 self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
407 def gethtbs(root_xid, default_xid):
409 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
410 Turn off HTBs without names.
413 for params in bwlimit.get():
416 minexemptrate, maxexemptrate,
417 usedbytes, usedi2bytes) = params
419 name = bwlimit.get_slice(xid)
422 and (xid != root_xid) \
423 and (xid != default_xid):
424 # Orphaned (not associated with a slice) class
426 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
429 livehtbs[xid] = {'share': share,
432 'maxexemptrate': maxexemptrate,
433 'minexemptrate': minexemptrate,
434 'usedbytes': usedbytes,
436 'usedi2bytes': usedi2bytes}
442 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
456 # Incase the limits have changed.
457 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
458 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
460 # Incase default isn't set yet.
461 if default_MaxRate == -1:
462 default_MaxRate = 1000000
465 f = open(datafile, "r+")
466 logger.log("bwmon: Loading %s" % datafile, 2)
467 (version, slices, deaddb) = pickle.load(f)
469 # Check version of data file
470 if version != "$Id$":
471 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
478 # Get/set special slice IDs
479 root_xid = bwlimit.get_xid("root")
480 default_xid = bwlimit.get_xid("default")
482 # Since root is required for sanity, its not in the API/plc database, so pass {}
484 if root_xid not in slices.keys():
485 slices[root_xid] = Slice(root_xid, "root", {})
486 slices[root_xid].reset(0, 0, 0, 0, {})
488 # Used by bwlimit. pass {} since there is no rspec (like above).
489 if default_xid not in slices.keys():
490 slices[default_xid] = Slice(default_xid, "default", {})
491 slices[default_xid].reset(0, 0, 0, 0, {})
494 # Get running slivers that should be on this node (from plc). {xid: name}
495 # db keys on name, bwmon keys on xid. db doesnt have xid either.
496 for plcSliver in nmdbcopy.keys():
497 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
499 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
500 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
502 # Get actual running values from tc.
503 # Update slice totals and bandwidth. {xid: {values}}
504 kernelhtbs = gethtbs(root_xid, default_xid)
505 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
507 # The dat file has HTBs for slices, but the HTBs aren't running
508 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
509 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
511 for nohtbslice in nohtbslices:
512 if live.has_key(nohtbslice):
513 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
515 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
516 del slices[nohtbslice]
518 # The dat file doesnt have HTB for the slice but kern has HTB
519 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
520 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
521 for slicenodat in slicesnodat:
522 # But slice is running
523 if live.has_key(slicenodat):
524 # init the slice. which means start accounting over since kernel
525 # htb was already there.
526 slices[slicenodat] = Slice(slicenodat,
527 live[slicenodat]['name'],
528 live[slicenodat]['_rspec'])
531 # Slices in GetSlivers but not running HTBs
532 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
533 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
536 for newslice in newslicesxids:
537 # Delegated slices dont have xids (which are uids) since they haven't been
539 if newslice != None and live[newslice].has_key('_rspec') == True:
540 # Check to see if we recently deleted this slice.
541 if live[newslice]['name'] not in deaddb.keys():
542 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
543 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
544 # and made a dict of computed values.
545 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
546 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
547 # Double check time for dead slice in deaddb is within 24hr recording period.
548 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
549 deadslice = deaddb[live[newslice]['name']]
550 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
551 slices[newslice] = deadslice['slice']
552 slices[newslice].xid = newslice
554 slices[newslice].reset(deadslice['slice'].MaxRate,
555 deadslice['slice'].Maxi2Rate,
556 deadslice['htb']['usedbytes'],
557 deadslice['htb']['usedi2bytes'],
558 live[newslice]['_rspec'])
560 slices[newslice].update(deadslice['slice'].MaxRate,
561 deadslice['slice'].Maxi2Rate,
562 deadslice['htb']['usedbytes'],
563 deadslice['htb']['usedi2bytes'],
564 deadslice['htb']['share'],
565 live[newslice]['_rspec'])
566 # Since the slice has been reinitialed, remove from dead database.
567 del deaddb[deadslice['slice'].name]
569 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
571 # Move dead slices that exist in the pickle file, but
572 # aren't instantiated by PLC into the dead dict until
573 # recording period is over. This is to avoid the case where a slice is dynamically created
574 # and destroyed then recreated to get around byte limits.
575 deadxids = Set(slices.keys()) - Set(live.keys())
576 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
577 for deadxid in deadxids:
578 if deadxid == root_xid or deadxid == default_xid:
580 logger.log("bwmon: removing dead slice %s " % deadxid)
581 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
582 # add slice (by name) to deaddb
583 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
584 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
586 if kernelhtbs.has_key(deadxid):
587 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
591 for deadslice in deaddb.keys():
592 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
593 logger.log("bwmon: Removing dead slice %s from dat." \
594 % deaddb[deadslice]['slice'].name)
595 del deaddb[deadslice]
597 # Get actual running values from tc since we've added and removed buckets.
598 # Update slice totals and bandwidth. {xid: {values}}
599 kernelhtbs = gethtbs(root_xid, default_xid)
600 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
602 for (xid, slice) in slices.iteritems():
603 # Monitor only the specified slices
604 if xid == root_xid or xid == default_xid: continue
605 if names and name not in names:
608 if (time.time() >= (slice.time + period)) or \
609 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
610 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
611 # Reset to defaults every 24 hours or if it appears
612 # that the byte counters have overflowed (or, more
613 # likely, the node was restarted or the HTB buckets
614 # were re-initialized).
615 slice.reset(kernelhtbs[xid]['maxrate'], \
616 kernelhtbs[xid]['maxexemptrate'], \
617 kernelhtbs[xid]['usedbytes'], \
618 kernelhtbs[xid]['usedi2bytes'], \
621 logger.log("bwmon: Updating slice %s" % slice.name, 2)
623 slice.update(kernelhtbs[xid]['maxrate'], \
624 kernelhtbs[xid]['maxexemptrate'], \
625 kernelhtbs[xid]['usedbytes'], \
626 kernelhtbs[xid]['usedi2bytes'], \
627 kernelhtbs[xid]['share'],
630 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
631 f = open(datafile, "w")
632 pickle.dump((version, slices, deaddb), f)
635 lock = threading.Event()
637 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
638 logger.log("bwmon: Thread started", 2)
641 logger.log("bwmon: Event received. Running.", 2)
642 database.db_lock.acquire()
643 nmdbcopy = copy.deepcopy(database.db)
644 database.db_lock.release()
646 except: logger.log_exc()
650 tools.as_daemon_thread(run)
652 def GetSlivers(*args):