X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fsenslab%2Fslabdriver.py;h=e7c95d7746f7eec207245e0598f953dcf12f6936;hb=9c93ec5573a58c63a6f572434913562b35e61f90;hp=444f1584b5329bb9bbe840f6cf616be4de0f7242;hpb=354b3d2a3cdee55528f5d156d4ed2615efb916eb;p=sfa.git diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 444f1584..e7c95d77 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -28,7 +28,7 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object from sfa.senslab.slabslices import SlabSlices @@ -135,6 +135,84 @@ class SlabDriver(Driver): return result + def synchronize_oar_and_slice_table(self, slice_hrn = None): + #Get list of leases + oar_leases_list = self.GetReservedNodes() + + logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list)) + #Get list of slices/leases . multiple entry per user depending on number of jobs + #At this point we don't have the slice_hrn so that's why + #we are calling Getslices, which holds a field with slice_hrn + + if slice_hrn : + sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list) + else : + sfa_slices_list = self.GetSlices() + + sfa_slices_dict_by_slice_hrn = {} + for sfa_slice in sfa_slices_list: + if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn: + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = [] + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) + else : + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) + + for slice_hrn in sfa_slices_dict_by_slice_hrn: + list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn] + if slice_hrn =='senslab2.avakian_slice': + logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa)) + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa) + + return + + + def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list): + + #Get list of slices/leases . multiple entry per user depending on number of jobs + #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') + sfa_slices_dict = {} + oar_leases_dict = {} + login = slice_hrn.split(".")[1].split("_")[0] + + #Create dictionnaries based on the tuple user login/ job id + #for the leases list and the slices list + + for sl in sfa_slices_list: + if sl['oar_job_id'] != [] : + for oar_jobid in sl['oar_job_id']: + if (login, oar_jobid) not in sfa_slices_dict: + sfa_slices_dict[(login,oar_jobid)] = sl + + for lease in oar_leases_list: + if (lease['user'], lease['lease_id']) not in oar_leases_dict: + oar_leases_dict[(lease['user'], lease['lease_id'])] = lease + + #Find missing entries in the sfa slices list dict by comparing + #the keys in both dictionnaries + #Add the missing entries in the slice sneslab table + + for lease in oar_leases_dict : + logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login)) + if lease not in sfa_slices_dict and login == lease[0]: + + #if lease in GetReservedNodes not in GetSlices update the db + #First get the list of nodes hostnames for this job + oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes'] + oar_reserved_nodes_list = [] + for node_dict in oar_reserved_nodes_listdict: + oar_reserved_nodes_list.append(node_dict['hostname']) + #And update the db with slice hrn, job id and node list + self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list) + + for lease in sfa_slices_dict: + #Job is now terminated or in Error, either way ot is not going to run again + #Remove it from the db + if lease not in oar_leases_dict: + self.db.delete_job( slice_hrn, lease[1]) + + return + def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \ users, options): aggregate = SlabAggregate(self) @@ -155,7 +233,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \ %(rspec.version)) - + self.synchronize_oar_and_slice_table(slice_hrn) # ensure site record exists? # ensure slice record exists sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \ @@ -557,6 +635,7 @@ class SlabDriver(Driver): reqdict['method'] = "delete" reqdict['strval'] = str(job_id) + self.db.delete_job(slice_hrn, job_id) answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \ reqdict,username) logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \ @@ -775,71 +854,96 @@ class SlabDriver(Driver): if slice_filter_type in authorized_filter_types_list: #Get list of slices based on the slice hrn if slice_filter_type == 'slice_hrn': - #There can be several jobs running for one slices + login = slice_filter.split(".")[1].split("_")[0] #DO NOT USE RegSlice - reg_researchers to get the hrn of the user #otherwise will mess up the RegRecord in Resolve, don't know #why - SA 08/08/2012 - slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).all() + #Only one entry for one user = one slice in slice_senslab table + slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() - #Get list of slices base on user id + #Get slice based on user id if slice_filter_type == 'record_id_user': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).all() + slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() - if slicerec is []: + if slicerec is None: return [] - slicerec_dictlist = [] - for record in slicerec: - slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict()) - if login is None : - login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] - - - + #slicerec_dictlist = [] + slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() + if login is None : + login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] + + #for record in slicerec: + #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict()) + #if login is None : + #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] + + #One slice can have multiple jobs + sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=slicerec_dict['slice_hrn']).all() + job_list = [] + for job in sqljob_list: + job_list.append(job.dump_sqlalchemyobj_to_dict()) + logger.debug("\r\n SLABDRIVER \tGetSlices login %s \ slice record %s" \ - %(login, slicerec_dictlist)) - for slicerec_dict in slicerec_dictlist : - if slicerec_dict['oar_job_id'] is not -1: - #Check with OAR the status of the job if a job id is in - #the slice record - rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \ - username = login) - logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\ - %(rslt)) - if rslt : - slicerec_dict.update(rslt) - slicerec_dict.update({'hrn':\ - str(slicerec_dict['slice_hrn'])}) - #If GetJobsResources is empty, this means the job is - #now in the 'Terminated' state - #Update the slice record - else : - self.db.update_job(slice_filter, job_id = -1) - slicerec_dict['oar_job_id'] = -1 - slicerec_dict.\ - update({'hrn':str(slicerec_dict['slice_hrn'])}) + %(login, slicerec_dict)) + #Several jobs for one slice + slicerec_dict['oar_job_id'] = [] + for job in job_list : + #if slicerec_dict['oar_job_id'] is not -1: + #Check with OAR the status of the job if a job id is in + #the slice record + + rslt = self.GetJobsResources(job['oar_job_id'], \ + username = login) + logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\ + %(rslt)) + if rslt : + slicerec_dict['oar_job_id'].append(job['oar_job_id']) + slicerec_dict.update(rslt) + slicerec_dict.update({'hrn':\ + str(slicerec_dict['slice_hrn'])}) + #If GetJobsResources is empty, this means the job is + #now in the 'Terminated' state + #Update the slice record + else : + self.db.delete_job(slice_filter, job['oar_job_id']) + slicerec_dict.\ + update({'hrn':str(slicerec_dict['slice_hrn'])}) + try: - slicerec_dict['node_ids'] = slicerec_dict['node_list'] + slicerec_dict['node_ids'] = job['node_list'] except KeyError: pass - - logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dictlist %s"\ - %(slicerec_dictlist)) - - return slicerec_dictlist - + + logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dict %s"\ + %(slicerec_dict)) + + return [slicerec_dict] + else: slice_list = slab_dbsession.query(SliceSenslab).all() + sqljob_list = slab_dbsession.query(JobSenslab).all() + + job_list = [] + for job in sqljob_list: + job_list.append(job.dump_sqlalchemyobj_to_dict()) + return_slice_list = [] for record in slice_list: return_slice_list.append(record.dump_sqlalchemyobj_to_dict()) - + + for slicerec_dict in return_slice_list: + slicerec_dict['oar_job_id'] = [] + for job in job_list: + if slicerec_dict['slice_hrn'] in job: + slicerec_dict['oar_job_id'].append(job['oar_job_id']) + logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \ slice_filter %s " %(return_slice_list, slice_filter)) @@ -1111,7 +1215,7 @@ class SlabDriver(Driver): if jobid : logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \ added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user)) - self.db.update_job( slice_name, jobid, added_nodes) + self.db.add_job( slice_name, jobid, added_nodes) __configure_experiment(jobid, added_nodes) __launch_senslab_experiment(jobid) @@ -1140,7 +1244,7 @@ class SlabDriver(Driver): # Get user information self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn']) - self.db.update_job(slice_record['hrn'], job_id = -1) + return @@ -1153,6 +1257,10 @@ class SlabDriver(Driver): def GetLeases(self, lease_filter_dict=None, return_fields_list=None): unfiltered_reservation_list = self.GetReservedNodes() + + ##Synchronize slice_table of sfa senslab db + #self.synchronize_oar_and_slice_table(unfiltered_reservation_list) + reservation_list = [] #Find the slice associated with this user senslab ldap uid logger.debug(" SLABDRIVER.PY \tGetLeases ") @@ -1174,6 +1282,7 @@ class SlabDriver(Driver): slice_info = query_slice_info.first() else: slice_info = None + resa_user_dict[resa['user']] = {} resa_user_dict[resa['user']]['ldap_info'] = user resa_user_dict[resa['user']]['slice_info'] = slice_info @@ -1315,8 +1424,8 @@ class SlabDriver(Driver): return #self.fill_record_slab_info(records) - - + + @@ -1505,7 +1614,6 @@ class SlabDriver(Driver): """ self.DeleteSliceFromNodes(slice_record) - self.db.update_job(slice_record['hrn'], job_id = -1) logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record)) return