moving towards reservable nodes
authorThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 9 Jun 2010 17:58:19 +0000 (17:58 +0000)
committerThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 9 Jun 2010 17:58:19 +0000 (17:58 +0000)
accounts.py
api.py
api_calls.py
bwmon.py
controller.py
database.py
nodemanager.py
plugins/omf_resctl.py
plugins/reservation.py
setup.py
slivermanager.py [moved from sm.py with 89% similarity]

index f07cb19..5bd9b60 100644 (file)
@@ -3,10 +3,12 @@
 
 """Functionality common to all account classes.
 
-Each subclass of Account must provide five methods: create() and
-destroy(), which are static; configure(), start(), and stop(), which
-are not.  configure(), which takes a record as its only argument, does
-things like set up ssh keys.  In addition, an Account subclass must
+Each subclass of Account must provide five methods: 
+  (*) create() and destroy(), which are static; 
+  (*) configure(), start(), and stop(), which are not.  
+
+configure(), which takes a record as its only argument, does
+things like set up ssh keys. In addition, an Account subclass must
 provide static member variables SHELL, which contains the unique shell
 that it uses; and TYPE, a string that is used by the account creation
 code.  For no particular reason, TYPE is divided hierarchically by
@@ -23,10 +25,9 @@ numbers of accounts, this may cause the NM process to run out of
 maximum stack size.
 """
 
-import Queue
+#import Queue
 import os
-import pwd
-import grp
+import pwd, grp
 import threading
 
 import logger
@@ -80,6 +81,7 @@ class Account:
 
     @staticmethod
     def create(name, vref = None): abstract
+
     @staticmethod
     def destroy(name): abstract
 
@@ -123,6 +125,7 @@ class Account:
     def is_running(self): pass
 
 class Worker:
+
     def __init__(self, name):
         self.name = name  # username
         self._acct = None  # the account object currently associated with this worker
diff --git a/api.py b/api.py
index a6cae30..b9ef8d1 100644 (file)
--- a/api.py
+++ b/api.py
@@ -106,6 +106,7 @@ class APIServer_UNIX(APIServer_INET): address_family = socket.AF_UNIX
 
 def start():
     """Start two XMLRPC interfaces: one bound to localhost, the other bound to a Unix domain socket."""
+    logger.log('api.start')
     serv1 = APIServer_INET(('127.0.0.1', API_SERVER_PORT), requestHandler=APIRequestHandler, logRequests=0)
     tools.as_daemon_thread(serv1.serve_forever)
     try: os.unlink(UNIX_ADDR)
index 1e350c7..a175391 100644 (file)
@@ -42,7 +42,7 @@ except: import logger as sliver_vs
 import ticket as ticket_module
 import tools
 
-deliver_ticket = None  # set in sm.py:start()
+deliver_ticket = None  # set in slivermanager.start()
 
 api_method_dict = {}
 nargs_dict = {}
index f18c710..a65ca9d 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
@@ -46,8 +46,8 @@ try:
     from plc_config import *
 except:
     DEBUG = True
-    logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
-    logger.log("bwmon:  Running in DEBUG mode.  Logging to file and not emailing.", 1)
+    logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
+    logger.log("bwmon: Running in DEBUG mode.  Logging to file and not emailing.", 1)
 
 # Constants
 seconds_per_day = 24 * 60 * 60
@@ -240,52 +240,52 @@ class Slice:
             MinRate = int(.25 * default_MaxRate)
         if MinRate != self.MinRate:
             self.MinRate = MinRate
-            logger.log("bwmon:  Updating %s: Min Rate = %s" %(self.name, self.MinRate))
+            logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
 
         MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
         if MaxRate != self.MaxRate:
             self.MaxRate = MaxRate
