From 9c93ec5573a58c63a6f572434913562b35e61f90 Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Mon, 13 Aug 2012 12:29:26 +0200 Subject: [PATCH] Add new table JobSenslab so that one slice can have multiple jobs. --- sfa/importer/slabimporter.py | 2 +- sfa/senslab/config/bash_nukem | 2 + sfa/senslab/slabaggregate.py | 2 + sfa/senslab/slabdriver.py | 206 ++++++++++++++++++++++++++-------- sfa/senslab/slabpostgres.py | 105 +++++++++++------ sfa/senslab/slabslices.py | 5 +- 6 files changed, 233 insertions(+), 89 deletions(-) diff --git a/sfa/importer/slabimporter.py b/sfa/importer/slabimporter.py index 84e6ebf8..325e526b 100644 --- a/sfa/importer/slabimporter.py +++ b/sfa/importer/slabimporter.py @@ -265,7 +265,7 @@ class SlabImporter: #Get it sl_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(slice_hrn)).all() - slab_slice = SliceSenslab( slice_hrn = slice_hrn, oar_job_id = -1, record_id_slice=sl_rec[0].record_id, record_id_user= user_record.record_id) + slab_slice = SliceSenslab( slice_hrn = slice_hrn, record_id_slice=sl_rec[0].record_id, record_id_user= user_record.record_id) print>>sys.stderr, "\r\n \r\n SLAB IMPORTER SLICE IMPORT NOTslice_record %s \r\n slab_slice %s" %(sl_rec,slab_slice) slab_dbsession.add(slab_slice) slab_dbsession.commit() diff --git a/sfa/senslab/config/bash_nukem b/sfa/senslab/config/bash_nukem index 3830a132..c4deedbf 100755 --- a/sfa/senslab/config/bash_nukem +++ b/sfa/senslab/config/bash_nukem @@ -33,8 +33,10 @@ sudo sfaadmin.py registry nuke # Drop table in slab_sfa # to avoid duplicates. +psql -d slab_sfa -U sfa -W -q -c "drop table job_senslab;" psql -d slab_sfa -U sfa -W -q -c "drop table slice_senslab;" + # ATTENTION :Save the config file /etc/sfa/sfa_config # before continuing diff --git a/sfa/senslab/slabaggregate.py b/sfa/senslab/slabaggregate.py index 895c287b..a64669b2 100644 --- a/sfa/senslab/slabaggregate.py +++ b/sfa/senslab/slabaggregate.py @@ -253,6 +253,8 @@ class SlabAggregate: now = int(time.time()) lease_filter = {'clip': now } + + self.driver.synchronize_oar_and_slice_table() #if slice_record: #lease_filter.update({'name': slice_record['name']}) return_fields = ['lease_id', 'hostname', 'site_id', \ diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 444f1584..e7c95d77 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -28,7 +28,7 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object from sfa.senslab.slabslices import SlabSlices @@ -135,6 +135,84 @@ class SlabDriver(Driver): return result + def synchronize_oar_and_slice_table(self, slice_hrn = None): + #Get list of leases + oar_leases_list = self.GetReservedNodes() + + logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list)) + #Get list of slices/leases . multiple entry per user depending on number of jobs + #At this point we don't have the slice_hrn so that's why + #we are calling Getslices, which holds a field with slice_hrn + + if slice_hrn : + sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list) + else : + sfa_slices_list = self.GetSlices() + + sfa_slices_dict_by_slice_hrn = {} + for sfa_slice in sfa_slices_list: + if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn: + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = [] + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) + else : + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) + + for slice_hrn in sfa_slices_dict_by_slice_hrn: + list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn] + if slice_hrn =='senslab2.avakian_slice': + logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa)) + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa) + + return + + + def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list): + + #Get list of slices/leases . multiple entry per user depending on number of jobs + #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') + sfa_slices_dict = {} + oar_leases_dict = {} + login = slice_hrn.split(".")[1].split("_")[0] + + #Create dictionnaries based on the tuple user login/ job id + #for the leases list and the slices list + + for sl in sfa_slices_list: + if sl['oar_job_id'] != [] : + for oar_jobid in sl['oar_job_id']: + if (login, oar_jobid) not in sfa_slices_dict: + sfa_slices_dict[(login,oar_jobid)] = sl + + for lease in oar_leases_list: + if (lease['user'], lease['lease_id']) not in oar_leases_dict: + oar_leases_dict[(lease['user'], lease['lease_id'])] = lease + + #Find missing entries in the sfa slices list dict by comparing + #the keys in both dictionnaries + #Add the missing entries in the slice sneslab table + + for lease in oar_leases_dict : + logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login)) + if lease not in sfa_slices_dict and login == lease[0]: + + #if lease in GetReservedNodes not in GetSlices update the db + #First get the list of nodes hostnames for this job + oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes'] + oar_reserved_nodes_list = [] + for node_dict in oar_reserved_nodes_listdict: + oar_reserved_nodes_list.append(node_dict['hostname']) + #And update the db with slice hrn, job id and node list + self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list) + + for lease in sfa_slices_dict: + #Job is now terminated or in Error, either way ot is not going to run again + #Remove it from the db + if lease not in oar_leases_dict: + self.db.delete_job( slice_hrn, lease[1]) + + return + def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \ users, options): aggregate = SlabAggregate(self) @@ -155,7 +233,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \ %(rspec.version)) - + self.synchronize_oar_and_slice_table(slice_hrn) # ensure site record exists? # ensure slice record exists sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \ @@ -557,6 +635,7 @@ class SlabDriver(Driver): reqdict['method'] = "delete" reqdict['strval'] = str(job_id) + self.db.delete_job(slice_hrn, job_id) answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \ reqdict,username) logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \ @@ -775,71 +854,96 @@ class SlabDriver(Driver): if slice_filter_type in authorized_filter_types_list: #Get list of slices based on the slice hrn if slice_filter_type == 'slice_hrn': - #There can be several jobs running for one slices + login = slice_filter.split(".")[1].split("_")[0] #DO NOT USE RegSlice - reg_researchers to get the hrn of the user #otherwise will mess up the RegRecord in Resolve, don't know #why - SA 08/08/2012 - slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).all() + #Only one entry for one user = one slice in slice_senslab table + slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() - #Get list of slices base on user id + #Get slice based on user id if slice_filter_type == 'record_id_user': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).all() + slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() - if slicerec is []: + if slicerec is None: return [] - slicerec_dictlist = [] - for record in slicerec: - slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict()) - if login is None : - login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] - - - + #slicerec_dictlist = [] + slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() + if login is None : + login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] + + #for record in slicerec: + #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict()) + #if login is None : + #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] + + #One slice can have multiple jobs + sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=slicerec_dict['slice_hrn']).all() + job_list = [] + for job in sqljob_list: + job_list.append(job.dump_sqlalchemyobj_to_dict()) + logger.debug("\r\n SLABDRIVER \tGetSlices login %s \ slice record %s" \ - %(login, slicerec_dictlist)) - for slicerec_dict in slicerec_dictlist : - if slicerec_dict['oar_job_id'] is not -1: - #Check with OAR the status of the job if a job id is in - #the slice record - rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \ - username = login) - logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\ - %(rslt)) - if rslt : - slicerec_dict.update(rslt) - slicerec_dict.update({'hrn':\ - str(slicerec_dict['slice_hrn'])}) - #If GetJobsResources is empty, this means the job is - #now in the 'Terminated' state - #Update the slice record - else : - self.db.update_job(slice_filter, job_id = -1) - slicerec_dict['oar_job_id'] = -1 - slicerec_dict.\ - update({'hrn':str(slicerec_dict['slice_hrn'])}) + %(login, slicerec_dict)) + #Several jobs for one slice + slicerec_dict['oar_job_id'] = [] + for job in job_list : + #if slicerec_dict['oar_job_id'] is not -1: + #Check with OAR the status of the job if a job id is in + #the slice record + + rslt = self.GetJobsResources(job['oar_job_id'], \ + username = login) + logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\ + %(rslt)) + if rslt : + slicerec_dict['oar_job_id'].append(job['oar_job_id']) + slicerec_dict.update(rslt) + slicerec_dict.update({'hrn':\ + str(slicerec_dict['slice_hrn'])}) + #If GetJobsResources is empty, this means the job is + #now in the 'Terminated' state + #Update the slice record + else : + self.db.delete_job(slice_filter, job['oar_job_id']) + slicerec_dict.\ + update({'hrn':str(slicerec_dict['slice_hrn'])}) + try: - slicerec_dict['node_ids'] = slicerec_dict['node_list'] + slicerec_dict['node_ids'] = job['node_list'] except KeyError: pass - - logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dictlist %s"\ - %(slicerec_dictlist)) - - return slicerec_dictlist - + + logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dict %s"\ + %(slicerec_dict)) + + return [slicerec_dict] + else: slice_list = slab_dbsession.query(SliceSenslab).all() + sqljob_list = slab_dbsession.query(JobSenslab).all() + + job_list = [] + for job in sqljob_list: + job_list.append(job.dump_sqlalchemyobj_to_dict()) + return_slice_list = [] for record in slice_list: return_slice_list.append(record.dump_sqlalchemyobj_to_dict()) - + + for slicerec_dict in return_slice_list: + slicerec_dict['oar_job_id'] = [] + for job in job_list: + if slicerec_dict['slice_hrn'] in job: + slicerec_dict['oar_job_id'].append(job['oar_job_id']) + logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \ slice_filter %s " %(return_slice_list, slice_filter)) @@ -1111,7 +1215,7 @@ class SlabDriver(Driver): if jobid : logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \ added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user)) - self.db.update_job( slice_name, jobid, added_nodes) + self.db.add_job( slice_name, jobid, added_nodes) __configure_experiment(jobid, added_nodes) __launch_senslab_experiment(jobid) @@ -1140,7 +1244,7 @@ class SlabDriver(Driver): # Get user information self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn']) - self.db.update_job(slice_record['hrn'], job_id = -1) + return @@ -1153,6 +1257,10 @@ class SlabDriver(Driver): def GetLeases(self, lease_filter_dict=None, return_fields_list=None): unfiltered_reservation_list = self.GetReservedNodes() + + ##Synchronize slice_table of sfa senslab db + #self.synchronize_oar_and_slice_table(unfiltered_reservation_list) + reservation_list = [] #Find the slice associated with this user senslab ldap uid logger.debug(" SLABDRIVER.PY \tGetLeases ") @@ -1174,6 +1282,7 @@ class SlabDriver(Driver): slice_info = query_slice_info.first() else: slice_info = None + resa_user_dict[resa['user']] = {} resa_user_dict[resa['user']]['ldap_info'] = user resa_user_dict[resa['user']]['slice_info'] = slice_info @@ -1315,8 +1424,8 @@ class SlabDriver(Driver): return #self.fill_record_slab_info(records) - - + + @@ -1505,7 +1614,6 @@ class SlabDriver(Driver): """ self.DeleteSliceFromNodes(slice_record) - self.db.update_job(slice_record['hrn'], job_id = -1) logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record)) return diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index 40bba7c9..4db5b898 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -9,7 +9,7 @@ from sfa.util.sfalogging import logger from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Table, Column, MetaData, join, ForeignKey import sfa.storage.model as model - +from sfa.storage.model import RegSlice from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, backref @@ -20,6 +20,7 @@ from sqlalchemy import MetaData, Table from sqlalchemy.exc import NoSuchTableError from sqlalchemy import String +from sfa.storage.alchemy import dbsession #Dict holding the columns names of the table as keys #and their type, used for creation of the table @@ -40,27 +41,20 @@ SlabBase = declarative_base() class SliceSenslab (SlabBase): __tablename__ = 'slice_senslab' #record_id_user = Column(Integer, primary_key=True) - # Multiple primary key aka composite primary key - # so that we can have several job id for a given slice hrn + slice_hrn = Column(String,primary_key=True) - oar_job_id = Column( Integer, primary_key=True) peer_authority = Column( String,nullable = True) record_id_slice = Column(Integer) record_id_user = Column(Integer) #oar_job_id = Column( Integer,default = -1) - node_list = Column(postgresql.ARRAY(String), nullable =True) + #node_list = Column(postgresql.ARRAY(String), nullable =True) - def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None): - self.node_list = [] + def __init__ (self, slice_hrn =None, record_id_slice=None, record_id_user= None,peer_authority=None): if record_id_slice: self.record_id_slice = record_id_slice if slice_hrn: self.slice_hrn = slice_hrn - if oar_job_id: - self.oar_job_id = oar_job_id - if slice_hrn: - self.slice_hrn = slice_hrn if record_id_user: self.record_id_user= record_id_user if peer_authority: @@ -68,8 +62,8 @@ class SliceSenslab (SlabBase): def __repr__(self): - result=">sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec) - #if job_id is not None: - #slice_rec.oar_job_id = job_id - #if nodes is not None : - #slice_rec.node_list = nodes - #slab_dbsession.commit() + + def find (self, name = None, filter_dict = None): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict) diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index c45a6a41..6b413e5a 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -197,9 +197,10 @@ class SlabSlices: tmp = sfa_slice['PI'][0].split(".") username = tmp[(len(tmp)-1)] #Update the table with the nodes that populate the slice - self.driver.db.update_job(sfa_slice['name'], nodes = added_nodes) logger.debug("SLABSLICES \tverify_slice_nodes slice %s \r\n \r\n deleted_nodes %s"\ %(sfa_slice,deleted_nodes)) + #self.driver.db.update_job(sfa_slice['name'], nodes = added_nodes) + #If there is a timeslot specified, then a job can be launched #try: ##slot = sfa_slice['timeslot'] @@ -213,7 +214,7 @@ class SlabSlices: if deleted_nodes: self.driver.DeleteSliceFromNodes(sfa_slice['name'], \ deleted_nodes) - + #return added_nodes except: logger.log_exc('Failed to add/remove slice from nodes') -- 2.43.0