from sfa.util.sfalogging import logger
from sfa.storage.alchemy import dbsession
-from sfa.storage.model import RegRecord, RegUser
+from sfa.storage.model import RegRecord, RegUser, RegSlice
from sfa.trust.credential import Credential
from sfa.rspecs.rspec import RSpec
from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
-from sfa.planetlab.plxrn import slicename_to_hrn, hrn_to_pl_slicename, \
- hostname_to_urn, \
- xrn_to_hostname
+
## thierry: everything that is API-related (i.e. handling incoming requests)
# is taken care of
from sfa.senslab.OARrestapi import OARrestapi
from sfa.senslab.LDAPapi import LDAPapi
-from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
-from sfa.senslab.slabaggregate import SlabAggregate
+from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab
+from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object
from sfa.senslab.slabslices import SlabSlices
top_level_status = 'unknown'
nodes_in_slice = sl['node_ids']
-
+ recuser = dbsession.query(RegRecord).filter_by(record_id = \
+ sl['record_id_user']).first()
+ sl.update({'user':recuser.hrn})
if len(nodes_in_slice) is 0:
raise SliverDoesNotExist("No slivers allocated ")
else:
result = {}
result['geni_urn'] = slice_urn
- result['pl_login'] = sl['job_user'] #For compatibility
+ result['pl_login'] = sl['user'] #For compatibility
timestamp = float(sl['startTime']) + float(sl['walltime'])
return result
+ def synchronize_oar_and_slice_table(self, slice_hrn = None):
+ #Get list of leases
+ oar_leases_list = self.GetReservedNodes()
+
+ logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list))
+ #Get list of slices/leases . multiple entry per user depending on number of jobs
+ #At this point we don't have the slice_hrn so that's why
+ #we are calling Getslices, which holds a field with slice_hrn
+
+ if slice_hrn :
+ sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list)
+ else :
+ sfa_slices_list = self.GetSlices()
+
+ sfa_slices_dict_by_slice_hrn = {}
+ for sfa_slice in sfa_slices_list:
+ if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn:
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = []
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+ else :
+ sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+
+ for slice_hrn in sfa_slices_dict_by_slice_hrn:
+ list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn]
+ if slice_hrn =='senslab2.avakian_slice':
+ logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa))
+ self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa)
+
+ return
+
+
+ def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list):
+
+ #Get list of slices/leases . multiple entry per user depending on number of jobs
+ #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+ sfa_slices_dict = {}
+ oar_leases_dict = {}
+ login = slice_hrn.split(".")[1].split("_")[0]
+
+ #Create dictionnaries based on the tuple user login/ job id
+ #for the leases list and the slices list
+
+ for sl in sfa_slices_list:
+ if sl['oar_job_id'] != [] :
+ for oar_jobid in sl['oar_job_id']:
+ if (login, oar_jobid) not in sfa_slices_dict:
+ sfa_slices_dict[(login,oar_jobid)] = sl
+
+ for lease in oar_leases_list:
+ if (lease['user'], lease['lease_id']) not in oar_leases_dict:
+ oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
+
+ #Find missing entries in the sfa slices list dict by comparing
+ #the keys in both dictionnaries
+ #Add the missing entries in the slice sneslab table
+
+ for lease in oar_leases_dict :
+ logger.debug(" =============SLABDRIVER \t\t\ synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login))
+ if lease not in sfa_slices_dict and login == lease[0]:
+
+ #if lease in GetReservedNodes not in GetSlices update the db
+ #First get the list of nodes hostnames for this job
+ oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes']
+ oar_reserved_nodes_list = []
+ for node_dict in oar_reserved_nodes_listdict:
+ oar_reserved_nodes_list.append(node_dict['hostname'])
+ #And update the db with slice hrn, job id and node list
+ self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list)
+
+ for lease in sfa_slices_dict:
+ #Job is now terminated or in Error, either way ot is not going to run again
+ #Remove it from the db
+ if lease not in oar_leases_dict:
+ self.db.delete_job( slice_hrn, lease[1])
+
+ return
+
def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
users, options):
- logger.debug("SLABDRIVER.PY \tcreate_sliver ")
aggregate = SlabAggregate(self)
slices = SlabSlices(self)
logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
%(rspec.version))
-
+ self.synchronize_oar_and_slice_table(slice_hrn)
# ensure site record exists?
# ensure slice record exists
sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
sfa_peer, options=options)
requested_attributes = rspec.version.get_slice_attributes()
-
- if requested_attributes:
- for attrib_dict in requested_attributes:
- if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
- is not None:
- sfa_slice.update({'timeslot':attrib_dict['timeslot']})
+
logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
# ensure person records exists
persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
sfa_peer, options=options)
- # ensure slice attributes exists?
# add/remove slice from nodes
requested_slivers = [node.get('component_name') \
for node in rspec.version.get_nodes_with_slivers()]
+ l = [ node for node in rspec.version.get_nodes_with_slivers() ]
logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
- requested_slivers %s " %(requested_slivers))
+ requested_slivers %s listnodes %s" %(requested_slivers,l))
nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
# add/remove leases
- requested_leases = []
+ requested_lease_list = []
kept_leases = []
for lease in rspec.version.get_leases():
- requested_lease = {}
+ single_requested_lease = {}
+ logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
if not lease.get('lease_id'):
- requested_lease['hostname'] = \
- xrn_to_hostname(lease.get('component_id').strip())
- requested_lease['start_time'] = lease.get('start_time')
- requested_lease['duration'] = lease.get('duration')
+ single_requested_lease['hostname'] = \
+ slab_xrn_to_hostname(lease.get('component_id').strip())
+ single_requested_lease['start_time'] = lease.get('start_time')
+ single_requested_lease['duration'] = lease.get('duration')
else:
kept_leases.append(int(lease['lease_id']))
- if requested_lease.get('hostname'):
- requested_leases.append(requested_lease)
+ if single_requested_lease.get('hostname'):
+ requested_lease_list.append(single_requested_lease)
+
+ #dCreate dict of leases by start_time, regrouping nodes reserved at the same
+ #time, for the same amount of time = one job on OAR
+ requested_job_dict = {}
+ for lease in requested_lease_list:
+
+ #In case it is an asap experiment start_time is empty
+ if lease['start_time'] == '':
+ lease['start_time'] = '0'
+
+ if lease['start_time'] not in requested_job_dict:
+ if isinstance(lease['hostname'], str):
+ lease['hostname'] = [lease['hostname']]
+
+ requested_job_dict[lease['start_time']] = lease
+ else :
+ job_lease = requested_job_dict[lease['start_time']]
+ if lease['duration'] == job_lease['duration'] :
+ job_lease['hostname'].append(lease['hostname'])
+
+
+
+
+ logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s " %(requested_job_dict))
+
leases = slices.verify_slice_leases(sfa_slice, \
- requested_leases, kept_leases, peer)
+ requested_job_dict, kept_leases, peer)
return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
slices = SlabSlices(self)
# determine if this is a peer slice
- peer = slices.get_peer(slice_hrn)
+ peer = slices.get_peer(slice_hrn)
+ logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
try:
if peer:
self.UnBindObjectFromPeer('slice', \
self.DeleteSliceFromNodes(sfa_slice)
finally:
if peer:
- self.BindObjectToPeer('slice', sfa_slice['slice_id'], \
+ self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \
peer, sfa_slice['peer_slice_id'])
return 1
#return rspec
#panos: passing user-defined options
- logger.debug("SLABDRIVER \tlist_resources rspec " )
aggregate = SlabAggregate(self)
origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
options.update({'origin_hrn':origin_hrn})
#return slices
# get data from db
- logger.debug("SLABDRIVER.PY \tlist_slices")
- slices = self.GetSlices()
- slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
- for slab_slice in slices]
+
+ slices = self.GetSlices()
+ logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
+ slice_hrns = [slab_slice['slice_hrn'] for slab_slice in slices]
+ #slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
+ #for slab_slice in slices]
slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
for slice_hrn in slice_hrns]
-
+
# cache the result
#if self.cache:
#logger.debug ("SlabDriver.list_slices stores value in cache")
reqdict['method'] = "delete"
reqdict['strval'] = str(job_id)
+ self.db.delete_job(slice_hrn, job_id)
answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
reqdict,username)
- logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s " \
- %(job_id,answer))
+ logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \
+ %(job_id,answer, username))
return answer
hostname_list = []
hostname_dict_list = []
for resource_id in resource_id_list:
- hostname_dict_list.append({'hostname' : \
- oar_id_node_dict[resource_id]['hostname'],
- 'site_id' : oar_id_node_dict[resource_id]['site']})
-
+ #Because jobs requested "asap" do not have defined resources
+ if resource_id is not "Undefined":
+ hostname_dict_list.append({'hostname' : \
+ oar_id_node_dict[resource_id]['hostname'],
+ 'site_id' : oar_id_node_dict[resource_id]['site']})
+
#hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
return hostname_dict_list
specified.
"""
+ login = None
return_slice_list = []
slicerec = {}
slicerec_dict = {}
authorized_filter_types_list = ['slice_hrn', 'record_id_user']
- logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
- %(authorized_filter_types_list))
+
if slice_filter_type in authorized_filter_types_list:
+ #Get list of slices based on the slice hrn
if slice_filter_type == 'slice_hrn':
+
+ login = slice_filter.split(".")[1].split("_")[0]
+
+ #DO NOT USE RegSlice - reg_researchers to get the hrn of the user
+ #otherwise will mess up the RegRecord in Resolve, don't know
+ #why - SA 08/08/2012
+
+ #Only one entry for one user = one slice in slice_senslab table
slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
-
+
+ #Get slice based on user id
if slice_filter_type == 'record_id_user':
slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
- if slicerec:
- #warning pylint OK
- slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
- logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
- %(slicerec_dict))
- #Get login
- login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
- logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
- slice record %s" \
- %(login, slicerec_dict))
- if slicerec_dict['oar_job_id'] is not -1:
- #Check with OAR the status of the job if a job id is in
- #the slice record
- rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
- username = login)
-
- if rslt :
- slicerec_dict.update(rslt)
- slicerec_dict.update({'hrn':\
- str(slicerec_dict['slice_hrn'])})
- #If GetJobsResources is empty, this means the job is
- #now in the 'Terminated' state
- #Update the slice record
- else :
- self.db.update_job(slice_filter, job_id = -1)
- slicerec_dict['oar_job_id'] = -1
- slicerec_dict.\
- update({'hrn':str(slicerec_dict['slice_hrn'])})
+ if slicerec is None:
+ return []
+ #slicerec_dictlist = []
+ slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
+ if login is None :
+ login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+
+ #for record in slicerec:
+ #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
+ #if login is None :
+ #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0]
+
+ #One slice can have multiple jobs
+ sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=slicerec_dict['slice_hrn']).all()
+ job_list = []
+ for job in sqljob_list:
+ job_list.append(job.dump_sqlalchemyobj_to_dict())
+
+ logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
+ slice record %s" \
+ %(login, slicerec_dict))
+
+ #Several jobs for one slice
+ slicerec_dict['oar_job_id'] = []
+ for job in job_list :
+ #if slicerec_dict['oar_job_id'] is not -1:
+ #Check with OAR the status of the job if a job id is in
+ #the slice record
+
+ rslt = self.GetJobsResources(job['oar_job_id'], \
+ username = login)
+ logger.debug("SLABDRIVER.PY \tGetSlices rslt fromn GetJobsResources %s"\
+ %(rslt))
+ if rslt :
+ slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+ slicerec_dict.update(rslt)
+ slicerec_dict.update({'hrn':\
+ str(slicerec_dict['slice_hrn'])})
+ #If GetJobsResources is empty, this means the job is
+ #now in the 'Terminated' state
+ #Update the slice record
+ else :
+ self.db.delete_job(slice_filter, job['oar_job_id'])
+ slicerec_dict.\
+ update({'hrn':str(slicerec_dict['slice_hrn'])})
+
try:
- slicerec_dict['node_ids'] = slicerec_dict['node_list']
+ slicerec_dict['node_ids'] = job['node_list']
except KeyError:
pass
+
+ logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dict %s"\
+ %(slicerec_dict))
+
+ return [slicerec_dict]
+
- logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\
- %(slicerec_dict))
-
- return slicerec_dict
+ else:
+ slice_list = slab_dbsession.query(SliceSenslab).all()
+ sqljob_list = slab_dbsession.query(JobSenslab).all()
+
+ job_list = []
+ for job in sqljob_list:
+ job_list.append(job.dump_sqlalchemyobj_to_dict())
+ return_slice_list = []
+ for record in slice_list:
+ return_slice_list.append(record.dump_sqlalchemyobj_to_dict())
- else:
- return_slice_list = slab_dbsession.query(SliceSenslab).all()
-
- logger.debug("SLABDRIVER.PY \tGetSlices slices %s \
+ for slicerec_dict in return_slice_list:
+ slicerec_dict['oar_job_id'] = []
+ for job in job_list:
+ if slicerec_dict['slice_hrn'] in job:
+ slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+
+ logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \
slice_filter %s " %(return_slice_list, slice_filter))
#if return_fields_list:
#instantion used in get_slivers ?
if not "instantiation" in slab_record:
slab_record["instantiation"] = "senslab-instantiated"
- slab_record["hrn"] = hrn_to_pl_slicename(hrn)
+ #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
+ #Unused hrn_to_pl_slicename because Slab's hrn already in the appropriate form SA 23/07/12
+ slab_record["hrn"] = hrn
logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
- slab_record %s hrn_to_pl_slicename(hrn) hrn %s " \
- %(slab_record['hrn'], hrn))
+ slab_record %s " %(slab_record['hrn']))
if "url" in record:
slab_record["url"] = record["url"]
if "description" in record:
else:
return None
-
- def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
- """ Creates the structure needed for a correct POST on OAR.
- Makes the timestamp transformation into the appropriate format.
- Sends the POST request to create the job with the resources in
- added_nodes.
- """
- site_list = []
- nodeid_list = []
- resource = ""
- reqdict = {}
- slice_name = slice_dict['name']
- try:
- slot = slice_dict['timeslot']
- logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
- slot %s" %(slot))
- except KeyError:
- #Running on default parameters
- #XP immediate , 10 mins
- slot = { 'date':None, 'start_time':None,
- 'timezone':None, 'duration':None }#10 min
-
- reqdict['workdir'] = '/tmp'
- reqdict['resource'] = "{network_address in ("
-
- for node in added_nodes:
- logger.debug("OARrestapi \tLaunchExperimentOnOAR \
- node %s" %(node))
-
- #Get the ID of the node : remove the root auth and put
- # the site in a separate list.
- # NT: it's not clear for me if the nodenames will have the senslab
- #prefix so lets take the last part only, for now.
-
- # Again here it's not clear if nodes will be prefixed with <site>_,
- #lets split and tanke the last part for now.
- #s=lastpart.split("_")
-
- nodeid = node
- reqdict['resource'] += "'" + nodeid + "', "
- nodeid_list.append(nodeid)
-
- custom_length = len(reqdict['resource'])- 2
- reqdict['resource'] = reqdict['resource'][0:custom_length] + \
- ")}/nodes=" + str(len(nodeid_list))
-
- def __process_walltime(duration=None):
- """ Calculates the walltime in seconds from the duration in H:M:S
- specified in the RSpec.
-
- """
- if duration:
- walltime = duration.split(":")
- # Fixing the walltime by adding a few delays. First put the walltime
- # in seconds oarAdditionalDelay = 20; additional delay for
- # /bin/sleep command to
- # take in account prologue and epilogue scripts execution
- # int walltimeAdditionalDelay = 120; additional delay
-
- desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
- int(walltime[2])
- total_walltime = desired_walltime + 140 #+2 min 20
- sleep_walltime = desired_walltime + 20 #+20 sec
- logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
- total_walltime %s sleep_walltime %s "\
- %(desired_walltime, total_walltime, \
- sleep_walltime))
- #Put the walltime back in str form
- #First get the hours
- walltime[0] = str(total_walltime / 3600)
- total_walltime = total_walltime - 3600 * int(walltime[0])
- #Get the remaining minutes
- walltime[1] = str(total_walltime / 60)
- total_walltime = total_walltime - 60 * int(walltime[1])
- #Get the seconds
- walltime[2] = str(total_walltime)
- logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
- %(walltime))
- else:
- #automatically set 10min +2 min 20
- walltime[0] = '0'
- walltime[1] = '12'
- walltime[2] = '20'
- sleep_walltime = '620'
-
- return walltime, sleep_walltime
-
- #if slot['duration']:
- walltime, sleep_walltime = __process_walltime(duration = \
- slot['duration'])
- #else:
- #walltime, sleep_walltime = self.__process_walltime(duration = None)
+
+
+
+ def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
+ lease_start_time, lease_duration, slice_user=None):
+ lease_dict = {}
+ lease_dict['lease_start_time'] = lease_start_time
+ lease_dict['lease_duration'] = lease_duration
+ lease_dict['added_nodes'] = added_nodes
+ lease_dict['slice_name'] = slice_name
+ lease_dict['slice_user'] = slice_user
+ lease_dict['grain'] = self.GetLeaseGranularity()
+ lease_dict['time_format'] = self.time_format
+
+ def __create_job_structure_request_for_OAR(lease_dict):
+ """ Creates the structure needed for a correct POST on OAR.
+ Makes the timestamp transformation into the appropriate format.
+ Sends the POST request to create the job with the resources in
+ added_nodes.
- reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
- ":" + str(walltime[1]) + ":" + str(walltime[2])
- reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
-
-
-
- #In case of a scheduled experiment (not immediate)
- #To run an XP immediately, don't specify date and time in RSpec
- #They will be set to None.
- server_timestamp, server_tz = self.GetTimezone()
- if slot['date'] and slot['start_time']:
- if slot['timezone'] is '' or slot['timezone'] is None:
- #assume it is server timezone
- from_zone = tz.gettz(server_tz)
- logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
- not specified server_tz %s from_zone %s" \
- %(server_tz, from_zone))
- else:
- #Get zone of the user from the reservation time given
- #in the rspec
- from_zone = tz.gettz(slot['timezone'])
-
- date = str(slot['date']) + " " + str(slot['start_time'])
- user_datetime = datetime.strptime(date, self.time_format)
- user_datetime = user_datetime.replace(tzinfo = from_zone)
+ """
+
+ site_list = []
+ nodeid_list = []
+ resource = ""
+ reqdict = {}
+
- #Convert to server zone
+ reqdict['workdir'] = '/tmp'
+ reqdict['resource'] = "{network_address in ("
+
+ for node in lease_dict['added_nodes']:
+ logger.debug("\r\n \r\n OARrestapi \t __create_job_structure_request_for_OAR \
+ node %s" %(node))
+
+ # Get the ID of the node
+ nodeid = node
+ reqdict['resource'] += "'" + nodeid + "', "
+ nodeid_list.append(nodeid)
+
+ custom_length = len(reqdict['resource'])- 2
+ reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+ ")}/nodes=" + str(len(nodeid_list))
+
+ def __process_walltime(duration):
+ """ Calculates the walltime in seconds from the duration in H:M:S
+ specified in the RSpec.
+
+ """
+ if duration:
+ # Fixing the walltime by adding a few delays.
+ # First put the walltime in seconds oarAdditionalDelay = 20;
+ # additional delay for /bin/sleep command to
+ # take in account prologue and epilogue scripts execution
+ # int walltimeAdditionalDelay = 120; additional delay
+ desired_walltime = duration
+ total_walltime = desired_walltime + 140#+2 min 20
+ sleep_walltime = desired_walltime + 20 #+20 sec
+ walltime = []
+ #Put the walltime back in str form
+ #First get the hours
+ walltime.append(str(total_walltime / 3600))
+ total_walltime = total_walltime - 3600 * int(walltime[0])
+ #Get the remaining minutes
+ walltime.append(str(total_walltime / 60))
+ total_walltime = total_walltime - 60 * int(walltime[1])
+ #Get the seconds
+ walltime.append(str(total_walltime))
+
+ else:
+ logger.log_exc(" __process_walltime duration null")
+
+ return walltime, sleep_walltime
+
- to_zone = tz.gettz(server_tz)
- reservation_date = user_datetime.astimezone(to_zone)
- #Readable time accpeted by OAR
- reqdict['reservation'] = reservation_date.strftime(self.time_format)
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
- reqdict['reservation'] %s " %(reqdict['reservation']))
-
- else:
- # Immediate XP. Not need to add special parameters.
- # normally not used in SFA
-
- pass
+ walltime, sleep_walltime = \
+ __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
+
+
+ reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
+ ":" + str(walltime[1]) + ":" + str(walltime[2])
+ reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+
+ #In case of a scheduled experiment (not immediate)
+ #To run an XP immediately, don't specify date and time in RSpec
+ #They will be set to None.
+ if lease_dict['lease_start_time'] is not '0':
+ #Readable time accepted by OAR
+ start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
+ strftime(lease_dict['time_format'])
+ reqdict['reservation'] = start_time
+ #If there is not start time, Immediate XP. No need to add special
+ # OAR parameters
+
+
+ reqdict['type'] = "deploy"
+ reqdict['directory'] = ""
+ reqdict['name'] = "SFA_" + lease_dict['slice_user']
+
+ return reqdict
-
- reqdict['type'] = "deploy"
- reqdict['directory'] = ""
- reqdict['name'] = "TestSandrine"
-
-
- # first step : start the OAR job and update the job
+
+ #Create the request for OAR
+ reqdict = __create_job_structure_request_for_OAR(lease_dict)
+ # first step : start the OAR job and update the job
logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
- \r\n site_list %s" %(reqdict, site_list))
+ \r\n " %(reqdict))
answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
reqdict, slice_user)
Impossible to create job %s " %(answer))
return
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
- added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
- self.db.update_job( slice_name, jobid, added_nodes)
-
- # second step : configure the experiment
- # we need to store the nodes in a yaml (well...) file like this :
- # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
- job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
- job_file.write('[')
- job_file.write(str(added_nodes[0].strip('node')))
- for node in added_nodes[1:len(added_nodes)] :
- job_file.write(', '+ node.strip('node'))
- job_file.write(']')
- job_file.close()
-
- # third step : call the senslab-experiment wrapper
- #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
- # "+str(jobid)+" "+slice_user
- javacmdline = "/usr/bin/java"
- jarname = \
- "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
- #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
- #str(jobid), slice_user])
- output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
- slice_user],stdout=subprocess.PIPE).communicate()[0]
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
- %(output))
- return
-
-
+ def __configure_experiment(jobid, added_nodes):
+ # second step : configure the experiment
+ # we need to store the nodes in a yaml (well...) file like this :
+ # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
+ job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
+ job_file.write('[')
+ job_file.write(str(added_nodes[0].strip('node')))
+ for node in added_nodes[1:len(added_nodes)] :
+ job_file.write(', '+ node.strip('node'))
+ job_file.write(']')
+ job_file.close()
+ return
+
+ def __launch_senslab_experiment(jobid):
+ # third step : call the senslab-experiment wrapper
+ #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
+ # "+str(jobid)+" "+slice_user
+ javacmdline = "/usr/bin/java"
+ jarname = \
+ "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
+ #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
+ #str(jobid), slice_user])
+ output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
+ slice_user],stdout=subprocess.PIPE).communicate()[0]
+
+ logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
+ %(output))
+ return
+
+
+
+ if jobid :
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
+ added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
+ self.db.add_job( slice_name, jobid, added_nodes)
+
+ __configure_experiment(jobid, added_nodes)
+ __launch_senslab_experiment(jobid)
+
+ return
+
+ def AddLeases(self, hostname_list, slice_record, lease_start_time, lease_duration):
+ logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
+ slice_record %s lease_start_time %s lease_duration %s "\
+ %( hostname_list, slice_record , lease_start_time, \
+ lease_duration))
+
+ tmp = slice_record['PI'][0].split(".")
+ username = tmp[(len(tmp)-1)]
+ self.LaunchExperimentOnOAR(hostname_list, slice_record['name'], lease_start_time, lease_duration, username)
+ start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
+ logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
+
+ return
+
+
#Delete the jobs and updates the job id in the senslab table
#to set it to -1
#Does not clear the node list
# Get user information
self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
- self.db.update_job(slice_record['hrn'], job_id = -1)
+
return
def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
unfiltered_reservation_list = self.GetReservedNodes()
+
+ ##Synchronize slice_table of sfa senslab db
+ #self.synchronize_oar_and_slice_table(unfiltered_reservation_list)
+
reservation_list = []
#Find the slice associated with this user senslab ldap uid
logger.debug(" SLABDRIVER.PY \tGetLeases ")
+ #Create user dict first to avoir looking several times for
+ #the same user in LDAP SA 27/07/12
+ resa_user_dict = {}
for resa in unfiltered_reservation_list:
- ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
- ldap_info = ldap_info[0][1]
-
- user = dbsession.query(RegUser).filter_by(email = \
- ldap_info['mail'][0]).first()
-
- slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
+ logger.debug("SLABDRIVER \tGetLeases USER %s"\
+ %(resa['user']))
+ if resa['user'] not in resa_user_dict:
+ logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
+ ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ ldap_info = ldap_info[0][1]
+ user = dbsession.query(RegUser).filter_by(email = \
+ ldap_info['mail'][0]).first()
+ #Separated in case user not in database : record_id not defined SA 17/07//12
+ query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
+ if query_slice_info:
+ slice_info = query_slice_info.first()
+ else:
+ slice_info = None
+
+ resa_user_dict[resa['user']] = {}
+ resa_user_dict[resa['user']]['ldap_info'] = user
+ resa_user_dict[resa['user']]['slice_info'] = slice_info
+
+ logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
+ %(resa_user_dict))
+ for resa in unfiltered_reservation_list:
+
+ #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ #ldap_info = ldap_info[0][1]
+
+ #user = dbsession.query(RegUser).filter_by(email = \
+ #ldap_info['mail'][0]).first()
+ ##Separated in case user not in database : record_id not defined SA 17/07//12
+ #query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
+ #if query_slice_info:
+ #slice_info = query_slice_info.first()
+ #Put the slice_urn
+ resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info'].slice_hrn
+ resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
#Put the slice_urn
- resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
+ #resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
resa['component_id_list'] = []
#Transform the hostnames into urns (component ids)
for node in resa['reserved_nodes']:
- resa['component_id_list'].append(hostname_to_urn(self.hrn, \
- self.root_auth, node['hostname']))
-
+ #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
+ #self.root_auth, node['hostname']))
+ slab_xrn = slab_xrn_object(self.root_auth, node['hostname'])
+ resa['component_id_list'].append(slab_xrn.urn)
#Filter the reservation list if necessary
#Returns all the leases associated with a given slice
logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
%(lease_filter_dict))
for resa in unfiltered_reservation_list:
- if lease_filter_dict['name'] == resa['slice_id']:
+ if lease_filter_dict['name'] == resa['slice_hrn']:
reservation_list.append(resa)
else:
reservation_list = unfiltered_reservation_list
#information is in the Senslab's DB.
if str(record['type']) == 'slice':
#Get slab slice record.
- recslice = self.GetSlices(slice_filter = \
+ recslice_list = self.GetSlices(slice_filter = \
str(record['hrn']),\
slice_filter_type = 'slice_hrn')
+
recuser = dbsession.query(RegRecord).filter_by(record_id = \
- recslice['record_id_user']).first()
- logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
- rec %s \r\n \r\n" %(recslice))
+ recslice_list[0]['record_id_user']).first()
+ logger.debug("SLABDRIVER \tfill_record_info TYPE SLICE RECUSER %s " %(recuser))
record.update({'PI':[recuser.hrn],
- 'researcher': [recuser.hrn],
- 'name':record['hrn'],
- 'oar_job_id':recslice['oar_job_id'],
- 'node_ids': [],
- 'person_ids':[recslice['record_id_user']],
- 'geni_urn':'', #For client_helper.py compatibility
- 'keys':'', #For client_helper.py compatibility
- 'key_ids':''}) #For client_helper.py compatibility
-
- elif str(record['type']) == 'user':
+ 'researcher': [recuser.hrn],
+ 'name':record['hrn'],
+ 'oar_job_id':[rec['oar_job_id'] for rec in recslice_list],
+ 'node_ids': [],
+ 'person_ids':[recslice_list[0]['record_id_user']],
+ 'geni_urn':'', #For client_helper.py compatibility
+ 'keys':'', #For client_helper.py compatibility
+ 'key_ids':''}) #For client_helper.py compatibility
+
+ #for rec in recslice_list:
+ #record['oar_job_id'].append(rec['oar_job_id'])
+ logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
+ recslice_list %s \r\n \t RECORD %s \r\n \r\n" %(recslice_list,record))
+ if str(record['type']) == 'user':
#The record is a SFA user record.
#Get the information about his slice from Senslab's DB
#and add it to the user record.
- recslice = self.GetSlices(\
+ recslice_list = self.GetSlices(\
slice_filter = record['record_id'],\
slice_filter_type = 'record_id_user')
- logger.debug( "SLABDRIVER.PY \t fill_record_info user \
- rec %s \r\n \r\n" %(recslice))
+ logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
+ recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
#Append slice record in records list,
#therefore fetches user and slice info again(one more loop)
#Will update PIs and researcher for the slice
recuser = dbsession.query(RegRecord).filter_by(record_id = \
- recslice['record_id_user']).first()
+ recslice_list[0]['record_id_user']).first()
+ logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
+ recuser %s \r\n \r\n" %(recuser))
+ recslice = {}
+ recslice = recslice_list[0]
recslice.update({'PI':[recuser.hrn],
- 'researcher': [recuser.hrn],
- 'name':record['hrn'],
- 'oar_job_id':recslice['oar_job_id'],
- 'node_ids': [],
- 'person_ids':[recslice['record_id_user']]})
+ 'researcher': [recuser.hrn],
+ 'name':record['hrn'],
+ 'node_ids': [],
+ 'oar_job_id': [rec['oar_job_id'] for rec in recslice_list],
+ 'person_ids':[recslice_list[0]['record_id_user']]})
+ recslice.update({'type':'slice', \
+ 'hrn':recslice_list[0]['slice_hrn']})
+ #for rec in recslice_list:
+ #recslice['oar_job_id'].append(rec['oar_job_id'])
#GetPersons takes [] as filters
#user_slab = self.GetPersons([{'hrn':recuser.hrn}])
user_slab = self.GetPersons([record])
- recslice.update({'type':'slice', \
- 'hrn':recslice['slice_hrn']})
+
record.update(user_slab[0])
#For client_helper.py compatibility
record.update( { 'geni_urn':'',
logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
INFO TO USER records %s" %(record_list))
-
+ logger.debug("SLABDRIVER.PY \tfill_record_info END \
+ #record %s \r\n \r\n " %(record))
except TypeError, error:
logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
%(error))
-
+ #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
+
return
#self.fill_record_slab_info(records)
-
-
+
+
"""
self.DeleteSliceFromNodes(slice_record)
- self.db.update_job(slice_record['hrn'], job_id = -1, nodes = [])
logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
return
"""
logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
return
+
+ def DeleteLeases(self, leases_id_list, slice_hrn ):
+ for job_id in leases_id_list:
+ self.DeleteJobs(job_id, slice_hrn)
+
+ logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \r\n " %(leases_id_list, slice_hrn))
+ return