from sfa.senslab.OARrestapi import OARrestapi
from sfa.senslab.LDAPapi import LDAPapi
-from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
+from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab
from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object
from sfa.senslab.slabslices import SlabSlices
return result
+ def synchronize_oar_and_slice_table(self, slice_hrn = None):
+ #Get list of leases
+ oar_leases_list = self.GetReservedNodes()
+
+ logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list))
+ #Get list of slices/leases . multiple entry per user depending on number of jobs
+ #At this point we don't have the slice_hrn so that's why
+ #we are calling Getslices, which holds a field with slice_hrn
+
+ if slice_hrn :
+ sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list)
+ else :
+ sfa_slices_list = self.GetSlices()
+
+ sfa_slices_dict_by_slice_hrn = {}
+ for sfa_slice in sfa_slices_list:
+ if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn:
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = []
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+ else :
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+
+ for slice_hrn in sfa_slices_dict_by_slice_hrn:
+ list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn]
+ if slice_hrn =='senslab2.avakian_slice':
+ logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa))
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa)
+
+ return
+
+
+ def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list):
+
+ #Get list of slices/leases . multiple entry per user depending on number of jobs
+ #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+ sfa_slices_dict = {}
+ oar_leases_dict = {}
+ login = slice_hrn.split(".")[1].split("_")[0]
+
+ #Create dictionnaries based on the tuple user login/ job id
+ #for the leases list and the slices list
+
+ for sl in sfa_slices_list:
+ if sl['oar_job_id'] != [] :
+ for oar_jobid in sl['oar_job_id']:
+ if (login, oar_jobid) not in sfa_slices_dict:
+ sfa_slices_dict[(login,oar_jobid)] = sl
+
+ for lease in oar_leases_list:
+ if (lease['user'], lease['lease_id']) not in oar_leases_dict:
+ oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
+
+ #Find missing entries in the sfa slices list dict by comparing
+ #the keys in both dictionnaries
+ #Add the missing entries in the slice sneslab table
+
+ for lease in oar_leases_dict :
+ logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login))
+ if lease not in sfa_slices_dict and login == lease[0]:
+
+ #if lease in GetReservedNodes not in GetSlices update the db
+ #First get the list of nodes hostnames for this job
+ oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes']
+ oar_reserved_nodes_list = []
+ for node_dict in oar_reserved_nodes_listdict:
+ oar_reserved_nodes_list.append(node_dict['hostname'])
+ #And update the db with slice hrn, job id and node list
+ self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list)
+
+ for lease in sfa_slices_dict:
+ #Job is now terminated or in Error, either way ot is not going to run again
+ #Remove it from the db
+ if lease not in oar_leases_dict:
+ self.db.delete_job( slice_hrn, lease[1])
+
+ return
+
def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
users, options):
aggregate = SlabAggregate(self)
logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
%(rspec.version))
-
+ self.synchronize_oar_and_slice_table(slice_hrn)
# ensure site record exists?
# ensure slice record exists
sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
reqdict['method'] = "delete"
reqdict['strval'] = str(job_id)
+ self.db.delete_job(slice_hrn, job_id)
answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
reqdict,username)
logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \
if slice_filter_type in authorized_filter_types_list:
#Get list of slices based on the slice hrn
if slice_filter_type == 'slice_hrn':
- #There can be several jobs running for one slices
+
login = slice_filter.split(".")[1].split("_")[0]
#DO NOT USE RegSlice - reg_researchers to get the hrn of the user
#otherwise will mess up the RegRecord in Resolve, don't know
#why - SA 08/08/2012
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).all()
+ #Only one entry for one user = one slice in slice_senslab table
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
- #Get list of slices base on user id
+ #Get slice based on user id
if slice_filter_type == 'record_id_user':
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).all()
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
- if slicerec is []:
+ if slicerec is None:
return []
- slicerec_dictlist = []
- for record in slicerec:
- slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
- if login is None :
- login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0]
-
-
-
+ #slicerec_dictlist = []
+ slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
+ if login is None :
+ login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+
+ #for record in slicerec:
+ #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
+ #if login is None :
+ #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0]
+
+ #One slice can have multiple jobs
+ sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=slicerec_dict['slice_hrn']).all()
+ job_list = []
+ for job in sqljob_list:
+ job_list.append(job.dump_sqlalchemyobj_to_dict())
+
logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
slice record %s" \
- %(login, slicerec_dictlist))
- for slicerec_dict in slicerec_dictlist :
- if slicerec_dict['oar_job_id'] is not -1:
- #Check with OAR the status of the job if a job id is in
- #the slice record
- rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
- username = login)
- logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\
- %(rslt))
- if rslt :
- slicerec_dict.update(rslt)
- slicerec_dict.update({'hrn':\
- str(slicerec_dict['slice_hrn'])})
- #If GetJobsResources is empty, this means the job is
- #now in the 'Terminated' state
- #Update the slice record
- else :
- self.db.update_job(slice_filter, job_id = -1)
- slicerec_dict['oar_job_id'] = -1
- slicerec_dict.\
- update({'hrn':str(slicerec_dict['slice_hrn'])})
+ %(login, slicerec_dict))
+ #Several jobs for one slice
+ slicerec_dict['oar_job_id'] = []
+ for job in job_list :
+ #if slicerec_dict['oar_job_id'] is not -1:
+ #Check with OAR the status of the job if a job id is in
+ #the slice record
+
+ rslt = self.GetJobsResources(job['oar_job_id'], \
+ username = login)
+ logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\
+ %(rslt))
+ if rslt :
+ slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+ slicerec_dict.update(rslt)
+ slicerec_dict.update({'hrn':\
+ str(slicerec_dict['slice_hrn'])})
+ #If GetJobsResources is empty, this means the job is
+ #now in the 'Terminated' state
+ #Update the slice record
+ else :
+ self.db.delete_job(slice_filter, job['oar_job_id'])
+ slicerec_dict.\
+ update({'hrn':str(slicerec_dict['slice_hrn'])})
+
try:
- slicerec_dict['node_ids'] = slicerec_dict['node_list']
+ slicerec_dict['node_ids'] = job['node_list']
except KeyError:
pass
-
- logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dictlist %s"\
- %(slicerec_dictlist))
-
- return slicerec_dictlist
-
+
+ logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dict %s"\
+ %(slicerec_dict))
+
+ return [slicerec_dict]
+
else:
slice_list = slab_dbsession.query(SliceSenslab).all()
+ sqljob_list = slab_dbsession.query(JobSenslab).all()
+
+ job_list = []
+ for job in sqljob_list:
+ job_list.append(job.dump_sqlalchemyobj_to_dict())
+
return_slice_list = []
for record in slice_list:
return_slice_list.append(record.dump_sqlalchemyobj_to_dict())
-
+
+ for slicerec_dict in return_slice_list:
+ slicerec_dict['oar_job_id'] = []
+ for job in job_list:
+ if slicerec_dict['slice_hrn'] in job:
+ slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+
logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \
slice_filter %s " %(return_slice_list, slice_filter))
if jobid :
logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
- self.db.update_job( slice_name, jobid, added_nodes)
+ self.db.add_job( slice_name, jobid, added_nodes)
__configure_experiment(jobid, added_nodes)
__launch_senslab_experiment(jobid)
# Get user information
self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
- self.db.update_job(slice_record['hrn'], job_id = -1)
+
return
def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
unfiltered_reservation_list = self.GetReservedNodes()
+
+ ##Synchronize slice_table of sfa senslab db
+ #self.synchronize_oar_and_slice_table(unfiltered_reservation_list)
+
reservation_list = []
#Find the slice associated with this user senslab ldap uid
logger.debug(" SLABDRIVER.PY \tGetLeases ")
slice_info = query_slice_info.first()
else:
slice_info = None
+
resa_user_dict[resa['user']] = {}
resa_user_dict[resa['user']]['ldap_info'] = user
resa_user_dict[resa['user']]['slice_info'] = slice_info
return
#self.fill_record_slab_info(records)
-
-
+
+
"""
self.DeleteSliceFromNodes(slice_record)
- self.db.update_job(slice_record['hrn'], job_id = -1)
logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
return
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Table, Column, MetaData, join, ForeignKey
import sfa.storage.model as model
-
+from sfa.storage.model import RegSlice
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy import String
+from sfa.storage.alchemy import dbsession
#Dict holding the columns names of the table as keys
#and their type, used for creation of the table
class SliceSenslab (SlabBase):
__tablename__ = 'slice_senslab'
#record_id_user = Column(Integer, primary_key=True)
- # Multiple primary key aka composite primary key
- # so that we can have several job id for a given slice hrn
+
slice_hrn = Column(String,primary_key=True)
- oar_job_id = Column( Integer, primary_key=True)
peer_authority = Column( String,nullable = True)
record_id_slice = Column(Integer)
record_id_user = Column(Integer)
#oar_job_id = Column( Integer,default = -1)
- node_list = Column(postgresql.ARRAY(String), nullable =True)
+ #node_list = Column(postgresql.ARRAY(String), nullable =True)
- def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None):
- self.node_list = []
+ def __init__ (self, slice_hrn =None, record_id_slice=None, record_id_user= None,peer_authority=None):
if record_id_slice:
self.record_id_slice = record_id_slice
if slice_hrn:
self.slice_hrn = slice_hrn
- if oar_job_id:
- self.oar_job_id = oar_job_id
- if slice_hrn:
- self.slice_hrn = slice_hrn
if record_id_user:
self.record_id_user= record_id_user
if peer_authority:
def __repr__(self):
- result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s peer_authority =%s"% \
- (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list, self.peer_authority)
+ result="<Record id user =%s, slice hrn=%s, Record id slice =%s ,peer_authority =%s"% \
+ (self.record_id_user, self.slice_hrn, self.record_id_slice, self.peer_authority)
result += ">"
return result
'peer_authority':self.peer_authority,
'record_id':self.record_id_slice,
'record_id_user':self.record_id_user,
+ 'record_id_slice':self.record_id_slice, }
+ return dict
+
+
+class JobSenslab (SlabBase):
+ __tablename__ = 'job_senslab'
+ #record_id_user = Column(Integer, primary_key=True)
+ # Multiple primary key aka composite primary key
+ # so that we can have several job id for a given slice hrn
+ slice_hrn = Column(String,ForeignKey('slice_senslab.slice_hrn'))
+ oar_job_id = Column( Integer, primary_key=True)
+ record_id_slice = Column(Integer)
+ record_id_user = Column(Integer)
+
+ #oar_job_id = Column( Integer,default = -1)
+ node_list = Column(postgresql.ARRAY(String), nullable =True)
+
+ slice_complete = relationship("SliceSenslab", backref=backref('job_senslab', order_by=slice_hrn))
+
+ def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
+ self.node_list = []
+ if record_id_slice:
+ self.record_id_slice = record_id_slice
+ if slice_hrn:
+ self.slice_hrn = slice_hrn
+ if oar_job_id:
+ self.oar_job_id = oar_job_id
+ if record_id_user:
+ self.record_id_user= record_id_user
+
+
+ def __repr__(self):
+ result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s "% \
+ (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
+ result += ">"
+ return result
+
+ def dump_sqlalchemyobj_to_dict(self):
+ dict = {'slice_hrn':self.slice_hrn,
+ 'record_id_user':self.record_id_user,
'oar_job_id':self.oar_job_id,
'record_id_slice':self.record_id_slice,
'node_list':self.node_list}
return dict
+
#class PeerSenslab(SlabBase):
#__tablename__ = 'peer_senslab'
#peername = Column(String, nullable = False)
return
def add_job (self, hrn, job_id, nodes = None ):
- slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
- if slice_rec :
- user_record = slice_rec.reg_researchers
- slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
- record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id, nodes_list = nodes)
- logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
- slab_dbsession.add(slab_slice)
- slab_dbsession.commit()
+ job_row = slab_dbsession.query(JobSenslab).filter_by(oar_job_id=job_id).first()
+ if job_row is None:
+ slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
+ if slice_rec :
+ user_record = slice_rec.reg_researchers
+ slab_slice = JobSenslab(slice_hrn = hrn, oar_job_id = job_id, \
+ record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
+ #slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
+ #record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
+ logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
+ slab_dbsession.add(slab_slice)
+ slab_slice.node_list = nodes
+ slab_dbsession.commit()
+ else:
+ return
def delete_job (self, hrn, job_id):
- slab_slice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).first()
- slab_dbsession.delete(slab_slice)
+ #slab_slice =
+ slab_dbsession.query(JobSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).delete()
+ #slab_dbsession.delete(slab_slice)
slab_dbsession.commit()
#Updates the job_id and the nodes list
#The nodes list is never erased.
- def update_job(self, hrn, job_id, nodes = None ):
-
- if job_id == -1:
- #Delete the job in DB
- self.delete_job(hrn, job_id)
- else :
- self.add_job(hrn, job_id, nodes)
- #slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
- #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec)
- #if job_id is not None:
- #slice_rec.oar_job_id = job_id
- #if nodes is not None :
- #slice_rec.node_list = nodes
- #slab_dbsession.commit()
+
+
def find (self, name = None, filter_dict = None):
print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict)