log subprocess calls.
[nodemanager.git] / bwmon.py
index 10cdd95..ce33e19 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
@@ -33,14 +33,18 @@ import database
 from sets import Set
 
 # Defaults
 from sets import Set
 
 # Defaults
-debug = False
-verbose = False
+# Set DEBUG to True if you don't want to send emails
+DEBUG = False
+# Set ENABLE to False to setup buckets, but not limit.
+ENABLE = True
+
 datafile = "/var/lib/misc/bwmon.dat"
 
 try:
     sys.path.append("/etc/planetlab")
     from plc_config import *
 except:
 datafile = "/var/lib/misc/bwmon.dat"
 
 try:
     sys.path.append("/etc/planetlab")
     from plc_config import *
 except:
+    DEBUG = True
     logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
     logger.log("bwmon:  Running in DEBUG mode.  Logging to file and not emailing.", 1)
 
     logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
     logger.log("bwmon:  Running in DEBUG mode.  Logging to file and not emailing.", 1)
 
@@ -221,7 +225,7 @@ class Slice:
             self.MinRate = MinRate
             logger.log("bwmon:  Updating %s: Min Rate = %s" %(self.name, self.MinRate))
 
             self.MinRate = MinRate
             logger.log("bwmon:  Updating %s: Min Rate = %s" %(self.name, self.MinRate))
 
