From: Sandrine Avakian Date: Tue, 14 Aug 2012 14:45:31 +0000 (+0200) Subject: Fixed sfi.py delete X-Git-Tag: sfa-2.1-24~3^2~98 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=58bd1d48bf112396197a00c96a565cf01a574d66;p=sfa.git Fixed sfi.py delete --- diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 9d1adaf8..a9bfce07 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -1,14 +1,13 @@ import subprocess from datetime import datetime -from dateutil import tz -from time import strftime, gmtime +from time import gmtime from sfa.util.faults import SliverDoesNotExist, UnknownSfaType from sfa.util.sfalogging import logger from sfa.storage.alchemy import dbsession -from sfa.storage.model import RegRecord, RegUser, RegSlice +from sfa.storage.model import RegRecord, RegUser from sfa.trust.credential import Credential @@ -28,8 +27,10 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab -from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, \ + JobSenslab +from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \ + slab_xrn_object from sfa.senslab.slabslices import SlabSlices @@ -144,14 +145,18 @@ class SlabDriver(Driver): #Get list of leases oar_leases_list = self.GetReservedNodes() - logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list)) - #Get list of slices/leases . multiple entry per user depending on number of jobs + logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table :\ + oar_leases_list %s\r\n" %( oar_leases_list)) + #Get list of slices/leases . multiple entry per user depending + #on number of jobs #At this point we don't have the slice_hrn so that's why #we are calling Getslices, which holds a field with slice_hrn if slice_hrn : - sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') - self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list) + sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, \ + slice_filter_type = 'slice_hrn') + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \ + oar_leases_list, sfa_slices_list) else : sfa_slices_list = self.GetSlices() @@ -159,23 +164,25 @@ class SlabDriver(Driver): for sfa_slice in sfa_slices_list: if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn: sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = [] - sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) - else : - sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice) + + sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].\ + append(sfa_slice) + for slice_hrn in sfa_slices_dict_by_slice_hrn: list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn] - if slice_hrn =='senslab2.avakian_slice': - logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa)) - self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa) + self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \ + oar_leases_list, list_slices_sfa) return - def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list): + def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, \ + oar_leases_list, sfa_slices_list): - #Get list of slices/leases . multiple entry per user depending on number of jobs - #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn') + #Get list of slices/leases . + #multiple entry per user depending on number of jobs + sfa_slices_dict = {} oar_leases_dict = {} login = slice_hrn.split(".")[1].split("_")[0] @@ -185,25 +192,29 @@ class SlabDriver(Driver): for sl in sfa_slices_list: if sl['oar_job_id'] != [] : + #one entry in the dictionnary for each jobid/login, one login + #can have multiple jobs running for oar_jobid in sl['oar_job_id']: if (login, oar_jobid) not in sfa_slices_dict: sfa_slices_dict[(login,oar_jobid)] = sl for lease in oar_leases_list: if (lease['user'], lease['lease_id']) not in oar_leases_dict: - oar_leases_dict[(lease['user'], lease['lease_id'])] = lease + oar_leases_dict[(lease['user'], lease['lease_id'])] = lease #Find missing entries in the sfa slices list dict by comparing #the keys in both dictionnaries #Add the missing entries in the slice sneslab table for lease in oar_leases_dict : - logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login)) - if lease not in sfa_slices_dict and login == lease[0]: - - #if lease in GetReservedNodes not in GetSlices update the db + + if lease not in sfa_slices_dict and login == lease[0]: + #if lease in GetReservedNodes not in GetSlices + #and the login of the job running matches then update the db + #for this login #First get the list of nodes hostnames for this job - oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes'] + oar_reserved_nodes_listdict = \ + oar_leases_dict[lease]['reserved_nodes'] oar_reserved_nodes_list = [] for node_dict in oar_reserved_nodes_listdict: oar_reserved_nodes_list.append(node_dict['hostname']) @@ -211,11 +222,11 @@ class SlabDriver(Driver): self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list) for lease in sfa_slices_dict: - #Job is now terminated or in Error, either way ot is not going to run again + #Job is now terminated or in Error, + #either way ot is not going to run again #Remove it from the db if lease not in oar_leases_dict: - self.db.delete_job( slice_hrn, lease[1]) - + self.db.delete_job( slice_hrn, lease[1]) return def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \ @@ -241,14 +252,20 @@ class SlabDriver(Driver): self.synchronize_oar_and_slice_table(slice_hrn) # ensure site record exists? # ensure slice record exists + #Removed options to verify_slice SA 14/08/12 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \ - sfa_peer, options=options) - requested_attributes = rspec.version.get_slice_attributes() + sfa_peer) + + #requested_attributes returned by rspec.version.get_slice_attributes() + #unused, removed SA 13/08/12 + rspec.version.get_slice_attributes() logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice)) # ensure person records exists - persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \ + #verify_persons returns added persons but since the return value + #is not used + slices.verify_persons(slice_hrn, sfa_slice, users, peer, \ sfa_peer, options=options) @@ -259,9 +276,10 @@ class SlabDriver(Driver): for node in rspec.version.get_nodes_with_slivers()] l = [ node for node in rspec.version.get_nodes_with_slivers() ] logger.debug("SLADRIVER \tcreate_sliver requested_slivers \ - requested_slivers %s listnodes %s" %(requested_slivers,l)) - - nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) + requested_slivers %s listnodes %s" \ + %(requested_slivers,l)) + #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12. + slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) # add/remove leases requested_lease_list = [] @@ -279,7 +297,8 @@ class SlabDriver(Driver): if single_requested_lease.get('hostname'): requested_lease_list.append(single_requested_lease) - #dCreate dict of leases by start_time, regrouping nodes reserved at the same + #dCreate dict of leases by start_time, regrouping nodes reserved + #at the same #time, for the same amount of time = one job on OAR requested_job_dict = {} for lease in requested_lease_list: @@ -303,8 +322,9 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s " %(requested_job_dict)) - - leases = slices.verify_slice_leases(sfa_slice, \ + #verify_slice_leases returns the leases , but the return value is unused + #here. Removed SA 13/08/12 + slices.verify_slice_leases(sfa_slice, \ requested_job_dict, kept_leases, peer) return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version) @@ -312,21 +332,28 @@ class SlabDriver(Driver): def delete_sliver (self, slice_urn, slice_hrn, creds, options): - sfa_slice = self.GetSlices(slice_filter = slice_hrn, \ + sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \ slice_filter_type = 'slice_hrn') - logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) - if not sfa_slice: + + if not sfa_slice_list: return 1 - + + sfa_slice = sfa_slice_list[0] + + logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) slices = SlabSlices(self) # determine if this is a peer slice peer = slices.get_peer(slice_hrn) + #TODO delete_sliver SA : UnBindObjectFromPeer should be + #used when there is another + #senslab testbed, which is not the case 14/08/12 . + logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer)) try: if peer: self.UnBindObjectFromPeer('slice', \ - sfa_slice['record_id_slice'], peer) + sfa_slice['record_id_slice'], peer,None) self.DeleteSliceFromNodes(sfa_slice) finally: if peer: @@ -524,7 +551,6 @@ class SlabDriver(Driver): def remove (self, sfa_record): sfa_record_type = sfa_record['type'] hrn = sfa_record['hrn'] - record_id = sfa_record['record_id'] if sfa_record_type == 'user': #get user from senslab ldap @@ -538,7 +564,7 @@ class SlabDriver(Driver): elif sfa_record_type == 'slice': if self.GetSlices(slice_filter = hrn, \ slice_filter_type = 'slice_hrn'): - self.DeleteSlice(sfa_record_type) + self.DeleteSlice(sfa_record) #elif type == 'authority': #if self.GetSites(pointer): @@ -583,7 +609,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \ %(records_list)) - except: + except KeyError: pass return_records = records_list @@ -599,7 +625,7 @@ class SlabDriver(Driver): #TODO : Handling OR request in make_ldap_filters_from_records #instead of the for loop #over the records' list - def GetPersons(self, person_filter=None, return_fields_list=None): + def GetPersons(self, person_filter=None): """ person_filter should be a list of dictionnaries when not set to None. Returns a list of users whose accounts are enabled found in ldap. @@ -643,8 +669,8 @@ class SlabDriver(Driver): self.db.delete_job(slice_hrn, job_id) answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \ reqdict,username) - logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \ - %(job_id,answer, username)) + logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \ + username %s" %(job_id,answer, username)) return answer @@ -696,7 +722,7 @@ class SlabDriver(Driver): #assigned_n = ['node', 'node_uri'] req = "GET_jobs_id_resources" - node_list_k = 'reserved_resources' + #Get job resources list from OAR node_id_list = self.oar.parser.SendRequest(req, job_id, username) @@ -705,8 +731,7 @@ class SlabDriver(Driver): hostname_list = \ self.__get_hostnames_from_oar_node_ids(node_id_list) - #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \ - #node_list_k) + #Replaces the previous entry "assigned_network_address" / #"reserved_resources" #with "node_ids" @@ -752,7 +777,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\ oar_id_node_dict %s" %(oar_id_node_dict)) - hostname_list = [] + hostname_dict_list = [] for resource_id in resource_id_list: #Because jobs requested "asap" do not have defined resources @@ -996,10 +1021,6 @@ class SlabDriver(Driver): def sfa_fields_to_slab_fields(self, sfa_type, hrn, record): - def convert_ints(tmpdict, int_fields): - for field in int_fields: - if field in tmpdict: - tmpdict[field] = int(tmpdict[field]) slab_record = {} #for field in record: @@ -1088,10 +1109,8 @@ class SlabDriver(Driver): added_nodes. """ - - site_list = [] + nodeid_list = [] - resource = "" reqdict = {} @@ -1242,13 +1261,10 @@ class SlabDriver(Driver): return - #Delete the jobs and updates the job id in the senslab table - #to set it to -1 - #Does not clear the node list + #Delete the jobs from job_senslab table def DeleteSliceFromNodes(self, slice_record): - # Get user information - - self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn']) + for job_id in slice_record['oar_job_id']: + self.DeleteJobs(job_id, slice_record['hrn']) return @@ -1260,7 +1276,7 @@ class SlabDriver(Driver): grain = 60 return grain - def GetLeases(self, lease_filter_dict=None, return_fields_list=None): + def GetLeases(self, lease_filter_dict=None): unfiltered_reservation_list = self.GetReservedNodes() ##Synchronize slice_table of sfa senslab db @@ -1296,15 +1312,7 @@ class SlabDriver(Driver): %(resa_user_dict)) for resa in unfiltered_reservation_list: - #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')') - #ldap_info = ldap_info[0][1] - - #user = dbsession.query(RegUser).filter_by(email = \ - #ldap_info['mail'][0]).first() - ##Separated in case user not in database : record_id not defined SA 17/07//12 - #query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id) - #if query_slice_info: - #slice_info = query_slice_info.first() + #Put the slice_urn resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info'].slice_hrn resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') @@ -1675,5 +1683,6 @@ class SlabDriver(Driver): for job_id in leases_id_list: self.DeleteJobs(job_id, slice_hrn) - logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \r\n " %(leases_id_list, slice_hrn)) + logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \ + \r\n " %(leases_id_list, slice_hrn)) return diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index 6b413e5a..0eb91603 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -115,6 +115,9 @@ class SlabSlices: logger.debug("SLABSLICES \ get_peer slice_authority %s \ site_authority %s hrn %s" %(slice_authority, \ site_authority, hrn)) + #This slice belongs to the current site + if site_authority == self.driver.root_auth : + return None # check if we are already peered with this site_authority, if so #peers = self.driver.GetPeers({}) peers = self.driver.GetPeers(peer_filter = slice_authority) @@ -157,7 +160,7 @@ class SlabSlices: self.driver.UnBindObjectFromPeer('senslab2', 'slice', \ sfa_slice['record_id_slice'], peer.hrn) - deleted = self.driver.DeleteLeases(deleted_leases, \ + self.driver.DeleteLeases(deleted_leases, \ sfa_slice['name']) #TODO : catch other exception? @@ -167,7 +170,7 @@ class SlabSlices: #Add new leases for start_time in requested_jobs_dict: job = requested_jobs_dict[start_time] - added = self.driver.AddLeases(job['hostname'], \ + self.driver.AddLeases(job['hostname'], \ sfa_slice, int(job['start_time']), \ int(job['duration'])) @@ -184,39 +187,19 @@ class SlabSlices: # remove nodes not in rspec deleted_nodes = list(set(current_slivers).\ difference(requested_slivers)) - - # add nodes from rspec - added_nodes = list(set(requested_slivers).difference(current_slivers)) - try: - #if peer: - #self.driver.UnBindObjectFromPeer('slice', slice['slice_id'], \ - #peer['shortname']) - #PI is a list, get the only username in this list - #so that the OAR/LDAP knows the user: - #remove the authority from the name - tmp = sfa_slice['PI'][0].split(".") - username = tmp[(len(tmp)-1)] - #Update the table with the nodes that populate the slice - logger.debug("SLABSLICES \tverify_slice_nodes slice %s \r\n \r\n deleted_nodes %s"\ - %(sfa_slice,deleted_nodes)) - #self.driver.db.update_job(sfa_slice['name'], nodes = added_nodes) - - #If there is a timeslot specified, then a job can be launched - #try: - ##slot = sfa_slice['timeslot'] - #self.driver.LaunchExperimentOnOAR(sfa_slice, added_nodes, \ - #username) - #except KeyError: - #logger.log_exc("SLABSLICES \verify_slice_nodes KeyError \ - #sfa_slice %s " %(sfa_slice)) + # add nodes from rspec + #added_nodes = list(set(requested_slivers).difference(current_slivers)) + #Update the table with the nodes that populate the slice + logger.debug("SLABSLICES \tverify_slice_nodes slice %s\ + \r\n \r\n deleted_nodes %s"\ + %(sfa_slice,deleted_nodes)) if deleted_nodes: self.driver.DeleteSliceFromNodes(sfa_slice['name'], \ deleted_nodes) - #return added_nodes - except: - logger.log_exc('Failed to add/remove slice from nodes') + return nodes + def free_egre_key(self): @@ -328,7 +311,7 @@ class SlabSlices: #return site - def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer, options={}): + def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer): #login_base = slice_hrn.split(".")[0] slicename = slice_hrn @@ -407,6 +390,7 @@ class SlabSlices: users_dict[user['hrn']] = {'person_id':user['person_id'], \ 'hrn':user['hrn']} + logger.debug( "SLABSLICE.PY \tverify_person \ users_dict %s \r\n user_by_hrn %s \r\n \ \tusers_by_id %s " \ @@ -436,6 +420,7 @@ class SlabSlices: if existing_users: for user in existing_users : #for k in users_dict[user['hrn']] : + existing_user_hrns.append(users_dict[user['hrn']]['hrn']) existing_user_ids.\ append(users_dict[user['hrn']]['person_id']) @@ -453,6 +438,7 @@ class SlabSlices: ldap_reslt = self.driver.ldap.LdapSearch(users) if ldap_reslt: existing_users = ldap_reslt[0] + #TODO : DEBUG user undefined ? SA 14/08/12 existing_user_hrns.append(users_dict[user['hrn']]['hrn']) existing_user_ids.\ append(users_dict[user['hrn']]['person_id']) @@ -614,13 +600,12 @@ class SlabSlices: removed_keys = set(existing_keys).difference(requested_keys) for existing_key_id in keydict: if keydict[existing_key_id] in removed_keys: - try: - if peer: - self.driver.UnBindObjectFromPeer('key', \ - existing_key_id, peer['shortname']) - self.driver.DeleteKey(existing_key_id) - except: - pass + + if peer: + self.driver.UnBindObjectFromPeer('key', \ + existing_key_id, peer['shortname']) + self.driver.DeleteKey(existing_key_id) + #def verify_slice_attributes(self, slice, requested_slice_attributes, \ #append=False, admin=False): @@ -695,88 +680,4 @@ class SlabSlices: #value: %s, node_id: %s\nCause:%s'\ #% (name, value, node_id, str(error))) - #def create_slice_aggregate(self, xrn, rspec): - #hrn, type = urn_to_hrn(xrn) - ## Determine if this is a peer slice - #peer = self.get_peer(hrn) - #sfa_peer = self.get_sfa_peer(hrn) - - #spec = RSpec(rspec) - ## Get the slice record from sfa - #slicename = hrn_to_pl_slicename(hrn) - #slice = {} - #slice_record = None - #registry = self.api.registries[self.api.hrn] - #credential = self.api.getCredential() - - #site_id, remote_site_id = self.verify_site(registry, \ - #credential, hrn, peer, sfa_peer) - #slice = self.verify_slice(registry, credential, \ - #hrn, site_id, remote_site_id, peer, sfa_peer) - - ## find out where this slice is currently running - #nodelist = self.driver.GetNodes(slice['node_ids'], ['hostname']) - #hostnames = [node['hostname'] for node in nodelist] - - ## get netspec details - #nodespecs = spec.getDictsByTagName('NodeSpec') - - ## dict in which to store slice attributes to set for the nodes - #nodes = {} - #for nodespec in nodespecs: - #if isinstance(nodespec['name'], list): - #for nodename in nodespec['name']: - #nodes[nodename] = {} - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = \ - #rspec_attribute_value - #elif isinstance(nodespec['name'], StringTypes): - #nodename = nodespec['name'] - #nodes[nodename] = {} - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = rspec_attribute_value - - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = rspec_attribute_value - - #node_names = nodes.keys() - ## remove nodes not in rspec - #deleted_nodes = list(set(hostnames).difference(node_names)) - ## add nodes from rspec - #added_nodes = list(set(node_names).difference(hostnames)) - - #try: - #if peer: - #self.driver.UnBindObjectFromPeer('slice', \ - #slice['slice_id'], peer) - - #self.driver.LaunchExperimentOnOAR(slicename, added_nodes) - - ## Add recognized slice tags - #for node_name in node_names: - #node = nodes[node_name] - #for slice_tag in node.keys(): - #value = node[slice_tag] - #if (isinstance(value, list)): - #value = value[0] - - #self.driver.AddSliceTag(slicename, slice_tag, \ - #value, node_name) - - #self.driver.DeleteSliceFromNodes(slicename, deleted_nodes) - #finally: - #if peer: - #self.driver.BindObjectToPeer('slice', slice['slice_id'], \ - #peer, slice['peer_slice_id']) - - #return 1 - + \ No newline at end of file