"""
Implements what a driver should provide for SFA to work.
"""
+from datetime import datetime
from sfa.util.faults import SliverDoesNotExist, Forbidden
from sfa.util.sfalogging import logger
from sfa.rspecs.version_manager import VersionManager
from sfa.rspecs.rspec import RSpec
-from sfa.iotlab.iotlabxrn import IotlabXrn, xrn_object
+from sfa.iotlab.iotlabxrn import IotlabXrn, xrn_object, xrn_to_hostname
from sfa.util.xrn import Xrn, hrn_to_urn, get_authority, urn_to_hrn
from sfa.iotlab.iotlabaggregate import IotlabAggregate
-from sfa.iotlab.iotlabxrn import xrn_to_hostname
+
from sfa.iotlab.iotlabslices import IotlabSlices
from sfa.trust.credential import Credential
from sfa.iotlab.iotlabshell import IotlabShell
from sqlalchemy.orm import joinedload
+from sfa.iotlab.iotlabpostgres import LeaseTableXP
class IotlabDriver(Driver):
""" Iotlab Driver class inherited from Driver generic class.
"""
Sets the iotlab SFA config parameters,
- instanciates the testbed api and the iotlab database.
+ instanciates the testbed api .
- :param config: iotlab SFA configuration object
- :type config: Config object
+ :param api: SfaApi configuration object. Holds reference to the
+ database.
+ :type api: SfaApi object
"""
Driver.__init__(self, api)
existing_records = {}
existing_hrns_by_types = {}
logger.debug("IOTLAB_API \tGetPeers peer_filter %s " % (peer_filter))
- all_records = self.api.dbsession().query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
+ query = self.api.dbsession().query(RegRecord)
+ all_records = query.filter(RegRecord.type.like('%authority%')).all()
for record in all_records:
existing_records[(record.hrn, record.type)] = record
" %(user_dict))
hrn = user_dict['hrn']
person_urn = hrn_to_urn(hrn, 'user')
- pubkey = user_dict['pkey']
try:
+ pubkey = user_dict['pkey']
pkey = convert_public_key(pubkey)
except TypeError:
#key not good. create another pkey
- logger.warn('__add_person_to_db: unable to convert public \
+ logger.warn('__add_person_to_db: no public key or unable to convert public \
key for %s' %(hrn ))
pkey = Keypair(create=True)
user_record = RegUser(hrn=hrn , pointer= '-1', \
authority=get_authority(hrn), \
email=user_dict['email'], gid = person_gid)
- user_record.reg_keys = [RegKey(user_dict['pkey'])]
+ #user_record.reg_keys = [RegKey(user_dict['pkey'])]
user_record.just_created()
self.api.dbsession().add (user_record)
self.api.dbsession().commit()
"""
Get the slice record based on the slice hrn. Fetch the record of the
user associated with the slice by using joinedload based on the
- reg_researcher relationship.
+ reg_researchers relationship.
:param slice_filter: the slice hrn we are looking for
:type slice_filter: string
#Only one entry for one user = one slice in testbed_xp table
#slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
+
raw_slicerec = self.api.dbsession().query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn=slice_filter).first()
#raw_slicerec = self.api.dbsession().query(RegRecord).filter_by(hrn = slice_filter).first()
if raw_slicerec:
- #load_reg_researcher
+ #load_reg_researchers
#raw_slicerec.reg_researchers
raw_slicerec = raw_slicerec.__dict__
logger.debug(" IOTLAB_API \t _sql_get_slice_info slice_filter %s \
#jobs associated to this slice
leases_list = []
- leases_list = self.testbed_shell.GetLeases(login=login)
+ leases_list = self.GetLeases(login=login)
#If no job is running or no job scheduled
#return only the slice record
if leases_list == [] and fixed_slicerec_dict:
return_slicerec_dictlist.append(slicerec_dict)
- logger.debug("IOTLAB_API.PY \tGetSlices \
- OHOHOHOH %s" %(return_slicerec_dictlist))
logger.debug("IOTLAB_API.PY \tGetSlices \
slicerec_dict %s return_slicerec_dictlist %s \
return return_slicerec_dictlist
+ def AddLeases(self, hostname_list, slice_record,
+ lease_start_time, lease_duration):
+
+ """Creates a job in OAR corresponding to the information provided
+ as parameters. Adds the job id and the slice hrn in the iotlab
+ database so that we are able to know which slice has which nodes.
+
+ :param hostname_list: list of nodes' OAR hostnames.
+ :param slice_record: sfa slice record, must contain login and hrn.
+ :param lease_start_time: starting time , unix timestamp format
+ :param lease_duration: duration in minutes
+
+ :type hostname_list: list
+ :type slice_record: dict
+ :type lease_start_time: integer
+ :type lease_duration: integer
+ :returns: job_id, can be None if the job request failed.
+
+ """
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases hostname_list %s \
+ slice_record %s lease_start_time %s lease_duration %s "\
+ %( hostname_list, slice_record , lease_start_time, \
+ lease_duration))
+
+ #tmp = slice_record['reg-researchers'][0].split(".")
+ username = slice_record['login']
+ #username = tmp[(len(tmp)-1)]
+ job_id = self.testbed_shell.LaunchExperimentOnOAR(hostname_list, \
+ slice_record['hrn'], \
+ lease_start_time, lease_duration, \
+ username)
+ if job_id is not None:
+ start_time = \
+ datetime.fromtimestamp(int(lease_start_time)).\
+ strftime(self.testbed_shell.time_format)
+ end_time = lease_start_time + lease_duration
+
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases TURN ON LOGGING SQL \
+ %s %s %s "%(slice_record['hrn'], job_id, end_time))
+
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases %s %s %s " \
+ %(type(slice_record['hrn']), type(job_id), type(end_time)))
+
+ iotlab_ex_row = LeaseTableXP(slice_hrn = slice_record['hrn'],
+ experiment_id=job_id,
+ end_time= end_time)
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases iotlab_ex_row %s" \
+ %(iotlab_ex_row))
+ self.api.dbsession().add(iotlab_ex_row)
+ self.api.dbsession().commit()
+
+ logger.debug("IOTLAB_API \t AddLeases hostname_list start_time %s "
+ %(start_time))
+
+ return job_id
+
+ def GetLeases(self, lease_filter_dict=None, login=None):
+ """
+
+ Get the list of leases from OAR with complete information
+ about which slice owns which jobs and nodes.
+ Two purposes:
+ -Fetch all the jobs from OAR (running, waiting..)
+ complete the reservation information with slice hrn
+ found in lease_table . If not available in the table,
+ assume it is a iotlab slice.
+ -Updates the iotlab table, deleting jobs when necessary.
+
+ :returns: reservation_list, list of dictionaries with 'lease_id',
+ 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list',
+ 'slice_hrn', 'resource_ids', 't_from', 't_until'
+ :rtype: list
+
+ """
+
+ unfiltered_reservation_list = self.testbed_shell.GetReservedNodes(login)
+
+ reservation_list = []
+ #Find the slice associated with this user iotlab ldap uid
+ logger.debug(" IOTLAB_API.PY \tGetLeases login %s\
+ unfiltered_reservation_list %s "
+ % (login, unfiltered_reservation_list))
+ #Create user dict first to avoid looking several times for
+ #the same user in LDAP SA 27/07/12
+ job_oar_list = []
+ jobs_psql_query = self.api.dbsession().query(LeaseTableXP).all()
+ jobs_psql_dict = dict([(row.experiment_id, row.__dict__)
+ for row in jobs_psql_query])
+ #jobs_psql_dict = jobs_psql_dict)
+ logger.debug("IOTLAB_API \tGetLeases jobs_psql_dict %s"
+ % (jobs_psql_dict))
+ jobs_psql_id_list = [row.experiment_id for row in jobs_psql_query]
+
+ for resa in unfiltered_reservation_list:
+ logger.debug("IOTLAB_API \tGetLeases USER %s"
+ % (resa['user']))
+ #Construct list of jobs (runing, waiting..) in oar
+ job_oar_list.append(resa['lease_id'])
+ #If there is information on the job in IOTLAB DB ]
+ #(slice used and job id)
+ if resa['lease_id'] in jobs_psql_dict:
+ job_info = jobs_psql_dict[resa['lease_id']]
+ logger.debug("IOTLAB_API \tGetLeases job_info %s"
+ % (job_info))
+ resa['slice_hrn'] = job_info['slice_hrn']
+ resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
+
+ #otherwise, assume it is a iotlab slice:
+ else:
+ resa['slice_id'] = hrn_to_urn(self.testbed_shell.root_auth \
+ + '.' + resa['user'] + "_slice",
+ 'slice')
+ resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
+
+ resa['component_id_list'] = []
+ #Transform the hostnames into urns (component ids)
+ for node in resa['reserved_nodes']:
+
+ iotlab_xrn = xrn_object(self.testbed_shell.root_auth, node)
+ resa['component_id_list'].append(iotlab_xrn.urn)
+
+ if lease_filter_dict:
+ logger.debug("IOTLAB_API \tGetLeases \
+ \r\n leasefilter %s" % ( lease_filter_dict))
+
+ # filter_dict_functions = {
+ # 'slice_hrn' : IotlabShell.filter_lease_name,
+ # 't_from' : IotlabShell.filter_lease_start_time
+ # }
+ reservation_list = list(unfiltered_reservation_list)
+ for filter_type in lease_filter_dict:
+ logger.debug("IOTLAB_API \tGetLeases reservation_list %s" \
+ % (reservation_list))
+ reservation_list = self.testbed_shell.filter_lease(
+ reservation_list,filter_type,
+ lease_filter_dict[filter_type] )
+
+ # Filter the reservation list with a maximum timespan so that the
+ # leases and jobs running after this timestamp do not appear
+ # in the result leases.
+ # if 'start_time' in :
+ # if resa['start_time'] < lease_filter_dict['start_time']:
+ # reservation_list.append(resa)
+
+
+ # if 'name' in lease_filter_dict and \
+ # lease_filter_dict['name'] == resa['slice_hrn']:
+ # reservation_list.append(resa)
+
+
+ if lease_filter_dict is None:
+ reservation_list = unfiltered_reservation_list
+
+ self.update_experiments_in_lease_table(job_oar_list, jobs_psql_id_list)
+
+ logger.debug(" IOTLAB_API.PY \tGetLeases reservation_list %s"
+ % (reservation_list))
+ return reservation_list
+
+
+
+ def update_experiments_in_lease_table(self,
+ experiment_list_from_testbed, experiment_list_in_db):
+ """ Cleans the lease_table by deleting expired and cancelled jobs.
+
+ Compares the list of experiment ids given by the testbed with the
+ experiment ids that are already in the database, deletes the
+ experiments that are no longer in the testbed experiment id list.
+
+ :param experiment_list_from_testbed: list of experiment ids coming
+ from testbed
+ :type experiment_list_from_testbed: list
+ :param experiment_list_in_db: list of experiment ids from the sfa
+ additionnal database.
+ :type experiment_list_in_db: list
+
+ :returns: None
+ """
+ #Turn the list into a set
+ set_experiment_list_in_db = set(experiment_list_in_db)
+
+ kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db)
+ logger.debug("\r\n \t update_experiments_in_lease_table \
+ experiment_list_in_db %s \r\n \
+ experiment_list_from_testbed %s \
+ kept_experiments %s "
+ % (set_experiment_list_in_db,
+ experiment_list_from_testbed, kept_experiments))
+ deleted_experiments = set_experiment_list_in_db.difference(
+ kept_experiments)
+ deleted_experiments = list(deleted_experiments)
+ if len(deleted_experiments) > 0:
+ request = self.api.dbsession().query(LeaseTableXP)
+ request.filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch')
+ self.api.dbsession().commit()
+ return
+
+
def AddSlice(self, slice_record, user_record):
"""
sfa_record = RegSlice(hrn=slice_record['hrn'],
gid=slice_record['gid'],
- pointer=slice_record['slice_id'],
+ #pointer=slice_record['slice_id'],
authority=slice_record['authority'])
logger.debug("IOTLAB_API.PY AddSlice sfa_record %s user_record %s"
% (sfa_record, user_record))
sfa_record.just_created()
self.api.dbsession().add(sfa_record)
self.api.dbsession().commit()
- #Update the reg-researcher dependance table
+ #Update the reg-researchers dependency table
sfa_record.reg_researchers = [user_record]
self.api.dbsession().commit()
recslice.update(
{'PI': [recuser['hrn']],
'researcher': [recuser['hrn']],
- 'name': record['hrn'],
+ 'name': recuser['hrn'],
'node_ids': [],
'oar_job_id': [],
'person_ids': [recuser['record_id']]})
- def delete(self, slice_urns, options={}):
+ def delete(self, slice_urns, options=None):
"""
Deletes the lease associated with the slice hrn and the credentials
if the slice belongs to iotlab. Answer to DeleteSliver.
.. note:: creds are unused, and are not used either in the dummy driver
delete_sliver .
"""
+ if options is None: options={}
# collect sliver ids so we can update sliver allocation states after
# we remove the slivers.
aggregate = IotlabAggregate(self)
'geni_expires': datetime_to_string(utcparse(sliver['expires']))})
return geni_slivers
- # def list_resources (self, slice_urn, slice_hrn, creds, options):
- # """
-
- # List resources from the iotlab aggregate and returns a Rspec
- # advertisement with resources found when slice_urn and slice_hrn are
- # None (in case of resource discovery).
- # If a slice hrn and urn are provided, list experiment's slice
- # nodes in a rspec format. Answer to ListResources.
- # Caching unused.
-
- # :param slice_urn: urn of the slice
- # :param slice_hrn: name of the slice
- # :param creds: slice credenials
- # :type slice_urn: string
- # :type slice_hrn: string
- # :type creds: ? unused
- # :param options: options used when listing resources (list_leases, info,
- # geni_available)
- # :returns: rspec string in xml
- # :rtype: string
-
- # .. note:: creds are unused
- # """
-
- # #cached_requested = options.get('cached', True)
-
- # version_manager = VersionManager()
- # # get the rspec's return format from options
- # rspec_version = \
- # version_manager.get_version(options.get('geni_rspec_version'))
- # version_string = "rspec_%s" % (rspec_version)
-
- # #panos adding the info option to the caching key (can be improved)
- # if options.get('info'):
- # version_string = version_string + "_" + \
- # options.get('info', 'default')
-
- # # Adding the list_leases option to the caching key
- # if options.get('list_leases'):
- # version_string = version_string + "_" + \
- # options.get('list_leases', 'default')
-
- # # Adding geni_available to caching key
- # if options.get('geni_available'):
- # version_string = version_string + "_" + \
- # str(options.get('geni_available'))
-
- # # look in cache first
- # #if cached_requested and self.cache and not slice_hrn:
- # #rspec = self.cache.get(version_string)
- # #if rspec:
- # #logger.debug("IotlabDriver.ListResources: \
- # #returning cached advertisement")
- # #return rspec
-
- # #panos: passing user-defined options
- # aggregate = IotlabAggregate(self)
-
- # rspec = aggregate.get_rspec(slice_xrn=slice_urn,
- # version=rspec_version, options=options)
-
- # # cache the result
- # #if self.cache and not slice_hrn:
- # #logger.debug("Iotlab.ListResources: stores advertisement in cache")
- # #self.cache.add(version_string, rspec)
-
- # return rspec
+
def list_slices(self, creds, options):
TODO: needs review
.. warning:: SA 12/12/13 - Removed. should be done in iotlabimporter
- since users, keys and slice are managed by the LDAP.
+ since users, keys and slice are managed by the LDAP.
"""
# pointer = old_sfa_record['pointer']
'geni_ad_rspec_versions': ad_rspec_versions}
# first 2 args are None in case of resource discovery
- def list_resources (self, version=None, options={}):
+ def list_resources (self, version=None, options=None):
+ if options is None: options={}
aggregate = IotlabAggregate(self)
rspec = aggregate.list_resources(version=version, options=options)
return rspec
aggregate = IotlabAggregate(self)
return aggregate.describe(urns, version=version, options=options)
- def status (self, urns, options={}):
+ def status (self, urns, options=None):
+ if options is None: options={}
aggregate = IotlabAggregate(self)
desc = aggregate.describe(urns, version='GENI 3')
status = {'geni_urn': desc['geni_urn'],
return status
- def allocate (self, urn, rspec_string, expiration, options={}):
+ def allocate (self, urn, rspec_string, expiration, options=None):
+ if options is None: options={}
xrn = Xrn(urn)
aggregate = IotlabAggregate(self)
peer = slices.get_peer(xrn.get_hrn())
sfa_peer = slices.get_sfa_peer(xrn.get_hrn())
+ caller_hrn = options.get('actual_caller_hrn', [])
+ caller_xrn = Xrn(caller_hrn)
+ caller_urn = caller_xrn.get_urn()
- slice_record = None
- users = options.get('geni_users', [])
+ logger.debug("IOTLABDRIVER.PY :: Allocate caller = %s" % (caller_urn))
+ slice_record = {}
+ users = options.get('geni_users', [])
sfa_users = options.get('sfa_users', [])
+
if sfa_users:
- slice_record = sfa_users[0].get('slice_record', [])
+ user = None
+ # Looking for the user who actually called the Allocate function in the list of users of the slice
+ for u in sfa_users:
+ if 'urn' in u and u['urn'] == caller_urn:
+ user = u
+ logger.debug("user = %s" % u)
+ # If we find the user in the list we use it, else we take the 1st in the list as before
+ if user:
+ user_hrn = caller_hrn
+ else:
+ user = sfa_users[0]
+ # XXX Always empty ??? no slice_record in the Allocate call
+ #slice_record = sfa_users[0].get('slice_record', [])
+ user_xrn = Xrn(sfa_users[0]['urn'])
+ user_hrn = user_xrn.get_hrn()
+
+ slice_record = user.get('slice_record', {})
+ slice_record['user'] = {'keys': user['keys'],
+ 'email': user['email'],
+ 'hrn': user_hrn}
+ slice_record['authority'] = xrn.get_authority_hrn()
+
+ logger.debug("IOTLABDRIVER.PY \t urn %s allocate options %s "
+ % (urn, options))
# parse rspec
rspec = RSpec(rspec_string)
# requested_attributes = rspec.version.get_slice_attributes()
# ensure site record exists
- # site = slices.verify_site(xrn.hrn, slice_record, peer, sfa_peer, options=options)
- # ensure slice record exists
- current_slice = slices.verify_slice(xrn.hrn, slice_record, sfa_peer)
- logger.debug("IOTLABDRIVER.PY \t ===============allocate \t\
- \r\n \r\n current_slice %s" % (current_slice))
# ensure person records exists
+ for user in users:
+ # XXX LOIC using hrn is a workaround because the function
+ # Xrn.get_urn returns 'urn:publicid:IDN+onelab:upmc+timur_friedman'
+ # Instead of this 'urn:publicid:IDN+onelab:upmc+user+timur_friedman'
+ user['hrn'] = urn_to_hrn(user['urn'])[0]
+ # XXX LOIC adding the users of the slice to reg-researchers
+ # reg-researchers is used in iotlabslices.py verify_slice in order to add the slice
+ if 'reg-researchers' not in slice_record:
+ slice_record['reg-researchers'] = list()
+ slice_record['reg-researchers'].append(user['hrn'])
+ if caller_hrn == user['hrn']:
+ #hierarchical_user = user['hrn'].split(".")
+ #user['login'] = hierarchical_user[-1]
+ #slice_record['login'] = user['login']
+ slice_record['user']=user
# oui c'est degueulasse, le slice_record se retrouve modifie
# dans la methode avec les infos du user, els infos sont propagees
# dans verify_slice_leases
+ logger.debug("IOTLABDRIVER.PY BEFORE slices.verify_persons")
+ logger.debug("LOIC - slice_record[user] = %s" % slice_record['user'])
persons = slices.verify_persons(xrn.hrn, slice_record, users,
options=options)
+ logger.debug("IOTLABDRIVER.PY AFTER slices.verify_persons")
+ logger.debug("LOIC - slice_record[user] = %s" % slice_record['user'])
+
+ # ensure slice record exists
+ current_slice = slices.verify_slice(xrn.hrn, slice_record, sfa_peer)
+ logger.debug("LOIC - AFTER verify_slice - slice_record[user] = %s" % slice_record['user'])
+ logger.debug("IOTLABDRIVER.PY \t ===============allocate \t\
+ \r\n \r\n current_slice %s" % (current_slice))
+
# ensure slice attributes exists
# slices.verify_slice_attributes(slice, requested_attributes,
# options=options)
logger.debug("IOTLABDRIVER.PY \tallocate requested_xp_dict %s "
% (requested_xp_dict))
request_nodes = rspec.version.get_nodes_with_slivers()
+
+
+ # JORDAN: nodes_list will contain a list of newly allocated nodes
nodes_list = []
for start_time in requested_xp_dict:
lease = requested_xp_dict[start_time]
# add/remove leases
rspec_requested_leases = rspec.version.get_leases()
- leases = slices.verify_slice_leases(slice_record, requested_xp_dict, peer)
+ leases = slices.verify_slice_leases(slice_record,
+ requested_xp_dict, peer)
logger.debug("IOTLABDRIVER.PY \tallocate leases %s \
rspec_requested_leases %s" % (leases,
rspec_requested_leases))
client_id = hostname
node_urn = xrn_object(self.testbed_shell.root_auth, hostname).urn
component_id = node_urn
- slice_urn = current_slice['reg-urn']
+ if 'reg-urn' in current_slice:
+ slice_urn = current_slice['reg-urn']
+ else:
+ slice_urn = current_slice['urn']
+
+ # JORDAN: We loop over leases previously in the slice
for lease in leases:
if hostname in lease['reserved_nodes']:
index = lease['reserved_nodes'].index(hostname)
sliver_hrn = '%s.%s-%s' % (self.hrn, lease['lease_id'],
lease['resource_ids'][index] )
- sliver_id = Xrn(sliver_hrn, type='sliver').urn
- record = SliverAllocation(sliver_id=sliver_id, client_id=client_id,
+ sliver_id = Xrn(sliver_hrn, type='sliver').urn
+ record = SliverAllocation(sliver_id=sliver_id, client_id=client_id,
component_id=component_id,
slice_urn = slice_urn,
allocation_state='geni_allocated')
- record.sync(self.api.dbsession())
+ record.sync(self.api.dbsession())
+ # JORDAN : added describe_options which was not specified at all
+ describe_options = {
+ 'geni_slice_urn': urn,
+ 'list_leases': 'all',
+ }
return aggregate.describe([xrn.get_urn()], version=rspec.version)
- def provision(self, urns, options={}):
+ def provision(self, urns, options=None):
+ if options is None: options={}
# update users
slices = IotlabSlices(self)
aggregate = IotlabAggregate(self)
version_manager = VersionManager()
rspec_version = version_manager.get_version(options[
'geni_rspec_version'])
+ # JORDAN : added describe_options instead of options
+ # urns at the begining ???
+ describe_options = {
+ 'geni_slice_urn': current_slice['urn'],
+ 'list_leases': 'all',
+ }
return self.describe(urns, rspec_version, options=options)