Support for local_ scripts
[nodemanager.git] / bwmon.py
index c3fe1d5..bc9191f 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
@@ -32,26 +32,22 @@ import database
 
 from sets import Set
 
 
 from sets import Set
 
+# Defaults
+debug = False
+verbose = False
+datafile = "/var/lib/misc/bwmon.dat"
+
 try:
     sys.path.append("/etc/planetlab")
     from plc_config import *
 except:
 try:
     sys.path.append("/etc/planetlab")
     from plc_config import *
 except:
-    logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found")
-    PLC_NAME = "PlanetLab"
-    PLC_SLICE_PREFIX = "pl"
-    PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
-    PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
+    logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
+    logger.log("bwmon:  Running in DEBUG mode.  Logging to file and not emailing.", 1)
 
 # Constants
 seconds_per_day = 24 * 60 * 60
 bits_per_byte = 8
 
 
 # Constants
 seconds_per_day = 24 * 60 * 60
 bits_per_byte = 8
 
-# Defaults
-debug = True
-verbose = False
-datafile = "/var/lib/misc/bwmon.dat"
-#nm = None
-
 # Burst to line rate (or node cap).  Set by NM. in KBit/s
 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
 # Burst to line rate (or node cap).  Set by NM. in KBit/s
 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
@@ -133,11 +129,8 @@ def slicemail(slice, subject, body):
 
     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
 
 
     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
 
-    # PLC has a separate list for pl_mom messages
-    if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
-        to = ["pl-mom@planet-lab.org"]
-    else:
-        to = [PLC_MAIL_SUPPORT_ADDRESS]
+    # Parsed from MyPLC config
+    to = [PLC_MAIL_MOM_LIST_ADDRESS]
 
     if slice is not None and slice != "root":
         to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
 
     if slice is not None and slice != "root":
         to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
@@ -197,9 +190,9 @@ class Slice:
         self.Maxi2Rate = default_Maxi2Rate
         self.Mini2Rate = default_Mini2Rate
         self.MaxKByte = default_MaxKByte
         self.Maxi2Rate = default_Maxi2Rate
         self.Mini2Rate = default_Mini2Rate
         self.MaxKByte = default_MaxKByte
-        self.ThreshKByte = default_ThreshKByte
+        self.ThreshKByte = (.8 * self.MaxKByte)
         self.Maxi2KByte = default_Maxi2KByte
         self.Maxi2KByte = default_Maxi2KByte
-        self.Threshi2KByte = default_Threshi2KByte
+        self.Threshi2KByte = (.8 * self.Maxi2KByte)
         self.Share = default_Share
         self.Sharei2 = default_Share
         self.emailed = False
         self.Share = default_Share
         self.Sharei2 = default_Share
         self.emailed = False
@@ -304,7 +297,7 @@ class Slice:
             logger.log("bwmon:  %s reset to %s/%s" % \
                   (self.name,
                    bwlimit.format_tc_rate(maxrate),
             logger.log("bwmon:  %s reset to %s/%s" % \
                   (self.name,
                    bwlimit.format_tc_rate(maxrate),
-                   bwlimit.format_tc_rate(maxi2rate)))
+                   bwlimit.format_tc_rate(maxi2rate)), 1)
             bwlimit.set(xid = self.xid, 
                 minrate = self.MinRate * 1000, 
                 maxrate = self.MaxRate * 1000, 
             bwlimit.set(xid = self.xid, 
                 minrate = self.MinRate * 1000, 
                 maxrate = self.MaxRate * 1000, 
@@ -312,7 +305,7 @@ class Slice:
                 minexemptrate = self.Mini2Rate * 1000,
                 share = self.Share)
 
                 minexemptrate = self.Mini2Rate * 1000,
                 share = self.Share)
 
-    def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes)
+    def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
         """
         Notify the slice it's being capped.
         """
         """
         Notify the slice it's being capped.
         """
@@ -322,8 +315,9 @@ class Slice:
                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
                   'date': time.asctime(time.gmtime()) + " GMT",
                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
                   'date': time.asctime(time.gmtime()) + " GMT",
