From f0991937bc0af0f4fbdc72c71f972ab44ab1aac6 Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Fri, 19 Oct 2012 13:59:19 +0200 Subject: [PATCH] Cleaning slabslices and slapostgres. --- sfa/senslab/slabdriver.py | 1 + sfa/senslab/slabpostgres.py | 39 ++++++----- sfa/senslab/slabslices.py | 132 +++++++++++++++++------------------- 3 files changed, 84 insertions(+), 88 deletions(-) diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index c8b58dff..2f68bc98 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -1174,6 +1174,7 @@ class SlabDriver(Driver): def GetLeaseGranularity(self): """ Returns the granularity of Senslab testbed. + OAR returns seconds for experiments duration. Defined in seconds. """ grain = 60 diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index b49416eb..412d0663 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -1,5 +1,3 @@ -import sys - from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -7,32 +5,31 @@ from sfa.util.config import Config from sfa.util.sfalogging import logger from sqlalchemy import Column, Integer, String -from sqlalchemy import Table, Column, MetaData +from sqlalchemy import Table, MetaData from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.dialects import postgresql -from sqlalchemy import MetaData, Table from sqlalchemy.exc import NoSuchTableError -from sqlalchemy import String #Dict holding the columns names of the table as keys #and their type, used for creation of the table -slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE \ +slice_table = {'record_id_user': 'integer PRIMARY KEY references X ON DELETE \ CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', \ 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'} #Dict with all the specific senslab tables tablenames_dict = {'slice_senslab': slice_table} -############################## - - SlabBase = declarative_base() class SliceSenslab (SlabBase): + """ SQL alchemy class to manipulate slice_senslab table in + slab_sfa database. + + """ __tablename__ = 'slice_senslab' #record_id_user = Column(Integer, primary_key=True) @@ -54,7 +51,7 @@ class SliceSenslab (SlabBase): if slice_hrn: self.slice_hrn = slice_hrn if record_id_user: - self.record_id_user= record_id_user + self.record_id_user = record_id_user if peer_authority: self.peer_authority = peer_authority @@ -83,16 +80,19 @@ class SliceSenslab (SlabBase): class SlabDB: + """ SQL Aclehmy connection class. + From alchemy.py + """ def __init__(self,config, debug = False): self.sl_base = SlabBase - dbname="slab_sfa" + dbname = "slab_sfa" if debug == True : l_echo_pool = True l_echo=True else : l_echo_pool = False l_echo = False - # will be created lazily on-demand + self.slab_session = None # the former PostgreSQL.py used the psycopg2 directly and was doing #self.connection.set_client_encoding("UNICODE") @@ -125,8 +125,11 @@ class SlabDB: def check (self): - self.slab_engine.execute ("select 1").scalar() + """ Cehck if a table exists by trying a selection + on the table. + """ + self.slab_engine.execute ("select 1").scalar() def session (self): @@ -160,9 +163,9 @@ class SlabDB: try: metadata = MetaData (bind=self.slab_engine) - table=Table (tablename, metadata, autoload=True) - + table = Table (tablename, metadata, autoload=True) return True + except NoSuchTableError: logger.log_exc("SLABPOSTGRES tablename %s does not exists" \ %(tablename)) @@ -184,6 +187,6 @@ class SlabDB: from sfa.util.config import Config -slab_alchemy= SlabDB(Config()) -slab_engine=slab_alchemy.slab_engine -slab_dbsession=slab_alchemy.session() +slab_alchemy = SlabDB(Config()) +slab_engine = slab_alchemy.slab_engine +slab_dbsession = slab_alchemy.session() diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index 447b9c97..b00cb930 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -132,31 +132,34 @@ class SlabSlices: return sfa_peer - def verify_slice_leases(self, sfa_slice, requested_jobs_dict, kept_leases, \ - peer): + def verify_slice_leases(self, sfa_slice, requested_jobs_dict, peer): #First get the list of current leases from OAR leases = self.driver.GetLeases({'name':sfa_slice['slice_hrn']}) - logger.debug("SLABSLICES verify_slice_leases requested_jobs_dict %s leases %s "%(requested_jobs_dict, leases )) - #leases = self.driver.GetLeases({'name':sfa_slice['name']},\ - #['lease_id']) + logger.debug("SLABSLICES verify_slice_leases requested_jobs_dict %s \ + leases %s "%(requested_jobs_dict, leases )) + + if leases : current_nodes_reserved_by_start_time = {} requested_nodes_by_start_time = {} leases_by_start_time = {} - #Create reduced dictionary with key start_time and value list of nodes + #Create reduced dictionary with key start_time and value + # the list of nodes #-for the leases already registered by OAR first # then for the new leases requested by the user #Leases already scheduled/running in OAR for lease in leases : - current_nodes_reserved_by_start_time[lease['t_from']] = lease['reserved_nodes'] + current_nodes_reserved_by_start_time[lease['t_from']] = \ + lease['reserved_nodes'] leases_by_start_time[lease['t_from']] = lease #Requested jobs for start_time in requested_jobs_dict: - requested_nodes_by_start_time[int(start_time)] = requested_jobs_dict[start_time]['hostname'] + requested_nodes_by_start_time[int(start_time)] = \ + requested_jobs_dict[start_time]['hostname'] #Check if there is any difference between the leases already #registered in OAR and the requested jobs. @@ -165,14 +168,17 @@ class SlabSlices: #-Added/removed nodes #-Newly added lease - logger.debug("SLABSLICES verify_slice_leases requested_nodes_by_start_time %s current_nodes_reserved_by_start_time %s " %(requested_nodes_by_start_time,current_nodes_reserved_by_start_time)) #Find all deleted leases - start_time_list = list(set(leases_by_start_time.keys()).difference(requested_nodes_by_start_time.keys())) - deleted_leases = [leases_by_start_time[start_time]['lease_id'] for start_time in start_time_list] + start_time_list = \ + list(set(leases_by_start_time.keys()).\ + difference(requested_nodes_by_start_time.keys())) + deleted_leases = [leases_by_start_time[start_time]['lease_id'] \ + for start_time in start_time_list] - - reschedule = {} + + reschedule_jobs_dict = {} + #Find added or removed nodes in exisiting leases for start_time in requested_nodes_by_start_time: if start_time in current_nodes_reserved_by_start_time: @@ -181,74 +187,59 @@ class SlabSlices: continue else: - update_node_set = set(requested_nodes_by_start_time[start_time]) - added_nodes = update_node_set.difference(current_nodes_reserved_by_start_time[start_time]) - shared_nodes = update_node_set.intersection(current_nodes_reserved_by_start_time[start_time]) - old_nodes_set = set(current_nodes_reserved_by_start_time[start_time]) - removed_nodes = old_nodes_set.difference(requested_nodes_by_start_time[start_time]) - logger.debug("SLABSLICES verify_slice_leases shared_nodes %s added_nodes %s removed_nodes %s"%(shared_nodes, added_nodes,removed_nodes )) - #If the lease is modified, delete it before creating it again. + update_node_set = \ + set(requested_nodes_by_start_time[start_time]) + added_nodes = \ + update_node_set.difference(\ + current_nodes_reserved_by_start_time[start_time]) + shared_nodes = \ + update_node_set.intersection(\ + current_nodes_reserved_by_start_time[start_time]) + old_nodes_set = \ + set(\ + current_nodes_reserved_by_start_time[start_time]) + removed_nodes = \ + old_nodes_set.difference(\ + requested_nodes_by_start_time[start_time]) + logger.debug("SLABSLICES verify_slice_leases \ + shared_nodes %s added_nodes %s removed_nodes %s"\ + %(shared_nodes, added_nodes,removed_nodes )) + #If the lease is modified, delete it before + #creating it again. #Add the deleted lease job id in the list + #WARNING :rescheduling does not work if there is already + # 2 running/scheduled jobs because deleting a job + #takes time SA 18/10/2012 if added_nodes or removed_nodes: - deleted_leases.append(leases_by_start_time[start_time]['lease_id']) - #Reschedule the job + deleted_leases.append(\ + leases_by_start_time[start_time]['lease_id']) + #Reschedule the job if added_nodes or shared_nodes: - reschedule[str(start_time)] = requested_jobs_dict[str(start_time)] - #logger.debug("SLABSLICES verify_slice_leases RESCHEDULE!!!!!") - #job = requested_jobs_dict[str(start_time)] - #self.driver.AddLeases(job['hostname'], \ - #sfa_slice, int(job['start_time']), \ - #int(job['duration'])) + reschedule_jobs_dict[str(start_time)] = \ + requested_jobs_dict[str(start_time)] + else: #New lease - logger.debug("SLABSLICES verify_slice_leases NEW LEASE") + logger.debug("SLABSLICES verify_slice_leases NEW LEASE") job = requested_jobs_dict[str(start_time)] - self.driver.AddLeases(job['hostname'], \ - sfa_slice, int(job['start_time']), \ - int(job['duration'])) - - current_leases = [lease['lease_id'] for lease in leases] - logger.debug("SLABSLICES verify_slice_leases current_leases %s kept_leases %s requested_jobs_dict %s"\ - %(current_leases,kept_leases,requested_jobs_dict)) + self.driver.AddLeases(job['hostname'], \ + sfa_slice, int(job['start_time']), \ + int(job['duration'])) + #Deleted leases are the ones with lease id not declared in the Rspec - #deleted_leases = list(set(current_leases).difference(kept_leases)) if deleted_leases: self.driver.DeleteLeases(deleted_leases, sfa_slice['slice_hrn']) - - if reschedule : + logger.debug("SLABSLICES \ + verify_slice_leases slice %s deleted_leases %s"\ + %(sfa_slice, deleted_leases)) + + + if reschedule_jobs_dict : for start_time in reschedule : - job = reschedule[start_time] + job = reschedule_jobs_dict[start_time] self.driver.AddLeases(job['hostname'], \ sfa_slice, int(job['start_time']), \ int(job['duration'])) - - #try: - ##if peer: - ##peer = RegAuyhority object is unsubscriptable - ##TODO :UnBindObjectFromPeer Quick and dirty - ##auth='senslab2 SA 27/07/12 - - ##Commented out UnBindObjectFromPeer SA 09/10/12 - ##self.driver.UnBindObjectFromPeer('senslab2', 'slice', \ - ##sfa_slice['record_id_slice'], peer.hrn) - #logger.debug("SLABSLICES verify_slice_leases slice %s deleted_leases %s"\ - #%(sfa_slice, deleted_leases)) - #self.driver.DeleteLeases(deleted_leases, \ - #sfa_slice['name']) - ##self.driver.DeleteLeases(deleted_leases, \ - ##sfa_slice['name']) - - ##TODO verify_slice_leases: catch other exception? - #except KeyError: - #logger.log_exc('Failed to remove slice leases') - - ##Add new leases - #for start_time in requested_jobs_dict: - #job = requested_jobs_dict[start_time] - #self.driver.AddLeases(job['hostname'], \ - #sfa_slice, int(job['start_time']), \ - #int(job['duration'])) - return leases def verify_slice_nodes(self, sfa_slice, requested_slivers, peer): @@ -256,7 +247,8 @@ class SlabSlices: deleted_nodes = [] if 'node_ids' in sfa_slice: - nodes = self.driver.GetNodes(sfa_slice['list_node_ids'], ['hostname']) + nodes = self.driver.GetNodes(sfa_slice['list_node_ids'], \ + ['hostname']) current_slivers = [node['hostname'] for node in nodes] # remove nodes not in rspec @@ -269,7 +261,7 @@ class SlabSlices: #Update the table with the nodes that populate the slice logger.debug("SLABSLICES \tverify_slice_nodes slice %s\ \r\n \r\n deleted_nodes %s"\ - %(sfa_slice,deleted_nodes)) + %(sfa_slice, deleted_nodes)) if deleted_nodes: #Delete the entire experience -- 2.43.0