def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
users, options):
- logger.debug("SLABDRIVER.PY \tcreate_sliver ")
aggregate = SlabAggregate(self)
slices = SlabSlices(self)
sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
sfa_peer, options=options)
requested_attributes = rspec.version.get_slice_attributes()
-
- if requested_attributes:
- for attrib_dict in requested_attributes:
- if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
- is not None:
- sfa_slice.update({'timeslot':attrib_dict['timeslot']})
+
logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
# ensure person records exists
persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
sfa_peer, options=options)
- # ensure slice attributes exists?
# add/remove slice from nodes
requested_slivers = [node.get('component_name') \
for node in rspec.version.get_nodes_with_slivers()]
+ l = [ node for node in rspec.version.get_nodes_with_slivers() ]
logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
- requested_slivers %s " %(requested_slivers))
+ requested_slivers %s listnodes %s" %(requested_slivers,l))
nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
# add/remove leases
- requested_leases = []
+ requested_lease_list = []
kept_leases = []
for lease in rspec.version.get_leases():
- requested_lease = {}
+ single_requested_lease = {}
+ logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
if not lease.get('lease_id'):
- requested_lease['hostname'] = \
+ single_requested_lease['hostname'] = \
slab_xrn_to_hostname(lease.get('component_id').strip())
- requested_lease['start_time'] = lease.get('start_time')
- requested_lease['duration'] = lease.get('duration')
+ single_requested_lease['start_time'] = lease.get('start_time')
+ single_requested_lease['duration'] = lease.get('duration')
else:
kept_leases.append(int(lease['lease_id']))
- if requested_lease.get('hostname'):
- requested_leases.append(requested_lease)
+ if single_requested_lease.get('hostname'):
+ requested_lease_list.append(single_requested_lease)
+
+ #dCreate dict of leases by start_time, regrouping nodes reserved at the same
+ #time, for the same amount of time = one job on OAR
+ requested_job_dict = {}
+ for lease in requested_lease_list:
+
+ #In case it is an asap experiment start_time is empty
+ if lease['start_time'] == '':
+ lease['start_time'] = '0'
+
+ if lease['start_time'] not in requested_job_dict:
+ if isinstance(lease['hostname'], str):
+ lease['hostname'] = [lease['hostname']]
+
+ requested_job_dict[lease['start_time']] = lease
+
+ else :
+ job_lease = requested_job_dict[lease['start_time']]
+ if lease['duration'] == job_lease['duration'] :
+ job_lease['hostname'].append(lease['hostname'])
+
+
+
+ logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s " %(requested_job_dict))
+
leases = slices.verify_slice_leases(sfa_slice, \
- requested_leases, kept_leases, peer)
+ requested_job_dict, kept_leases, peer)
return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
else:
return None
-
- def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
- """ Creates the structure needed for a correct POST on OAR.
- Makes the timestamp transformation into the appropriate format.
- Sends the POST request to create the job with the resources in
- added_nodes.
- """
- site_list = []
- nodeid_list = []
- resource = ""
- reqdict = {}
- slice_name = slice_dict['name']
- try:
- slot = slice_dict['timeslot']
- logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
- slot %s" %(slot))
- except KeyError:
- #Running on default parameters
- #XP immediate , 10 mins
- slot = { 'date':None, 'start_time':None,
- 'timezone':None, 'duration':None }#10 min
-
- reqdict['workdir'] = '/tmp'
- reqdict['resource'] = "{network_address in ("
-
- for node in added_nodes:
- logger.debug("OARrestapi \tLaunchExperimentOnOAR \
- node %s" %(node))
-
- #Get the ID of the node : remove the root auth and put
- # the site in a separate list.
- # NT: it's not clear for me if the nodenames will have the senslab
- #prefix so lets take the last part only, for now.
-
- # Again here it's not clear if nodes will be prefixed with <site>_,
- #lets split and tanke the last part for now.
- #s=lastpart.split("_")
-
- nodeid = node
- reqdict['resource'] += "'" + nodeid + "', "
- nodeid_list.append(nodeid)
-
- custom_length = len(reqdict['resource'])- 2
- reqdict['resource'] = reqdict['resource'][0:custom_length] + \
- ")}/nodes=" + str(len(nodeid_list))
-
- def __process_walltime(duration=None):
- """ Calculates the walltime in seconds from the duration in H:M:S
- specified in the RSpec.
-
- """
- if duration:
- walltime = duration.split(":")
- # Fixing the walltime by adding a few delays. First put the walltime
- # in seconds oarAdditionalDelay = 20; additional delay for
- # /bin/sleep command to
- # take in account prologue and epilogue scripts execution
- # int walltimeAdditionalDelay = 120; additional delay
-
- desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
- int(walltime[2])
- total_walltime = desired_walltime + 140 #+2 min 20
- sleep_walltime = desired_walltime + 20 #+20 sec
- logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
- total_walltime %s sleep_walltime %s "\
- %(desired_walltime, total_walltime, \
- sleep_walltime))
- #Put the walltime back in str form
- #First get the hours
- walltime[0] = str(total_walltime / 3600)
- total_walltime = total_walltime - 3600 * int(walltime[0])
- #Get the remaining minutes
- walltime[1] = str(total_walltime / 60)
- total_walltime = total_walltime - 60 * int(walltime[1])
- #Get the seconds
- walltime[2] = str(total_walltime)
- logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
- %(walltime))
- else:
- #automatically set 10min +2 min 20
- walltime[0] = '0'
- walltime[1] = '12'
- walltime[2] = '20'
- sleep_walltime = '620'
-
- return walltime, sleep_walltime
-
- #if slot['duration']:
- walltime, sleep_walltime = __process_walltime(duration = \
- slot['duration'])
- #else:
- #walltime, sleep_walltime = self.__process_walltime(duration = None)
+
+
+
+ def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
+ lease_start_time, lease_duration, slice_user=None):
+ lease_dict = {}
+ lease_dict['lease_start_time'] = lease_start_time
+ lease_dict['lease_duration'] = lease_duration
+ lease_dict['added_nodes'] = added_nodes
+ lease_dict['slice_name'] = slice_name
+ lease_dict['slice_user'] = slice_user
+ lease_dict['grain'] = self.GetLeaseGranularity()
+ lease_dict['time_format'] = self.time_format
+
+ def __create_job_structure_request_for_OAR(lease_dict):
+ """ Creates the structure needed for a correct POST on OAR.
+ Makes the timestamp transformation into the appropriate format.
+ Sends the POST request to create the job with the resources in
+ added_nodes.
- reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
- ":" + str(walltime[1]) + ":" + str(walltime[2])
- reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
-
-
-
- #In case of a scheduled experiment (not immediate)
- #To run an XP immediately, don't specify date and time in RSpec
- #They will be set to None.
- server_timestamp, server_tz = self.GetTimezone()
- if slot['date'] and slot['start_time']:
- if slot['timezone'] is '' or slot['timezone'] is None:
- #assume it is server timezone
- from_zone = tz.gettz(server_tz)
- logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
- not specified server_tz %s from_zone %s" \
- %(server_tz, from_zone))
- else:
- #Get zone of the user from the reservation time given
- #in the rspec
- from_zone = tz.gettz(slot['timezone'])
-
- date = str(slot['date']) + " " + str(slot['start_time'])
- user_datetime = datetime.strptime(date, self.time_format)
- user_datetime = user_datetime.replace(tzinfo = from_zone)
+ """
+
+ site_list = []
+ nodeid_list = []
+ resource = ""
+ reqdict = {}
+
- #Convert to server zone
+ reqdict['workdir'] = '/tmp'
+ reqdict['resource'] = "{network_address in ("
+
+ for node in lease_dict['added_nodes']:
+ logger.debug("\r\n \r\n OARrestapi \t __create_job_structure_request_for_OAR \
+ node %s" %(node))
+
+ # Get the ID of the node
+ nodeid = node
+ reqdict['resource'] += "'" + nodeid + "', "
+ nodeid_list.append(nodeid)
+
+ custom_length = len(reqdict['resource'])- 2
+ reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+ ")}/nodes=" + str(len(nodeid_list))
+
+ def __process_walltime(duration):
+ """ Calculates the walltime in seconds from the duration in H:M:S
+ specified in the RSpec.
+
+ """
+ if duration:
+ # Fixing the walltime by adding a few delays.
+ # First put the walltime in seconds oarAdditionalDelay = 20;
+ # additional delay for /bin/sleep command to
+ # take in account prologue and epilogue scripts execution
+ # int walltimeAdditionalDelay = 120; additional delay
+ desired_walltime = duration
+ total_walltime = desired_walltime + 140#+2 min 20
+ sleep_walltime = desired_walltime + 20 #+20 sec
+ walltime = []
+ #Put the walltime back in str form
+ #First get the hours
+ walltime.append(str(total_walltime / 3600))
+ total_walltime = total_walltime - 3600 * int(walltime[0])
+ #Get the remaining minutes
+ walltime.append(str(total_walltime / 60))
+ total_walltime = total_walltime - 60 * int(walltime[1])
+ #Get the seconds
+ walltime.append(str(total_walltime))
+
+ else:
+ logger.log_exc(" __process_walltime duration null")
+
+ return walltime, sleep_walltime
+
- to_zone = tz.gettz(server_tz)
- reservation_date = user_datetime.astimezone(to_zone)
- #Readable time accpeted by OAR
- reqdict['reservation'] = reservation_date.strftime(self.time_format)
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
- reqdict['reservation'] %s " %(reqdict['reservation']))
-
- else:
- # Immediate XP. Not need to add special parameters.
- # normally not used in SFA
-
- pass
+ walltime, sleep_walltime = \
+ __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
+
+
+ reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
+ ":" + str(walltime[1]) + ":" + str(walltime[2])
+ reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+
+ #In case of a scheduled experiment (not immediate)
+ #To run an XP immediately, don't specify date and time in RSpec
+ #They will be set to None.
+ if lease_dict['lease_start_time'] is not '0':
+ #Readable time accepted by OAR
+ start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
+ strftime(lease_dict['time_format'])
+ reqdict['reservation'] = start_time
+ #If there is not start time, Immediate XP. No need to add special
+ # OAR parameters
+
+
+ reqdict['type'] = "deploy"
+ reqdict['directory'] = ""
+ reqdict['name'] = "SFA_" + lease_dict['slice_user']
+
+ return reqdict
-
- reqdict['type'] = "deploy"
- reqdict['directory'] = ""
- reqdict['name'] = "TestSandrine"
-
-
- # first step : start the OAR job and update the job
+
+ #Create the request for OAR
+ reqdict = __create_job_structure_request_for_OAR(lease_dict)
+ # first step : start the OAR job and update the job
logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
- \r\n site_list %s" %(reqdict, site_list))
+ \r\n " %(reqdict))
answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
reqdict, slice_user)
Impossible to create job %s " %(answer))
return
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
- added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
- self.db.update_job( slice_name, jobid, added_nodes)
-
- # second step : configure the experiment
- # we need to store the nodes in a yaml (well...) file like this :
- # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
- job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
- job_file.write('[')
- job_file.write(str(added_nodes[0].strip('node')))
- for node in added_nodes[1:len(added_nodes)] :
- job_file.write(', '+ node.strip('node'))
- job_file.write(']')
- job_file.close()
-
- # third step : call the senslab-experiment wrapper
- #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
- # "+str(jobid)+" "+slice_user
- javacmdline = "/usr/bin/java"
- jarname = \
- "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
- #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
- #str(jobid), slice_user])
- output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
- slice_user],stdout=subprocess.PIPE).communicate()[0]
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
- %(output))
- return
-
-
+ def __configure_experiment(jobid, added_nodes):
+ # second step : configure the experiment
+ # we need to store the nodes in a yaml (well...) file like this :
+ # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
+ job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
+ job_file.write('[')
+ job_file.write(str(added_nodes[0].strip('node')))
+ for node in added_nodes[1:len(added_nodes)] :
+ job_file.write(', '+ node.strip('node'))
+ job_file.write(']')
+ job_file.close()
+ return
+
+ def __launch_senslab_experiment(jobid):
+ # third step : call the senslab-experiment wrapper
+ #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
+ # "+str(jobid)+" "+slice_user
+ javacmdline = "/usr/bin/java"
+ jarname = \
+ "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
+ #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
+ #str(jobid), slice_user])
+ output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
+ slice_user],stdout=subprocess.PIPE).communicate()[0]
+
+ logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
+ %(output))
+ return
+
+
+
+ if jobid :
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
+ added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
+ self.db.update_job( slice_name, jobid, added_nodes)
+
+ __configure_experiment(jobid, added_nodes)
+ __launch_senslab_experiment(jobid)
+
+ return
+
+ def AddLeases(self, hostname_list, slice_record, lease_start_time, lease_duration):
+ logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
+ slice_record %s lease_start_time %s lease_duration %s "\
+ %( hostname_list, slice_record , lease_start_time, \
+ lease_duration))
+
+ tmp = slice_record['PI'][0].split(".")
+ username = tmp[(len(tmp)-1)]
+ self.LaunchExperimentOnOAR(hostname_list, slice_record['name'], lease_start_time, lease_duration, username)
+ start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
+ logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
+
+ return
+
+
#Delete the jobs and updates the job id in the senslab table
#to set it to -1
#Does not clear the node list
reservation_list = []
#Find the slice associated with this user senslab ldap uid
logger.debug(" SLABDRIVER.PY \tGetLeases ")
+ #Create user dict first to avoir looking several times for
+ #the same user in LDAP SA 27/07/12
+ resa_user_dict = {}
for resa in unfiltered_reservation_list:
- ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
- ldap_info = ldap_info[0][1]
-
- user = dbsession.query(RegUser).filter_by(email = \
- ldap_info['mail'][0]).first()
- #Separated in case user not in database : record_id not defined SA 17/07//12
- query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
- if query_slice_info:
- slice_info = query_slice_info.first()
-
+ logger.debug("SLABDRIVER \tGetLeases USER %s"\
+ %(resa['user']))
+ if resa['user'] not in resa_user_dict:
+ logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
+ ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ ldap_info = ldap_info[0][1]
+ user = dbsession.query(RegUser).filter_by(email = \
+ ldap_info['mail'][0]).first()
+ #Separated in case user not in database : record_id not defined SA 17/07//12
+ query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
+ if query_slice_info:
+ slice_info = query_slice_info.first()
+ else:
+ slice_info = None
+ resa_user_dict[resa['user']] = {}
+ resa_user_dict[resa['user']]['ldap_info'] = user
+ resa_user_dict[resa['user']]['slice_info'] = slice_info
+
+ logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
+ %(resa_user_dict))
+ for resa in unfiltered_reservation_list:
+
+ #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ #ldap_info = ldap_info[0][1]
+
+ #user = dbsession.query(RegUser).filter_by(email = \
+ #ldap_info['mail'][0]).first()
+ ##Separated in case user not in database : record_id not defined SA 17/07//12
+ #query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
+ #if query_slice_info:
+ #slice_info = query_slice_info.first()
+ #Put the slice_urn
+ resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info'].slice_hrn
+ resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
#Put the slice_urn
- resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
+ #resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
resa['component_id_list'] = []
#Transform the hostnames into urns (component ids)
for node in resa['reserved_nodes']:
logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
%(lease_filter_dict))
for resa in unfiltered_reservation_list:
- if lease_filter_dict['name'] == resa['slice_id']:
+ if lease_filter_dict['name'] == resa['slice_hrn']:
reservation_list.append(resa)
else:
reservation_list = unfiltered_reservation_list
"""
logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
return
+
+ def DeleteLeases(self, leases_id_list, slice_hrn ):
+ for job_id in leases_id_list:
+ self.DeleteJobs(job_id, slice_hrn)
+
+ logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \r\n " %(leases_id_list, slice_hrn))
+ return