import subprocess
from datetime import datetime
-from dateutil import tz
-from time import strftime, gmtime
+from time import gmtime
from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
from sfa.util.sfalogging import logger
from sfa.storage.alchemy import dbsession
-from sfa.storage.model import RegRecord, RegUser, RegSlice
+from sfa.storage.model import RegRecord, RegUser
from sfa.trust.credential import Credential
from sfa.senslab.OARrestapi import OARrestapi
from sfa.senslab.LDAPapi import LDAPapi
-from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab
-from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object
+from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, \
+ JobSenslab
+from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
+ slab_xrn_object
from sfa.senslab.slabslices import SlabSlices
#Get list of leases
oar_leases_list = self.GetReservedNodes()
- logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list))
- #Get list of slices/leases . multiple entry per user depending on number of jobs
+ logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table :\
+ oar_leases_list %s\r\n" %( oar_leases_list))
+ #Get list of slices/leases . multiple entry per user depending
+ #on number of jobs
#At this point we don't have the slice_hrn so that's why
#we are calling Getslices, which holds a field with slice_hrn
if slice_hrn :
- sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
- self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list)
+ sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, \
+ slice_filter_type = 'slice_hrn')
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \
+ oar_leases_list, sfa_slices_list)
else :
sfa_slices_list = self.GetSlices()
for sfa_slice in sfa_slices_list:
if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn:
sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = []
- sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
- else :
- sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].\
+ append(sfa_slice)
+
for slice_hrn in sfa_slices_dict_by_slice_hrn:
list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn]
- if slice_hrn =='senslab2.avakian_slice':
- logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa))
- self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa)
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \
+ oar_leases_list, list_slices_sfa)
return
- def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list):
+ def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, \
+ oar_leases_list, sfa_slices_list):
- #Get list of slices/leases . multiple entry per user depending on number of jobs
- #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+ #Get list of slices/leases .
+ #multiple entry per user depending on number of jobs
+
sfa_slices_dict = {}
oar_leases_dict = {}
login = slice_hrn.split(".")[1].split("_")[0]
for sl in sfa_slices_list:
if sl['oar_job_id'] != [] :
+ #one entry in the dictionnary for each jobid/login, one login
+ #can have multiple jobs running
for oar_jobid in sl['oar_job_id']:
if (login, oar_jobid) not in sfa_slices_dict:
sfa_slices_dict[(login,oar_jobid)] = sl
for lease in oar_leases_list:
if (lease['user'], lease['lease_id']) not in oar_leases_dict:
- oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
+ oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
#Find missing entries in the sfa slices list dict by comparing
#the keys in both dictionnaries
#Add the missing entries in the slice sneslab table
for lease in oar_leases_dict :
- logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login))
- if lease not in sfa_slices_dict and login == lease[0]:
-
- #if lease in GetReservedNodes not in GetSlices update the db
+
+ if lease not in sfa_slices_dict and login == lease[0]:
+ #if lease in GetReservedNodes not in GetSlices
+ #and the login of the job running matches then update the db
+ #for this login
#First get the list of nodes hostnames for this job
- oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes']
+ oar_reserved_nodes_listdict = \
+ oar_leases_dict[lease]['reserved_nodes']
oar_reserved_nodes_list = []
for node_dict in oar_reserved_nodes_listdict:
oar_reserved_nodes_list.append(node_dict['hostname'])
self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list)
for lease in sfa_slices_dict:
- #Job is now terminated or in Error, either way ot is not going to run again
+ #Job is now terminated or in Error,
+ #either way ot is not going to run again
#Remove it from the db
if lease not in oar_leases_dict:
- self.db.delete_job( slice_hrn, lease[1])
-
+ self.db.delete_job( slice_hrn, lease[1])
return
def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
self.synchronize_oar_and_slice_table(slice_hrn)
# ensure site record exists?
# ensure slice record exists
+ #Removed options to verify_slice SA 14/08/12
sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
- sfa_peer, options=options)
- requested_attributes = rspec.version.get_slice_attributes()
+ sfa_peer)
+
+ #requested_attributes returned by rspec.version.get_slice_attributes()
+ #unused, removed SA 13/08/12
+ rspec.version.get_slice_attributes()
logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
# ensure person records exists
- persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
+ #verify_persons returns added persons but since the return value
+ #is not used
+ slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
sfa_peer, options=options)
for node in rspec.version.get_nodes_with_slivers()]
l = [ node for node in rspec.version.get_nodes_with_slivers() ]
logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
- requested_slivers %s listnodes %s" %(requested_slivers,l))
-
- nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
+ requested_slivers %s listnodes %s" \
+ %(requested_slivers,l))
+ #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
+ slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
# add/remove leases
requested_lease_list = []
if single_requested_lease.get('hostname'):
requested_lease_list.append(single_requested_lease)
- #dCreate dict of leases by start_time, regrouping nodes reserved at the same
+ #dCreate dict of leases by start_time, regrouping nodes reserved
+ #at the same
#time, for the same amount of time = one job on OAR
requested_job_dict = {}
for lease in requested_lease_list:
logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s " %(requested_job_dict))
-
- leases = slices.verify_slice_leases(sfa_slice, \
+ #verify_slice_leases returns the leases , but the return value is unused
+ #here. Removed SA 13/08/12
+ slices.verify_slice_leases(sfa_slice, \
requested_job_dict, kept_leases, peer)
return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
def delete_sliver (self, slice_urn, slice_hrn, creds, options):
- sfa_slice = self.GetSlices(slice_filter = slice_hrn, \
+ sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \
slice_filter_type = 'slice_hrn')
- logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
- if not sfa_slice:
+
+ if not sfa_slice_list:
return 1
-
+
+ sfa_slice = sfa_slice_list[0]
+
+ logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
slices = SlabSlices(self)
# determine if this is a peer slice
peer = slices.get_peer(slice_hrn)
+ #TODO delete_sliver SA : UnBindObjectFromPeer should be
+ #used when there is another
+ #senslab testbed, which is not the case 14/08/12 .
+
logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
try:
if peer:
self.UnBindObjectFromPeer('slice', \
- sfa_slice['record_id_slice'], peer)
+ sfa_slice['record_id_slice'], peer,None)
self.DeleteSliceFromNodes(sfa_slice)
finally:
if peer:
def remove (self, sfa_record):
sfa_record_type = sfa_record['type']
hrn = sfa_record['hrn']
- record_id = sfa_record['record_id']
if sfa_record_type == 'user':
#get user from senslab ldap
elif sfa_record_type == 'slice':
if self.GetSlices(slice_filter = hrn, \
slice_filter_type = 'slice_hrn'):
- self.DeleteSlice(sfa_record_type)
+ self.DeleteSlice(sfa_record)
#elif type == 'authority':
#if self.GetSites(pointer):
logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
%(records_list))
- except:
+ except KeyError:
pass
return_records = records_list
#TODO : Handling OR request in make_ldap_filters_from_records
#instead of the for loop
#over the records' list
- def GetPersons(self, person_filter=None, return_fields_list=None):
+ def GetPersons(self, person_filter=None):
"""
person_filter should be a list of dictionnaries when not set to None.
Returns a list of users whose accounts are enabled found in ldap.
self.db.delete_job(slice_hrn, job_id)
answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
reqdict,username)
- logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \
- %(job_id,answer, username))
+ logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
+ username %s" %(job_id,answer, username))
return answer
#assigned_n = ['node', 'node_uri']
req = "GET_jobs_id_resources"
- node_list_k = 'reserved_resources'
+
#Get job resources list from OAR
node_id_list = self.oar.parser.SendRequest(req, job_id, username)
hostname_list = \
self.__get_hostnames_from_oar_node_ids(node_id_list)
- #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
- #node_list_k)
+
#Replaces the previous entry "assigned_network_address" /
#"reserved_resources"
#with "node_ids"
logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
oar_id_node_dict %s" %(oar_id_node_dict))
- hostname_list = []
+
hostname_dict_list = []
for resource_id in resource_id_list:
#Because jobs requested "asap" do not have defined resources
def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
- def convert_ints(tmpdict, int_fields):
- for field in int_fields:
- if field in tmpdict:
- tmpdict[field] = int(tmpdict[field])
slab_record = {}
#for field in record:
added_nodes.
"""
-
- site_list = []
+
nodeid_list = []
- resource = ""
reqdict = {}
return
- #Delete the jobs and updates the job id in the senslab table
- #to set it to -1
- #Does not clear the node list
+ #Delete the jobs from job_senslab table
def DeleteSliceFromNodes(self, slice_record):
- # Get user information
-
- self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
+ for job_id in slice_record['oar_job_id']:
+ self.DeleteJobs(job_id, slice_record['hrn'])
return
grain = 60
return grain
- def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
+ def GetLeases(self, lease_filter_dict=None):
unfiltered_reservation_list = self.GetReservedNodes()
##Synchronize slice_table of sfa senslab db
%(resa_user_dict))
for resa in unfiltered_reservation_list:
- #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
- #ldap_info = ldap_info[0][1]
-
- #user = dbsession.query(RegUser).filter_by(email = \
- #ldap_info['mail'][0]).first()
- ##Separated in case user not in database : record_id not defined SA 17/07//12
- #query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
- #if query_slice_info:
- #slice_info = query_slice_info.first()
+
#Put the slice_urn
resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info'].slice_hrn
resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
for job_id in leases_id_list:
self.DeleteJobs(job_id, slice_hrn)
- logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \r\n " %(leases_id_list, slice_hrn))
+ logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
+ \r\n " %(leases_id_list, slice_hrn))
return
logger.debug("SLABSLICES \ get_peer slice_authority %s \
site_authority %s hrn %s" %(slice_authority, \
site_authority, hrn))
+ #This slice belongs to the current site
+ if site_authority == self.driver.root_auth :
+ return None
# check if we are already peered with this site_authority, if so
#peers = self.driver.GetPeers({})
peers = self.driver.GetPeers(peer_filter = slice_authority)
self.driver.UnBindObjectFromPeer('senslab2', 'slice', \
sfa_slice['record_id_slice'], peer.hrn)
- deleted = self.driver.DeleteLeases(deleted_leases, \
+ self.driver.DeleteLeases(deleted_leases, \
sfa_slice['name'])
#TODO : catch other exception?
#Add new leases
for start_time in requested_jobs_dict:
job = requested_jobs_dict[start_time]
- added = self.driver.AddLeases(job['hostname'], \
+ self.driver.AddLeases(job['hostname'], \
sfa_slice, int(job['start_time']), \
int(job['duration']))
# remove nodes not in rspec
deleted_nodes = list(set(current_slivers).\
difference(requested_slivers))
-
- # add nodes from rspec
- added_nodes = list(set(requested_slivers).difference(current_slivers))
- try:
- #if peer:
- #self.driver.UnBindObjectFromPeer('slice', slice['slice_id'], \
- #peer['shortname'])
- #PI is a list, get the only username in this list
- #so that the OAR/LDAP knows the user:
- #remove the authority from the name
- tmp = sfa_slice['PI'][0].split(".")
- username = tmp[(len(tmp)-1)]
- #Update the table with the nodes that populate the slice
- logger.debug("SLABSLICES \tverify_slice_nodes slice %s \r\n \r\n deleted_nodes %s"\
- %(sfa_slice,deleted_nodes))
- #self.driver.db.update_job(sfa_slice['name'], nodes = added_nodes)
-
- #If there is a timeslot specified, then a job can be launched
- #try:
- ##slot = sfa_slice['timeslot']
- #self.driver.LaunchExperimentOnOAR(sfa_slice, added_nodes, \
- #username)
- #except KeyError:
- #logger.log_exc("SLABSLICES \verify_slice_nodes KeyError \
- #sfa_slice %s " %(sfa_slice))
+ # add nodes from rspec
+ #added_nodes = list(set(requested_slivers).difference(current_slivers))
+ #Update the table with the nodes that populate the slice
+ logger.debug("SLABSLICES \tverify_slice_nodes slice %s\
+ \r\n \r\n deleted_nodes %s"\
+ %(sfa_slice,deleted_nodes))
if deleted_nodes:
self.driver.DeleteSliceFromNodes(sfa_slice['name'], \
deleted_nodes)
- #return added_nodes
- except:
- logger.log_exc('Failed to add/remove slice from nodes')
+ return nodes
+
def free_egre_key(self):
#return site
- def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer, options={}):
+ def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer):
#login_base = slice_hrn.split(".")[0]
slicename = slice_hrn
users_dict[user['hrn']] = {'person_id':user['person_id'], \
'hrn':user['hrn']}
+
logger.debug( "SLABSLICE.PY \tverify_person \
users_dict %s \r\n user_by_hrn %s \r\n \
\tusers_by_id %s " \
if existing_users:
for user in existing_users :
#for k in users_dict[user['hrn']] :
+
existing_user_hrns.append(users_dict[user['hrn']]['hrn'])
existing_user_ids.\
append(users_dict[user['hrn']]['person_id'])
ldap_reslt = self.driver.ldap.LdapSearch(users)
if ldap_reslt:
existing_users = ldap_reslt[0]
+ #TODO : DEBUG user undefined ? SA 14/08/12
existing_user_hrns.append(users_dict[user['hrn']]['hrn'])
existing_user_ids.\
append(users_dict[user['hrn']]['person_id'])
removed_keys = set(existing_keys).difference(requested_keys)
for existing_key_id in keydict:
if keydict[existing_key_id] in removed_keys:
- try:
- if peer:
- self.driver.UnBindObjectFromPeer('key', \
- existing_key_id, peer['shortname'])
- self.driver.DeleteKey(existing_key_id)
- except:
- pass
+
+ if peer:
+ self.driver.UnBindObjectFromPeer('key', \
+ existing_key_id, peer['shortname'])
+ self.driver.DeleteKey(existing_key_id)
+
#def verify_slice_attributes(self, slice, requested_slice_attributes, \
#append=False, admin=False):
#value: %s, node_id: %s\nCause:%s'\
#% (name, value, node_id, str(error)))
- #def create_slice_aggregate(self, xrn, rspec):
- #hrn, type = urn_to_hrn(xrn)
- ## Determine if this is a peer slice
- #peer = self.get_peer(hrn)
- #sfa_peer = self.get_sfa_peer(hrn)
-
- #spec = RSpec(rspec)
- ## Get the slice record from sfa
- #slicename = hrn_to_pl_slicename(hrn)
- #slice = {}
- #slice_record = None
- #registry = self.api.registries[self.api.hrn]
- #credential = self.api.getCredential()
-
- #site_id, remote_site_id = self.verify_site(registry, \
- #credential, hrn, peer, sfa_peer)
- #slice = self.verify_slice(registry, credential, \
- #hrn, site_id, remote_site_id, peer, sfa_peer)
-
- ## find out where this slice is currently running
- #nodelist = self.driver.GetNodes(slice['node_ids'], ['hostname'])
- #hostnames = [node['hostname'] for node in nodelist]
-
- ## get netspec details
- #nodespecs = spec.getDictsByTagName('NodeSpec')
-
- ## dict in which to store slice attributes to set for the nodes
- #nodes = {}
- #for nodespec in nodespecs:
- #if isinstance(nodespec['name'], list):
- #for nodename in nodespec['name']:
- #nodes[nodename] = {}
- #for k in nodespec.keys():
- #rspec_attribute_value = nodespec[k]
- #if (self.rspec_to_slice_tag.has_key(k)):
- #slice_tag_name = self.rspec_to_slice_tag[k]
- #nodes[nodename][slice_tag_name] = \
- #rspec_attribute_value
- #elif isinstance(nodespec['name'], StringTypes):
- #nodename = nodespec['name']
- #nodes[nodename] = {}
- #for k in nodespec.keys():
- #rspec_attribute_value = nodespec[k]
- #if (self.rspec_to_slice_tag.has_key(k)):
- #slice_tag_name = self.rspec_to_slice_tag[k]
- #nodes[nodename][slice_tag_name] = rspec_attribute_value
-
- #for k in nodespec.keys():
- #rspec_attribute_value = nodespec[k]
- #if (self.rspec_to_slice_tag.has_key(k)):
- #slice_tag_name = self.rspec_to_slice_tag[k]
- #nodes[nodename][slice_tag_name] = rspec_attribute_value
-
- #node_names = nodes.keys()
- ## remove nodes not in rspec
- #deleted_nodes = list(set(hostnames).difference(node_names))
- ## add nodes from rspec
- #added_nodes = list(set(node_names).difference(hostnames))
-
- #try:
- #if peer:
- #self.driver.UnBindObjectFromPeer('slice', \
- #slice['slice_id'], peer)
-
- #self.driver.LaunchExperimentOnOAR(slicename, added_nodes)
-
- ## Add recognized slice tags
- #for node_name in node_names:
- #node = nodes[node_name]
- #for slice_tag in node.keys():
- #value = node[slice_tag]
- #if (isinstance(value, list)):
- #value = value[0]
-
- #self.driver.AddSliceTag(slicename, slice_tag, \
- #value, node_name)
-
- #self.driver.DeleteSliceFromNodes(slicename, deleted_nodes)
- #finally:
- #if peer:
- #self.driver.BindObjectToPeer('slice', slice['slice_id'], \
- #peer, slice['peer_slice_id'])
-
- #return 1
-
+
\ No newline at end of file