From 8c4c5f8f1d71375c44b27d89c94bec8468ea7157 Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Wed, 22 Aug 2012 16:46:25 +0200 Subject: [PATCH] Modified GetSlices. Removed JobSenslab table, now using GetReservedNodes instead. Added parameter username to GetReservedNodes to get direclty the jobs by user from OAR. Modified slightly OAR restapi. Bug : creatimng job does not work using SFA format rspec. --- sfa/senslab/OARrestapi.py | 6 + sfa/senslab/config/bash_nukem | 2 +- sfa/senslab/slabaggregate.py | 2 +- sfa/senslab/slabdriver.py | 294 ++++++++++++---------------------- sfa/senslab/slabpostgres.py | 115 +------------ sfa/senslab/slabslices.py | 49 +++--- 6 files changed, 142 insertions(+), 326 deletions(-) diff --git a/sfa/senslab/OARrestapi.py b/sfa/senslab/OARrestapi.py index d2c8f982..e4e499b3 100644 --- a/sfa/senslab/OARrestapi.py +++ b/sfa/senslab/OARrestapi.py @@ -47,6 +47,9 @@ class OARrestapi: def GETRequestToOARRestAPI(self, request, strval=None , username = None ): self.oarserver['uri'] = \ OARGETParser.OARrequests_uri_dict[request]['uri'] + #Get job details with username + if 'owner' in OARGETParser.OARrequests_uri_dict[request] and username: + self.oarserver['uri'] += OARGETParser.OARrequests_uri_dict[request]['owner'] + username headers = {} data = json.dumps({}) logger.debug("OARrestapi \tGETRequestToOARRestAPI %s" %(request)) @@ -507,7 +510,10 @@ class OARGETParser: 'GET_reserved_nodes': {'uri': '/oarapi/jobs/details.json?state=Running,Waiting,Launching',\ + 'owner':'&user=', 'parse_func':ParseReservedNodes}, + + 'GET_running_jobs': {'uri':'/oarapi/jobs/details.json?state=Running',\ 'parse_func':ParseRunningJobs}, diff --git a/sfa/senslab/config/bash_nukem b/sfa/senslab/config/bash_nukem index c4deedbf..b1a56d52 100755 --- a/sfa/senslab/config/bash_nukem +++ b/sfa/senslab/config/bash_nukem @@ -33,7 +33,7 @@ sudo sfaadmin.py registry nuke # Drop table in slab_sfa # to avoid duplicates. -psql -d slab_sfa -U sfa -W -q -c "drop table job_senslab;" + psql -d slab_sfa -U sfa -W -q -c "drop table slice_senslab;" diff --git a/sfa/senslab/slabaggregate.py b/sfa/senslab/slabaggregate.py index 6ade1477..0a42bf52 100644 --- a/sfa/senslab/slabaggregate.py +++ b/sfa/senslab/slabaggregate.py @@ -262,7 +262,7 @@ class SlabAggregate: now = int(time.time()) lease_filter = {'clip': now } - self.driver.synchronize_oar_and_slice_table() + #self.driver.synchronize_oar_and_slice_table() #if slice_record: #lease_filter.update({'name': slice_record['name']}) return_fields = ['lease_id', 'hostname', 'site_id', \ diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 8c193f38..9a89f425 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -27,8 +27,8 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, \ - JobSenslab +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab + from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \ slab_xrn_object from sfa.senslab.slabslices import SlabSlices @@ -140,94 +140,6 @@ class SlabDriver(Driver): %(resources,res)) return result - - def synchronize_oar_and_slice_table(self, slice_hrn = None): - #Get list of leases - oar_leases_list = self.GetReservedNodes() - - logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table :\ - oar_leases_list %s\r\n" %( oar_leases_list)) - #Get list of slices/leases . multiple entry per user depending - #on number of jobs - #At this point we don't have the slice_hrn so that's why - #we are calling Getslices, which holds a field with slice_hrn - - if slice_hrn : - sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, \ - slice_filter_type = 'slice_hrn') - self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \ - oar_leases_list, sfa_slices_list) - else : - sfa_slices_list = self.GetSlices() - - sfa_slices_dict_by_slice_hrn = {} - for sfa_slice in sfa_slices_list: - if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn: - sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = [] - - sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].\ - append(sfa_slice) - - - for slice_hrn in sfa_slices_dict_by_slice_hrn: - list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn] - self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \ - oar_leases_list, list_slices_sfa) - - return - - - def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, \ - oar_leases_list, sfa_slices_list): - - #Get list of slices/leases . - #multiple entry per user depending on number of jobs - - sfa_slices_dict = {} - oar_leases_dict = {} - login = slice_hrn.split(".")[1].split("_")[0] - - #Create dictionnaries based on the tuple user login/ job id - #for the leases list and the slices list - - for sl in sfa_slices_list: - if sl['oar_job_id'] != [] : - #one entry in the dictionnary for each jobid/login, one login - #can have multiple jobs running - #for oar_jobid in sl['oar_job_id']: - if (login, sl['oar_job_id']) not in sfa_slices_dict: - sfa_slices_dict[(login,sl['oar_job_id'])] = sl - - for lease in oar_leases_list: - if (lease['user'], lease['lease_id']) not in oar_leases_dict: - oar_leases_dict[(lease['user'], lease['lease_id'])] = lease - - #Find missing entries in the sfa slices list dict by comparing - #the keys in both dictionnaries - #Add the missing entries in the slice sneslab table - - for lease in oar_leases_dict : - - if lease not in sfa_slices_dict and login == lease[0]: - #if lease in GetReservedNodes not in GetSlices - #and the login of the job running matches then update the db - #for this login - #First get the list of nodes hostnames for this job - oar_reserved_nodes_listdict = \ - oar_leases_dict[lease]['reserved_nodes'] - oar_reserved_nodes_list = [] - for node_dict in oar_reserved_nodes_listdict: - oar_reserved_nodes_list.append(node_dict['hostname']) - #And update the db with slice hrn, job id and node list - self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list) - - for lease in sfa_slices_dict: - #Job is now terminated or in Error, - #either way ot is not going to run again - #Remove it from the db - if lease not in oar_leases_dict: - self.db.delete_job( slice_hrn, lease[1]) - return def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \ users, options): @@ -249,7 +161,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \t create_sliver \tr spec.version %s slice_record %s " \ %(rspec.version,slice_record)) - self.synchronize_oar_and_slice_table(slice_hrn) + #self.synchronize_oar_and_slice_table(slice_hrn) # ensure site record exists? # ensure slice record exists #Removed options to verify_slice SA 14/08/12 @@ -338,28 +250,30 @@ class SlabDriver(Driver): if not sfa_slice_list: return 1 - sfa_slice = sfa_slice_list[0] + #Delete all in the slice + for sfa_slice in sfa_slice_list: + - logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) - slices = SlabSlices(self) - # determine if this is a peer slice - - peer = slices.get_peer(slice_hrn) - #TODO delete_sliver SA : UnBindObjectFromPeer should be - #used when there is another - #senslab testbed, which is not the case 14/08/12 . + logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) + slices = SlabSlices(self) + # determine if this is a peer slice - logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer)) - try: - if peer: - self.UnBindObjectFromPeer('slice', \ - sfa_slice['record_id_slice'], peer,None) - self.DeleteSliceFromNodes(sfa_slice) - finally: - if peer: - self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \ - peer, sfa_slice['peer_slice_id']) - return 1 + peer = slices.get_peer(slice_hrn) + #TODO delete_sliver SA : UnBindObjectFromPeer should be + #used when there is another + #senslab testbed, which is not the case 14/08/12 . + + logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer)) + try: + if peer: + self.UnBindObjectFromPeer('slice', \ + sfa_slice['record_id_slice'], peer,None) + self.DeleteSliceFromNodes(sfa_slice) + finally: + if peer: + self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \ + peer, sfa_slice['peer_slice_id']) + return 1 def AddSlice(self, slice_record): @@ -789,10 +703,10 @@ class SlabDriver(Driver): #hostname_list.append(oar_id_node_dict[resource_id]['hostname']) return hostname_dict_list - def GetReservedNodes(self): + def GetReservedNodes(self,username = None): #Get the nodes in use and the reserved nodes reservation_dict_list = \ - self.oar.parser.SendRequest("GET_reserved_nodes") + self.oar.parser.SendRequest("GET_reserved_nodes", username = username) for resa in reservation_dict_list: @@ -864,7 +778,9 @@ class SlabDriver(Driver): return return_site_list - #warning return_fields_list paramr emoved (Not used) + + + def GetSlices(self, slice_filter = None, slice_filter_type = None): #def GetSlices(self, slice_filter = None, slice_filter_type = None, \ #return_fields_list = None): @@ -881,80 +797,71 @@ class SlabDriver(Driver): slicerec_dict = {} authorized_filter_types_list = ['slice_hrn', 'record_id_user'] slicerec_dictlist = [] + + if slice_filter_type in authorized_filter_types_list: - #Get list of slices based on the slice hrn - if slice_filter_type == 'slice_hrn': - - login = slice_filter.split(".")[1].split("_")[0] - - #DO NOT USE RegSlice - reg_researchers to get the hrn of the user - #otherwise will mess up the RegRecord in Resolve, don't know - #why - SA 08/08/2012 - - #Only one entry for one user = one slice in slice_senslab table - slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() - - #Get slice based on user id - if slice_filter_type == 'record_id_user': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() - - if slicerec is None: - return [] - #slicerec_dictlist = [] - fixed_slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() - if login is None : - login = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] - - #for record in slicerec: - #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict()) - #if login is None : - #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] - - #One slice can have multiple jobs - sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=fixed_slicerec_dict['slice_hrn']).all() - job_list = [] - for job in sqljob_list: - job_list.append(job.dump_sqlalchemyobj_to_dict()) + + def __get_slice_records(slice_filter = None, slice_filter_type = None): + + login = None + #Get list of slices based on the slice hrn + if slice_filter_type == 'slice_hrn': + + login = slice_filter.split(".")[1].split("_")[0] + + #DO NOT USE RegSlice - reg_researchers to get the hrn of the user + #otherwise will mess up the RegRecord in Resolve, don't know + #why - SA 08/08/2012 + + #Only one entry for one user = one slice in slice_senslab table + slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() + + #Get slice based on user id + if slice_filter_type == 'record_id_user': + slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() + + if slicerec is None: + return login, [] + else: + fixed_slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() + + if login is None : + login = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] + return login, fixed_slicerec_dict + + + + login, fixed_slicerec_dict = __get_slice_records(slice_filter, slice_filter_type) logger.debug(" SLABDRIVER \tGetSlices login %s \ slice record %s" \ %(login, fixed_slicerec_dict)) + + + + #One slice can have multiple jobs + + leases_list = self.GetReservedNodes(username = login) #Several jobs for one slice - #TODO : Modify to make a diff with jobs not terminated = 1 OAR request SA 20/08/12 - for job in job_list : + + for lease in leases_list : slicerec_dict = {} - slicerec_dict['oar_job_id'] = [] - + + #Check with OAR the status of the job if a job id is in #the slice record - rslt = self.GetJobsResources(job['oar_job_id'], \ - username = login) - logger.debug("SLABDRIVER.PY \tGetSlices job %s \trslt fromn GetJobsResources %s \r\n"\ - %(job, rslt)) - if rslt : - slicerec_dict['oar_job_id']= job['oar_job_id'] - slicerec_dict.update(rslt) - slicerec_dict.update(fixed_slicerec_dict) - slicerec_dict.update({'hrn':\ - str(fixed_slicerec_dict['slice_hrn'])}) - + + + slicerec_dict['oar_job_id'] = lease['lease_id'] + slicerec_dict.update({'node_ids':lease['reserved_nodes']}) + slicerec_dict.update(fixed_slicerec_dict) + slicerec_dict.update({'hrn':\ + str(fixed_slicerec_dict['slice_hrn'])}) - #If GetJobsResources is empty, this means the job is - #now in the 'Terminated' state - #Update the slice record - else : - self.db.delete_job(slice_filter, job['oar_job_id']) - slicerec_dict.\ - update({'hrn':str(fixed_slicerec_dict['slice_hrn'])}) - - try: - slicerec_dict['node_ids'] = job['node_list'] - except KeyError: - pass - + slicerec_dictlist.append(slicerec_dict) logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s slicerec_dictlist %s" %(slicerec_dict, slicerec_dictlist)) @@ -962,26 +869,31 @@ class SlabDriver(Driver): %(slicerec_dictlist)) return slicerec_dictlist - + else: + slice_list = slab_dbsession.query(SliceSenslab).all() - sqljob_list = slab_dbsession.query(JobSenslab).all() + leases_list = self.GetReservedNodes() - job_list = [] - for job in sqljob_list: - job_list.append(job.dump_sqlalchemyobj_to_dict()) - + + slicerec_dictlist = [] return_slice_list = [] for record in slice_list: return_slice_list.append(record.dump_sqlalchemyobj_to_dict()) - for slicerec_dict in return_slice_list: - slicerec_dict['oar_job_id'] = [] - for job in job_list: - if slicerec_dict['slice_hrn'] in job: - slicerec_dict['oar_job_id'].append(job['oar_job_id']) - + for fixed_slicerec_dict in return_slice_list: + slicerec_dict = {} + owner = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] + for lease in leases_list: + if owner == lease['user']: + slicerec_dict['oar_job_id'] = lease['lease_id'] + slicerec_dict.update({'node_ids':lease['reserved_nodes']}) + slicerec_dict.update(fixed_slicerec_dict) + slicerec_dict.update({'hrn':\ + str(fixed_slicerec_dict['slice_hrn'])}) + slicerec_dictlist.append(slicerec_dict) + logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \ slice_filter %s " %(return_slice_list, slice_filter)) @@ -989,10 +901,7 @@ class SlabDriver(Driver): #return_slice_list = parse_filter(sliceslist, \ #slice_filter,'slice', return_fields_list) - return return_slice_list - - - + return slicerec_dictlist def testbed_name (self): return self.hrn @@ -1271,9 +1180,8 @@ class SlabDriver(Driver): #Delete the jobs from job_senslab table def DeleteSliceFromNodes(self, slice_record): - for job_id in slice_record['oar_job_id']: - self.DeleteJobs(job_id, slice_record['hrn']) + self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn']) return diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index 4db5b898..96c94983 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -75,47 +75,7 @@ class SliceSenslab (SlabBase): 'record_id_slice':self.record_id_slice, } return dict - -class JobSenslab (SlabBase): - __tablename__ = 'job_senslab' - #record_id_user = Column(Integer, primary_key=True) - # Multiple primary key aka composite primary key - # so that we can have several job id for a given slice hrn - slice_hrn = Column(String,ForeignKey('slice_senslab.slice_hrn')) - oar_job_id = Column( Integer, primary_key=True) - record_id_slice = Column(Integer) - record_id_user = Column(Integer) - - #oar_job_id = Column( Integer,default = -1) - node_list = Column(postgresql.ARRAY(String), nullable =True) - - slice_complete = relationship("SliceSenslab", backref=backref('job_senslab', order_by=slice_hrn)) - - def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None): - self.node_list = [] - if record_id_slice: - self.record_id_slice = record_id_slice - if slice_hrn: - self.slice_hrn = slice_hrn - if oar_job_id: - self.oar_job_id = oar_job_id - if record_id_user: - self.record_id_user= record_id_user - - - def __repr__(self): - result=">sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict) + - #Filter_by can not handle more than one argument, hence these functions - def filter_id_user(query, user_id): - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user" - return query.filter_by(record_id_user = user_id) - - def filter_job(query, job): - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job " - return query.filter_by(oar_job_id = job) - - def filer_id_slice (query, id_slice): - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice" - return query.filter_by(record_id_slice = id_slice) - - def filter_slice_hrn(query, hrn): - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn" - return query.filter_by(slice_hrn = hrn) - - - extended_filter = {'record_id_user': filter_id_user, - 'oar_job_id':filter_job, - 'record_id_slice': filer_id_slice, - 'slice_hrn': filter_slice_hrn} - - Q = slab_dbsession.query(SliceSenslab) - - if filter_dict is not None: - for k in filter_dict: - try: - newQ= extended_filter[k](Q, filter_dict[k]) - Q = newQ - except KeyError: - print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!" - print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first()) - rec = Q.first() - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec) - return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])) - #reclist = [] - ##for rec in Q.all(): - #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))) - #return reclist - diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index 1b8859af..7779fcb1 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -373,11 +373,22 @@ class SlabSlices: def verify_persons(self, slice_hrn, slice_record, users, peer, sfa_peer, \ options={}): + """ + users is a record list. Records can either be local records + or users records from known and trusted federated sites. + If the user is from another site that senslab doesn't trust yet, + then Resolve will raise an error before getting to create_sliver. + """ #TODO SA 21/08/12 verify_persons Needs review - users_by_id = {} - users_by_hrn = {} + + + users_by_id = {} + users_by_hrn = {} + #users_dict : dict whose keys can either be the user's hrn or its id. + #Values contains only id and hrn users_dict = {} - + + #First create dicts by hrn and id for each user in the user record list: for user in users: if 'urn' in user and (not 'hrn' in user ) : @@ -393,7 +404,7 @@ class SlabSlices: 'hrn':user['hrn']} - logger.debug( "SLABSLICE.PY \tverify_person \ + logger.debug( "SLABSLICE.PY \t verify_person \ users_dict %s \r\n user_by_hrn %s \r\n \ \tusers_by_id %s " \ %(users_dict,users_by_hrn, users_by_id)) @@ -401,33 +412,37 @@ class SlabSlices: existing_user_ids = [] existing_user_hrns = [] existing_users = [] - #Check if user is in Senslab LDAP using its hrn. - #Assuming Senslab is centralised : one LDAP for all sites, + # Check if user is in Senslab LDAP using its hrn. + # Assuming Senslab is centralised : one LDAP for all sites, # user_id unknown from LDAP - # LDAP does not provide users id, therefore we rely on hrns + # LDAP does not provide users id, therefore we rely on hrns containing + # the login of the user. # If the hrn is not a senslab hrn, the user may not be in LDAP. if users_by_hrn: - #Construct the list of filters for GetPersons + #Construct the list of filters (list of dicts) for GetPersons filter_user = [] for hrn in users_by_hrn: filter_user.append (users_by_hrn[hrn]) logger.debug(" SLABSLICE.PY \tverify_person filter_user %s " \ - %(filter_user)) - existing_users = self.driver.GetPersons(filter_user) + %(filter_user)) + #Check user's in LDAP with GetPersons + #Needed because what if the user has been deleted in LDAP but + #is still in SFA? + existing_users = self.driver.GetPersons(filter_user) + + #User's in senslab LDAP if existing_users: for user in existing_users : existing_user_hrns.append(users_dict[user['hrn']]['hrn']) existing_user_ids.\ append(users_dict[user['hrn']]['person_id']) - #User from another federated site , - #does not have a senslab account yet? - #Check in the LDAP if we know email, - #maybe he has multiple SFA accounts = multiple hrns. - #Check before adding them to LDAP + # User from another known trusted federated site. Check + # if a senslab account matching the email has already been created. else: req = 'mail=' if isinstance(users, list): + req += users[0]['email'] else: req += users['email'] @@ -464,9 +479,7 @@ class SlabSlices: except KeyError: pass - #existing_slice_user_hrns = [user['hrn'] for \ - #user in existing_slice_users] - + # users to be added, removed or updated #One user in one senslab slice : there should be no need #to remove/ add any user from/to a slice. -- 2.47.0