-                  'period': format_period(period)} 
-        if new_maxrate ! = self.MaxRate:
+                  'period': format_period(period)}
+
+        if new_maxrate != self.MaxRate:
             # Format template parameters for low bandwidth message
             params['class'] = "low bandwidth"
             params['bytes'] = format_bytes(usedbytes - self.bytes)
             # Format template parameters for low bandwidth message
             params['class'] = "low bandwidth"
             params['bytes'] = format_bytes(usedbytes - self.bytes)
@@ -355,7 +349,7 @@ class Slice:
                 slicemail(self.name, subject, message + (footer % params))
 
 
                 slicemail(self.name, subject, message + (footer % params))
 
 
-    def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
+    def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
         """
         Update byte counts and check if byte thresholds have been
         exceeded. If exceeded, cap to  remaining bytes in limit over remaining in period.  
         """
         Update byte counts and check if byte thresholds have been
         exceeded. If exceeded, cap to  remaining bytes in limit over remaining in period.  
@@ -364,7 +358,17 @@ class Slice:
     
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
     
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
-     
+
+        # Check shares for Sirius loans.
+        if runningshare != self.Share:
+            logger.log("bwmon:  Updating share to %s" % self.share)
+            bwlimit.set(xid = self.xid, 
+                minrate = self.MinRate * 1000, 
+                maxrate = self.MaxRate * 1000, 
+                maxexemptrate = self.Maxi2Rate * 1000,
+                minexemptrate = self.Mini2Rate * 1000,
+                share = self.Share)
+
         # Prepare message parameters from the template
         #message = ""
         #params = {'slice': self.name, 'hostname': socket.gethostname(),
         # Prepare message parameters from the template
         #message = ""
         #params = {'slice': self.name, 'hostname': socket.gethostname(),
@@ -391,18 +395,7 @@ class Slice:
             new_maxrate = self.MaxRate * 1000
             self.capped = False
 
             new_maxrate = self.MaxRate * 1000
             self.capped = False
 
-        ## Format template parameters for low bandwidth message
-        #params['class'] = "low bandwidth"
-        #params['bytes'] = format_bytes(usedbytes - self.bytes)
-        #params['limit'] = format_bytes(self.MaxKByte * 1024)
-        #params['thresh'] = format_bytes(self.ThreshKByte * 1024)
-        #params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
-
-        # Cap low bandwidth burst rate
-        #if new_maxrate != runningmaxrate:
-        #    message += template % params
-        #    logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
-    
+   
         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
             maxi2byte = self.Maxi2KByte * 1024
             i2bytesused = usedi2bytes - self.i2bytes
         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
             maxi2byte = self.Maxi2KByte * 1024
             i2bytesused = usedi2bytes - self.i2bytes
@@ -419,17 +412,6 @@ class Slice:
             new_maxi2rate = self.Maxi2Rate * 1000
             self.capped = False
 
             new_maxi2rate = self.Maxi2Rate * 1000
             self.capped = False
 
-        # Format template parameters for high bandwidth message
-        #params['class'] = "high bandwidth"
-        #params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
-        #params['limit'] = format_bytes(self.Maxi2KByte * 1024)
-        #params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
-
-        # Cap high bandwidth burst rate
-        #if new_maxi2rate != runningmaxi2rate:
-        #    message += template % params
-        #    logger.log("bwmon:  %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params)
-
         # Apply parameters
         if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
         # Apply parameters
         if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
@@ -437,13 +419,7 @@ class Slice:
         # Notify slice
         if self.capped == True and self.emailed == False:
             self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes)
         # Notify slice
         if self.capped == True and self.emailed == False:
             self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes)
-        #    subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
-        #    if debug:
-        #        logger.log("bwmon:  "+ subject)
-        #        logger.log("bwmon:  "+ message + (footer % params))
-        #    else:
-        #        self.emailed = True
-        #        slicemail(self.name, subject, message + (footer % params))
+
 
 def gethtbs(root_xid, default_xid):
     """
 
 def gethtbs(root_xid, default_xid):
     """
@@ -464,7 +440,7 @@ def gethtbs(root_xid, default_xid):
         and (xid != default_xid):
             # Orphaned (not associated with a slice) class
             name = "%d?" % xid
         and (xid != default_xid):
             # Orphaned (not associated with a slice) class
             name = "%d?" % xid
