import subprocess
from datetime import datetime
-from dateutil import tz
-from time import strftime, gmtime
from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
from sfa.util.sfalogging import logger
from sfa.rspecs.version_manager import VersionManager
from sfa.rspecs.rspec import RSpec
-from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
-from sfa.planetlab.plxrn import slicename_to_hrn, \
- hostname_to_urn, \
- xrn_to_hostname
+from sfa.util.xrn import hrn_to_urn
+
## thierry: everything that is API-related (i.e. handling incoming requests)
# is taken care of
from sfa.senslab.LDAPapi import LDAPapi
from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
-from sfa.senslab.slabaggregate import SlabAggregate
+
+from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
+ slab_xrn_object
from sfa.senslab.slabslices import SlabSlices
# GetNodes or GetSites sorts of calls directly
# and thus minimize the differences in the managers with the pl version
class SlabDriver(Driver):
-
+ """ Senslab Driver class inherited from Driver generic class.
+
+ Contains methods compliant with the SFA standard and the testbed
+ infrastructure (calls to LDAP and OAR).
+ """
def __init__(self, config):
Driver.__init__ (self, config)
self.config = config
self.hrn = config.SFA_INTERFACE_HRN
-
self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
-
self.oar = OARrestapi()
self.ldap = LDAPapi()
self.time_format = "%Y-%m-%d %H:%M:%S"
- self.db = SlabDB(config,debug = True)
+ self.db = SlabDB(config, debug = True)
self.cache = None
"""
#First get the slice with the slice hrn
- sl = self.GetSlices(slice_filter = slice_hrn, \
+ slice_list = self.GetSlices(slice_filter = slice_hrn, \
slice_filter_type = 'slice_hrn')
- if len(sl) is 0:
- raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
-
- top_level_status = 'unknown'
- nodes_in_slice = sl['node_ids']
- if len(nodes_in_slice) is 0:
- raise SliverDoesNotExist("No slivers allocated ")
- else:
- top_level_status = 'ready'
+ if len(slice_list) is 0:
+ raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
- logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
- %s \r\n " %(slice_urn, slice_hrn, sl))
-
- if sl['oar_job_id'] is not -1:
- #A job is running on Senslab for this slice
- # report about the local nodes that are in the slice only
-
- nodes_all = self.GetNodes({'hostname':nodes_in_slice},
- ['node_id', 'hostname','site','boot_state'])
- nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
+ #Slice has the same slice hrn for each slice in the slice/lease list
+ #So fetch the info on the user once
+ one_slice = slice_list[0]
+ recuser = dbsession.query(RegRecord).filter_by(record_id = \
+ one_slice['record_id_user']).first()
+
+ #Make a list of all the nodes hostnames in use for this slice
+ slice_nodes_list = []
+ for sl in slice_list:
+ for node in sl['node_ids']:
+ slice_nodes_list.append(node['hostname'])
+ #Get all the corresponding nodes details
+ nodes_all = self.GetNodes({'hostname':slice_nodes_list},
+ ['node_id', 'hostname','site','boot_state'])
+ nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
+
+
+
+ for sl in slice_list:
+ #For compatibility
+ top_level_status = 'empty'
result = {}
+ result.fromkeys(['geni_urn','pl_login','geni_status','geni_resources'],None)
+ result['pl_login'] = recuser.hrn
+ logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
+ %s \r\n " %(slice_urn, slice_hrn, sl))
+ try:
+ nodes_in_slice = sl['node_ids']
+ except KeyError:
+ #No job in the slice
+ result['geni_status'] = top_level_status
+ result['geni_resources'] = []
+ return result
+
+ top_level_status = 'ready'
+
+ #A job is running on Senslab for this slice
+ # report about the local nodes that are in the slice only
+
result['geni_urn'] = slice_urn
- result['pl_login'] = sl['job_user'] #For compatibility
+
- timestamp = float(sl['startTime']) + float(sl['walltime'])
- result['pl_expires'] = strftime(self.time_format, \
- gmtime(float(timestamp)))
+ #timestamp = float(sl['startTime']) + float(sl['walltime'])
+ #result['pl_expires'] = strftime(self.time_format, \
+ #gmtime(float(timestamp)))
#result['slab_expires'] = strftime(self.time_format,\
- #gmtime(float(timestamp)))
+ #gmtime(float(timestamp)))
resources = []
- for node in nodeall_byhostname:
+ for node in sl['node_ids']:
res = {}
#res['slab_hostname'] = node['hostname']
#res['slab_boot_state'] = node['boot_state']
- res['pl_hostname'] = nodeall_byhostname[node]['hostname']
- res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
- res['pl_last_contact'] = strftime(self.time_format, \
- gmtime(float(timestamp)))
- sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
- nodeall_byhostname[node]['node_id'])
+ res['pl_hostname'] = node['hostname']
+ res['pl_boot_state'] = nodeall_byhostname[node['hostname']]['boot_state']
+ #res['pl_last_contact'] = strftime(self.time_format, \
+ #gmtime(float(timestamp)))
+ sliver_id = Xrn(slice_urn, type='slice', \
+ id=nodeall_byhostname[node['hostname']]['node_id'], \
+ authority=self.hrn).urn
+
res['geni_urn'] = sliver_id
- if nodeall_byhostname[node]['boot_state'] == 'Alive':
+ if nodeall_byhostname[node['hostname']]['boot_state'] == 'Alive':
res['geni_status'] = 'ready'
else:
result['geni_status'] = top_level_status
result['geni_resources'] = resources
logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
- %(resources,res))
+ %(resources,res))
return result
-
-
+
+
def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
users, options):
- logger.debug("SLABDRIVER.PY \tcreate_sliver ")
aggregate = SlabAggregate(self)
slices = SlabSlices(self)
# parse rspec
rspec = RSpec(rspec_string)
- logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
- %(rspec.version))
-
+ logger.debug("SLABDRIVER.PY \t create_sliver \tr spec.version %s slice_record %s " \
+ %(rspec.version,slice_record))
+ #self.synchronize_oar_and_slice_table(slice_hrn)
# ensure site record exists?
# ensure slice record exists
+ #Removed options to verify_slice SA 14/08/12
sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
- sfa_peer, options=options)
- requested_attributes = rspec.version.get_slice_attributes()
-
- if requested_attributes:
- for attrib_dict in requested_attributes:
- if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
- is not None:
- sfa_slice.update({'timeslot':attrib_dict['timeslot']})
+ sfa_peer)
+
+ #requested_attributes returned by rspec.version.get_slice_attributes()
+ #unused, removed SA 13/08/12
+ rspec.version.get_slice_attributes()
+
logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
# ensure person records exists
- persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
+ #verify_persons returns added persons but since the return value
+ #is not used
+ slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
sfa_peer, options=options)
- # ensure slice attributes exists?
# add/remove slice from nodes
requested_slivers = [node.get('component_name') \
for node in rspec.version.get_nodes_with_slivers()]
+ l = [ node for node in rspec.version.get_nodes_with_slivers() ]
logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
- requested_slivers %s " %(requested_slivers))
-
- nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
+ requested_slivers %s listnodes %s" \
+ %(requested_slivers,l))
+ #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
+ slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
# add/remove leases
- requested_leases = []
+ requested_lease_list = []
kept_leases = []
for lease in rspec.version.get_leases():
- requested_lease = {}
+ single_requested_lease = {}
+ logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
if not lease.get('lease_id'):
- requested_lease['hostname'] = \
- xrn_to_hostname(lease.get('component_id').strip())
- requested_lease['start_time'] = lease.get('start_time')
- requested_lease['duration'] = lease.get('duration')
+ single_requested_lease['hostname'] = \
+ slab_xrn_to_hostname(lease.get('component_id').strip())
+ single_requested_lease['start_time'] = lease.get('start_time')
+ single_requested_lease['duration'] = lease.get('duration')
else:
kept_leases.append(int(lease['lease_id']))
- if requested_lease.get('hostname'):
- requested_leases.append(requested_lease)
+ if single_requested_lease.get('hostname'):
+ requested_lease_list.append(single_requested_lease)
+
+ #dCreate dict of leases by start_time, regrouping nodes reserved
+ #at the same
+ #time, for the same amount of time = one job on OAR
+ requested_job_dict = {}
+ for lease in requested_lease_list:
+
+ #In case it is an asap experiment start_time is empty
+ if lease['start_time'] == '':
+ lease['start_time'] = '0'
+
+ if lease['start_time'] not in requested_job_dict:
+ if isinstance(lease['hostname'], str):
+ lease['hostname'] = [lease['hostname']]
+
+ requested_job_dict[lease['start_time']] = lease
+
+ else :
+ job_lease = requested_job_dict[lease['start_time']]
+ if lease['duration'] == job_lease['duration'] :
+ job_lease['hostname'].append(lease['hostname'])
+
+
- leases = slices.verify_slice_leases(sfa_slice, \
- requested_leases, kept_leases, peer)
+
+ logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s " %(requested_job_dict))
+ #verify_slice_leases returns the leases , but the return value is unused
+ #here. Removed SA 13/08/12
+ slices.verify_slice_leases(sfa_slice, \
+ requested_job_dict, kept_leases, peer)
return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
def delete_sliver (self, slice_urn, slice_hrn, creds, options):
- sfa_slice = self.GetSlices(slice_filter = slice_hrn, \
+ sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \
slice_filter_type = 'slice_hrn')
- logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
- if not sfa_slice:
+
+ if not sfa_slice_list:
+ return 1
+
+ #Delete all in the slice
+ for sfa_slice in sfa_slice_list:
+
+
+ logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
+ slices = SlabSlices(self)
+ # determine if this is a peer slice
+
+ peer = slices.get_peer(slice_hrn)
+ #TODO delete_sliver SA : UnBindObjectFromPeer should be
+ #used when there is another
+ #senslab testbed, which is not the case 14/08/12 .
+
+ logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
+ try:
+ if peer:
+ self.UnBindObjectFromPeer('slice', \
+ sfa_slice['record_id_slice'], peer,None)
+ self.DeleteSliceFromNodes(sfa_slice)
+ finally:
+ if peer:
+ self.BindObjectToPeer('slice', sfa_slice['record_id_slice'], \
+ peer, sfa_slice['peer_slice_id'])
return 1
-
- slices = SlabSlices(self)
- # determine if this is a peer slice
-
- peer = slices.get_peer(slice_hrn)
- try:
- if peer:
- self.UnBindObjectFromPeer('slice', \
- sfa_slice['record_id_slice'], peer)
- self.DeleteSliceFromNodes(sfa_slice)
- finally:
- if peer:
- self.BindObjectToPeer('slice', sfa_slice['slice_id'], \
- peer, sfa_slice['peer_slice_id'])
- return 1
def AddSlice(self, slice_record):
#return slices
# get data from db
- logger.debug("SLABDRIVER.PY \tlist_slices")
- slices = self.GetSlices()
- slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
- for slab_slice in slices]
+
+ slices = self.GetSlices()
+ logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
+ slice_hrns = [slab_slice['slice_hrn'] for slab_slice in slices]
+ #slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
+ #for slab_slice in slices]
slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
for slice_hrn in slice_hrns]
-
+
# cache the result
#if self.cache:
#logger.debug ("SlabDriver.list_slices stores value in cache")
return slice_urns
- #No site or node register supported
+
def register (self, sfa_record, hrn, pub_key):
- record_type = sfa_record['type']
- slab_record = self.sfa_fields_to_slab_fields(record_type, hrn, \
- sfa_record)
-
-
- if record_type == 'slice':
- acceptable_fields = ['url', 'instantiation', 'name', 'description']
- for key in slab_record.keys():
- if key not in acceptable_fields:
- slab_record.pop(key)
- logger.debug("SLABDRIVER.PY register")
- slices = self.GetSlices(slice_filter =slab_record['hrn'], \
- slice_filter_type = 'slice_hrn')
- if not slices:
- pointer = self.AddSlice(slab_record)
- else:
- pointer = slices[0]['slice_id']
-
- elif record_type == 'user':
- persons = self.GetPersons([sfa_record])
- #persons = self.GetPersons([sfa_record['hrn']])
- if not persons:
- pointer = self.AddPerson(dict(sfa_record))
- #add in LDAP
- else:
- pointer = persons[0]['person_id']
-
- #Does this make sense to senslab ?
- #if 'enabled' in sfa_record and sfa_record['enabled']:
- #self.UpdatePerson(pointer, \
- #{'enabled': sfa_record['enabled']})
-
- #TODO register Change this AddPersonToSite stuff 05/07/2012 SA
- # add this person to the site only if
- # she is being added for the first
- # time by sfa and doesnt already exist in plc
- if not persons or not persons[0]['site_ids']:
- login_base = get_leaf(sfa_record['authority'])
- self.AddPersonToSite(pointer, login_base)
-
- # What roles should this user have?
- #TODO : DElete this AddRoleToPerson 04/07/2012 SA
- #Function prototype is :
- #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email)
- #what's the pointer doing here?
- self.AddRoleToPerson('user', pointer)
- # Add the user's key
- if pub_key:
- self.AddPersonKey(pointer, {'key_type' : 'ssh', \
- 'key' : pub_key})
-
- #No node adding outside OAR
-
- return pointer
+ """
+ Adding new user, slice, node or site should not be handled
+ by SFA.
+
+ Adding nodes = OAR
+ Adding users = LDAP Senslab
+ Adding slice = Import from LDAP users
+ Adding site = OAR
+ """
+ return -1
#No site or node record update allowed
def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
def remove (self, sfa_record):
sfa_record_type = sfa_record['type']
hrn = sfa_record['hrn']
- record_id = sfa_record['record_id']
if sfa_record_type == 'user':
#get user from senslab ldap
elif sfa_record_type == 'slice':
if self.GetSlices(slice_filter = hrn, \
slice_filter_type = 'slice_hrn'):
- self.DeleteSlice(sfa_record_type)
+ self.DeleteSlice(sfa_record)
#elif type == 'authority':
#if self.GetSites(pointer):
logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
%(records_list))
- except:
+ except KeyError:
pass
return_records = records_list
#TODO : Handling OR request in make_ldap_filters_from_records
#instead of the for loop
#over the records' list
- def GetPersons(self, person_filter=None, return_fields_list=None):
+ def GetPersons(self, person_filter=None):
"""
person_filter should be a list of dictionnaries when not set to None.
Returns a list of users whose accounts are enabled found in ldap.
reqdict['method'] = "delete"
reqdict['strval'] = str(job_id)
+
answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
reqdict,username)
- logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \
- %(job_id,answer, username))
+ logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
+ username %s" %(job_id,answer, username))
return answer
#assigned_n = ['node', 'node_uri']
req = "GET_jobs_id_resources"
- node_list_k = 'reserved_resources'
+
#Get job resources list from OAR
node_id_list = self.oar.parser.SendRequest(req, job_id, username)
hostname_list = \
self.__get_hostnames_from_oar_node_ids(node_id_list)
- #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
- #node_list_k)
+
#Replaces the previous entry "assigned_network_address" /
#"reserved_resources"
#with "node_ids"
logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
oar_id_node_dict %s" %(oar_id_node_dict))
- hostname_list = []
+
hostname_dict_list = []
for resource_id in resource_id_list:
- hostname_dict_list.append({'hostname' : \
- oar_id_node_dict[resource_id]['hostname'],
- 'site_id' : oar_id_node_dict[resource_id]['site']})
-
+ #Because jobs requested "asap" do not have defined resources
+ if resource_id is not "Undefined":
+ hostname_dict_list.append({'hostname' : \
+ oar_id_node_dict[resource_id]['hostname'],
+ 'site_id' : oar_id_node_dict[resource_id]['site']})
+
#hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
return hostname_dict_list
- def GetReservedNodes(self):
+ def GetReservedNodes(self,username = None):
#Get the nodes in use and the reserved nodes
reservation_dict_list = \
- self.oar.parser.SendRequest("GET_reserved_nodes")
+ self.oar.parser.SendRequest("GET_reserved_nodes", username = username)
for resa in reservation_dict_list:
return return_site_list
- #warning return_fields_list paramr emoved (Not used)
+
+
+
def GetSlices(self, slice_filter = None, slice_filter_type = None):
#def GetSlices(self, slice_filter = None, slice_filter_type = None, \
#return_fields_list = None):
specified.
"""
+ login = None
return_slice_list = []
slicerec = {}
slicerec_dict = {}
authorized_filter_types_list = ['slice_hrn', 'record_id_user']
- logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
- %(authorized_filter_types_list))
+ slicerec_dictlist = []
+
+
if slice_filter_type in authorized_filter_types_list:
- if slice_filter_type == 'slice_hrn':
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
-
- if slice_filter_type == 'record_id_user':
- slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
+
+
+ def __get_slice_records(slice_filter = None, slice_filter_type = None):
+
+ login = None
+ #Get list of slices based on the slice hrn
+ if slice_filter_type == 'slice_hrn':
+
+ login = slice_filter.split(".")[1].split("_")[0]
+
+ #DO NOT USE RegSlice - reg_researchers to get the hrn of the user
+ #otherwise will mess up the RegRecord in Resolve, don't know
+ #why - SA 08/08/2012
+
+ #Only one entry for one user = one slice in slice_senslab table
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
+
+ #Get slice based on user id
+ if slice_filter_type == 'record_id_user':
+ slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
+
+ if slicerec is None:
+ return login, []
+ else:
+ fixed_slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
+
+ if login is None :
+ login = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+ return login, fixed_slicerec_dict
- if slicerec:
- #warning pylint OK
- slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
- logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
- %(slicerec_dict))
- #Get login
- login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
- logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
- slice record %s" \
- %(login, slicerec_dict))
- if slicerec_dict['oar_job_id'] is not -1:
- #Check with OAR the status of the job if a job id is in
- #the slice record
- rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
- username = login)
-
- if rslt :
- slicerec_dict.update(rslt)
- slicerec_dict.update({'hrn':\
- str(slicerec_dict['slice_hrn'])})
- #If GetJobsResources is empty, this means the job is
- #now in the 'Terminated' state
- #Update the slice record
- else :
- self.db.update_job(slice_filter, job_id = -1)
- slicerec_dict['oar_job_id'] = -1
- slicerec_dict.\
- update({'hrn':str(slicerec_dict['slice_hrn'])})
- try:
- slicerec_dict['node_ids'] = slicerec_dict['node_list']
- except KeyError:
- pass
+
+
+ login, fixed_slicerec_dict = __get_slice_records(slice_filter, slice_filter_type)
+ logger.debug(" SLABDRIVER \tGetSlices login %s \
+ slice record %s" \
+ %(login, fixed_slicerec_dict))
+
+
+
+ #One slice can have multiple jobs
+
+ leases_list = self.GetReservedNodes(username = login)
+ #If no job is running or no job scheduled
+ if leases_list == [] :
+ return [fixed_slicerec_dict]
+
+ #Several jobs for one slice
+ for lease in leases_list :
+ slicerec_dict = {}
+
- logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\
- %(slicerec_dict))
-
- return slicerec_dict
+ #Check with OAR the status of the job if a job id is in
+ #the slice record
+
+
+
+ slicerec_dict['oar_job_id'] = lease['lease_id']
+ slicerec_dict.update({'node_ids':lease['reserved_nodes']})
+ slicerec_dict.update(fixed_slicerec_dict)
+ slicerec_dict.update({'hrn':\
+ str(fixed_slicerec_dict['slice_hrn'])})
+
+
+ slicerec_dictlist.append(slicerec_dict)
+ logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s slicerec_dictlist %s" %(slicerec_dict, slicerec_dictlist))
+
+ logger.debug("SLABDRIVER.PY \tGetSlices RETURN slicerec_dictlist %s"\
+ %(slicerec_dictlist))
+
+ return slicerec_dictlist
else:
- return_slice_list = slab_dbsession.query(SliceSenslab).all()
-
- logger.debug("SLABDRIVER.PY \tGetSlices slices %s \
+
+ slice_list = slab_dbsession.query(SliceSenslab).all()
+ leases_list = self.GetReservedNodes()
+
+
+ slicerec_dictlist = []
+ return_slice_list = []
+ for record in slice_list:
+ return_slice_list.append(record.dump_sqlalchemyobj_to_dict())
+
+ for fixed_slicerec_dict in return_slice_list:
+ slicerec_dict = {}
+ owner = fixed_slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
+ for lease in leases_list:
+ if owner == lease['user']:
+ slicerec_dict['oar_job_id'] = lease['lease_id']
+ slicerec_dict.update({'node_ids':lease['reserved_nodes']})
+ slicerec_dict.update(fixed_slicerec_dict)
+ slicerec_dict.update({'hrn':\
+ str(fixed_slicerec_dict['slice_hrn'])})
+ slicerec_dictlist.append(slicerec_dict)
+
+ logger.debug("SLABDRIVER.PY \tGetSlices RETURN slices %s \
slice_filter %s " %(return_slice_list, slice_filter))
#if return_fields_list:
#return_slice_list = parse_filter(sliceslist, \
#slice_filter,'slice', return_fields_list)
- return return_slice_list
-
-
-
+ return slicerec_dictlist
def testbed_name (self): return self.hrn
def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
- def convert_ints(tmpdict, int_fields):
- for field in int_fields:
- if field in tmpdict:
- tmpdict[field] = int(tmpdict[field])
slab_record = {}
#for field in record:
else:
return None
-
- def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
- """ Creates the structure needed for a correct POST on OAR.
- Makes the timestamp transformation into the appropriate format.
- Sends the POST request to create the job with the resources in
- added_nodes.
- """
- site_list = []
- nodeid_list = []
- resource = ""
- reqdict = {}
- slice_name = slice_dict['name']
- try:
- slot = slice_dict['timeslot']
- logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
- slot %s" %(slot))
- except KeyError:
- #Running on default parameters
- #XP immediate , 10 mins
- slot = { 'date':None, 'start_time':None,
- 'timezone':None, 'duration':None }#10 min
-
- reqdict['workdir'] = '/tmp'
- reqdict['resource'] = "{network_address in ("
-
- for node in added_nodes:
- logger.debug("OARrestapi \tLaunchExperimentOnOAR \
- node %s" %(node))
-
- #Get the ID of the node : remove the root auth and put
- # the site in a separate list.
- # NT: it's not clear for me if the nodenames will have the senslab
- #prefix so lets take the last part only, for now.
-
- # Again here it's not clear if nodes will be prefixed with <site>_,
- #lets split and tanke the last part for now.
- #s=lastpart.split("_")
-
- nodeid = node
- reqdict['resource'] += "'" + nodeid + "', "
- nodeid_list.append(nodeid)
-
- custom_length = len(reqdict['resource'])- 2
- reqdict['resource'] = reqdict['resource'][0:custom_length] + \
- ")}/nodes=" + str(len(nodeid_list))
-
- def __process_walltime(duration=None):
- """ Calculates the walltime in seconds from the duration in H:M:S
- specified in the RSpec.
-
- """
- if duration:
- walltime = duration.split(":")
- # Fixing the walltime by adding a few delays. First put the walltime
- # in seconds oarAdditionalDelay = 20; additional delay for
- # /bin/sleep command to
- # take in account prologue and epilogue scripts execution
- # int walltimeAdditionalDelay = 120; additional delay
-
- desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
- int(walltime[2])
- total_walltime = desired_walltime + 140 #+2 min 20
- sleep_walltime = desired_walltime + 20 #+20 sec
- logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
- total_walltime %s sleep_walltime %s "\
- %(desired_walltime, total_walltime, \
- sleep_walltime))
- #Put the walltime back in str form
- #First get the hours
- walltime[0] = str(total_walltime / 3600)
- total_walltime = total_walltime - 3600 * int(walltime[0])
- #Get the remaining minutes
- walltime[1] = str(total_walltime / 60)
- total_walltime = total_walltime - 60 * int(walltime[1])
- #Get the seconds
- walltime[2] = str(total_walltime)
- logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
- %(walltime))
- else:
- #automatically set 10min +2 min 20
- walltime[0] = '0'
- walltime[1] = '12'
- walltime[2] = '20'
- sleep_walltime = '620'
-
- return walltime, sleep_walltime
-
- #if slot['duration']:
- walltime, sleep_walltime = __process_walltime(duration = \
- slot['duration'])
- #else:
- #walltime, sleep_walltime = self.__process_walltime(duration = None)
+
+
+
+ def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
+ lease_start_time, lease_duration, slice_user=None):
+ lease_dict = {}
+ lease_dict['lease_start_time'] = lease_start_time
+ lease_dict['lease_duration'] = lease_duration
+ lease_dict['added_nodes'] = added_nodes
+ lease_dict['slice_name'] = slice_name
+ lease_dict['slice_user'] = slice_user
+ lease_dict['grain'] = self.GetLeaseGranularity()
+ lease_dict['time_format'] = self.time_format
+
+ def __create_job_structure_request_for_OAR(lease_dict):
+ """ Creates the structure needed for a correct POST on OAR.
+ Makes the timestamp transformation into the appropriate format.
+ Sends the POST request to create the job with the resources in
+ added_nodes.
- reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
- ":" + str(walltime[1]) + ":" + str(walltime[2])
- reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
-
-
-
- #In case of a scheduled experiment (not immediate)
- #To run an XP immediately, don't specify date and time in RSpec
- #They will be set to None.
- server_timestamp, server_tz = self.GetTimezone()
- if slot['date'] and slot['start_time']:
- if slot['timezone'] is '' or slot['timezone'] is None:
- #assume it is server timezone
- from_zone = tz.gettz(server_tz)
- logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
- not specified server_tz %s from_zone %s" \
- %(server_tz, from_zone))
- else:
- #Get zone of the user from the reservation time given
- #in the rspec
- from_zone = tz.gettz(slot['timezone'])
-
- date = str(slot['date']) + " " + str(slot['start_time'])
- user_datetime = datetime.strptime(date, self.time_format)
- user_datetime = user_datetime.replace(tzinfo = from_zone)
-
- #Convert to server zone
+ """
- to_zone = tz.gettz(server_tz)
- reservation_date = user_datetime.astimezone(to_zone)
- #Readable time accpeted by OAR
- reqdict['reservation'] = reservation_date.strftime(self.time_format)
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
- reqdict['reservation'] %s " %(reqdict['reservation']))
+ nodeid_list = []
+ reqdict = {}
+
- else:
- # Immediate XP. Not need to add special parameters.
- # normally not used in SFA
-
- pass
-
+ reqdict['workdir'] = '/tmp'
+ reqdict['resource'] = "{network_address in ("
+
+ for node in lease_dict['added_nodes']:
+ logger.debug("\r\n \r\n OARrestapi \t __create_job_structure_request_for_OAR \
+ node %s" %(node))
+
+ # Get the ID of the node
+ nodeid = node
+ reqdict['resource'] += "'" + nodeid + "', "
+ nodeid_list.append(nodeid)
+
+ custom_length = len(reqdict['resource'])- 2
+ reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+ ")}/nodes=" + str(len(nodeid_list))
+
+ def __process_walltime(duration):
+ """ Calculates the walltime in seconds from the duration in H:M:S
+ specified in the RSpec.
+
+ """
+ if duration:
+ # Fixing the walltime by adding a few delays.
+ # First put the walltime in seconds oarAdditionalDelay = 20;
+ # additional delay for /bin/sleep command to
+ # take in account prologue and epilogue scripts execution
+ # int walltimeAdditionalDelay = 120; additional delay
+ desired_walltime = duration
+ total_walltime = desired_walltime + 140#+2 min 20
+ sleep_walltime = desired_walltime + 20 #+20 sec
+ walltime = []
+ #Put the walltime back in str form
+ #First get the hours
+ walltime.append(str(total_walltime / 3600))
+ total_walltime = total_walltime - 3600 * int(walltime[0])
+ #Get the remaining minutes
+ walltime.append(str(total_walltime / 60))
+ total_walltime = total_walltime - 60 * int(walltime[1])
+ #Get the seconds
+ walltime.append(str(total_walltime))
+
+ else:
+ logger.log_exc(" __process_walltime duration null")
+
+ return walltime, sleep_walltime
+
- reqdict['type'] = "deploy"
- reqdict['directory'] = ""
- reqdict['name'] = "TestSandrine"
-
-
- # first step : start the OAR job and update the job
+ walltime, sleep_walltime = \
+ __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
+
+
+ reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
+ ":" + str(walltime[1]) + ":" + str(walltime[2])
+ reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+
+ #In case of a scheduled experiment (not immediate)
+ #To run an XP immediately, don't specify date and time in RSpec
+ #They will be set to None.
+ if lease_dict['lease_start_time'] is not '0':
+ #Readable time accepted by OAR
+ start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
+ strftime(lease_dict['time_format'])
+ reqdict['reservation'] = start_time
+ #If there is not start time, Immediate XP. No need to add special
+ # OAR parameters
+
+
+ reqdict['type'] = "deploy"
+ reqdict['directory'] = ""
+ reqdict['name'] = "SFA_" + lease_dict['slice_user']
+
+ return reqdict
+
+
+ #Create the request for OAR
+ reqdict = __create_job_structure_request_for_OAR(lease_dict)
+ # first step : start the OAR job and update the job
logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
- \r\n site_list %s" %(reqdict, site_list))
+ \r\n " %(reqdict))
answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
reqdict, slice_user)
Impossible to create job %s " %(answer))
return
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
- added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
- self.db.update_job( slice_name, jobid, added_nodes)
-
- # second step : configure the experiment
- # we need to store the nodes in a yaml (well...) file like this :
- # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
- job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
- job_file.write('[')
- job_file.write(str(added_nodes[0].strip('node')))
- for node in added_nodes[1:len(added_nodes)] :
- job_file.write(', '+ node.strip('node'))
- job_file.write(']')
- job_file.close()
-
- # third step : call the senslab-experiment wrapper
- #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
- # "+str(jobid)+" "+slice_user
- javacmdline = "/usr/bin/java"
- jarname = \
- "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
- #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
- #str(jobid), slice_user])
- output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
- slice_user],stdout=subprocess.PIPE).communicate()[0]
-
- logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
- %(output))
- return
-
-
- #Delete the jobs and updates the job id in the senslab table
- #to set it to -1
- #Does not clear the node list
+ def __configure_experiment(jobid, added_nodes):
+ # second step : configure the experiment
+ # we need to store the nodes in a yaml (well...) file like this :
+ # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
+ job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
+ job_file.write('[')
+ job_file.write(str(added_nodes[0].strip('node')))
+ for node in added_nodes[1:len(added_nodes)] :
+ job_file.write(', '+ node.strip('node'))
+ job_file.write(']')
+ job_file.close()
+ return
+
+ def __launch_senslab_experiment(jobid):
+ # third step : call the senslab-experiment wrapper
+ #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
+ # "+str(jobid)+" "+slice_user
+ javacmdline = "/usr/bin/java"
+ jarname = \
+ "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
+ #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
+ #str(jobid), slice_user])
+ output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
+ slice_user],stdout=subprocess.PIPE).communicate()[0]
+
+ logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
+ %(output))
+ return
+
+
+
+ if jobid :
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
+ added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
+
+
+ __configure_experiment(jobid, added_nodes)
+ __launch_senslab_experiment(jobid)
+
+ return
+
+ def AddLeases(self, hostname_list, slice_record, lease_start_time, lease_duration):
+ logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
+ slice_record %s lease_start_time %s lease_duration %s "\
+ %( hostname_list, slice_record , lease_start_time, \
+ lease_duration))
+
+ tmp = slice_record['PI'][0].split(".")
+ username = tmp[(len(tmp)-1)]
+ self.LaunchExperimentOnOAR(hostname_list, slice_record['name'], lease_start_time, lease_duration, username)
+ start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
+ logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
+
+ return
+
+
+ #Delete the jobs from job_senslab table
def DeleteSliceFromNodes(self, slice_record):
- # Get user information
-
+
self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
- self.db.update_job(slice_record['hrn'], job_id = -1)
return
grain = 60
return grain
- def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
+ def GetLeases(self, lease_filter_dict=None):
unfiltered_reservation_list = self.GetReservedNodes()
+
+ ##Synchronize slice_table of sfa senslab db
+ #self.synchronize_oar_and_slice_table(unfiltered_reservation_list)
+
reservation_list = []
#Find the slice associated with this user senslab ldap uid
logger.debug(" SLABDRIVER.PY \tGetLeases ")
+ #Create user dict first to avoir looking several times for
+ #the same user in LDAP SA 27/07/12
+ resa_user_dict = {}
for resa in unfiltered_reservation_list:
- ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
- ldap_info = ldap_info[0][1]
-
- user = dbsession.query(RegUser).filter_by(email = \
- ldap_info['mail'][0]).first()
- #Separated in case user not in database : record_id not defined SA 17/07//12
- query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
- if query_slice_info:
- slice_info = query_slice_info.first()
-
+ logger.debug("SLABDRIVER \tGetLeases USER %s"\
+ %(resa['user']))
+ if resa['user'] not in resa_user_dict:
+ logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
+ ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ ldap_info = ldap_info[0][1]
+ user = dbsession.query(RegUser).filter_by(email = \
+ ldap_info['mail'][0]).first()
+ #Separated in case user not in database : record_id not defined SA 17/07//12
+ query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id)
+ if query_slice_info:
+ slice_info = query_slice_info.first()
+ else:
+ slice_info = None
+
+ resa_user_dict[resa['user']] = {}
+ resa_user_dict[resa['user']]['ldap_info'] = user
+ resa_user_dict[resa['user']]['slice_info'] = slice_info
+
+ logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
+ %(resa_user_dict))
+ for resa in unfiltered_reservation_list:
+
+
+ #Put the slice_urn
+ resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info'].slice_hrn
+ resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
#Put the slice_urn
- resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
+ #resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
resa['component_id_list'] = []
#Transform the hostnames into urns (component ids)
for node in resa['reserved_nodes']:
- resa['component_id_list'].append(hostname_to_urn(self.hrn, \
- self.root_auth, node['hostname']))
-
+ #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
+ #self.root_auth, node['hostname']))
+ slab_xrn = slab_xrn_object(self.root_auth, node['hostname'])
+ resa['component_id_list'].append(slab_xrn.urn)
#Filter the reservation list if necessary
#Returns all the leases associated with a given slice
logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
%(lease_filter_dict))
for resa in unfiltered_reservation_list:
- if lease_filter_dict['name'] == resa['slice_id']:
+ if lease_filter_dict['name'] == resa['slice_hrn']:
reservation_list.append(resa)
else:
reservation_list = unfiltered_reservation_list
#information is in the Senslab's DB.
if str(record['type']) == 'slice':
#Get slab slice record.
- recslice = self.GetSlices(slice_filter = \
+ recslice_list = self.GetSlices(slice_filter = \
str(record['hrn']),\
slice_filter_type = 'slice_hrn')
+
recuser = dbsession.query(RegRecord).filter_by(record_id = \
- recslice['record_id_user']).first()
- logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
- rec %s \r\n \r\n" %(recslice))
+ recslice_list[0]['record_id_user']).first()
+ logger.debug("SLABDRIVER \tfill_record_info TYPE SLICE RECUSER %s " %(recuser))
record.update({'PI':[recuser.hrn],
- 'researcher': [recuser.hrn],
- 'name':record['hrn'],
- 'oar_job_id':recslice['oar_job_id'],
- 'node_ids': [],
- 'person_ids':[recslice['record_id_user']],
- 'geni_urn':'', #For client_helper.py compatibility
- 'keys':'', #For client_helper.py compatibility
- 'key_ids':''}) #For client_helper.py compatibility
+ 'researcher': [recuser.hrn],
+ 'name':record['hrn'],
+ 'oar_job_id':[],
+ 'node_ids': [],
+ 'person_ids':[recslice_list[0]['record_id_user']],
+ 'geni_urn':'', #For client_helper.py compatibility
+ 'keys':'', #For client_helper.py compatibility
+ 'key_ids':''}) #For client_helper.py compatibility
- elif str(record['type']) == 'user':
+ try:
+ for rec in recslice_list:
+ record['oar_job_id'].append(rec['oar_job_id'])
+ except KeyError:
+ pass
+
+ logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
+ recslice_list %s \r\n \t RECORD %s \r\n \r\n" %(recslice_list,record))
+ if str(record['type']) == 'user':
#The record is a SFA user record.
#Get the information about his slice from Senslab's DB
#and add it to the user record.
- recslice = self.GetSlices(\
+ recslice_list = self.GetSlices(\
slice_filter = record['record_id'],\
slice_filter_type = 'record_id_user')
- logger.debug( "SLABDRIVER.PY \t fill_record_info user \
- rec %s \r\n \r\n" %(recslice))
+ logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
+ recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
#Append slice record in records list,
#therefore fetches user and slice info again(one more loop)
#Will update PIs and researcher for the slice
recuser = dbsession.query(RegRecord).filter_by(record_id = \
- recslice['record_id_user']).first()
+ recslice_list[0]['record_id_user']).first()
+ logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
+ recuser %s \r\n \r\n" %(recuser))
+ recslice = {}
+ recslice = recslice_list[0]
recslice.update({'PI':[recuser.hrn],
- 'researcher': [recuser.hrn],
- 'name':record['hrn'],
- 'oar_job_id':recslice['oar_job_id'],
- 'node_ids': [],
- 'person_ids':[recslice['record_id_user']]})
+ 'researcher': [recuser.hrn],
+ 'name':record['hrn'],
+ 'node_ids': [],
+ 'oar_job_id': [],
+ 'person_ids':[recslice_list[0]['record_id_user']]})
+ try:
+ for rec in recslice_list:
+ recslice['oar_job_id'].append(rec['oar_job_id'])
+ except KeyError:
+ pass
+
+ recslice.update({'type':'slice', \
+ 'hrn':recslice_list[0]['slice_hrn']})
+
#GetPersons takes [] as filters
#user_slab = self.GetPersons([{'hrn':recuser.hrn}])
user_slab = self.GetPersons([record])
- recslice.update({'type':'slice', \
- 'hrn':recslice['slice_hrn']})
+
record.update(user_slab[0])
#For client_helper.py compatibility
record.update( { 'geni_urn':'',
logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
INFO TO USER records %s" %(record_list))
-
+ logger.debug("SLABDRIVER.PY \tfill_record_info END \
+ #record %s \r\n \r\n " %(record))
except TypeError, error:
logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
%(error))
-
+ #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
+
return
#self.fill_record_slab_info(records)
-
-
+
+
"""
self.DeleteSliceFromNodes(slice_record)
- self.db.update_job(slice_record['hrn'], job_id = -1, nodes = [])
logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
return
#TODO AddPerson 04/07/2012 SA
- def AddPerson(self, auth, person_fields=None):
- """Adds a new account. Any fields specified in person_fields are used,
+ #def AddPerson(self, auth, person_fields=None):
+ def AddPerson(self, record):#TODO fixing 28/08//2012 SA
+ """Adds a new account. Any fields specified in records are used,
otherwise defaults are used.
Accounts are disabled by default. To enable an account,
use UpdatePerson().
FROM PLC API DOC
"""
- logger.warning("SLABDRIVER AddPerson EMPTY - DO NOTHING \r\n ")
+ ret = self.ldap.LdapAddUser(record)
+ logger.warning("SLABDRIVER AddPerson return code %s \r\n ", ret)
return
#TODO AddPersonToSite 04/07/2012 SA
"""
logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
return
+
+ def DeleteLeases(self, leases_id_list, slice_hrn ):
+ for job_id in leases_id_list:
+ self.DeleteJobs(job_id, slice_hrn)
+
+ logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
+ \r\n " %(leases_id_list, slice_hrn))
+ return