2 Implements what a driver should provide for SFA to work.
4 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
5 from sfa.util.sfalogging import logger
6 from sfa.storage.alchemy import dbsession
7 from sfa.storage.model import RegRecord
9 from sfa.managers.driver import Driver
10 from sfa.rspecs.version_manager import VersionManager
11 from sfa.rspecs.rspec import RSpec
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.cortexlab.cortexlabaggregate import CortexlabAggregate, \
16 cortexlab_xrn_to_hostname
18 from sfa.cortexlab.cortexlabslices import CortexlabSlices
20 from sfa.cortexlab.cortexlabshell import CortexlabShell
24 class CortexlabDriver(Driver):
25 """ Cortexlab Driver class inherited from Driver generic class.
27 Contains methods compliant with the SFA standard and the testbed
28 infrastructure (calls to LDAP and scheduler to book the nodes).
30 .. seealso::: Driver class
33 def __init__(self, config):
36 Sets the iotlab SFA config parameters,
37 instanciates the testbed api and the iotlab database.
39 :param config: iotlab SFA configuration object
40 :type config: Config object
43 Driver.__init__(self, config)
45 self.testbed_shell = CortexlabShell(config)
48 def augment_records_with_testbed_info(self, record_list):
51 Adds specific testbed info to the records.
53 :param record_list: list of sfa dictionaries records
54 :type record_list: list
55 :returns: list of records with extended information in each record
59 return self.fill_record_info(record_list)
61 def fill_record_info(self, record_list):
64 For each SFA record, fill in the iotlab specific and SFA specific
67 :param record_list: list of sfa dictionaries records
68 :type record_list: list
69 :returns: list of records with extended information in each record
72 .. warning:: Should not be modifying record_list directly because modi
73 fication are kept outside the method's scope. Howerver, there is no
74 other way to do it given the way it's called in registry manager.
78 logger.debug("CORTEXLABDRIVER \tfill_record_info records %s "
80 if not isinstance(record_list, list):
81 record_list = [record_list]
84 for record in record_list:
86 if str(record['type']) == 'node':
87 # look for node info using GetNodes
88 # the record is about one node only
89 filter_dict = {'hrn': [record['hrn']]}
90 node_info = self.testbed_shell.GetNodes(filter_dict)
91 # the node_info is about one node only, but it is formatted
93 record.update(node_info[0])
94 logger.debug("CORTEXLABDRIVER.PY \t \
95 fill_record_info NODE" % (record))
97 #If the record is a SFA slice record, then add information
98 #about the user of this slice. This kind of
99 #information is in the Iotlab's DB.
100 if str(record['type']) == 'slice':
101 if 'reg_researchers' in record and isinstance(record
104 record['reg_researchers'] = \
105 record['reg_researchers'][0].__dict__
107 {'PI': [record['reg_researchers']['hrn']],
108 'researcher': [record['reg_researchers']['hrn']],
109 'name': record['hrn'],
112 'person_ids': [record['reg_researchers']
114 # For client_helper.py compatibility
116 # For client_helper.py compatibility
118 # For client_helper.py compatibility
121 #Get iotlab slice record and oar job id if any.
122 recslice_list = self.testbed_shell.GetSlices(
123 slice_filter=str(record['hrn']),
124 slice_filter_type='slice_hrn')
126 logger.debug("CORTEXLABDRIVER \tfill_record_info \
127 TYPE SLICE RECUSER record['hrn'] %s record['experiment_id']\
128 %s " % (record['hrn'], record['experiment_id']))
129 del record['reg_researchers']
131 for rec in recslice_list:
132 logger.debug("CORTEXLABDRIVER\r\n \t \
133 fill_record_info experiment_id %s "
134 % (rec['experiment_id']))
136 record['node_ids'] = [self.testbed_shell.root_auth +
137 '.' + hostname for hostname
142 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info SLICE \
143 recslice_list %s \r\n \t RECORD %s \r\n \
144 \r\n" % (recslice_list, record))
146 if str(record['type']) == 'user':
147 #The record is a SFA user record.
148 #Get the information about his slice from Iotlab's DB
149 #and add it to the user record.
150 recslice_list = self.testbed_shell.GetSlices(
151 slice_filter=record['record_id'],
152 slice_filter_type='record_id_user')
154 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info \
155 TYPE USER recslice_list %s \r\n \t RECORD %s \r\n"
156 % (recslice_list, record))
157 #Append slice record in records list,
158 #therefore fetches user and slice info again(one more loop)
159 #Will update PIs and researcher for the slice
161 recuser = recslice_list[0]['reg_researchers']
162 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info USER \
163 recuser %s \r\n \r\n" % (recuser))
165 recslice = recslice_list[0]
167 {'PI': [recuser['hrn']],
168 'researcher': [recuser['hrn']],
169 'name': record['hrn'],
172 'person_ids': [recuser['record_id']]})
174 for rec in recslice_list:
175 recslice['experiment_id'].append(rec['experiment_id'])
179 recslice.update({'type': 'slice',
180 'hrn': recslice_list[0]['hrn']})
182 #GetPersons takes [] as filters
183 user_cortexlab = self.testbed_shell.GetPersons([record])
185 record.update(user_cortexlab[0])
186 #For client_helper.py compatibility
191 record_list.append(recslice)
193 logger.debug("CORTEXLABDRIVER.PY \t \
194 fill_record_info ADDING SLICE\
195 INFO TO USER records %s" % (record_list))
197 except TypeError, error:
198 logger.log_exc("CORTEXLABDRIVER \t fill_record_info EXCEPTION %s"
203 def sliver_status(self, slice_urn, slice_hrn):
205 Receive a status request for slice named urn/hrn
206 urn:publicid:IDN+iotlab+nturro_slice hrn iotlab.nturro_slice
207 shall return a structure as described in
208 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
209 NT : not sure if we should implement this or not, but used by sface.
211 :param slice_urn: slice urn
212 :type slice_urn: string
213 :param slice_hrn: slice hrn
214 :type slice_hrn: string
218 #First get the slice with the slice hrn
219 slice_list = self.testbed_shell.GetSlices(slice_filter=slice_hrn,
220 slice_filter_type='slice_hrn')
222 if len(slice_list) == 0:
223 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
225 #Used for fetching the user info witch comes along the slice info
226 one_slice = slice_list[0]
228 #Make a list of all the nodes hostnames in use for this slice
229 slice_nodes_list = []
230 slice_nodes_list = one_slice['node_ids']
231 #Get all the corresponding nodes details
232 nodes_all = self.testbed_shell.GetNodes(
233 {'hostname': slice_nodes_list},
234 ['node_id', 'hostname', 'site', 'boot_state'])
235 nodeall_byhostname = dict([(one_node['hostname'], one_node)
236 for one_node in nodes_all])
238 for single_slice in slice_list:
240 top_level_status = 'empty'
243 ['geni_urn', 'geni_error', 'cortexlab_login', 'geni_status',
244 'geni_resources'], None)
246 # ['geni_urn','geni_error', 'pl_login','geni_status',
247 # 'geni_resources'], None)
248 # result['pl_login'] = one_slice['reg_researchers'][0].hrn
249 result['cortexlab_login'] = one_slice['user']
250 logger.debug("Slabdriver - sliver_status Sliver status \
251 urn %s hrn %s single_slice %s \r\n "
252 % (slice_urn, slice_hrn, single_slice))
254 if 'node_ids' not in single_slice:
256 result['geni_status'] = top_level_status
257 result['geni_resources'] = []
260 top_level_status = 'ready'
262 #A job is running on Iotlab for this slice
263 # report about the local nodes that are in the slice only
265 result['geni_urn'] = slice_urn
268 for node_hostname in single_slice['node_ids']:
270 res['cortexlab_hostname'] = node_hostname
271 res['cortexlab_boot_state'] = \
272 nodeall_byhostname[node_hostname]['boot_state']
275 slice_urn, type='slice',
276 id=nodeall_byhostname[node_hostname]['node_id']).urn
278 res['geni_urn'] = sliver_id
279 #node_name = node['hostname']
280 if nodeall_byhostname[node_hostname]['boot_state'] == 'Alive':
282 res['geni_status'] = 'ready'
284 res['geni_status'] = 'failed'
285 top_level_status = 'failed'
287 res['geni_error'] = ''
289 resources.append(res)
291 result['geni_status'] = top_level_status
292 result['geni_resources'] = resources
293 logger.debug("CORTEXLABDRIVER \tsliver_statusresources %s res %s "
298 def get_user_record(self, hrn):
301 Returns the user record based on the hrn from the SFA DB .
303 :param hrn: user's hrn
305 :returns: user record from SFA database
309 return dbsession.query(RegRecord).filter_by(hrn=hrn).first()
311 def testbed_name(self):
314 Returns testbed's name.
315 :returns: testbed authority name.
321 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
322 def aggregate_version(self):
325 Returns the testbed's supported rspec advertisement and request
327 :returns: rspec versions supported ad a dictionary.
331 version_manager = VersionManager()
332 ad_rspec_versions = []
333 request_rspec_versions = []
334 for rspec_version in version_manager.versions:
335 if rspec_version.content_type in ['*', 'ad']:
336 ad_rspec_versions.append(rspec_version.to_dict())
337 if rspec_version.content_type in ['*', 'request']:
338 request_rspec_versions.append(rspec_version.to_dict())
340 'testbed': self.testbed_name(),
341 'geni_request_rspec_versions': request_rspec_versions,
342 'geni_ad_rspec_versions': ad_rspec_versions}
344 def _get_requested_leases_list(self, rspec):
346 Process leases in rspec depending on the rspec version (format)
347 type. Find the lease requests in the rspec and creates
348 a lease request list with the mandatory information ( nodes,
349 start time and duration) of the valid leases (duration above or
350 equal to the iotlab experiment minimum duration).
352 :param rspec: rspec request received.
354 :returns: list of lease requests found in the rspec
357 requested_lease_list = []
358 for lease in rspec.version.get_leases():
359 single_requested_lease = {}
360 logger.debug("CORTEXLABDRIVER.PY \t \
361 _get_requested_leases_list lease %s " % (lease))
363 if not lease.get('lease_id'):
364 if get_authority(lease['component_id']) == \
365 self.testbed_shell.root_auth:
366 single_requested_lease['hostname'] = \
367 cortexlab_xrn_to_hostname(\
368 lease.get('component_id').strip())
369 single_requested_lease['start_time'] = \
370 lease.get('start_time')
371 single_requested_lease['duration'] = lease.get('duration')
372 #Check the experiment's duration is valid before adding
373 #the lease to the requested leases list
374 duration_in_seconds = \
375 int(single_requested_lease['duration'])
376 if duration_in_seconds >= self.testbed_shell.GetMinExperimentDurationInGranularity():
377 requested_lease_list.append(single_requested_lease)
379 return requested_lease_list
382 def _group_leases_by_start_time(requested_lease_list):
384 Create dict of leases by start_time, regrouping nodes reserved
385 at the same time, for the same amount of time so as to
386 define one job on OAR.
388 :param requested_lease_list: list of leases
389 :type requested_lease_list: list
390 :returns: Dictionary with key = start time, value = list of leases
391 with the same start time.
396 requested_xp_dict = {}
397 for lease in requested_lease_list:
399 #In case it is an asap experiment start_time is empty
400 if lease['start_time'] == '':
401 lease['start_time'] = '0'
403 if lease['start_time'] not in requested_xp_dict:
404 if isinstance(lease['hostname'], str):
405 lease['hostname'] = [lease['hostname']]
407 requested_xp_dict[lease['start_time']] = lease
410 job_lease = requested_xp_dict[lease['start_time']]
411 if lease['duration'] == job_lease['duration']:
412 job_lease['hostname'].append(lease['hostname'])
414 return requested_xp_dict
416 def _process_requested_xp_dict(self, rspec):
418 Turns the requested leases and information into a dictionary
419 of requested jobs, grouped by starting time.
421 :param rspec: RSpec received
426 requested_lease_list = self._get_requested_leases_list(rspec)
427 logger.debug("CORTEXLABDRIVER _process_requested_xp_dict \
428 requested_lease_list %s" % (requested_lease_list))
429 xp_dict = self._group_leases_by_start_time(requested_lease_list)
430 logger.debug("CORTEXLABDRIVER _process_requested_xp_dict xp_dict\
435 def create_sliver(self, slice_urn, slice_hrn, creds, rspec_string,
437 """Answer to CreateSliver.
439 Creates the leases and slivers for the users from the information
440 found in the rspec string.
441 Launch experiment on OAR if the requested leases is valid. Delete
442 no longer requested leases.
445 :param creds: user's credentials
447 :param users: user record list
452 :returns: a valid Rspec for the slice which has just been
458 aggregate = CortexlabAggregate(self)
460 slices = CortexlabSlices(self)
461 peer = slices.get_peer(slice_hrn)
462 sfa_peer = slices.get_sfa_peer(slice_hrn)
465 if not isinstance(creds, list):
469 slice_record = users[0].get('slice_record', {})
470 logger.debug("CORTEXLABDRIVER.PY \t ===============create_sliver \t\
471 creds %s \r\n \r\n users %s"
473 slice_record['user'] = {'keys': users[0]['keys'],
474 'email': users[0]['email'],
475 'hrn': slice_record['reg-researchers'][0]}
477 rspec = RSpec(rspec_string)
478 logger.debug("CORTEXLABDRIVER.PY \t create_sliver \trspec.version \
479 %s slice_record %s users %s"
480 % (rspec.version, slice_record, users))
482 # ensure site record exists?
483 # ensure slice record exists
484 #Removed options in verify_slice SA 14/08/12
485 #Removed peer record in verify_slice SA 18/07/13
486 sfa_slice = slices.verify_slice(slice_hrn, slice_record, sfa_peer)
488 # ensure person records exists
489 #verify_persons returns added persons but the return value
491 #Removed peer record and sfa_peer in verify_persons SA 18/07/13
492 slices.verify_persons(slice_hrn, sfa_slice, users, options=options)
493 #requested_attributes returned by rspec.version.get_slice_attributes()
494 #unused, removed SA 13/08/12
495 #rspec.version.get_slice_attributes()
497 logger.debug("CORTEXLABDRIVER.PY create_sliver slice %s " % (sfa_slice))
499 # add/remove slice from nodes
501 #requested_slivers = [node.get('component_id') \
502 #for node in rspec.version.get_nodes_with_slivers()\
503 #if node.get('authority_id') is self.testbed_shell.root_auth]
504 #l = [ node for node in rspec.version.get_nodes_with_slivers() ]
505 #logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
506 #requested_slivers %s listnodes %s" \
507 #%(requested_slivers,l))
508 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
509 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
511 requested_xp_dict = self._process_requested_xp_dict(rspec)
513 logger.debug("CORTEXLABDRIVER.PY \tcreate_sliver requested_xp_dict %s "
514 % (requested_xp_dict))
515 #verify_slice_leases returns the leases , but the return value is unused
516 #here. Removed SA 13/08/12
517 slices.verify_slice_leases(sfa_slice,
518 requested_xp_dict, peer)
520 return aggregate.get_rspec(slice_xrn=slice_urn,
521 login=sfa_slice['login'],
522 version=rspec.version)
524 def delete_sliver(self, slice_urn, slice_hrn, creds, options):
526 Deletes the lease associated with the slice hrn and the credentials
527 if the slice belongs to iotlab. Answer to DeleteSliver.
529 :param slice_urn: urn of the slice
530 :param slice_hrn: name of the slice
531 :param creds: slice credenials
532 :type slice_urn: string
533 :type slice_hrn: string
534 :type creds: ? unused
536 :returns: 1 if the slice to delete was not found on iotlab,
537 True if the deletion was successful, False otherwise otherwise.
539 .. note:: Should really be named delete_leases because iotlab does
540 not have any slivers, but only deals with leases. However,
541 SFA api only have delete_sliver define so far. SA 13/05/2013
542 .. note:: creds are unused, and are not used either in the dummy driver
546 sfa_slice_list = self.testbed_shell.GetSlices(
547 slice_filter=slice_hrn,
548 slice_filter_type='slice_hrn')
550 if not sfa_slice_list:
553 #Delete all leases in the slice
554 for sfa_slice in sfa_slice_list:
555 logger.debug("CORTEXLABDRIVER.PY delete_sliver slice %s" % (sfa_slice))
556 slices = CortexlabSlices(self)
557 # determine if this is a peer slice
559 peer = slices.get_peer(slice_hrn)
561 logger.debug("CORTEXLABDRIVER.PY delete_sliver peer %s \
562 \r\n \t sfa_slice %s " % (peer, sfa_slice))
564 self.testbed_shell.DeleteSliceFromNodes(sfa_slice)
569 def list_resources (self, slice_urn, slice_hrn, creds, options):
572 List resources from the iotlab aggregate and returns a Rspec
573 advertisement with resources found when slice_urn and slice_hrn are
574 None (in case of resource discovery).
575 If a slice hrn and urn are provided, list experiment's slice
576 nodes in a rspec format. Answer to ListResources.
579 :param slice_urn: urn of the slice
580 :param slice_hrn: name of the slice
581 :param creds: slice credenials
582 :type slice_urn: string
583 :type slice_hrn: string
584 :type creds: ? unused
585 :param options: options used when listing resources (list_leases, info,
587 :returns: rspec string in xml
590 .. note:: creds are unused
593 #cached_requested = options.get('cached', True)
595 version_manager = VersionManager()
596 # get the rspec's return format from options
598 version_manager.get_version(options.get('geni_rspec_version'))
599 version_string = "rspec_%s" % (rspec_version)
601 #panos adding the info option to the caching key (can be improved)
602 if options.get('info'):
603 version_string = version_string + "_" + \
604 options.get('info', 'default')
606 # Adding the list_leases option to the caching key
607 if options.get('list_leases'):
608 version_string = version_string + "_" + \
609 options.get('list_leases', 'default')
611 # Adding geni_available to caching key
612 if options.get('geni_available'):
613 version_string = version_string + "_" + \
614 str(options.get('geni_available'))
616 # look in cache first
617 #if cached_requested and self.cache and not slice_hrn:
618 #rspec = self.cache.get(version_string)
620 #logger.debug("IotlabDriver.ListResources: \
621 #returning cached advertisement")
624 #panos: passing user-defined options
625 aggregate = CortexlabAggregate(self)
627 rspec = aggregate.get_rspec(slice_xrn=slice_urn,
628 version=rspec_version, options=options)
631 #if self.cache and not slice_hrn:
632 #logger.debug("Iotlab.ListResources: stores advertisement in cache")
633 #self.cache.add(version_string, rspec)
638 def list_slices(self, creds, options):
639 """Answer to ListSlices.
641 List slices belonging to iotlab, returns slice urns list.
642 No caching used. Options unused but are defined in the SFA method
645 :returns: slice urns list
648 .. note:: creds are unused
650 # look in cache first
652 #slices = self.cache.get('slices')
654 #logger.debug("PlDriver.list_slices returns from cache")
659 slices = self.testbed_shell.GetSlices()
660 logger.debug("CORTEXLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n"
662 slice_hrns = [iotlab_slice['hrn'] for iotlab_slice in slices]
664 slice_urns = [hrn_to_urn(slice_hrn, 'slice')
665 for slice_hrn in slice_hrns]
669 #logger.debug ("IotlabDriver.list_slices stores value in cache")
670 #self.cache.add('slices', slice_urns)
675 def register(self, sfa_record, hrn, pub_key):
677 Adding new user, slice, node or site should not be handled
680 ..warnings:: should not be used. Different components are in charge of
681 doing this task. Adding nodes = OAR
682 Adding users = LDAP Iotlab
683 Adding slice = Import from LDAP users
686 :param sfa_record: record provided by the client of the
688 :type sfa_record: dict
689 :param pub_key: public key of the user
690 :type pub_key: string
692 .. note:: DOES NOTHING. Returns -1.
698 def update(self, old_sfa_record, new_sfa_record, hrn, new_key):
700 No site or node record update allowed in Cortexlab. The only modifications
701 authorized here are key deletion/addition on an existing user and
702 password change. On an existing user, CAN NOT BE MODIFIED: 'first_name',
703 'last_name', 'email'. DOES NOT EXIST IN LDAP: 'phone', 'url', 'bio',
704 'title', 'accepted_aup'. A slice is bound to its user, so modifying the
705 user's ssh key should nmodify the slice's GID after an import procedure.
707 :param old_sfa_record: what is in the db for this hrn
708 :param new_sfa_record: what was passed to the update call
709 :param new_key: the new user's public key
710 :param hrn: the user's sfa hrn
711 :type old_sfa_record: dict
712 :type new_sfa_record: dict
713 :type new_key: string
717 .. seealso:: update in driver.py.
720 pointer = old_sfa_record['pointer']
721 old_sfa_record_type = old_sfa_record['type']
723 # new_key implemented for users only
724 if new_key and old_sfa_record_type not in ['user']:
725 raise UnknownSfaType(old_sfa_record_type)
727 if old_sfa_record_type == "user":
729 all_fields = new_sfa_record
730 for key in all_fields.keys():
731 if key in ['key', 'password']:
732 update_fields[key] = all_fields[key]
735 # must check this key against the previous one if it exists
736 persons = self.testbed_shell.GetPersons([old_sfa_record])
738 keys = [person['pkey']]
739 #Get all the person's keys
740 keys_dict = self.testbed_shell.GetKeys(keys)
742 # Delete all stale keys, meaning the user has only one key
744 #TODO: do we really want to delete all the other keys?
745 #Is this a problem with the GID generation to have multiple
751 #remove all the other keys
752 for key in keys_dict:
753 self.testbed_shell.DeleteKey(person, key)
754 self.testbed_shell.AddPersonKey(
755 person, {'sshPublicKey': person['pkey']},
756 {'sshPublicKey': new_key})
759 def remove(self, sfa_record):
762 Removes users only. Mark the user as disabled in LDAP. The user and his
763 slice are then deleted from the db by running an import on the registry.
765 :param sfa_record: record is the existing sfa record in the db
766 :type sfa_record: dict
768 ..warning::As fas as the slice is concerned, here only the leases are
769 removed from the slice. The slice is record itself is not removed
774 TODO : REMOVE SLICE FROM THE DB AS WELL? SA 14/05/2013,
776 TODO: return boolean for the slice part
778 sfa_record_type = sfa_record['type']
779 hrn = sfa_record['hrn']
780 if sfa_record_type == 'user':
782 #get user from iotlab ldap
783 person = self.testbed_shell.GetPersons(sfa_record)
784 #No registering at a given site in Iotlab.
785 #Once registered to the LDAP, all iotlab sites are
788 #Mark account as disabled in ldap
789 return self.testbed_shell.DeletePerson(sfa_record)
791 elif sfa_record_type == 'slice':
792 if self.testbed_shell.GetSlices(slice_filter=hrn,
793 slice_filter_type='slice_hrn'):
794 ret = self.testbed_shell.DeleteSlice(sfa_record)