-            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name)
+            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name, 1)
             bwlimit.off(xid)
 
         livehtbs[xid] = {'share': share,
             bwlimit.off(xid)
 
         livehtbs[xid] = {'share': share,
@@ -507,7 +483,7 @@ def sync(nmdbcopy):
 
     try:
         f = open(datafile, "r+")
 
     try:
         f = open(datafile, "r+")
-        logger.log("bwmon:  Loading %s" % datafile)
+        logger.log("bwmon:  Loading %s" % datafile, 2)
         (version, slices, deaddb) = pickle.load(f)
         f.close()
         # Check version of data file
         (version, slices, deaddb) = pickle.load(f)
         f.close()
         # Check version of data file
@@ -540,35 +516,41 @@ def sync(nmdbcopy):
     for plcSliver in nmdbcopy.keys():
         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
 
     for plcSliver in nmdbcopy.keys():
         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
 
-    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__())
-    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__())
+    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__(), 2)
+    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__(), 2)
 
     # Get actual running values from tc.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
 
     # Get actual running values from tc.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
-    logger.log("bwmon:  Found %s running HTBs" % kernelhtbs.keys().__len__())
+    logger.log("bwmon:  Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
     # The dat file has HTBs for slices, but the HTBs aren't running
     nohtbslices =  Set(slices.keys()) - Set(kernelhtbs.keys())
 
     # The dat file has HTBs for slices, but the HTBs aren't running
     nohtbslices =  Set(slices.keys()) - Set(kernelhtbs.keys())
-    logger.log( "bwmon:  Found %s slices in dat but not running." % nohtbslices.__len__() )
+    logger.log( "bwmon:  Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
     # Reset tc counts.
     for nohtbslice in nohtbslices:
         if live.has_key(nohtbslice): 
             slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
     # Reset tc counts.
     for nohtbslice in nohtbslices:
         if live.has_key(nohtbslice): 
             slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
+        else:
+            logger.log("bwmon:  Removing abondoned slice %s from dat." % nohtbslice)
+            del slices[nohtbslice]
 
 
-    # The dat file doesnt have HTB for the slice, but slice is running and
-    # HTB exists
+    # The dat file doesnt have HTB for the slice but kern has HTB
     slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
     slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
-    logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() )
+    logger.log( "bwmon:  Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
     for slicenodat in slicesnodat:
     for slicenodat in slicesnodat:
-        slices[slicenodat] = Slice(slicenodat, 
-                                   live[slicenodat]['name'], 
-                                   live[slicenodat]['_rspec'])
+        # But slice is running 
+        if live.has_key(slicenodat): 
+            # init the slice.  which means start accounting over since kernel
+            # htb was already there.
+            slices[slicenodat] = Slice(slicenodat, 
+                live[slicenodat]['name'], 
+                live[slicenodat]['_rspec'])
 
     # Get new slices.
     # Slices in GetSlivers but not running HTBs
     newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
 
     # Get new slices.
     # Slices in GetSlivers but not running HTBs
     newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
-    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__())
+    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__(), 2)
        
     # Setup new slices
     for newslice in newslicesxids:
        
     # Setup new slices
     for newslice in newslicesxids:
@@ -599,40 +581,43 @@ def sync(nmdbcopy):
                                     deadslice['slice'].Maxi2Rate, 
                                     deadslice['htb']['usedbytes'], 
                                     deadslice['htb']['usedi2bytes'], 
                                     deadslice['slice'].Maxi2Rate, 
                                     deadslice['htb']['usedbytes'], 
                                     deadslice['htb']['usedi2bytes'], 
+                                    deadslice['htb']['share'], 
                                     live[newslice]['_rspec'])
                 # Since the slice has been reinitialed, remove from dead database.
                                     live[newslice]['_rspec'])
                 # Since the slice has been reinitialed, remove from dead database.
-                del deaddb[deadslice]
+                del deaddb[deadslice['slice'].name]
         else:
         else:
