from sfa.senslab.OARrestapi import OARrestapi
from sfa.senslab.LDAPapi import LDAPapi
-from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, \
- JobSenslab
+from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
+
from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
slab_xrn_object
from sfa.senslab.slabslices import SlabSlices
%(resources,res))
return result
-
- def synchronize_oar_and_slice_table(self, slice_hrn = None):
- #Get list of leases
- oar_leases_list = self.GetReservedNodes()
-
- logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table :\
- oar_leases_list %s\r\n" %( oar_leases_list))
- #Get list of slices/leases . multiple entry per user depending
- #on number of jobs
- #At this point we don't have the slice_hrn so that's why
- #we are calling Getslices, which holds a field with slice_hrn
-
- if slice_hrn :
- sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, \
- slice_filter_type = 'slice_hrn')
- self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \
- oar_leases_list, sfa_slices_list)
- else :
- sfa_slices_list = self.GetSlices()
-
- sfa_slices_dict_by_slice_hrn = {}
- for sfa_slice in sfa_slices_list:
- if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn:
- sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = []
-
- sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].\
- append(sfa_slice)
-
-
- for slice_hrn in sfa_slices_dict_by_slice_hrn:
- list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn]
- self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, \
- oar_leases_list, list_slices_sfa)
-
- return
-
-
- def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, \
- oar_leases_list, sfa_slices_list):
-
- #Get list of slices/leases .
- #multiple entry per user depending on number of jobs
-
- sfa_slices_dict = {}
- oar_leases_dict = {}
- login = slice_hrn.split(".")[1].split("_")[0]
-
- #Create dictionnaries based on the tuple user login/ job id
- #for the leases list and the slices list
-
- for sl in sfa_slices_list:
- if sl['oar_job_id'] != [] :
- #one entry in the dictionnary for each jobid/login, one login
- #can have multiple jobs running
- #for oar_jobid in sl['oar_job_id']:
- if (login, sl['oar_job_id']) not in sfa_slices_dict:
- sfa_slices_dict[(login,sl['oar_job_id'])] = sl
-
- for lease in oar_leases_list:
- if (lease['user'], lease['lease_id']) not in oar_leases_dict:
- oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
-
- #Find missing entries in the sfa slices list dict by comparing
- #the keys in both dictionnaries
- #Add the missing entries in the slice sneslab table
-
- for lease in oar_leases_dict :
-
- if lease not in sfa_slices_dict and login == lease[0]:
- #if lease in GetReservedNodes not in GetSlices
- #and the login of the job running matches then update the db
- #for this login
- #First get the list of nodes hostnames for this job
- oar_reserved_nodes_listdict = \
- oar_leases_dict[lease]['reserved_nodes']
- oar_reserved_nodes_list = []
- for node_dict in oar_reserved_nodes_listdict:
- oar_reserved_nodes_list.append(node_dict['hostname'])
- #And update the db with slice hrn, job id and node list
- self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list)
-
- for lease in sfa_slices_dict:
- #Job is now terminated or in Error,
- #either way ot is not going to run again
- #Remove it from the db
- if lease not in oar_leases_dict:
- self.db.delete_job( slice_hrn, lease[1])
- return
def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
users, options):
logger.debug("SLABDRIVER.PY \t create_sliver \tr spec.version %s slice_record %s " \
%(rspec.version,slice_record))
- self.synchronize_oar_and_slice_table(slice_hrn)
+ #self.synchronize_oar_and_slice_table(slice_hrn)
# ensure site record exists?
# ensure slice record exists
#Removed options to verify_slice SA 14/08/12
if not sfa_slice_list:
return 1
- sfa_slice = sfa_slice_list[0]
+ #Delete all in the slice
+ for sfa_slice in sfa_slice_list:
+
- logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
- slices = SlabSlices(self)
- # determine if this is a peer slice
-
- peer = slices.get_peer(slice_hrn)
- #TODO delete_sliver SA : UnBindObjectFromPeer should be
- #used when there is another
- #senslab testbed, which is not the case 14/08/12 .
+ logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
+ slices = SlabSlices(self)
+ # determine if this is a peer slice
- logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
- try:
- if peer:
- self.UnBindObjectFromPeer('slice', \
- sfa_slice['record_id_slice'], peer,None)
- self.DeleteSliceFromNodes(sfa_slice)
- finally:
- if peer:
- self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \
- peer, sfa_slice['peer_slice_id'])
- return 1
+ peer = slices.get_peer(slice_hrn)
+ #TODO delete_sliver SA : UnBindObjectFromPeer should be
+ #used when there is another
+ #senslab testbed, which is not the case 14/08/12 .
+
+ logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
+ try:
+ if peer:
+ self.UnBindObjectFromPeer('slice', \
+ sfa_slice['record_id_slice'], peer,None)
+ self.DeleteSliceFromNodes(sfa_slice)
+ finally:
+ if peer:
+ self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \
+ peer, sfa_slice['peer_slice_id'])
+ return 1
def AddSlice(self, slice_record):
#hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
return hostname_dict_list
- def GetReservedNodes(self):
+ def GetReservedNodes(self,username = None):
#Get the nodes in use and the reserved nodes
reservation_dict_list = \
- self.oar.parser.SendRequest("GET_reserved_nodes")
+ self.oar.parser.SendRequest("GET_reserved_nodes", username = username)
for resa in reservation_dict_list:
return return_site_list
- #warning return_fields_list paramr emoved (Not used)
+
+
+
def GetSlices(self, slice_filter = None, slice_filter_type = None):
#def GetSlices(self, slice_filter = None, slice_filter_type = None, \
#return_fields_list = None):
slicerec_dict = {}
authorized_filter_types_list = ['slice_hrn', 'record_id_user']
slicerec_dictlist = []
+
+
if slice_filter_type in authorized_filter_types_list:
- #Get list of slices based on the slice hrn
- if slice_filter_type == 'slice_hrn':
-
- login = slice_filter.split(".")[1].split("_")[0]
-
- #DO NOT USE RegSlice - reg_researchers to get the hrn of the user
- #otherwise will mess up the RegRecord in Resolve, don't know
- #why - SA 08/08/2012
-
- #Only one entry for one user = one slice in slice_senslab table
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
-
- #Get slice based on user id
- if slice_filter_type == 'record_id_user':
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
-
- if slicerec is None:
- return []
- #slicerec_dictlist = []
- fixed_slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
- if login is None :
- login = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
-
- #for record in slicerec:
- #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
- #if login is None :
- #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0]
-
- #One slice can have multiple jobs
- sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=fixed_slicerec_dict['slice_hrn']).all()
- job_list = []
- for job in sqljob_list:
- job_list.append(job.dump_sqlalchemyobj_to_dict())
+
+ def __get_slice_records(slice_filter = None, slice_filter_type = None):
+
+ login = None
+ #Get list of slices based on the slice hrn
+ if slice_filter_type == 'slice_hrn':
+
+ login = slice_filter.split(".")[1].split("_")[0]
+
+ #DO NOT USE RegSlice - reg_researchers to get the hrn of the user
+ #otherwise will mess up the RegRecord in Resolve, don't know
+ #why - SA 08/08/2012
+
+ #Only one entry for one user = one slice in slice_senslab table
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
+
+ #Get slice based on user id
+ if slice_filter_type == 'record_id_user':
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
+
+ if slicerec is None:
+ return login, []
+ else:
+ fixed_slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
+
+ if login is None :
+ login = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+ return login, fixed_slicerec_dict
+
+
+
+ login, fixed_slicerec_dict = __get_slice_records(slice_filter, slice_filter_type)
logger.debug(" SLABDRIVER \tGetSlices login %s \
slice record %s" \
%(login, fixed_slicerec_dict))
+
+
+
+ #One slice can have multiple jobs
+
+ leases_list = self.GetReservedNodes(username = login)
#Several jobs for one slice
- #TODO : Modify to make a diff with jobs not terminated = 1 OAR request SA 20/08/12
- for job in job_list :
+
+ for lease in leases_list :
slicerec_dict = {}
- slicerec_dict['oar_job_id'] = []
-
+
+
#Check with OAR the status of the job if a job id is in
#the slice record
- rslt = self.GetJobsResources(job['oar_job_id'], \
- username = login)
- logger.debug("SLABDRIVER.PY \tGetSlices job %s \trslt fromn GetJobsResources %s \r\n"\
- %(job, rslt))
- if rslt :
- slicerec_dict['oar_job_id']= job['oar_job_id']
- slicerec_dict.update(rslt)
- slicerec_dict.update(fixed_slicerec_dict)
- slicerec_dict.update({'hrn':\
- str(fixed_slicerec_dict['slice_hrn'])})
-
+
+
+ slicerec_dict['oar_job_id'] = lease['lease_id']
+ slicerec_dict.update({'node_ids':lease['reserved_nodes']})
+ slicerec_dict.update(fixed_slicerec_dict)
+ slicerec_dict.update({'hrn':\
+ str(fixed_slicerec_dict['slice_hrn'])})
- #If GetJobsResources is empty, this means the job is
- #now in the 'Terminated' state
- #Update the slice record
- else :
- self.db.delete_job(slice_filter, job['oar_job_id'])
- slicerec_dict.\
- update({'hrn':str(fixed_slicerec_dict['slice_hrn'])})
-
- try:
- slicerec_dict['node_ids'] = job['node_list']
- except KeyError:
- pass
-
+
slicerec_dictlist.append(slicerec_dict)
logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s slicerec_dictlist %s" %(slicerec_dict, slicerec_dictlist))
%(slicerec_dictlist))
return slicerec_dictlist
-
+
else:
+
slice_list = slab_dbsession.query(SliceSenslab).all()
- sqljob_list = slab_dbsession.query(JobSenslab).all()
+ leases_list = self.GetReservedNodes()
- job_list = []
- for job in sqljob_list:
- job_list.append(job.dump_sqlalchemyobj_to_dict())
-
+
+ slicerec_dictlist = []
return_slice_list = []
for record in slice_list:
return_slice_list.append(record.dump_sqlalchemyobj_to_dict())
- for slicerec_dict in return_slice_list:
- slicerec_dict['oar_job_id'] = []
- for job in job_list:
- if slicerec_dict['slice_hrn'] in job:
- slicerec_dict['oar_job_id'].append(job['oar_job_id'])
-
+ for fixed_slicerec_dict in return_slice_list:
+ slicerec_dict = {}
+ owner = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+ for lease in leases_list:
+ if owner == lease['user']:
+ slicerec_dict['oar_job_id'] = lease['lease_id']
+ slicerec_dict.update({'node_ids':lease['reserved_nodes']})
+ slicerec_dict.update(fixed_slicerec_dict)
+ slicerec_dict.update({'hrn':\
+ str(fixed_slicerec_dict['slice_hrn'])})
+ slicerec_dictlist.append(slicerec_dict)
+
logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \
slice_filter %s " %(return_slice_list, slice_filter))
#return_slice_list = parse_filter(sliceslist, \
#slice_filter,'slice', return_fields_list)
- return return_slice_list
-
-
-
+ return slicerec_dictlist
def testbed_name (self): return self.hrn
#Delete the jobs from job_senslab table
def DeleteSliceFromNodes(self, slice_record):
- for job_id in slice_record['oar_job_id']:
- self.DeleteJobs(job_id, slice_record['hrn'])
+ self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
return
'record_id_slice':self.record_id_slice, }
return dict
-
-class JobSenslab (SlabBase):
- __tablename__ = 'job_senslab'
- #record_id_user = Column(Integer, primary_key=True)
- # Multiple primary key aka composite primary key
- # so that we can have several job id for a given slice hrn
- slice_hrn = Column(String,ForeignKey('slice_senslab.slice_hrn'))
- oar_job_id = Column( Integer, primary_key=True)
- record_id_slice = Column(Integer)
- record_id_user = Column(Integer)
-
- #oar_job_id = Column( Integer,default = -1)
- node_list = Column(postgresql.ARRAY(String), nullable =True)
-
- slice_complete = relationship("SliceSenslab", backref=backref('job_senslab', order_by=slice_hrn))
-
- def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
- self.node_list = []
- if record_id_slice:
- self.record_id_slice = record_id_slice
- if slice_hrn:
- self.slice_hrn = slice_hrn
- if oar_job_id:
- self.oar_job_id = oar_job_id
- if record_id_user:
- self.record_id_user= record_id_user
-
-
- def __repr__(self):
- result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s "% \
- (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
- result += ">"
- return result
-
- def dump_sqlalchemyobj_to_dict(self):
- dict = {'slice_hrn':self.slice_hrn,
- 'record_id_user':self.record_id_user,
- 'oar_job_id':self.oar_job_id,
- 'record_id_slice':self.record_id_slice,
- 'node_list':self.node_list}
- return dict
+
#class PeerSenslab(SlabBase):
#__tablename__ = 'peer_senslab'
SlabBase.metadata.create_all(slab_engine)
return
- def add_job (self, hrn, job_id, nodes = None ):
- job_row = slab_dbsession.query(JobSenslab).filter_by(oar_job_id=job_id).first()
- if job_row is None:
- slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
- if slice_rec :
- user_record = slice_rec.reg_researchers
- slab_slice = JobSenslab(slice_hrn = hrn, oar_job_id = job_id, \
- record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
- #slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
- #record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
- logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
- slab_dbsession.add(slab_slice)
- slab_slice.node_list = nodes
- slab_dbsession.commit()
- else:
- return
-
-
- def delete_job (self, hrn, job_id):
- #slab_slice =
- slab_dbsession.query(JobSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).delete()
- #slab_dbsession.delete(slab_slice)
- slab_dbsession.commit()
-
- #Updates the job_id and the nodes list
- #The nodes list is never erased.
-
-
-
- def find (self, name = None, filter_dict = None):
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict)
+
- #Filter_by can not handle more than one argument, hence these functions
- def filter_id_user(query, user_id):
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user"
- return query.filter_by(record_id_user = user_id)
-
- def filter_job(query, job):
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
- return query.filter_by(oar_job_id = job)
-
- def filer_id_slice (query, id_slice):
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice"
- return query.filter_by(record_id_slice = id_slice)
-
- def filter_slice_hrn(query, hrn):
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn"
- return query.filter_by(slice_hrn = hrn)
-
-
- extended_filter = {'record_id_user': filter_id_user,
- 'oar_job_id':filter_job,
- 'record_id_slice': filer_id_slice,
- 'slice_hrn': filter_slice_hrn}
-
- Q = slab_dbsession.query(SliceSenslab)
-
- if filter_dict is not None:
- for k in filter_dict:
- try:
- newQ= extended_filter[k](Q, filter_dict[k])
- Q = newQ
- except KeyError:
- print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
- print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
- rec = Q.first()
- print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec)
- return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
- #reclist = []
- ##for rec in Q.all():
- #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
- #return reclist
-
def verify_persons(self, slice_hrn, slice_record, users, peer, sfa_peer, \
options={}):
+ """
+ users is a record list. Records can either be local records
+ or users records from known and trusted federated sites.
+ If the user is from another site that senslab doesn't trust yet,
+ then Resolve will raise an error before getting to create_sliver.
+ """
#TODO SA 21/08/12 verify_persons Needs review
- users_by_id = {}
- users_by_hrn = {}
+
+
+ users_by_id = {}
+ users_by_hrn = {}
+ #users_dict : dict whose keys can either be the user's hrn or its id.
+ #Values contains only id and hrn
users_dict = {}
-
+
+ #First create dicts by hrn and id for each user in the user record list:
for user in users:
if 'urn' in user and (not 'hrn' in user ) :
'hrn':user['hrn']}
- logger.debug( "SLABSLICE.PY \tverify_person \
+ logger.debug( "SLABSLICE.PY \t verify_person \
users_dict %s \r\n user_by_hrn %s \r\n \
\tusers_by_id %s " \
%(users_dict,users_by_hrn, users_by_id))
existing_user_ids = []
existing_user_hrns = []
existing_users = []
- #Check if user is in Senslab LDAP using its hrn.
- #Assuming Senslab is centralised : one LDAP for all sites,
+ # Check if user is in Senslab LDAP using its hrn.
+ # Assuming Senslab is centralised : one LDAP for all sites,
# user_id unknown from LDAP
- # LDAP does not provide users id, therefore we rely on hrns
+ # LDAP does not provide users id, therefore we rely on hrns containing
+ # the login of the user.
# If the hrn is not a senslab hrn, the user may not be in LDAP.
if users_by_hrn:
- #Construct the list of filters for GetPersons
+ #Construct the list of filters (list of dicts) for GetPersons
filter_user = []
for hrn in users_by_hrn:
filter_user.append (users_by_hrn[hrn])
logger.debug(" SLABSLICE.PY \tverify_person filter_user %s " \
- %(filter_user))
- existing_users = self.driver.GetPersons(filter_user)
+ %(filter_user))
+ #Check user's in LDAP with GetPersons
+ #Needed because what if the user has been deleted in LDAP but
+ #is still in SFA?
+ existing_users = self.driver.GetPersons(filter_user)
+
+ #User's in senslab LDAP
if existing_users:
for user in existing_users :
existing_user_hrns.append(users_dict[user['hrn']]['hrn'])
existing_user_ids.\
append(users_dict[user['hrn']]['person_id'])
- #User from another federated site ,
- #does not have a senslab account yet?
- #Check in the LDAP if we know email,
- #maybe he has multiple SFA accounts = multiple hrns.
- #Check before adding them to LDAP
+ # User from another known trusted federated site. Check
+ # if a senslab account matching the email has already been created.
else:
req = 'mail='
if isinstance(users, list):
+
req += users[0]['email']
else:
req += users['email']
except KeyError:
pass
- #existing_slice_user_hrns = [user['hrn'] for \
- #user in existing_slice_users]
-
+
# users to be added, removed or updated
#One user in one senslab slice : there should be no need
#to remove/ add any user from/to a slice.