Cleaning slabslices and slapostgres.
authorSandrine Avakian <sandrine.avakian@inria.fr>
Fri, 19 Oct 2012 11:59:19 +0000 (13:59 +0200)
committerSandrine Avakian <sandrine.avakian@inria.fr>
Fri, 19 Oct 2012 12:05:28 +0000 (14:05 +0200)
sfa/senslab/slabdriver.py
sfa/senslab/slabpostgres.py
sfa/senslab/slabslices.py

index c8b58df..2f68bc9 100644 (file)
@@ -1174,6 +1174,7 @@ class SlabDriver(Driver):
  
     def GetLeaseGranularity(self):
         """ Returns the granularity of Senslab testbed.
+        OAR returns seconds for experiments duration.
         Defined in seconds. """
         
         grain = 60 
index b49416e..412d066 100644 (file)
@@ -1,5 +1,3 @@
-import sys
-
 from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
 
@@ -7,32 +5,31 @@ from sfa.util.config import Config
 from sfa.util.sfalogging import logger
 
 from sqlalchemy import Column, Integer, String
-from sqlalchemy import Table, Column, MetaData
+from sqlalchemy import Table, MetaData
 from sqlalchemy.ext.declarative import declarative_base
 
 from sqlalchemy.dialects import postgresql
 
-from sqlalchemy import MetaData, Table
 from sqlalchemy.exc import NoSuchTableError
 
-from sqlalchemy import String
 
 #Dict holding the columns names of the table as keys
 #and their type, used for creation of the table
-slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE \
+slice_table = {'record_id_user': 'integer PRIMARY KEY references X ON DELETE \
 CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  \
 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
 
 #Dict with all the specific senslab tables
 tablenames_dict = {'slice_senslab': slice_table}
 
-##############################
-
-
 
 SlabBase = declarative_base()
 
 class SliceSenslab (SlabBase):
+    """ SQL alchemy class to manipulate slice_senslab table in 
+    slab_sfa database.
+    
+    """
     __tablename__ = 'slice_senslab' 
     #record_id_user = Column(Integer, primary_key=True)
 
@@ -54,7 +51,7 @@ class SliceSenslab (SlabBase):
         if slice_hrn:
             self.slice_hrn = slice_hrn
         if record_id_user: 
-            self.record_id_user= record_id_user
+            self.record_id_user = record_id_user
         if peer_authority:
             self.peer_authority = peer_authority
             
@@ -83,16 +80,19 @@ class SliceSenslab (SlabBase):
           
           
 class SlabDB:
+    """ SQL Aclehmy connection class.
+    From alchemy.py
+    """
     def __init__(self,config, debug = False):
         self.sl_base = SlabBase
-        dbname="slab_sfa"
+        dbname = "slab_sfa"
         if debug == True :
             l_echo_pool = True
             l_echo=True 
         else :
             l_echo_pool = False
             l_echo = False 
-        # will be created lazily on-demand
+
         self.slab_session = None
         # the former PostgreSQL.py used the psycopg2 directly and was doing
         #self.connection.set_client_encoding("UNICODE")
@@ -125,8 +125,11 @@ class SlabDB:
     
     
     def check (self):
-        self.slab_engine.execute ("select 1").scalar()
+        """ Cehck if a table exists by trying a selection
+        on the table. 
         
+        """
+        self.slab_engine.execute ("select 1").scalar()
         
         
     def session (self):
@@ -160,9 +163,9 @@ class SlabDB:
        
         try:
             metadata = MetaData (bind=self.slab_engine)
-            table=Table (tablename, metadata, autoload=True)
-           
+            table = Table (tablename, metadata, autoload=True)
             return True
+        
         except NoSuchTableError:
             logger.log_exc("SLABPOSTGRES tablename %s does not exists" \
                             %(tablename))
@@ -184,6 +187,6 @@ class SlabDB:
 
 from sfa.util.config import Config
 
-slab_alchemy= SlabDB(Config())
-slab_engine=slab_alchemy.slab_engine
-slab_dbsession=slab_alchemy.session()
+slab_alchemy = SlabDB(Config())
+slab_engine = slab_alchemy.slab_engine
+slab_dbsession = slab_alchemy.session()
index 447b9c9..ded7a13 100644 (file)
@@ -132,31 +132,34 @@ class SlabSlices:
         return sfa_peer
         
         
-    def verify_slice_leases(self, sfa_slice, requested_jobs_dict, kept_leases, \
-        peer):
+    def verify_slice_leases(self, sfa_slice, requested_jobs_dict, peer):
 
        
         #First get the list of current leases from OAR          
         leases = self.driver.GetLeases({'name':sfa_slice['slice_hrn']})
-        logger.debug("SLABSLICES  verify_slice_leases requested_jobs_dict %s leases %s "%(requested_jobs_dict, leases ))
-        #leases = self.driver.GetLeases({'name':sfa_slice['name']},\
-                                     #['lease_id'])
+        logger.debug("SLABSLICES verify_slice_leases requested_jobs_dict %s \
+                        leases %s "%(requested_jobs_dict, leases ))
+        
+        
         if leases : 
             current_nodes_reserved_by_start_time = {}
             requested_nodes_by_start_time = {}
             leases_by_start_time = {}
-            #Create reduced dictionary with key start_time and value list of nodes
+            #Create reduced dictionary with key start_time and value 
+            # the list of nodes
             #-for the leases already registered by OAR first
             # then for the new leases requested by the user
             
             #Leases already scheduled/running in OAR
             for lease in leases :
-                current_nodes_reserved_by_start_time[lease['t_from']] = lease['reserved_nodes']
+                current_nodes_reserved_by_start_time[lease['t_from']] = \
+                        lease['reserved_nodes']
                 leases_by_start_time[lease['t_from']] = lease
                 
             #Requested jobs     
             for start_time in requested_jobs_dict:
-                requested_nodes_by_start_time[int(start_time)]  = requested_jobs_dict[start_time]['hostname']
+                requested_nodes_by_start_time[int(start_time)]  = \
+                        requested_jobs_dict[start_time]['hostname']
                      
             #Check if there is any difference between the leases already
             #registered in OAR and the requested jobs.   
@@ -165,14 +168,17 @@ class SlabSlices:
             #-Added/removed nodes
             #-Newly added lease 
 
-            logger.debug("SLABSLICES  verify_slice_leases  requested_nodes_by_start_time %s current_nodes_reserved_by_start_time %s " %(requested_nodes_by_start_time,current_nodes_reserved_by_start_time))  
             
             #Find all deleted leases
-            start_time_list = list(set(leases_by_start_time.keys()).difference(requested_nodes_by_start_time.keys()))
-            deleted_leases = [leases_by_start_time[start_time]['lease_id'] for start_time in start_time_list]
+            start_time_list = \
+                list(set(leases_by_start_time.keys()).\
+                difference(requested_nodes_by_start_time.keys()))
+            deleted_leases = [leases_by_start_time[start_time]['lease_id'] \
+                                for start_time in start_time_list]
 
-            
-            reschedule = {}
+  
+            reschedule_jobs_dict = {}
+            #Find added or removed nodes in exisiting leases
             for start_time in requested_nodes_by_start_time:
                 if start_time in current_nodes_reserved_by_start_time:
                     
@@ -181,74 +187,60 @@ class SlabSlices:
                         continue
                     
                     else:
-                        update_node_set = set(requested_nodes_by_start_time[start_time])
-                        added_nodes = update_node_set.difference(current_nodes_reserved_by_start_time[start_time])
-                        shared_nodes = update_node_set.intersection(current_nodes_reserved_by_start_time[start_time])
-                        old_nodes_set = set(current_nodes_reserved_by_start_time[start_time])
-                        removed_nodes = old_nodes_set.difference(requested_nodes_by_start_time[start_time])
-                        logger.debug("SLABSLICES  verify_slice_leases shared_nodes %s  added_nodes %s removed_nodes %s"%(shared_nodes, added_nodes,removed_nodes ))
-                        #If the lease is modified, delete it before creating it again.
+                        update_node_set = \
+                                set(requested_nodes_by_start_time[start_time])
+                        added_nodes = \
+                            update_node_set.difference(\
+                            current_nodes_reserved_by_start_time[start_time])
+                        shared_nodes = \
+                            update_node_set.intersection(\
+                            current_nodes_reserved_by_start_time[start_time])
+                        old_nodes_set = \
+                            set(\
+                            current_nodes_reserved_by_start_time[start_time])
+                        removed_nodes = \
+                            old_nodes_set.difference(\
+                            requested_nodes_by_start_time[start_time])
+                        logger.debug("SLABSLICES verify_slice_leases \
+                            shared_nodes %s  added_nodes %s removed_nodes %s"\
+                            %(shared_nodes, added_nodes,removed_nodes ))
+                        #If the lease is modified, delete it before 
+                        #creating it again.
                         #Add the deleted lease job id in the list
+                        #WARNING :rescheduling does not work if there is already  
+                        # 2 running/scheduled jobs because deleting a job 
+                        #takes time SA 18/10/2012
                         if added_nodes or removed_nodes:
-                            deleted_leases.append(leases_by_start_time[start_time]['lease_id'])
-                            #Reschedule the job
+                            deleted_leases.append(\
+                                leases_by_start_time[start_time]['lease_id'])
+                            #Reschedule the job 
                             if added_nodes or shared_nodes:
-                                reschedule[str(start_time)] = requested_jobs_dict[str(start_time)]
-                                #logger.debug("SLABSLICES  verify_slice_leases RESCHEDULE!!!!!")
-                                #job = requested_jobs_dict[str(start_time)]
-                                #self.driver.AddLeases(job['hostname'], \
-                                #sfa_slice, int(job['start_time']), \
-                                #int(job['duration']))
+                                reschedule_jobs_dict[str(start_time)] = \
+                                            requested_jobs_dict[str(start_time)]
+
                 else: 
                         #New lease 
-                        logger.debug("SLABSLICES  verify_slice_leases NEW LEASE")
+                        logger.debug("SLABSLICES verify_slice_leases NEW LEASE")
                         job = requested_jobs_dict[str(start_time)]
+                        
                         self.driver.AddLeases(job['hostname'], \
-                        sfa_slice, int(job['start_time']), \
-                        int(job['duration']))
-         
-            current_leases = [lease['lease_id'] for lease in leases]
-            logger.debug("SLABSLICES  verify_slice_leases  current_leases %s kept_leases %s requested_jobs_dict %s"\
-                                        %(current_leases,kept_leases,requested_jobs_dict))
+                                sfa_slice, int(job['start_time']), \
+                                int(job['duration']))
+
             #Deleted leases are the ones with lease id not declared in the Rspec