-            logger.log("bwmon:  Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
+            logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
 
         Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
         if Mini2Rate != self.Mini2Rate:
             self.Mini2Rate = Mini2Rate 
-            logger.log("bwmon:  Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
+            logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
 
         Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
         if Maxi2Rate != self.Maxi2Rate:
             self.Maxi2Rate = Maxi2Rate
-            logger.log("bwmon:  Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
+            logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
                           
         MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
         if MaxKByte != self.MaxKByte:
             self.MaxKByte = MaxKByte
-            logger.log("bwmon:  Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
+            logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
                           
         Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
         if Maxi2KByte != self.Maxi2KByte:
             self.Maxi2KByte = Maxi2KByte
-            logger.log("bwmon:  Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
+            logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
                           
         ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
         if ThreshKByte != self.ThreshKByte:
             self.ThreshKByte = ThreshKByte
-            logger.log("bwmon:  Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
+            logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
                           
         Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
         if Threshi2KByte != self.Threshi2KByte:    
             self.Threshi2KByte = Threshi2KByte
-            logger.log("bwmon:  Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
+            logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
  
         Share = int(rspec.get('net_share', default_Share))
         if Share != self.Share:
             self.Share = Share
-            logger.log("bwmon:  Updating %s: Net Share = %s" %(self.name, self.Share))
+            logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
 
         Sharei2 = int(rspec.get('net_i2_share', default_Share))
         if Sharei2 != self.Sharei2:
             self.Sharei2 = Sharei2 
-            logger.log("bwmon:  Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
+            logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
 
 
     def reset(self, runningrates, rspec):
@@ -321,7 +321,7 @@ class Slice:
          (maxi2rate != runningrates.get('maxexemptrate', 0)) or \
          (mini2rate != runningrates.get('minexemptrate', 0)) or \
          (self.Share != runningrates.get('share', 0)):
-            logger.log("bwmon:  %s reset to %s/%s" % \
+            logger.log("bwmon: %s reset to %s/%s" % \
                   (self.name,
                    bwlimit.format_tc_rate(maxrate),
                    bwlimit.format_tc_rate(maxi2rate)), 1)
@@ -353,7 +353,7 @@ class Slice:
 
             # Cap low bandwidth burst rate
             message += template % params
-            logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
+            logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
 
         if new_maxexemptrate != (self.Maxi2Rate * 1000):
             # Format template parameters for high bandwidth message
@@ -363,17 +363,17 @@ class Slice:
             params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
  
             message += template % params
-            logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
+            logger.log("bwmon:  ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
        
         # Notify slice
         if self.emailed == False:
             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
             if DEBUG:
-                logger.log("bwmon:  "+ subject)
-                logger.log("bwmon:  "+ message + (footer % params))
+                logger.log("bwmon: "+ subject)
+                logger.log("bwmon: "+ message + (footer % params))
             else:
                 self.emailed = True
-                logger.log("bwmon:  Emailing %s" % self.name)
+                logger.log("bwmon: Emailing %s" % self.name)
                 slicemail(self.name, subject, message + (footer % params))
 
 
@@ -465,7 +465,7 @@ def gethtbs(root_xid, default_xid):
         and (xid != default_xid):
             # Orphaned (not associated with a slice) class
             name = "%d?" % xid
-            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name, 1)
+            logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
             bwlimit.off(xid)
 
         livehtbs[xid] = {'share': share,
@@ -506,12 +506,12 @@ def sync(nmdbcopy):
 
     try:
         f = open(DB_FILE, "r+")
-        logger.log("bwmon:  Loading %s" % DB_FILE, 2)
+        logger.log("bwmon: Loading %s" % DB_FILE, 2)
         (version, slices, deaddb) = pickle.load(f)
         f.close()
         # Check version of data file
         if version != "$Id$":
-            logger.log("bwmon:  Not using old version '%s' data file %s" % (version, DB_FILE))
+            logger.log("bwmon: Not using old version '%s' data file %s" % (version, DB_FILE))
             raise Exception
     except Exception:
         version = "$Id$"
@@ -539,28 +539,28 @@ def sync(nmdbcopy):
     for plcSliver in nmdbcopy.keys():
         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
 
-    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__(), 2)
-    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__(), 2)
+    logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
+    logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
 
     # Get actual running values from tc.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
-    logger.log("bwmon:  Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
+    logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
     # The dat file has HTBs for slices, but the HTBs aren't running
     nohtbslices =  set(slices.keys()) - set(kernelhtbs.keys())
-    logger.log( "bwmon:  Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
+    logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
     # Reset tc counts.
     for nohtbslice in nohtbslices:
         if live.has_key(nohtbslice): 
             slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] )
         else:
-            logger.log("bwmon:  Removing abondoned slice %s from dat." % nohtbslice)
+            logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
             del slices[nohtbslice]
 
     # The dat file doesnt have HTB for the slice but kern has HTB
     slicesnodat = set(kernelhtbs.keys()) - set(slices.keys())
-    logger.log( "bwmon:  Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
+    logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
     for slicenodat in slicesnodat:
         # But slice is running 
         if live.has_key(slicenodat): 
@@ -573,7 +573,7 @@ def sync(nmdbcopy):
     # Get new slices.
     # Slices in GetSlivers but not running HTBs
     newslicesxids = set(live.keys()) - set(kernelhtbs.keys())
-    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__(), 2)
+    logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
        
     # Setup new slices
     for newslice in newslicesxids:
@@ -607,38 +607,38 @@ def sync(nmdbcopy):
                 del deaddb[deadslice['slice'].name]
                 del newvals
         else:
-            logger.log("bwmon:  Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
+            logger.log("bwmon: Slice %s doesn't have xid.  Skipping." % live[newslice]['name'])
 
     # Move dead slices that exist in the pickle file, but
     # aren't instantiated by PLC into the dead dict until
     # recording period is over.  This is to avoid the case where a slice is dynamically created
     # and destroyed then recreated to get around byte limits.
     deadxids = set(slices.keys()) - set(live.keys())
-    logger.log("bwmon:  Found %s dead slices" % (deadxids.__len__() - 2), 2)
+    logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
     for deadxid in deadxids:
         if deadxid == root_xid or deadxid == default_xid:
             continue
-        logger.log("bwmon:  removing dead slice %s " % deadxid)
+        logger.log("bwmon: removing dead slice %s " % deadxid)
         if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
             # add slice (by name) to deaddb
-            logger.log("bwmon:  Saving bandwidth totals for %s." % slices[deadxid].name)
+            logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
             deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
             del slices[deadxid]
         if kernelhtbs.has_key(deadxid): 
-            logger.log("bwmon:  Removing HTB for %s." % deadxid, 2)
+            logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
             bwlimit.off(deadxid)
     
     # Clean up deaddb
     for deadslice in deaddb.keys():
         if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
-            logger.log("bwmon:  Removing dead slice %s from dat." \
+            logger.log("bwmon: Removing dead slice %s from dat." \
                         % deaddb[deadslice]['slice'].name)
             del deaddb[deadslice]
 
     # Get actual running values from tc since we've added and removed buckets.
     # Update slice totals and bandwidth. {xid: {values}}
     kernelhtbs = gethtbs(root_xid, default_xid)
-    logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
+    logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
 
     # Update all byte limites on all slices
     for (xid, slice) in slices.iteritems():
@@ -656,11 +656,11 @@ def sync(nmdbcopy):
             # were re-initialized).
             slice.reset(kernelhtbs[xid], live[xid]['_rspec'])
         elif ENABLE:
-            logger.log("bwmon:  Updating slice %s" % slice.name, 2)
+            logger.log("bwmon: Updating slice %s" % slice.name, 2)
             # Update byte counts
             slice.update(kernelhtbs[xid], live[xid]['_rspec'])
 
-    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),DB_FILE), 2)
+    logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),DB_FILE), 2)
     f = open(DB_FILE, "w")
     pickle.dump((version, slices, deaddb), f)
     f.close()
@@ -690,7 +690,7 @@ def allOff():
     default_xid = bwlimit.get_xid("default")
     kernelhtbs = gethtbs(root_xid, default_xid)
     if len(kernelhtbs):
-        logger.log("bwmon:  Disabling all running HTBs.")
+        logger.log("bwmon: Disabling all running HTBs.")
         for htb in kernelhtbs.keys(): bwlimit.off(htb) 
 
 
@@ -700,10 +700,10 @@ def run():
     When run as a thread, wait for event, lock db, deep copy it, release it, 
     run bwmon.GetSlivers(), then go back to waiting.
     """
-    logger.log("bwmon:  Thread started", 2)
+    logger.log("bwmon: Thread started", 2)
     while True:
         lock.wait()
-        logger.log("bwmon:  Event received.  Running.", 2)
+        logger.log("bwmon: Event received.  Running.", 2)
         database.db_lock.acquire()
         nmdbcopy = copy.deepcopy(database.db)
         database.db_lock.release()
@@ -711,7 +711,7 @@ def run():
             if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0:
                 # class show to check if net:InitNodeLimit:bwlimit.init has run.
                 sync(nmdbcopy)
-            else: logger.log("bwmon:  BW limits DISABLED.")
+            else: logger.log("bwmon: BW limits DISABLED.")
         except: logger.log_exc("bwmon failed")
         lock.clear()
 
index 42f7514..ab8e521 100644 (file)
@@ -1,7 +1,8 @@
 # $Id$
 # $URL$
 
-"""Delegate accounts are used to provide secure access to the XMLRPC API.  They are normal Unix accounts with a shell that tunnels XMLRPC requests to the API server."""
+"""Delegate accounts are used to provide secure access to the XMLRPC API.  
+They are normal Unix accounts with a shell that tunnels XMLRPC requests to the API server."""
 
 import accounts
 import logger
index ba6fbb4..16ca104 100644 (file)
@@ -182,4 +182,5 @@ It proceeds to handle dump requests forever."""
     except:
         logger.log_exc("database: failed in start")
         db = Database()
+    logger.log('database.start')
     tools.as_daemon_thread(run)
index 386776f..11be5f9 100755 (executable)
@@ -32,8 +32,6 @@ import random
 
 class NodeManager:
 
-    id="$Id$"
-
     PLUGIN_PATH = "/usr/share/NodeManager/plugins"
 
     DB_FILE = "/var/lib/nodemanager/getslivers.pickle"
@@ -42,7 +40,7 @@ class NodeManager:
     # NOTE: modules listed here will also be loaded in this order
     # once loaded, they get re-ordered after their priority (lower comes first) 
     # for determining the runtime order
-    core_modules=['net','conf_files', 'sm', 'bwmon']
+    core_modules=['net', 'conf_files', 'slivermanager', 'bwmon']
 
     default_period=600
     default_random=301
@@ -84,7 +82,7 @@ class NodeManager:
 
 
     def GetSlivers(self, config, plc):
-        """Run call backs defined in modules"""
+        """Retrieves GetSlivers at PLC and triggers callbacks defined in modules/plugins"""
         try: 
             logger.log("nodemanager: Syncing w/ PLC")
             # retrieve GetSlivers from PLC
@@ -93,10 +91,10 @@ class NodeManager:
             self.getPLCDefaults(data, config)
             # tweak the 'vref' attribute from GetSliceFamily
             self.setSliversVref (data)
-            # log it for debug purposes, no matter what verbose is
-            logger.log_slivers(data)
             # dump it too, so it can be retrieved later in case of comm. failure
             self.dumpSlivers(data)
+            # log it for debug purposes, no matter what verbose is
+            logger.log_slivers(data)
             logger.verbose("nodemanager: Sync w/ PLC done")
             last_data=data
         except: 
@@ -108,7 +106,7 @@ class NodeManager:
             last_data=self.loadSlivers()
         #  Invoke GetSlivers() functions from the callback modules
         for module in self.loaded_modules:
-            logger.verbose('triggering GetSlivers callback for module %s'%module.__name__)
+            logger.verbose('nodemanager: triggering %s.GetSlivers'%module.__name__)
             try:        
                 callback = getattr(module, 'GetSlivers')
                 module_data=data
@@ -195,6 +193,7 @@ class NodeManager:
             for module in self.modules:
                 try:
                     m = __import__(module)
+                    logger.verbose("nodemanager: triggering %s.start"%m.__name__)
                     m.start(self.options, config)
                     self.loaded_modules.append(m)
                 except ImportError, err:
@@ -245,7 +244,7 @@ class NodeManager:
         except: logger.log_exc("nodemanager: failed in run")
 
 def run():
-    logger.log("======================================== Entering nodemanager.py "+NodeManager.id)
+    logger.log("======================================== Entering nodemanager.py")
     NodeManager().run()
     
 if __name__ == '__main__':
index bd09c05..f1a6d2a 100644 (file)
@@ -5,7 +5,7 @@
 # NodeManager plugin - first step of handling omf_controlled slices
 
 """
-Overwrites the 'resctl' tag of slivers controlled by OMF so sm.py does the right thing
+Overwrites the 'resctl' tag of slivers controlled by OMF so slivermanager.py does the right thing
 """
 
 import os
index 21e3e55..ddb974b 100644 (file)
 # $Id$
 # $URL$
 #
-# NodeManager plugin - first step of handling omf_controlled slices
+# NodeManager plugin - first step of handling reservable nodes
 
 """
-Overwrites the 'resctl' tag of slivers controlled by OMF so sm.py does the right thing
+Manages running slices when reservation_policy is 'lease_or_idle' or 'lease_or_shared'
 """
 
+import time
+import threading
+
 import logger
 
 priority = 45
-# this instructs nodemanager that we want to use the latest known data when the plc link is down
+
+# this instructs nodemanager that we want to use the latest known data in case the plc link is down
 persistent_data = True
 
+# of course things would be simpler if node manager was to create one instance of the plugins 
+# instead of blindly caling functions in the module...
+
+##############################
+# rough implementation for a singleton class
+def Singleton (klass,*args,**kwds):
+    if not hasattr(klass,'_instance'):
+        klass._instance=klass(*args,**kwds)
+    return klass._instance
+
 def start(options, conf):
-    logger.log("reservation: plugin starting up...")
+    return Singleton(reservation).start(options,conf)
 
 def GetSlivers(data, conf = None, plc = None):
+    return Singleton(reservation).GetSlivers(data, conf, plc)
+
+##############################
+class reservation:
+
+    def __init__ (self):
+        # the last snapshot of data exposed by GetSlivers
+        self.data = None
+        # this is a dict mapping a raounded timestamp to the corr. Timer object
+        self.timers = {}
+    # the granularity is set in the API (initial value is 15 minutes)
+    # and it used to round all leases start/until times
+    # changing this dynamically can have some weird effects of course..
+    def granularity (self):
+        try:
+            return self.data['lease_granularity']
+        # in case we'd try to access this before it's populated..
+        except:
+            return 60*60
+
+    # round to granularity
+    def round_time (self, time):
+        granularity=self.granularity()
+        return ((int(time)+granularity/2)/granularity)*granularity
+
+    def clear_timers (self):
+        for timer in self.timers.values():
+            timer.cancel()
+        self.timers={}
+
+    def clear_timer (self,timestamp):
+        round=self.round_time(timestamp)
+        if self.timers.has_key(round):
+            timer=self.timers[round]
+            timer.cancel()
+            del self.timers[round]
+
+    def sync_timers_from_leases (self):
+        self.clear_timers()
+        for lease in self.data['leases']:
+            self.ensure_timer(lease['t_from'])
+            self.ensure_timer(lease['t_until'])
+
+    def ensure_timer(self, timestamp):
+        now=time.time()
+        # forget about past events
+        if timestamp < now: return
+        round=self.round_time(timestamp)
+        if self.timers.has_key(round): return
+        def this_closure ():
+            self.round_time_callback (round)
+        timer=threading.Timer(timestamp-now,this_closure)
+        self.timers[round]=timer
+        timer.start()
+
+    def round_time_callback (self, time_arg):
+        now=time.time()
+        round_now=self.round_time(now)
+        logger.log('reservation.round_time_callback now=%f round_now=%d arg=%d...'%(now,round_now,time_arg))
+        leases_text="leases=%r"%self.data['leases']
+        logger.log(leases_text)
+
+    def show_time (self, timestamp):
+        return time.strftime ("%Y-%m-%d %H:%M %Z",time.gmtime(timestamp))
+
+    ####################
+    def start(self,options,conf):
+        logger.log("reservation: plugin performing dummy start...")
+
+    # this method is entirely about making sure that we have events scheduled 
+    # at the <granularity> intervals where there is a lease that starts or ends
+    def GetSlivers (self, data, conf=None, plc=None):
+    
+        # check we're using a compliant GetSlivers
+        if 'reservation_policy' not in data: 
+            logger.log_missing_data("reservation.GetSlivers",'reservation_policy')
+            return
+        reservation_policy=data['reservation_policy']
+        if 'leases' not in data: 
+            logger.log_missing_data("reservation.GetSlivers",'leases')
+            return
+    
+
+        # store data locally
+        # since we've asked for persistent_data, we should not get an empty data here
+        if data: self.data = data
 
-    if 'reservation_policy' not in data: 
-        logger.log_missing_data("reservation.GetSlivers",'reservation_policy')
-        return
-    reservation_policy=data['reservation_policy']
-
-    if 'leases' not in data: 
-        logger.log_missing_data("reservation.GetSlivers",'leases')
-        return
-
-    if reservation_policy in ['lease_or_idle','lease_or_shared']:
-        logger.log( 'reservation.GetSlivers - scaffolding...')
-    elif reservation_policy == 'none':
-        return
-    else:
-        logger.log("reservation: ignoring -- unexpected value for reservation_policy %r"%reservation_policy)
-        return
+        # regular nodes are not affected
+        if reservation_policy == 'none':
+            return
+        elif reservation_policy not in ['lease_or_idle','lease_or_shared']:
+            logger.log("reservation: ignoring -- unexpected value for reservation_policy %r"%reservation_policy)
+            return
+        # at this point we have reservation_policy in ['lease_or_idle','lease_or_shared']
+        # we make no difference for now
+        logger.verbose('reservation.GetSlivers : reservable node -- listing timers ')
+        
+        self.sync_timers_from_leases()
+        for timestamp in self.timers.keys():
+            logger.verbose('TIMER armed for %s'%self.show_time(timestamp))
+           
+        logger.verbose('reservation.GetSlivers : end listing timers')
+        
index ba3f4e8..bc10f60 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -31,7 +31,7 @@ setup(
         'plcapi',
         'safexmlrpc',
         'sliver_vs',
-        'sm',
+        'slivermanager',
         'ticket',
         'tools',
         ],
similarity index 89%
rename from sm.py
rename to slivermanager.py
index ae3b01f..56f2c48 100644 (file)
--- a/sm.py
@@ -10,21 +10,20 @@ also to make inter-sliver resource loans.  The sliver manager is also
 responsible for handling delegation accounts.
 """
 
-# $Id$
+priority=10
 
-try: from bwlimit import bwmin, bwmax
-except ImportError: bwmin, bwmax = 8, 1000*1000*1000
+import string,re
+
+import logger
 import accounts
-import api
-import api_calls
+import api, api_calls
 import database
 import controller
-import logger
 import sliver_vs
-import string,re
 
+try: from bwlimit import bwmin, bwmax
+except ImportError: bwmin, bwmax = 8, 1000*1000*1000
 
-priority=10
 
 DEFAULT_ALLOCATION = {
     'enabled': 1,
@@ -65,16 +64,16 @@ def GetSlivers(data, config = None, plc=None, fullupdate=True):
     in, use the GetSlivers() heartbeat as a cue to scan for expired
     slivers."""
 
-    logger.verbose("sm: Entering GetSlivers with fullupdate=%r"%fullupdate)
+    logger.verbose("slivermanager: Entering GetSlivers with fullupdate=%r"%fullupdate)
     for key in data.keys():
-        logger.verbose('sm: GetSlivers key : ' + key)
+        logger.verbose('slivermanager: GetSlivers key : ' + key)
 
     node_id = None
     try:
         f = open('/etc/planetlab/node_id')
         try: node_id = int(f.read())
         finally: f.close()
-    except: logger.log_exc("sm: GetSlivers failed to read /etc/planetlab/node_id")
+    except: logger.log_exc("slivermanager: GetSlivers failed to read /etc/planetlab/node_id")
 
     if data.has_key('node_id') and data['node_id'] != node_id: return
 
@@ -85,15 +84,15 @@ def GetSlivers(data, config = None, plc=None, fullupdate=True):
 
     # Take intscripts (global) returned by API, make dict
     if 'initscripts' not in data:
-        logger.log_missing_data("sm.GetSlivers",'initscripts')
+        logger.log_missing_data("slivermanager.GetSlivers",'initscripts')
         return
     initscripts = {}
     for is_rec in data['initscripts']:
-        logger.verbose("sm: initscript: %s" % is_rec['name'])
+        logger.verbose("slivermanager: initscript: %s" % is_rec['name'])
         initscripts[str(is_rec['name'])] = is_rec['script']
 
     for sliver in data['slivers']:
-        logger.verbose("sm: %s: sm:GetSlivers in slivers loop"%sliver['name'])
+        logger.verbose("slivermanager: %s: slivermanager.GetSlivers in slivers loop"%sliver['name'])
         rec = sliver.copy()
         rec.setdefault('timestamp', data['timestamp'])
 
@@ -148,8 +147,8 @@ def GetSlivers(data, config = None, plc=None, fullupdate=True):
     database.db.sync()
     accounts.Startingup = False
 
-def deliver_ticket(data): return GetSlivers(data, fullupdate=False)
-
+def deliver_ticket(data): 
+    return GetSlivers(data, fullupdate=False)
 
 def start(options, config):
     for resname, default_amt in sliver_vs.DEFAULT_ALLOCATION.iteritems():