-            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated."\
-                       "Skipping." % live[newslice]['name'])
+            logger.log("bwmon:  Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
 
     # Move dead slices that exist in the pickle file, but
     # aren't instantiated by PLC into the dead dict until
     # recording period is over.  This is to avoid the case where a slice is dynamically created
     # and destroyed then recreated to get around byte limits.
     deadxids = Set(slices.keys()) - Set(live.keys())
 
     # Move dead slices that exist in the pickle file, but
     # aren't instantiated by PLC into the dead dict until
     # recording period is over.  This is to avoid the case where a slice is dynamically created
     # and destroyed then recreated to get around byte limits.
     deadxids = Set(slices.keys()) - Set(live.keys())
-    logger.log("bwmon:  Found %s dead slices" % (dead.__len__() - 2))
+    logger.log("bwmon:  Found %s dead slices" % (deadxids.__len__() - 2), 2)
     for deadxid in deadxids:
         if deadxid == root_xid or deadxid == default_xid:
             continue
     for deadxid in deadxids:
         if deadxid == root_xid or deadxid == default_xid:
             continue
-        logger.log("bwmon:  removing dead slice  %s " % deadxid)
-        if slices.has_key(deadxid):
+        logger.log("bwmon:  removing dead slice %s " % deadxid)
+        if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
             # add slice (by name) to deaddb
             # add slice (by name) to deaddb
+            logger.log("bwmon:  Saving bandwidth totals for %s." % slices[deadxid].name)
             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
             del slices[deadxid]
         if kernelhtbs.has_key(deadxid): 
             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
             del slices[deadxid]
         if kernelhtbs.has_key(deadxid): 
-            bwlimit.off(xid)
-       
-       # Clean up deaddb
-       for (deadslice, deadhtb) in deaddb.iteritems():
-               if (time.time() >= (deadslice.time() + period)):
-                       logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name)
-                       del deaddb[deadslice.name]
+            logger.log("bwmon:  Removing HTB for %s." % deadxid, 2)
+            bwlimit.off(deadxid)
+    
+    # Clean up deaddb
+    for deadslice in deaddb.keys():
+        if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
+            logger.log("bwmon:  Removing dead slice %s from dat." \
+                        % deaddb[deadslice]['slice'].name)
+            del deaddb[deadslice]
 
     # Get actual running values from tc since we've added and removed buckets.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
 
     # Get actual running values from tc since we've added and removed buckets.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
-    logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__())
+    logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
     for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
 
     for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
@@ -653,15 +638,16 @@ def sync(nmdbcopy):
                 kernelhtbs[xid]['usedi2bytes'], \
                 live[xid]['_rspec'])
         else:
                 kernelhtbs[xid]['usedi2bytes'], \
                 live[xid]['_rspec'])
         else:
-            if debug:  logger.log("bwmon: Updating slice %s" % slice.name)
+            logger.log("bwmon:  Updating slice %s" % slice.name, 2)
             # Update byte counts
             slice.update(kernelhtbs[xid]['maxrate'], \
                 kernelhtbs[xid]['maxexemptrate'], \
                 kernelhtbs[xid]['usedbytes'], \
                 kernelhtbs[xid]['usedi2bytes'], \
             # Update byte counts
             slice.update(kernelhtbs[xid]['maxrate'], \
                 kernelhtbs[xid]['maxexemptrate'], \
                 kernelhtbs[xid]['usedbytes'], \
                 kernelhtbs[xid]['usedi2bytes'], \
+                kernelhtbs[xid]['share'],
                 live[xid]['_rspec'])
 
                 live[xid]['_rspec'])
 
-    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile))
+    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
     f = open(datafile, "w")
     pickle.dump((version, slices, deaddb), f)
     f.close()
     f = open(datafile, "w")
     pickle.dump((version, slices, deaddb), f)
     f.close()
@@ -669,10 +655,10 @@ def sync(nmdbcopy):
 lock = threading.Event()
 def run():
     """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
 lock = threading.Event()
 def run():
     """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
-    if debug:  logger.log("bwmon:  Thread started")
+    logger.log("bwmon:  Thread started", 2)
     while True:
         lock.wait()
     while True:
         lock.wait()
-        if debug: logger.log("bwmon:  Event received.  Running.")
+        logger.log("bwmon:  Event received.  Running.", 2)
         database.db_lock.acquire()
         nmdbcopy = copy.deepcopy(database.db)
         database.db_lock.release()
         database.db_lock.acquire()
         nmdbcopy = copy.deepcopy(database.db)
         database.db_lock.release()