From: Sandrine Avakian Date: Thu, 7 Jun 2012 15:42:30 +0000 (+0200) Subject: SlabDriver cleaning mainly. X-Git-Tag: sfa-2.1-24~3^2~162 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ee23aa89c674d3dad0ab3af906c452b9c7493f20;p=sfa.git SlabDriver cleaning mainly. --- diff --git a/sfa/senslab/OARrestapi.py b/sfa/senslab/OARrestapi.py index 8f5aa621..b4afd07e 100644 --- a/sfa/senslab/OARrestapi.py +++ b/sfa/senslab/OARrestapi.py @@ -57,8 +57,11 @@ class OARrestapi: self.oarserver['uri'] = None self.oarserver['postformat'] = 'json' - self.jobstates = ["Terminated", "Running", "Error", "Waiting", "Launching","Hold"] - + + self.jobstates = ['Terminated','Hold','Waiting','toLaunch','toError',\ + 'toAckReservation','Launching','Finishing',\ + 'Running','Suspended','Resuming','Error'] + self.parser = OARGETParser(self) @@ -329,7 +332,7 @@ class OARGETParser: #Retourne liste de dictionnaires contenant attributs des sites def ParseSites(self): nodes_per_site = {} - + config = Config() # Create a list of nodes per site_id for node_id in self.node_dictlist.keys(): node = self.node_dictlist[node_id] diff --git a/sfa/senslab/slabaggregate.py b/sfa/senslab/slabaggregate.py index e5949828..040c54f1 100644 --- a/sfa/senslab/slabaggregate.py +++ b/sfa/senslab/slabaggregate.py @@ -54,7 +54,7 @@ class SlabAggregate: slice_hrn, _ = urn_to_hrn(slice_xrn) slice_name = slice_hrn print >>sys.stderr,"\r\n \r\n \t\t_____________ Slabaggregate api get_slice_and_slivers " - slices = self.driver.GetSlices(slice_filter= str(slice_name), filter_type = 'slice_hrn') + slices = self.driver.GetSlices(slice_filter= str(slice_name), slice_filter_type = 'slice_hrn') print >>sys.stderr,"\r\n \r\n \t\t_____________ Slabaggregate api get_slice_and_slivers slices %s " %(slices) if not slices: return (slice, slivers) diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index d2c98d9b..d4fac712 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -47,7 +47,7 @@ class SlabDriver(Driver): Driver.__init__ (self, config) self.config=config self.hrn = config.SFA_INTERFACE_HRN - + self.root_auth = config.SFA_REGISTRY_ROOT_AUTH self.oar = OARrestapi() @@ -67,7 +67,7 @@ class SlabDriver(Driver): """ #First get the slice with the slice hrn - sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn') + sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') if len(sl) is 0: raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn)) @@ -179,7 +179,7 @@ class SlabDriver(Driver): def delete_sliver (self, slice_urn, slice_hrn, creds, options): - slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn') + slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn') print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice) if not slice: return 1 @@ -276,7 +276,7 @@ class SlabDriver(Driver): if key not in acceptable_fields: slab_record.pop(key) print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register" - slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn') + slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn') if not slices: pointer = self.AddSlice(slab_record) else: @@ -372,7 +372,7 @@ class SlabDriver(Driver): if persons and persons[0]['site_ids']: self.DeletePerson(username) elif type == 'slice': - if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'): + if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'): self.DeleteSlice(hrn) #elif type == 'authority': @@ -462,53 +462,87 @@ class SlabDriver(Driver): answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username) print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer) - - def GetJobs(self,job_id= None, resources=True,return_fields_list=None, username = None): - #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\ - #'api_timestamp'] - #assigned_res = ['resource_id', 'resource_uri'] - #assigned_n = ['node', 'node_uri'] - - if job_id and resources is False: - req = "GET_jobs_id" - node_list_k = 'assigned_network_address' - - if job_id and resources : - req = "GET_jobs_id_resources" - node_list_k = 'reserved_resources' - + def GetJobsId(self, job_id, username = None ): + """ + Details about a specific job. + Includes details about submission time, jot type, state, events, + owner, assigned ressources, walltime etc... + + """ + req = "GET_jobs_id" + node_list_k = 'assigned_network_address' #Get job info from OAR job_info = self.oar.parser.SendRequest(req, job_id, username) - print>>sys.stderr, "\r\n \r\n \t\t GetJobs %s " %(job_info) - if 'state' in job_info : + logger.debug("SLABDRIVER \t GetJobs %s " %(job_info)) + try: if job_info['state'] == 'Terminated': - print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN " + logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\ + %(job_id)) return None if job_info['state'] == 'Error': - print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info) + logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\ + %(job_info)) return None + + except KeyError: + logger.error("SLABDRIVER \tGetJobsId KeyError") + return None - #Get a dict of nodes . Key :hostname of the node - node_list = self.GetNodes() - node_hostname_list = [] - for node in node_list: - node_hostname_list.append(node['hostname']) - node_dict = dict(zip(node_hostname_list,node_list)) - try : - liste =job_info[node_list_k] - for k in range(len(liste)): - job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname'] - - #Replaces the previous entry "assigned_network_address" / "reserved_resources" - #with "node_ids" - job_info.update({'node_ids':job_info[node_list_k]}) - del job_info[node_list_k] - return job_info + parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k) + #Replaces the previous entry "assigned_network_address" / "reserved_resources" + #with "node_ids" + job_info.update({'node_ids':parsed_job_info[node_list_k]}) + del job_info[node_list_k] + logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info)) + return job_info + + + def GetJobsResources(self,job_id, return_fields_list=None, username = None): + #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\ + #'api_timestamp'] + #assigned_res = ['resource_id', 'resource_uri'] + #assigned_n = ['node', 'node_uri'] + + req = "GET_jobs_id_resources" + node_list_k = 'reserved_resources' + + #Get job info from OAR + job_info = self.oar.parser.SendRequest(req, job_id, username) + logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info)) + + parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k) + #Replaces the previous entry "assigned_network_address" / "reserved_resources" + #with "node_ids" + job_info.update({'node_ids':parsed_job_info[node_list_k]}) + del job_info[node_list_k] + return job_info + + def get_info_on_reserved_nodes(self,job_info,node_list_name): + #Get the list of the testbed nodes records and make a + #dictionnary keyed on the hostname out of it + node_list_dict = self.GetNodes() + #node_hostname_list = [] + node_hostname_list = [node['hostname'] for node in node_list_dict] + #for node in node_list_dict: + #node_hostname_list.append(node['hostname']) + node_dict = dict(zip(node_hostname_list,node_list_dict)) + try : + reserved_node_hostname_list = [] + for index in range(len(job_info[node_list_name])): + #job_info[node_list_name][k] = + reserved_node_hostname_list[index] = \ + node_dict[job_info[node_list_name][index]]['hostname'] + + logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \ + reserved_node_hostname_list %s" \ + %(reserved_node_hostname_list)) except KeyError: - print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " + logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " ) + return reserved_node_hostname_list + def GetReservedNodes(self): # this function returns a list of all the nodes already involved in an oar job #jobs=self.oar.parser.SendRequest("GET_reserved_nodes") @@ -580,28 +614,37 @@ class SlabDriver(Driver): return return_site_list - def GetSlices(self,slice_filter = None, filter_type = None, return_fields_list=None): + def GetSlices(self, slice_filter = None, slice_filter_type = None, \ + return_fields_list=None): return_slice_list = [] slicerec = {} rec = {} - ftypes = ['slice_hrn', 'record_id_user'] - if filter_type and filter_type in ftypes: - if filter_type == 'slice_hrn': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() - if filter_type == 'record_id_user': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() + authorized_filter_types_list = ['slice_hrn', 'record_id_user'] + print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list) + if slice_filter_type in authorized_filter_types_list: + if slice_filter_type == 'slice_hrn': + slicerec = slab_dbsession.query(SliceSenslab).\ + filter_by(slice_hrn = slice_filter).first() + + if slice_filter_type == 'record_id_user': + slicerec = slab_dbsession.query(SliceSenslab).\ + filter_by(record_id_user = slice_filter).first() if slicerec: - rec = slicerec.dumpquerytodict() + rec = slicerec.dump_sqlalchemyobj_to_dict() + print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec) + #Get login login = slicerec.slice_hrn.split(".")[1].split("_")[0] - #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices %s " %(slicerec) + logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\ + %(login,rec)) if slicerec.oar_job_id is not -1: - rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login ) - #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices GetJobs %s " %(rslt) + #Check with OAR the status of the job if a job id is in + #the slice record + rslt = self.GetJobsId(slicerec.oar_job_id,username = login) if rslt : rec.update(rslt) rec.update({'hrn':str(rec['slice_hrn'])}) - #If GetJobs is empty, this means the job is now in the 'Terminated' state + #If GetJobsResources is empty, this means the job is now in the 'Terminated' state #Update the slice record else : self.db.update_job(slice_filter, job_id = -1) @@ -884,84 +927,81 @@ class SlabDriver(Driver): def augment_records_with_testbed_info (self, sfa_records): return self.fill_record_info (sfa_records) - def fill_record_info(self, records): + def fill_record_info(self, record_list): """ Given a SFA record, fill in the senslab specific and SFA specific fields in the record. """ - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s " %(records) - if not isinstance(records, list): - records = [records] - - parkour = records + logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list)) + if not isinstance(record_list, list): + record_list = [record_list] + try: - for record in parkour: - + for record in record_list: + #If the record is a SFA slice record, then add information + #about the user of this slice. This kind of information is in the + #Senslab's DB. if str(record['type']) == 'slice': - #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t \t record %s" %(record) - #sfatable = SfaTable() - - #existing_records_by_id = {} - #all_records = dbsession.query(RegRecord).all() - #for rec in all_records: - #existing_records_by_id[rec.record_id] = rec - #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']]) - - #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) - #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first() - recslice = self.GetSlices(slice_filter = str(record['hrn']), filter_type = 'slice_hrn') - #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice) - #if isinstance(recslice,list) and len(recslice) == 1: - #recslice = recslice[0] - - recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first() - #existing_records_by_id[recslice['record_id_user']] - #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser) - - + #Get slab slice record. + recslice = self.GetSlices(slice_filter = \ + str(record['hrn']),\ + slice_filter_type = 'slice_hrn') + recuser = dbsession.query(RegRecord).filter_by(record_id = \ + recslice['record_id_user']).first() + logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \ + rec %s \r\n \r\n" %(recslice)) record.update({'PI':[recuser.hrn], - 'researcher': [recuser.hrn], - 'name':record['hrn'], - 'oar_job_id':recslice['oar_job_id'], - 'node_ids': [], - 'person_ids':[recslice['record_id_user']], - 'geni_urn':'', #For client_helper.py compatibility - 'keys':'', #For client_helper.py compatibility - 'key_ids':''}) #For client_helper.py compatibility + 'researcher': [recuser.hrn], + 'name':record['hrn'], + 'oar_job_id':recslice['oar_job_id'], + 'node_ids': [], + 'person_ids':[recslice['record_id_user']], + 'geni_urn':'', #For client_helper.py compatibility + 'keys':'', #For client_helper.py compatibility + 'key_ids':''}) #For client_helper.py compatibility elif str(record['type']) == 'user': - #Add the data about slice - rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user') - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU! rec %s \r\n \t rec['record_id_user'] %s " %(rec,rec['record_id_user']) - #Append record in records list, therfore fetches user and slice info again(one more loop) + #The record is a SFA user record. + #Get the information about his slice from Senslab's DB + #and add it to the user record. + recslice = self.GetSlices(slice_filter = \ + record['record_id'],\ + slice_filter_type = 'record_id_user') + + logger.debug( "SLABDRIVER.PY \t fill_record_info user \ + rec %s \r\n \r\n" %(recslice)) + #Append slice record in records list, + #therefore fetches user and slice info again(one more loop) #Will update PIs and researcher for the slice - recuser = dbsession.query(RegRecord).filter_by(record_id = rec['record_id_user']).first() - rec.update({'PI':[recuser.hrn], + recuser = dbsession.query(RegRecord).filter_by(record_id = \ + recslice['record_id_user']).first() + recslice.update({'PI':[recuser.hrn], 'researcher': [recuser.hrn], 'name':record['hrn'], - 'oar_job_id':rec['oar_job_id'], + 'oar_job_id':recslice['oar_job_id'], 'node_ids': [], - 'person_ids':[rec['record_id_user']]}) - #retourne une liste 100512 - + 'person_ids':[recslice['record_id_user']]}) + #GetPersons takes [] as filters user_slab = self.GetPersons([{'hrn':recuser.hrn}]) - - rec.update({'type':'slice','hrn':rec['slice_hrn']}) + + recslice.update({'type':'slice','hrn':recslice['slice_hrn']}) record.update(user_slab[0]) #For client_helper.py compatibility record.update( { 'geni_urn':'', 'keys':'', 'key_ids':'' }) - records.append(rec) - - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info ADDING SLICEINFO TO USER records %s" %(records) + record_list.append(recslice) - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info OKrecords %s" %(records) - except TypeError: - print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info EXCEPTION RECORDS : %s" %(records) + logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\ + INFO TO USER records %s" %(record_list)) + + + except TypeError,e: + logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e)) + return #self.fill_record_slab_info(records) diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index a1338332..ae7f1157 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -69,7 +69,7 @@ class SliceSenslab (SlabBase): result += ">" return result - def dumpquerytodict(self): + def dump_sqlalchemyobj_to_dict(self): dict = {'slice_hrn':self.slice_hrn, 'peer_authority':self.peer_authority, 'record_id':self.record_id_slice, diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index 37d3ebca..49c226e0 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -45,7 +45,7 @@ class SlabSlices: - slice = self.driver.GetSlices(slice_filter = slice_name, filter_type = 'slice_hrn') + slice = self.driver.GetSlices(slice_filter = slice_name, slice_filter_type = 'slice_hrn') # Get user information @@ -276,7 +276,7 @@ class SlabSlices: login_base = slice_hrn.split(".")[0] slicename = slice_hrn - sl = self.driver.GetSlices(slice_filter=slicename, filter_type = 'slice_hrn') + sl = self.driver.GetSlices(slice_filter=slicename, slice_filter_type = 'slice_hrn') if sl: print>>sys.stderr, " \r\n \r\rn Slices.py verify_slice slicename %s sl %s slice_record %s"%(slicename ,sl, slice_record)