2 Implements what a driver should provide for SFA to work.
4 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
5 from sfa.util.sfalogging import logger
6 from sfa.storage.alchemy import dbsession
7 from sfa.storage.model import RegRecord
9 from sfa.managers.driver import Driver
10 from sfa.rspecs.version_manager import VersionManager
11 from sfa.rspecs.rspec import RSpec
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.cortexlab.cortexlabaggregate import CortexlabAggregate, \
16 cortexlab_xrn_to_hostname
18 from sfa.iotlab.iotlabslices import CortexlabSlices
20 from sfa.cortexlab.cortexlabshell import CortexlabShell
23 class CortexlabDriver(Driver):
24 """ Cortexlab Driver class inherited from Driver generic class.
26 Contains methods compliant with the SFA standard and the testbed
27 infrastructure (calls to LDAP and scheduler to book the nodes).
29 .. seealso::: Driver class
32 def __init__(self, config):
35 Sets the iotlab SFA config parameters,
36 instanciates the testbed api and the iotlab database.
38 :param config: iotlab SFA configuration object
39 :type config: Config object
42 Driver.__init__(self, config)
44 self.testbed_shell = CortexlabShell(config)
47 def augment_records_with_testbed_info(self, record_list):
50 Adds specific testbed info to the records.
52 :param record_list: list of sfa dictionaries records
53 :type record_list: list
54 :returns: list of records with extended information in each record
58 return self.fill_record_info(record_list)
60 def fill_record_info(self, record_list):
63 For each SFA record, fill in the iotlab specific and SFA specific
66 :param record_list: list of sfa dictionaries records
67 :type record_list: list
68 :returns: list of records with extended information in each record
71 .. warning:: Should not be modifying record_list directly because modi
72 fication are kept outside the method's scope. Howerver, there is no
73 other way to do it given the way it's called in registry manager.
77 logger.debug("CORTEXLABDRIVER \tfill_record_info records %s "
79 if not isinstance(record_list, list):
80 record_list = [record_list]
83 for record in record_list:
85 if str(record['type']) == 'node':
86 # look for node info using GetNodes
87 # the record is about one node only
88 filter_dict = {'hrn': [record['hrn']]}
89 node_info = self.testbed_shell.GetNodes(filter_dict)
90 # the node_info is about one node only, but it is formatted
92 record.update(node_info[0])
93 logger.debug("CORTEXLABDRIVER.PY \t \
94 fill_record_info NODE" % (record))
96 #If the record is a SFA slice record, then add information
97 #about the user of this slice. This kind of
98 #information is in the Iotlab's DB.
99 if str(record['type']) == 'slice':
100 if 'reg_researchers' in record and isinstance(record
103 record['reg_researchers'] = \
104 record['reg_researchers'][0].__dict__
106 {'PI': [record['reg_researchers']['hrn']],
107 'researcher': [record['reg_researchers']['hrn']],
108 'name': record['hrn'],
111 'person_ids': [record['reg_researchers']
113 # For client_helper.py compatibility
115 # For client_helper.py compatibility
117 # For client_helper.py compatibility
120 #Get iotlab slice record and oar job id if any.
121 recslice_list = self.testbed_shell.GetSlices(
122 slice_filter=str(record['hrn']),
123 slice_filter_type='slice_hrn')
125 logger.debug("CORTEXLABDRIVER \tfill_record_info \
126 TYPE SLICE RECUSER record['hrn'] %s record['experiment_id']\
127 %s " % (record['hrn'], record['experiment_id']))
128 del record['reg_researchers']
130 for rec in recslice_list:
131 logger.debug("CORTEXLABDRIVER\r\n \t \
132 fill_record_info experiment_id %s "
133 % (rec['experiment_id']))
135 record['node_ids'] = [self.testbed_shell.root_auth +
136 '.' + hostname for hostname
141 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info SLICE \
142 recslice_list %s \r\n \t RECORD %s \r\n \
143 \r\n" % (recslice_list, record))
145 if str(record['type']) == 'user':
146 #The record is a SFA user record.
147 #Get the information about his slice from Iotlab's DB
148 #and add it to the user record.
149 recslice_list = self.testbed_shell.GetSlices(
150 slice_filter=record['record_id'],
151 slice_filter_type='record_id_user')
153 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info \
154 TYPE USER recslice_list %s \r\n \t RECORD %s \r\n"
155 % (recslice_list, record))
156 #Append slice record in records list,
157 #therefore fetches user and slice info again(one more loop)
158 #Will update PIs and researcher for the slice
160 recuser = recslice_list[0]['reg_researchers']
161 logger.debug("CORTEXLABDRIVER.PY \t fill_record_info USER \
162 recuser %s \r\n \r\n" % (recuser))
164 recslice = recslice_list[0]
166 {'PI': [recuser['hrn']],
167 'researcher': [recuser['hrn']],
168 'name': record['hrn'],
171 'person_ids': [recuser['record_id']]})
173 for rec in recslice_list:
174 recslice['experiment_id'].append(rec['experiment_id'])
178 recslice.update({'type': 'slice',
179 'hrn': recslice_list[0]['hrn']})
181 #GetPersons takes [] as filters
182 user_cortexlab = self.testbed_shell.GetPersons([record])
184 record.update(user_cortexlab[0])
185 #For client_helper.py compatibility
190 record_list.append(recslice)
192 logger.debug("CORTEXLABDRIVER.PY \t \
193 fill_record_info ADDING SLICE\
194 INFO TO USER records %s" % (record_list))
196 except TypeError, error:
197 logger.log_exc("CORTEXLABDRIVER \t fill_record_info EXCEPTION %s"
202 def sliver_status(self, slice_urn, slice_hrn):
204 Receive a status request for slice named urn/hrn
205 urn:publicid:IDN+iotlab+nturro_slice hrn iotlab.nturro_slice
206 shall return a structure as described in
207 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
208 NT : not sure if we should implement this or not, but used by sface.
210 :param slice_urn: slice urn
211 :type slice_urn: string
212 :param slice_hrn: slice hrn
213 :type slice_hrn: string
217 #First get the slice with the slice hrn
218 slice_list = self.testbed_shell.GetSlices(slice_filter=slice_hrn,
219 slice_filter_type='slice_hrn')
221 if len(slice_list) == 0:
222 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
224 #Used for fetching the user info witch comes along the slice info
225 one_slice = slice_list[0]
227 #Make a list of all the nodes hostnames in use for this slice
228 slice_nodes_list = []
229 slice_nodes_list = one_slice['node_ids']
230 #Get all the corresponding nodes details
231 nodes_all = self.testbed_shell.GetNodes(
232 {'hostname': slice_nodes_list},
233 ['node_id', 'hostname', 'site', 'boot_state'])
234 nodeall_byhostname = dict([(one_node['hostname'], one_node)
235 for one_node in nodes_all])
237 for single_slice in slice_list:
239 top_level_status = 'empty'
242 ['geni_urn', 'geni_error', 'cortexlab_login', 'geni_status',
243 'geni_resources'], None)
245 # ['geni_urn','geni_error', 'pl_login','geni_status',
246 # 'geni_resources'], None)
247 # result['pl_login'] = one_slice['reg_researchers'][0].hrn
248 result['cortexlab_login'] = one_slice['user']
249 logger.debug("Slabdriver - sliver_status Sliver status \
250 urn %s hrn %s single_slice %s \r\n "
251 % (slice_urn, slice_hrn, single_slice))
253 if 'node_ids' not in single_slice:
255 result['geni_status'] = top_level_status
256 result['geni_resources'] = []
259 top_level_status = 'ready'
261 #A job is running on Iotlab for this slice
262 # report about the local nodes that are in the slice only
264 result['geni_urn'] = slice_urn
267 for node_hostname in single_slice['node_ids']:
269 res['cortexlab_hostname'] = node_hostname
270 res['cortexlab_boot_state'] = \
271 nodeall_byhostname[node_hostname]['boot_state']
274 slice_urn, type='slice',
275 id=nodeall_byhostname[node_hostname]['node_id']).urn
277 res['geni_urn'] = sliver_id
278 #node_name = node['hostname']
279 if nodeall_byhostname[node_hostname]['boot_state'] == 'Alive':
281 res['geni_status'] = 'ready'
283 res['geni_status'] = 'failed'
284 top_level_status = 'failed'
286 res['geni_error'] = ''
288 resources.append(res)
290 result['geni_status'] = top_level_status
291 result['geni_resources'] = resources
292 logger.debug("CORTEXLABDRIVER \tsliver_statusresources %s res %s "
297 def get_user_record(hrn):
300 Returns the user record based on the hrn from the SFA DB .
302 :param hrn: user's hrn
304 :returns: user record from SFA database
308 return dbsession.query(RegRecord).filter_by(hrn=hrn).first()
310 def testbed_name(self):
313 Returns testbed's name.
314 :returns: testbed authority name.
320 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
321 def aggregate_version(self):
324 Returns the testbed's supported rspec advertisement and request
326 :returns: rspec versions supported ad a dictionary.
330 version_manager = VersionManager()
331 ad_rspec_versions = []
332 request_rspec_versions = []
333 for rspec_version in version_manager.versions:
334 if rspec_version.content_type in ['*', 'ad']:
335 ad_rspec_versions.append(rspec_version.to_dict())
336 if rspec_version.content_type in ['*', 'request']:
337 request_rspec_versions.append(rspec_version.to_dict())
339 'testbed': self.testbed_name(),
340 'geni_request_rspec_versions': request_rspec_versions,
341 'geni_ad_rspec_versions': ad_rspec_versions}
343 def _get_requested_leases_list(self, rspec):
345 Process leases in rspec depending on the rspec version (format)
346 type. Find the lease requests in the rspec and creates
347 a lease request list with the mandatory information ( nodes,
348 start time and duration) of the valid leases (duration above or
349 equal to the iotlab experiment minimum duration).
351 :param rspec: rspec request received.
353 :returns: list of lease requests found in the rspec
356 requested_lease_list = []
357 for lease in rspec.version.get_leases():
358 single_requested_lease = {}
359 logger.debug("CORTEXLABDRIVER.PY \t \
360 _get_requested_leases_list lease %s " % (lease))
362 if not lease.get('lease_id'):
363 if get_authority(lease['component_id']) == \
364 self.testbed_shell.root_auth:
365 single_requested_lease['hostname'] = \
366 cortexlab_xrn_to_hostname(\
367 lease.get('component_id').strip())
368 single_requested_lease['start_time'] = \
369 lease.get('start_time')
370 single_requested_lease['duration'] = lease.get('duration')
371 #Check the experiment's duration is valid before adding
372 #the lease to the requested leases list
373 duration_in_seconds = \
374 int(single_requested_lease['duration'])
375 if duration_in_seconds >= self.testbed_shell.GetMinExperimentDurationInGranularity():
376 requested_lease_list.append(single_requested_lease)
378 return requested_lease_list
381 def _group_leases_by_start_time(requested_lease_list):
383 Create dict of leases by start_time, regrouping nodes reserved
384 at the same time, for the same amount of time so as to
385 define one job on OAR.
387 :param requested_lease_list: list of leases
388 :type requested_lease_list: list
389 :returns: Dictionary with key = start time, value = list of leases
390 with the same start time.
395 requested_xp_dict = {}
396 for lease in requested_lease_list:
398 #In case it is an asap experiment start_time is empty
399 if lease['start_time'] == '':
400 lease['start_time'] = '0'
402 if lease['start_time'] not in requested_xp_dict:
403 if isinstance(lease['hostname'], str):
404 lease['hostname'] = [lease['hostname']]
406 requested_xp_dict[lease['start_time']] = lease
409 job_lease = requested_xp_dict[lease['start_time']]
410 if lease['duration'] == job_lease['duration']:
411 job_lease['hostname'].append(lease['hostname'])
413 return requested_xp_dict
415 def _process_requested_xp_dict(self, rspec):
417 Turns the requested leases and information into a dictionary
418 of requested jobs, grouped by starting time.
420 :param rspec: RSpec received
425 requested_lease_list = self._get_requested_leases_list(rspec)
426 logger.debug("CORTEXLABDRIVER _process_requested_xp_dict \
427 requested_lease_list %s" % (requested_lease_list))
428 xp_dict = self._group_leases_by_start_time(requested_lease_list)
429 logger.debug("CORTEXLABDRIVER _process_requested_xp_dict xp_dict\
434 def create_sliver(self, slice_urn, slice_hrn, creds, rspec_string,
436 """Answer to CreateSliver.
438 Creates the leases and slivers for the users from the information
439 found in the rspec string.
440 Launch experiment on OAR if the requested leases is valid. Delete
441 no longer requested leases.
444 :param creds: user's credentials
446 :param users: user record list
451 :returns: a valid Rspec for the slice which has just been
457 aggregate = CortexlabAggregate(self)
459 slices = CortexlabSlices(self)
460 peer = slices.get_peer(slice_hrn)
461 sfa_peer = slices.get_sfa_peer(slice_hrn)
464 if not isinstance(creds, list):
468 slice_record = users[0].get('slice_record', {})
469 logger.debug("CORTEXLABDRIVER.PY \t ===============create_sliver \t\
470 creds %s \r\n \r\n users %s"
472 slice_record['user'] = {'keys': users[0]['keys'],
473 'email': users[0]['email'],
474 'hrn': slice_record['reg-researchers'][0]}
476 rspec = RSpec(rspec_string)
477 logger.debug("CORTEXLABDRIVER.PY \t create_sliver \trspec.version \
478 %s slice_record %s users %s"
479 % (rspec.version, slice_record, users))
481 # ensure site record exists?
482 # ensure slice record exists
483 #Removed options in verify_slice SA 14/08/12
484 #Removed peer record in verify_slice SA 18/07/13
485 sfa_slice = slices.verify_slice(slice_hrn, slice_record, sfa_peer)
487 # ensure person records exists
488 #verify_persons returns added persons but the return value
490 #Removed peer record and sfa_peer in verify_persons SA 18/07/13
491 slices.verify_persons(slice_hrn, sfa_slice, users, options=options)
492 #requested_attributes returned by rspec.version.get_slice_attributes()
493 #unused, removed SA 13/08/12
494 #rspec.version.get_slice_attributes()
496 logger.debug("CORTEXLABDRIVER.PY create_sliver slice %s " % (sfa_slice))
498 # add/remove slice from nodes
500 #requested_slivers = [node.get('component_id') \
501 #for node in rspec.version.get_nodes_with_slivers()\
502 #if node.get('authority_id') is self.testbed_shell.root_auth]
503 #l = [ node for node in rspec.version.get_nodes_with_slivers() ]
504 #logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
505 #requested_slivers %s listnodes %s" \
506 #%(requested_slivers,l))
507 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
508 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
510 requested_xp_dict = self._process_requested_xp_dict(rspec)
512 logger.debug("CORTEXLABDRIVER.PY \tcreate_sliver requested_xp_dict %s "
513 % (requested_xp_dict))
514 #verify_slice_leases returns the leases , but the return value is unused
515 #here. Removed SA 13/08/12
516 slices.verify_slice_leases(sfa_slice,
517 requested_xp_dict, peer)
519 return aggregate.get_rspec(slice_xrn=slice_urn,
520 login=sfa_slice['login'],
521 version=rspec.version)
523 def delete_sliver(self, slice_urn, slice_hrn, creds, options):
525 Deletes the lease associated with the slice hrn and the credentials
526 if the slice belongs to iotlab. Answer to DeleteSliver.
528 :param slice_urn: urn of the slice
529 :param slice_hrn: name of the slice
530 :param creds: slice credenials
531 :type slice_urn: string
532 :type slice_hrn: string
533 :type creds: ? unused
535 :returns: 1 if the slice to delete was not found on iotlab,
536 True if the deletion was successful, False otherwise otherwise.
538 .. note:: Should really be named delete_leases because iotlab does
539 not have any slivers, but only deals with leases. However,
540 SFA api only have delete_sliver define so far. SA 13/05/2013
541 .. note:: creds are unused, and are not used either in the dummy driver
545 sfa_slice_list = self.testbed_shell.GetSlices(
546 slice_filter=slice_hrn,
547 slice_filter_type='slice_hrn')
549 if not sfa_slice_list:
552 #Delete all leases in the slice
553 for sfa_slice in sfa_slice_list:
554 logger.debug("CORTEXLABDRIVER.PY delete_sliver slice %s" % (sfa_slice))
555 slices = CortexlabSlices(self)
556 # determine if this is a peer slice
558 peer = slices.get_peer(slice_hrn)
560 logger.debug("CORTEXLABDRIVER.PY delete_sliver peer %s \
561 \r\n \t sfa_slice %s " % (peer, sfa_slice))
563 self.testbed_shell.DeleteSliceFromNodes(sfa_slice)
568 def list_resources (self, slice_urn, slice_hrn, creds, options):
571 List resources from the iotlab aggregate and returns a Rspec
572 advertisement with resources found when slice_urn and slice_hrn are
573 None (in case of resource discovery).
574 If a slice hrn and urn are provided, list experiment's slice
575 nodes in a rspec format. Answer to ListResources.
578 :param slice_urn: urn of the slice
579 :param slice_hrn: name of the slice
580 :param creds: slice credenials
581 :type slice_urn: string
582 :type slice_hrn: string
583 :type creds: ? unused
584 :param options: options used when listing resources (list_leases, info,
586 :returns: rspec string in xml
589 .. note:: creds are unused
592 #cached_requested = options.get('cached', True)
594 version_manager = VersionManager()
595 # get the rspec's return format from options
597 version_manager.get_version(options.get('geni_rspec_version'))
598 version_string = "rspec_%s" % (rspec_version)
600 #panos adding the info option to the caching key (can be improved)
601 if options.get('info'):
602 version_string = version_string + "_" + \
603 options.get('info', 'default')
605 # Adding the list_leases option to the caching key
606 if options.get('list_leases'):
607 version_string = version_string + "_" + \
608 options.get('list_leases', 'default')
610 # Adding geni_available to caching key
611 if options.get('geni_available'):
612 version_string = version_string + "_" + \
613 str(options.get('geni_available'))
615 # look in cache first
616 #if cached_requested and self.cache and not slice_hrn:
617 #rspec = self.cache.get(version_string)
619 #logger.debug("IotlabDriver.ListResources: \
620 #returning cached advertisement")
623 #panos: passing user-defined options
624 aggregate = CortexlabAggregate(self)
626 rspec = aggregate.get_rspec(slice_xrn=slice_urn,
627 version=rspec_version, options=options)
630 #if self.cache and not slice_hrn:
631 #logger.debug("Iotlab.ListResources: stores advertisement in cache")
632 #self.cache.add(version_string, rspec)
637 def list_slices(self, creds, options):
638 """Answer to ListSlices.
640 List slices belonging to iotlab, returns slice urns list.
641 No caching used. Options unused but are defined in the SFA method
644 :returns: slice urns list
647 .. note:: creds are unused
649 # look in cache first
651 #slices = self.cache.get('slices')
653 #logger.debug("PlDriver.list_slices returns from cache")
658 slices = self.testbed_shell.GetSlices()
659 logger.debug("CORTEXLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n"
661 slice_hrns = [iotlab_slice['hrn'] for iotlab_slice in slices]
663 slice_urns = [hrn_to_urn(slice_hrn, 'slice')
664 for slice_hrn in slice_hrns]
668 #logger.debug ("IotlabDriver.list_slices stores value in cache")
669 #self.cache.add('slices', slice_urns)
674 def register(self, sfa_record, hrn, pub_key):
676 Adding new user, slice, node or site should not be handled
679 ..warnings:: should not be used. Different components are in charge of
680 doing this task. Adding nodes = OAR
681 Adding users = LDAP Iotlab
682 Adding slice = Import from LDAP users
685 :param sfa_record: record provided by the client of the
687 :type sfa_record: dict
688 :param pub_key: public key of the user
689 :type pub_key: string
691 .. note:: DOES NOTHING. Returns -1.
697 def update(self, old_sfa_record, new_sfa_record, hrn, new_key):
699 No site or node record update allowed in Cortexlab. The only modifications
700 authorized here are key deletion/addition on an existing user and
701 password change. On an existing user, CAN NOT BE MODIFIED: 'first_name',
702 'last_name', 'email'. DOES NOT EXIST IN LDAP: 'phone', 'url', 'bio',
703 'title', 'accepted_aup'. A slice is bound to its user, so modifying the
704 user's ssh key should nmodify the slice's GID after an import procedure.
706 :param old_sfa_record: what is in the db for this hrn
707 :param new_sfa_record: what was passed to the update call
708 :param new_key: the new user's public key
709 :param hrn: the user's sfa hrn
710 :type old_sfa_record: dict
711 :type new_sfa_record: dict
712 :type new_key: string
716 .. seealso:: update in driver.py.
719 pointer = old_sfa_record['pointer']
720 old_sfa_record_type = old_sfa_record['type']
722 # new_key implemented for users only
723 if new_key and old_sfa_record_type not in ['user']:
724 raise UnknownSfaType(old_sfa_record_type)
726 if old_sfa_record_type == "user":
728 all_fields = new_sfa_record
729 for key in all_fields.keys():
730 if key in ['key', 'password']:
731 update_fields[key] = all_fields[key]
734 # must check this key against the previous one if it exists
735 persons = self.testbed_shell.GetPersons([old_sfa_record])
737 keys = [person['pkey']]
738 #Get all the person's keys
739 keys_dict = self.testbed_shell.GetKeys(keys)
741 # Delete all stale keys, meaning the user has only one key
743 #TODO: do we really want to delete all the other keys?
744 #Is this a problem with the GID generation to have multiple
750 #remove all the other keys
751 for key in keys_dict:
752 self.testbed_shell.DeleteKey(person, key)
753 self.testbed_shell.AddPersonKey(
754 person, {'sshPublicKey': person['pkey']},
755 {'sshPublicKey': new_key})
758 def remove(self, sfa_record):
761 Removes users only. Mark the user as disabled in LDAP. The user and his
762 slice are then deleted from the db by running an import on the registry.
764 :param sfa_record: record is the existing sfa record in the db
765 :type sfa_record: dict
767 ..warning::As fas as the slice is concerned, here only the leases are
768 removed from the slice. The slice is record itself is not removed
773 TODO : REMOVE SLICE FROM THE DB AS WELL? SA 14/05/2013,
775 TODO: return boolean for the slice part
777 sfa_record_type = sfa_record['type']
778 hrn = sfa_record['hrn']
779 if sfa_record_type == 'user':
781 #get user from iotlab ldap
782 person = self.testbed_shell.GetPersons(sfa_record)
783 #No registering at a given site in Iotlab.
784 #Once registered to the LDAP, all iotlab sites are
787 #Mark account as disabled in ldap
788 return self.testbed_shell.DeletePerson(sfa_record)
790 elif sfa_record_type == 'slice':
791 if self.testbed_shell.GetSlices(slice_filter=hrn,
792 slice_filter_type='slice_hrn'):
793 ret = self.testbed_shell.DeleteSlice(sfa_record)