-        MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
+        MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
         if MaxRate != self.MaxRate:
             self.MaxRate = MaxRate
             logger.log("bwmon:  Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
         if MaxRate != self.MaxRate:
             self.MaxRate = MaxRate
             logger.log("bwmon:  Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
@@ -231,7 +235,7 @@ class Slice:
             self.Mini2Rate = Mini2Rate 
             logger.log("bwmon:  Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
 
             self.Mini2Rate = Mini2Rate 
             logger.log("bwmon:  Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
 
-        Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
+        Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
         if Maxi2Rate != self.Maxi2Rate:
             self.Maxi2Rate = Maxi2Rate
             logger.log("bwmon:  Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
         if Maxi2Rate != self.Maxi2Rate:
             self.Maxi2Rate = Maxi2Rate
             logger.log("bwmon:  Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
@@ -272,7 +276,6 @@ class Slice:
         Begin a new recording period. Remove caps by restoring limits
         to their default values.
         """
         Begin a new recording period. Remove caps by restoring limits
         to their default values.
         """
-        
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
 
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
 
@@ -314,7 +317,7 @@ class Slice:
                   'date': time.asctime(time.gmtime()) + " GMT",
                   'period': format_period(period)}
 
                   'date': time.asctime(time.gmtime()) + " GMT",
                   'period': format_period(period)}
 
-        if new_maxrate != self.MaxRate:
+        if new_maxrate != (self.MaxRate * 1000):
             # Format template parameters for low bandwidth message
             params['class'] = "low bandwidth"
             params['bytes'] = format_bytes(usedbytes - self.bytes)
             # Format template parameters for low bandwidth message
             params['class'] = "low bandwidth"
             params['bytes'] = format_bytes(usedbytes - self.bytes)
@@ -325,7 +328,7 @@ class Slice:
             message += template % params
             logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
 
             message += template % params
             logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
 
-        if new_maxexemptrate != self.Maxi2Rate:
+        if new_maxexemptrate != (self.Maxi2Rate * 1000):
             # Format template parameters for high bandwidth message
             params['class'] = "high bandwidth"
             params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
             # Format template parameters for high bandwidth message
             params['class'] = "high bandwidth"
             params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
@@ -336,13 +339,14 @@ class Slice:
             logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
        
         # Notify slice
             logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
        
         # Notify slice
-        if message and self.emailed == False:
+        if self.emailed == False:
             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
-            if debug:
+            if DEBUG:
                 logger.log("bwmon:  "+ subject)
                 logger.log("bwmon:  "+ message + (footer % params))
             else:
                 self.emailed = True
                 logger.log("bwmon:  "+ subject)
                 logger.log("bwmon:  "+ message + (footer % params))
             else:
                 self.emailed = True
+                logger.log("bwmon:  Emailing %s" % self.name)
                 slicemail(self.name, subject, message + (footer % params))
 
 
                 slicemail(self.name, subject, message + (footer % params))
 
 
@@ -352,7 +356,13 @@ class Slice:
         exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.  
         Recalculate every time module runs.
         """
         exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.  
         Recalculate every time module runs.
         """
-    
+         
+        # copy self.Min* and self.*share values for comparison later.
+        runningMinRate = self.MinRate
+        runningMini2Rate = self.Mini2Rate
+        runningshare = self.Share
+        runningsharei2 = self.Sharei2
+
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
 
         # Query Node Manager for max rate overrides
         self.updateSliceAttributes(rspec)    
 
@@ -362,17 +372,17 @@ class Slice:
             maxbyte = self.MaxKByte * 1024
             bytesused = usedbytes - self.bytes
             timeused = int(time.time() - self.time)
             maxbyte = self.MaxKByte * 1024
             bytesused = usedbytes - self.bytes
             timeused = int(time.time() - self.time)
-            # Calcuate new rate.
+            # Calcuate new rate. in bit/s
             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
             # Never go under MinRate
             if new_maxrate < (self.MinRate * 1000):
                 new_maxrate = self.MinRate * 1000
             # State information.  I'm capped.
             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
             # Never go under MinRate
             if new_maxrate < (self.MinRate * 1000):
                 new_maxrate = self.MinRate * 1000
             # State information.  I'm capped.
-            self.capped = True
+            self.capped += True
         else:
             # Sanity Check
             new_maxrate = self.MaxRate * 1000
         else:
             # Sanity Check
             new_maxrate = self.MaxRate * 1000
-            self.capped = False
+            self.capped += False
  
         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
             maxi2byte = self.Maxi2KByte * 1024
  
         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
             maxi2byte = self.Maxi2KByte * 1024
@@ -384,14 +394,21 @@ class Slice:
             if new_maxi2rate < (self.Mini2Rate * 1000):
                 new_maxi2rate = self.Mini2Rate * 1000
             # State information.  I'm capped.
             if new_maxi2rate < (self.Mini2Rate * 1000):
                 new_maxi2rate = self.Mini2Rate * 1000
             # State information.  I'm capped.
-            self.capped = True
+            self.capped += True
         else:
             # Sanity
             new_maxi2rate = self.Maxi2Rate * 1000
         else:
             # Sanity
             new_maxi2rate = self.Maxi2Rate * 1000
-            self.capped = False
-
-        # Apply parameters
-        bwlimit.set(xid = self.xid, 
+            self.capped += False
+
+        # Check running values against newly calculated values so as not to run tc
+        # unnecessarily
+        if (runningmaxrate != new_maxrate) or \
+        (runningMinRate != self.MinRate) or \
+        (runningmaxi2rate != new_maxi2rate) or \
+        (runningMini2Rate != self.Mini2Rate) or \
+        (runningshare != self.Share):
+            # Apply parameters
+            bwlimit.set(xid = self.xid, 
                 minrate = self.MinRate * 1000, 
                 maxrate = new_maxrate,
                 minexemptrate = self.Mini2Rate * 1000,
                 minrate = self.MinRate * 1000, 
                 maxrate = new_maxrate,
                 minexemptrate = self.Mini2Rate * 1000,
@@ -399,7 +416,7 @@ class Slice:
                 share = self.Share)
 
         # Notify slice
                 share = self.Share)
 
         # Notify slice
-        if self.capped == True and self.emailed == False:
+        if self.capped == True:
             self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
 
 
             self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
 
 
@@ -447,8 +464,7 @@ def sync(nmdbcopy):
         default_Maxi2Rate, \
         default_MaxKByte,\
         default_Maxi2KByte,\
         default_Maxi2Rate, \
         default_MaxKByte,\
         default_Maxi2KByte,\
-        default_Share,\
-        verbose
+        default_Share
 
     # All slices
     names = []
 
     # All slices
     names = []
@@ -458,7 +474,7 @@ def sync(nmdbcopy):
 
     # Incase default isn't set yet.
     if default_MaxRate == -1:
 
     # Incase default isn't set yet.
     if default_MaxRate == -1:
-        default_MaxRate = 1000000
+        default_MaxRate = 10000000
 
     try:
         f = open(datafile, "r+")
 
     try:
         f = open(datafile, "r+")
@@ -598,6 +614,7 @@ def sync(nmdbcopy):
     kernelhtbs = gethtbs(root_xid, default_xid)
     logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
     kernelhtbs = gethtbs(root_xid, default_xid)
     logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
+    # Update all byte limites on all slices
     for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
         if xid == root_xid or xid == default_xid: continue
     for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
         if xid == root_xid or xid == default_xid: continue
@@ -616,7 +633,7 @@ def sync(nmdbcopy):
                 kernelhtbs[xid]['usedbytes'], \
                 kernelhtbs[xid]['usedi2bytes'], \
                 live[xid]['_rspec'])
                 kernelhtbs[xid]['usedbytes'], \
                 kernelhtbs[xid]['usedi2bytes'], \
                 live[xid]['_rspec'])
-        else:
+        elif ENABLE:
             logger.log("bwmon:  Updating slice %s" % slice.name, 2)
             # Update byte counts
             slice.update(kernelhtbs[xid]['maxrate'], \
             logger.log("bwmon:  Updating slice %s" % slice.name, 2)
             # Update byte counts
             slice.update(kernelhtbs[xid]['maxrate'], \
@@ -631,9 +648,40 @@ def sync(nmdbcopy):
     pickle.dump((version, slices, deaddb), f)
     f.close()
 
     pickle.dump((version, slices, deaddb), f)
     f.close()
 
+
+def getDefaults(nmdbcopy):
+    '''
+    Get defaults from default slice's slice attributes.
+    '''
+    status = True
+    # default slice
+    dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default")
+    if dfltslice: 
+        if dfltslice['rspec']['net_max_rate'] == -1:
+            allOff()
+            status = False
+    return status
+
+
+def allOff():
+    """
+    Turn off all slice HTBs
+    """
+    # Get/set special slice IDs
+    root_xid = bwlimit.get_xid("root")
+    default_xid = bwlimit.get_xid("default")
+    kernelhtbs = gethtbs(root_xid, default_xid)
+    if len(kernelhtbs):
+        logger.log("bwlimit:  Disabling all running HTBs.")
+        for htb in kernelhtbs.keys(): bwlimit.off(htb) 
+
+
 lock = threading.Event()
 def run():
 lock = threading.Event()
 def run():
-    """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
+    """
+    When run as a thread, wait for event, lock db, deep copy it, release it, 
+    run bwmon.GetSlivers(), then go back to waiting.
+    """
     logger.log("bwmon:  Thread started", 2)
     while True:
         lock.wait()
     logger.log("bwmon:  Thread started", 2)
     while True:
         lock.wait()
@@ -641,7 +689,11 @@ def run():
         database.db_lock.acquire()
         nmdbcopy = copy.deepcopy(database.db)
         database.db_lock.release()
         database.db_lock.acquire()
         nmdbcopy = copy.deepcopy(database.db)
         database.db_lock.release()
-        try:  sync(nmdbcopy)
+        try:  
+            if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev eth0")) > 0:
+                # class show to check if net:InitNodeLimit:bwlimit.init has run.
+                sync(nmdbcopy)
+            else: logger.log("bwmon:  BW limits DISABLED.")
         except: logger.log_exc()
         lock.clear()
 
         except: logger.log_exc()
         lock.clear()