X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fsenslab%2Fslabslices.py;h=188917517f5b65562bc1fe55f60482fb6872f344;hb=da2fe4312309e507c295409557f41ab9d4a86d37;hp=b6a0bcb98c65508a904af963e24cb27e5648045b;hpb=dd07ad19e9c39e121c6ff177a863c9e7da1f1eac;p=sfa.git diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index b6a0bcb9..18891751 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -7,101 +7,12 @@ MAXINT = 2L**31-1 class SlabSlices: rspec_to_slice_tag = {'max_rate':'net_max_rate'} - - #def __init__(self, api, ttl = .5, origin_hrn=None): - #self.api = api - ##filepath = path + os.sep + filename - #self.policy = Policy(self.api) - #self.origin_hrn = origin_hrn - #self.registry = api.registries[api.hrn] - #self.credential = api.getCredential() - #self.nodes = [] - #self.persons = [] - - + + def __init__(self, driver): self.driver = driver - ##Used in SFACE? - #def get_slivers(self, xrn, node=None): - #hrn, hrn_type = urn_to_hrn(xrn) - - #slice_name = hrn_to_pl_slicename(hrn) - ## XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead - ## of doing all of this? - ##return self.api.driver.GetSliceTicket(self.auth, slice_name) - - - - #sfa_slice = self.driver.GetSlices(slice_filter = slice_name, \ - # slice_filter_type = 'slice_hrn') - - - ## Get user information - ##TODO - #alchemy_person = dbsession.query(RegRecord).filter_by(record_id = \ - #sfa_slice['record_id_user']).first() - - #slivers = [] - #sliver_attributes = [] - - #if sfa_slice['oar_job_id'] is not -1: - #nodes_all = self.driver.GetNodes({'hostname': \ - #sfa_slice['node_ids']}, - #['node_id', 'hostname','site','boot_state']) - #nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all]) - #nodes = sfa_slice['node_ids'] - - #for node in nodes: - ##for sliver_attribute in filter(lambda a: a['node_id'] == \ - #node['node_id'], slice_tags): - #sliver_attribute['tagname'] = 'slab-tag' - #sliver_attribute['value'] = 'slab-value' - #sliver_attributes.append(sliver_attribute['tagname']) - #attributes.append({'tagname': sliver_attribute['tagname'], - #'value': sliver_attribute['value']}) - - ## set nodegroup slice attributes - #for slice_tag in filter(lambda a: a['nodegroup_id'] \ - #in node['nodegroup_ids'], slice_tags): - ## Do not set any nodegroup slice attributes for - ## which there is at least one sliver attribute - ## already set. - #if slice_tag not in slice_tags: - #attributes.append({'tagname': slice_tag['tagname'], - #'value': slice_tag['value']}) - - #for slice_tag in filter(lambda a: a['node_id'] is None, \ - #slice_tags): - ## Do not set any global slice attributes for - ## which there is at least one sliver attribute - ## already set. - #if slice_tag['tagname'] not in sliver_attributes: - #attributes.append({'tagname': slice_tag['tagname'], - #'value': slice_tag['value']}) - - ## XXX Sanity check; though technically this should - ## be a system invariant - ## checked with an assertion - #if sfa_slice['expires'] > MAXINT: sfa_slice['expires']= MAXINT - - #slivers.append({ - #'hrn': hrn, - #'name': sfa_slice['name'], - #'slice_id': sfa_slice['slice_id'], - #'instantiation': sfa_slice['instantiation'], - #'expires': sfa_slice['expires'], - #'keys': keys, - #'attributes': attributes - #}) - - #return slivers - - - - - - #return slivers + def get_peer(self, xrn): hrn, hrn_type = urn_to_hrn(xrn) #Does this slice belong to a local site or a peer senslab site? @@ -115,6 +26,9 @@ class SlabSlices: logger.debug("SLABSLICES \ get_peer slice_authority %s \ site_authority %s hrn %s" %(slice_authority, \ site_authority, hrn)) + #This slice belongs to the current site + if site_authority == self.driver.root_auth : + return None # check if we are already peered with this site_authority, if so #peers = self.driver.GetPeers({}) peers = self.driver.GetPeers(peer_filter = slice_authority) @@ -137,75 +51,154 @@ class SlabSlices: sfa_peer = site_authority return sfa_peer + + def verify_slice_leases(self, sfa_slice, requested_jobs_dict, peer): + + + #First get the list of current leases from OAR + leases = self.driver.GetLeases({'name':sfa_slice['slice_hrn']}) + logger.debug("SLABSLICES verify_slice_leases requested_jobs_dict %s \ + leases %s "%(requested_jobs_dict, leases )) - def verify_slice_leases(self, sfa_slice, requested_leases, kept_leases, \ - peer): + current_nodes_reserved_by_start_time = {} + requested_nodes_by_start_time = {} + leases_by_start_time = {} + reschedule_jobs_dict = {} + - leases = self.driver.GetLeases({'name':sfa_slice['name']}, ['lease_id']) - grain = self.driver.GetLeaseGranularity() - if leases : - current_leases = [lease['lease_id'] for lease in leases] - deleted_leases = list(set(current_leases).difference(kept_leases)) - - try: - if peer: - self.driver.UnBindObjectFromPeer('slice', \ - sfa_slice['slice_id'], peer['shortname']) - deleted = self.driver.DeleteLeases(deleted_leases) - for lease in requested_leases: - added = self.driver.AddLeases(lease['hostname'], \ - sfa_slice['name'], int(lease['start_time']), \ - int(lease['duration'])) - #TODO : catch other exception? - except KeyError: - logger.log_exc('Failed to add/remove slice leases') + #Create reduced dictionary with key start_time and value + # the list of nodes + #-for the leases already registered by OAR first + # then for the new leases requested by the user + + #Leases already scheduled/running in OAR + for lease in leases : + current_nodes_reserved_by_start_time[lease['t_from']] = \ + lease['reserved_nodes'] + leases_by_start_time[lease['t_from']] = lease + + + #Requested jobs + for start_time in requested_jobs_dict: + requested_nodes_by_start_time[int(start_time)] = \ + requested_jobs_dict[start_time]['hostname'] + #Check if there is any difference between the leases already + #registered in OAR and the requested jobs. + #Difference could be: + #-Lease deleted in the requested jobs + #-Added/removed nodes + #-Newly added lease + + logger.debug("SLABSLICES verify_slice_leases \ + requested_nodes_by_start_time %s \ + "%(requested_nodes_by_start_time )) + #Find all deleted leases + start_time_list = \ + list(set(leases_by_start_time.keys()).\ + difference(requested_nodes_by_start_time.keys())) + deleted_leases = [leases_by_start_time[start_time]['lease_id'] \ + for start_time in start_time_list] + + + #Find added or removed nodes in exisiting leases + for start_time in requested_nodes_by_start_time: + logger.debug("SLABSLICES verify_slice_leases start_time %s \ + "%( start_time)) + if start_time in current_nodes_reserved_by_start_time: + + if requested_nodes_by_start_time[start_time] == \ + current_nodes_reserved_by_start_time[start_time]: + continue + + else: + update_node_set = \ + set(requested_nodes_by_start_time[start_time]) + added_nodes = \ + update_node_set.difference(\ + current_nodes_reserved_by_start_time[start_time]) + shared_nodes = \ + update_node_set.intersection(\ + current_nodes_reserved_by_start_time[start_time]) + old_nodes_set = \ + set(\ + current_nodes_reserved_by_start_time[start_time]) + removed_nodes = \ + old_nodes_set.difference(\ + requested_nodes_by_start_time[start_time]) + logger.debug("SLABSLICES verify_slice_leases \ + shared_nodes %s added_nodes %s removed_nodes %s"\ + %(shared_nodes, added_nodes,removed_nodes )) + #If the lease is modified, delete it before + #creating it again. + #Add the deleted lease job id in the list + #WARNING :rescheduling does not work if there is already + # 2 running/scheduled jobs because deleting a job + #takes time SA 18/10/2012 + if added_nodes or removed_nodes: + deleted_leases.append(\ + leases_by_start_time[start_time]['lease_id']) + #Reschedule the job + if added_nodes or shared_nodes: + reschedule_jobs_dict[str(start_time)] = \ + requested_jobs_dict[str(start_time)] + + else: + #New lease + + job = requested_jobs_dict[str(start_time)] + logger.debug("SLABSLICES \ + NEWLEASE slice %s job %s"\ + %(sfa_slice, job)) + self.driver.AddLeases(job['hostname'], \ + sfa_slice, int(job['start_time']), \ + int(job['duration'])) + + #Deleted leases are the ones with lease id not declared in the Rspec + if deleted_leases: + self.driver.DeleteLeases(deleted_leases, sfa_slice['slice_hrn']) + logger.debug("SLABSLICES \ + verify_slice_leases slice %s deleted_leases %s"\ + %(sfa_slice, deleted_leases)) + + + if reschedule_jobs_dict : + for start_time in reschedule_jobs_dict: + job = reschedule_jobs_dict[start_time] + self.driver.AddLeases(job['hostname'], \ + sfa_slice, int(job['start_time']), \ + int(job['duration'])) return leases def verify_slice_nodes(self, sfa_slice, requested_slivers, peer): current_slivers = [] deleted_nodes = [] - - if sfa_slice['node_ids']: - nodes = self.driver.GetNodes(sfa_slice['node_ids'], ['hostname']) + + if 'node_ids' in sfa_slice: + nodes = self.driver.GetNodes(sfa_slice['list_node_ids'], \ + ['hostname']) current_slivers = [node['hostname'] for node in nodes] # remove nodes not in rspec deleted_nodes = list(set(current_slivers).\ difference(requested_slivers)) - - # add nodes from rspec - added_nodes = list(set(requested_slivers).difference(current_slivers)) - try: - #if peer: - #self.driver.UnBindObjectFromPeer('slice', slice['slice_id'], \ - #peer['shortname']) - #PI is a list, get the only username in this list - #so that the OAR/LDAP knows the user: - #remove the authority from the name - tmp = sfa_slice['PI'][0].split(".") - username = tmp[(len(tmp)-1)] - #Update the table with the nodes that populate the slice - self.driver.db.update_job(sfa_slice['name'], nodes = added_nodes) - logger.debug("SLABSLICES \tverify_slice_nodes slice %s "\ - %(sfa_slice)) - #If there is a timeslot specified, then a job can be launched - try: - #slot = sfa_slice['timeslot'] - self.driver.LaunchExperimentOnOAR(sfa_slice, added_nodes, \ - username) - except KeyError: - logger.log_exc("SLABSLICES \verify_slice_nodes KeyError \ - sfa_slice %s " %(sfa_slice)) + # add nodes from rspec + #added_nodes = list(set(requested_slivers).\ + #difference(current_slivers)) + + logger.debug("SLABSLICES \tverify_slice_nodes slice %s\ + \r\n \r\n deleted_nodes %s"\ + %(sfa_slice, deleted_nodes)) if deleted_nodes: - self.driver.DeleteSliceFromNodes(sfa_slice['name'], \ - deleted_nodes) + #Delete the entire experience + self.driver.DeleteSliceFromNodes(sfa_slice) + #self.driver.DeleteSliceFromNodes(sfa_slice['slice_hrn'], \ + #deleted_nodes) + return nodes - except: - logger.log_exc('Failed to add/remove slice from nodes') def free_egre_key(self): @@ -317,31 +310,34 @@ class SlabSlices: #return site - def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer, options={}): + def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer): #login_base = slice_hrn.split(".")[0] slicename = slice_hrn - sl = self.driver.GetSlices(slice_filter = slicename, \ + slices_list = self.driver.GetSlices(slice_filter = slicename, \ slice_filter_type = 'slice_hrn') - if sl: - - logger.debug("SLABSLICE \tverify_slice slicename %s sl %s \ - slice_record %s"%(slicename, sl, slice_record)) - sfa_slice = sl - sfa_slice.update(slice_record) - #del slice['last_updated'] - #del slice['date_created'] - #if peer: - #slice['peer_slice_id'] = slice_record.get('slice_id', None) - ## unbind from peer so we can modify if necessary. - ## Will bind back later - #self.driver.UnBindObjectFromPeer('slice', slice['slice_id'], \ + if slices_list: + for sl in slices_list: + + logger.debug("SLABSLICE \tverify_slice slicename %s slices_list %s sl %s \ + slice_record %s"%(slicename, slices_list,sl, \ + slice_record)) + sfa_slice = sl + sfa_slice.update(slice_record) + #del slice['last_updated'] + #del slice['date_created'] + #if peer: + #slice['peer_slice_id'] = slice_record.get('slice_id', None) + ## unbind from peer so we can modify if necessary. + ## Will bind back later + #self.driver.UnBindObjectFromPeer('slice', \ + #slice['slice_id'], \ #peer['shortname']) - #Update existing record (e.g. expires field) - #it with the latest info. - ##if slice_record and slice['expires'] != slice_record['expires']: - ##self.driver.UpdateSlice( slice['slice_id'], {'expires' : \ - #slice_record['expires']}) + #Update existing record (e.g. expires field) + #it with the latest info. + ##if slice_record and slice['expires'] != slice_record['expires']: + ##self.driver.UpdateSlice( slice['slice_id'], {'expires' : \ + #slice_record['expires']}) else: logger.debug(" SLABSLICES \tverify_slice Oups \ slice_record %s peer %s sfa_peer %s "\ @@ -377,10 +373,22 @@ class SlabSlices: def verify_persons(self, slice_hrn, slice_record, users, peer, sfa_peer, \ options={}): - users_by_id = {} - users_by_hrn = {} + """ + users is a record list. Records can either be local records + or users records from known and trusted federated sites. + If the user is from another site that senslab doesn't trust yet, + then Resolve will raise an error before getting to create_sliver. + """ + #TODO SA 21/08/12 verify_persons Needs review + + + users_by_id = {} + users_by_hrn = {} + #users_dict : dict whose keys can either be the user's hrn or its id. + #Values contains only id and hrn users_dict = {} - + + #First create dicts by hrn and id for each user in the user record list: for user in users: if 'urn' in user and (not 'hrn' in user ) : @@ -395,7 +403,8 @@ class SlabSlices: users_dict[user['hrn']] = {'person_id':user['person_id'], \ 'hrn':user['hrn']} - logger.debug( "SLABSLICE.PY \tverify_person \ + + logger.debug( "SLABSLICE.PY \t verify_person \ users_dict %s \r\n user_by_hrn %s \r\n \ \tusers_by_id %s " \ %(users_dict,users_by_hrn, users_by_id)) @@ -403,74 +412,61 @@ class SlabSlices: existing_user_ids = [] existing_user_hrns = [] existing_users = [] - #Check if user is in LDAP using its hrn. - #Assuming Senslab is centralised : one LDAP for all sites, + # Check if user is in Senslab LDAP using its hrn. + # Assuming Senslab is centralised : one LDAP for all sites, # user_id unknown from LDAP - # LDAP does not provide users id, therfore we rely on hrns + # LDAP does not provide users id, therefore we rely on hrns containing + # the login of the user. + # If the hrn is not a senslab hrn, the user may not be in LDAP. if users_by_hrn: - #Construct the list of filters for GetPersons + #Construct the list of filters (list of dicts) for GetPersons filter_user = [] for hrn in users_by_hrn: - #filter_user.append ( {'hrn':hrn}) filter_user.append (users_by_hrn[hrn]) logger.debug(" SLABSLICE.PY \tverify_person filter_user %s " \ - %(filter_user)) - existing_users = self.driver.GetPersons(filter_user) - #existing_users = self.driver.GetPersons({'hrn': \ - #users_by_hrn.keys()}) - #existing_users = self.driver.GetPersons({'hrn': \ - #users_by_hrn.keys()}, \ - #['hrn','pkey']) + %(filter_user)) + #Check user's in LDAP with GetPersons + #Needed because what if the user has been deleted in LDAP but + #is still in SFA? + existing_users = self.driver.GetPersons(filter_user) + + #User's in senslab LDAP if existing_users: for user in existing_users : - #for k in users_dict[user['hrn']] : existing_user_hrns.append(users_dict[user['hrn']]['hrn']) existing_user_ids.\ append(users_dict[user['hrn']]['person_id']) - #User from another federated site , - #does not have a senslab account yet? - #or have multiple SFA accounts - #Check before adding them to LDAP - + # User from another known trusted federated site. Check + # if a senslab account matching the email has already been created. else: - + req = 'mail=' if isinstance(users, list): - ldap_reslt = self.driver.ldap.LdapSearch(users[0]) + + req += users[0]['email'] else: - ldap_reslt = self.driver.ldap.LdapSearch(users) + req += users['email'] + + ldap_reslt = self.driver.ldap.LdapSearch(req) if ldap_reslt: - existing_users = ldap_reslt[0] - existing_user_hrns.append(users_dict[user['hrn']]['hrn']) - existing_user_ids.\ - append(users_dict[user['hrn']]['person_id']) + logger.debug(" SLABSLICE.PY \tverify_person users \ + USER already in Senslab \t ldap_reslt %s \ + "%( ldap_reslt)) + existing_users.append(ldap_reslt[1]) + else: #User not existing in LDAP - + #TODO SA 21/08/12 raise smthg to add user or add it auto ? logger.debug(" SLABSLICE.PY \tverify_person users \ - not in ldap ... %s \r\n \t ldap_reslt %s " \ - %(users, ldap_reslt)) - - - # requested slice users + not in ldap ...NEW ACCOUNT NEEDED %s \r\n \t \ + ldap_reslt %s " %(users, ldap_reslt)) + requested_user_ids = users_by_id.keys() requested_user_hrns = users_by_hrn.keys() logger.debug("SLABSLICE.PY \tverify_person requested_user_ids %s \ user_by_hrn %s " %(requested_user_ids, users_by_hrn)) - # existing slice users - - #existing_slice_users_filter = {'hrn': slice_record['PI'][0]} - #logger.debug(" SLABSLICE.PY \tverify_person requested_user_ids %s \ - #existing_slice_users_filter %s slice_record %s" %(requested_user_ids,\ - #existing_slice_users_filter,slice_record)) - - #existing_slice_users = \ - #self.driver.GetPersons([existing_slice_users_filter]) - #existing_slice_users = \ - #self.driver.GetPersons(existing_slice_users_filter, \ - #['hrn','pkey']) - #logger.debug("SLABSLICE.PY \tverify_person existing_slice_users %s " \ - #%(existing_slice_users)) + + #Check that the user of the slice in the slice record #matches the existing users try: @@ -484,9 +480,7 @@ class SlabSlices: except KeyError: pass - #existing_slice_user_hrns = [user['hrn'] for \ - #user in existing_slice_users] - + # users to be added, removed or updated #One user in one senslab slice : there should be no need #to remove/ add any user from/to a slice. @@ -505,8 +499,10 @@ class SlabSlices: added_user = users_dict[added_user_hrn] #hrn, type = urn_to_hrn(added_user['urn']) person = { - 'first_name': added_user.get('first_name', hrn), - 'last_name': added_user.get('last_name', hrn), + #'first_name': added_user.get('first_name', hrn), + #'last_name': added_user.get('last_name', hrn), + 'first_name': added_user['first_name'], + 'last_name': added_user['last_name'], 'person_id': added_user['person_id'], 'peer_person_id': None, 'keys': [], @@ -602,13 +598,12 @@ class SlabSlices: removed_keys = set(existing_keys).difference(requested_keys) for existing_key_id in keydict: if keydict[existing_key_id] in removed_keys: - try: - if peer: - self.driver.UnBindObjectFromPeer('key', \ - existing_key_id, peer['shortname']) - self.driver.DeleteKey(existing_key_id) - except: - pass + + if peer: + self.driver.UnBindObjectFromPeer('key', \ + existing_key_id, peer['shortname']) + self.driver.DeleteKey(existing_key_id) + #def verify_slice_attributes(self, slice, requested_slice_attributes, \ #append=False, admin=False): @@ -683,88 +678,4 @@ class SlabSlices: #value: %s, node_id: %s\nCause:%s'\ #% (name, value, node_id, str(error))) - #def create_slice_aggregate(self, xrn, rspec): - #hrn, type = urn_to_hrn(xrn) - ## Determine if this is a peer slice - #peer = self.get_peer(hrn) - #sfa_peer = self.get_sfa_peer(hrn) - - #spec = RSpec(rspec) - ## Get the slice record from sfa - #slicename = hrn_to_pl_slicename(hrn) - #slice = {} - #slice_record = None - #registry = self.api.registries[self.api.hrn] - #credential = self.api.getCredential() - - #site_id, remote_site_id = self.verify_site(registry, \ - #credential, hrn, peer, sfa_peer) - #slice = self.verify_slice(registry, credential, \ - #hrn, site_id, remote_site_id, peer, sfa_peer) - - ## find out where this slice is currently running - #nodelist = self.driver.GetNodes(slice['node_ids'], ['hostname']) - #hostnames = [node['hostname'] for node in nodelist] - - ## get netspec details - #nodespecs = spec.getDictsByTagName('NodeSpec') - - ## dict in which to store slice attributes to set for the nodes - #nodes = {} - #for nodespec in nodespecs: - #if isinstance(nodespec['name'], list): - #for nodename in nodespec['name']: - #nodes[nodename] = {} - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = \ - #rspec_attribute_value - #elif isinstance(nodespec['name'], StringTypes): - #nodename = nodespec['name'] - #nodes[nodename] = {} - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = rspec_attribute_value - - #for k in nodespec.keys(): - #rspec_attribute_value = nodespec[k] - #if (self.rspec_to_slice_tag.has_key(k)): - #slice_tag_name = self.rspec_to_slice_tag[k] - #nodes[nodename][slice_tag_name] = rspec_attribute_value - - #node_names = nodes.keys() - ## remove nodes not in rspec - #deleted_nodes = list(set(hostnames).difference(node_names)) - ## add nodes from rspec - #added_nodes = list(set(node_names).difference(hostnames)) - - #try: - #if peer: - #self.driver.UnBindObjectFromPeer('slice', \ - #slice['slice_id'], peer) - - #self.driver.LaunchExperimentOnOAR(slicename, added_nodes) - - ## Add recognized slice tags - #for node_name in node_names: - #node = nodes[node_name] - #for slice_tag in node.keys(): - #value = node[slice_tag] - #if (isinstance(value, list)): - #value = value[0] - - #self.driver.AddSliceTag(slicename, slice_tag, \ - #value, node_name) - - #self.driver.DeleteSliceFromNodes(slicename, deleted_nodes) - #finally: - #if peer: - #self.driver.BindObjectToPeer('slice', slice['slice_id'], \ - #peer, slice['peer_slice_id']) - - #return 1 - +