-            #deleted_leases = list(set(current_leases).difference(kept_leases))
             if deleted_leases:
                 self.driver.DeleteLeases(deleted_leases, sfa_slice['slice_hrn'])
-                
-            if reschedule : 
+                logger.debug("SLABSLICES \
+                        verify_slice_leases slice %s deleted_leases %s"\
+                        %(sfa_slice, deleted_leases))
+                        
+                        
+            if reschedule_jobs_dict : 
                 for start_time in reschedule :
-                    job = reschedule[start_time]
+                    job = reschedule_jobs_dict[start_time]
                     self.driver.AddLeases(job['hostname'], \
                         sfa_slice, int(job['start_time']), \
                         int(job['duration']))
-                       
-            #try:
-                ##if peer:
-                    ##peer = RegAuyhority object is unsubscriptable
-                    ##TODO :UnBindObjectFromPeer Quick and dirty 
-                    ##auth='senslab2 SA 27/07/12
-                    
-                    ##Commented out UnBindObjectFromPeer SA 09/10/12
-                    ##self.driver.UnBindObjectFromPeer('senslab2', 'slice', \
-                                    ##sfa_slice['record_id_slice'], peer.hrn)
-                #logger.debug("SLABSLICES  verify_slice_leases slice %s deleted_leases %s"\
-                                        #%(sfa_slice, deleted_leases))
-                #self.driver.DeleteLeases(deleted_leases, \
-                                        #sfa_slice['name'])
-                ##self.driver.DeleteLeases(deleted_leases, \
-                                        ##sfa_slice['name'])
-               
-            ##TODO verify_slice_leases: catch other exception?
-            #except KeyError: 
-                #logger.log_exc('Failed to remove slice leases')
-                
-        ##Add new leases        
-        #for start_time in requested_jobs_dict:
-            #job = requested_jobs_dict[start_time]
-            #self.driver.AddLeases(job['hostname'], \
-                        #sfa_slice, int(job['start_time']), \
-                        #int(job['duration']))
-                        
         return leases
 
     def verify_slice_nodes(self, sfa_slice, requested_slivers, peer):
@@ -256,7 +248,8 @@ class SlabSlices:
         deleted_nodes = []
 
         if 'node_ids' in sfa_slice:
-            nodes = self.driver.GetNodes(sfa_slice['list_node_ids'], ['hostname'])
+            nodes = self.driver.GetNodes(sfa_slice['list_node_ids'], \
+                ['hostname'])
             current_slivers = [node['hostname'] for node in nodes]
     
             # remove nodes not in rspec
@@ -269,7 +262,7 @@ class SlabSlices:
             #Update the table with the nodes that populate the slice
             logger.debug("SLABSLICES \tverify_slice_nodes slice %s\
                                          \r\n \r\n deleted_nodes %s"\
-                                        %(sfa_slice,deleted_nodes))
+                                        %(sfa_slice, deleted_nodes))
 
             if deleted_nodes:
                 #Delete the entire experience