1 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
2 from sfa.util.sfalogging import logger
3 from sfa.storage.alchemy import dbsession
4 from sfa.storage.model import RegRecord
8 from sfa.managers.driver import Driver
9 from sfa.rspecs.version_manager import VersionManager
10 from sfa.rspecs.rspec import RSpec
12 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.senslab.slabpostgres import SlabDB
18 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname
20 from sfa.senslab.slabslices import SlabSlices
23 from sfa.senslab.slabapi import SlabTestbedAPI
27 class SlabDriver(Driver):
28 """ Senslab Driver class inherited from Driver generic class.
30 Contains methods compliant with the SFA standard and the testbed
31 infrastructure (calls to LDAP and OAR).
33 .. seealso:: Driver class
36 def __init__(self, config):
39 Sets the senslab SFA config parameters ,
40 instanciates the testbed api and the senslab database.
42 :param config: senslab SFA configuration object
43 :type config: Config object
45 Driver.__init__ (self, config)
48 self.db = SlabDB(config, debug = False)
49 self.slab_api = SlabTestbedAPI(config)
52 def augment_records_with_testbed_info (self, record_list ):
55 Adds specific testbed info to the records.
57 :param record_list: list of sfa dictionaries records
58 :type record_list: list
59 :return: list of records with extended information in each record
62 return self.fill_record_info (record_list)
64 def fill_record_info(self, record_list):
66 For each SFA record, fill in the senslab specific and SFA specific
69 :param record_list: list of sfa dictionaries records
70 :type record_list: list
71 :return: list of records with extended information in each record
74 .. warnings:: Should not be modifying record_list directly because modi
75 fication are kept outside the method's scope. Howerver, there is no
76 other way to do it given the way it's called in registry manager.
79 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
80 if not isinstance(record_list, list):
81 record_list = [record_list]
85 for record in record_list:
86 #If the record is a SFA slice record, then add information
87 #about the user of this slice. This kind of
88 #information is in the Senslab's DB.
89 if str(record['type']) == 'slice':
90 if 'reg_researchers' in record and \
91 isinstance(record['reg_researchers'], list) :
92 record['reg_researchers'] = \
93 record['reg_researchers'][0].__dict__
94 record.update({'PI':[record['reg_researchers']['hrn']],
95 'researcher': [record['reg_researchers']['hrn']],
99 'person_ids':[record['reg_researchers']['record_id']],
100 'geni_urn':'', #For client_helper.py compatibility
101 'keys':'', #For client_helper.py compatibility
102 'key_ids':''}) #For client_helper.py compatibility
105 #Get slab slice record and oar job id if any.
106 recslice_list = self.slab_api.GetSlices(slice_filter = \
108 slice_filter_type = 'slice_hrn')
111 logger.debug("SLABDRIVER \tfill_record_info \
112 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
113 %s " %(record['hrn'], record['oar_job_id']))
114 del record['reg_researchers']
116 for rec in recslice_list:
117 logger.debug("SLABDRIVER\r\n \t \
118 fill_record_info oar_job_id %s " \
119 %(rec['oar_job_id']))
121 record['node_ids'] = [ self.slab_api.root_auth + \
122 hostname for hostname in rec['node_ids']]
127 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
128 recslice_list %s \r\n \t RECORD %s \r\n \
129 \r\n" %(recslice_list, record))
131 if str(record['type']) == 'user':
132 #The record is a SFA user record.
133 #Get the information about his slice from Senslab's DB
134 #and add it to the user record.
135 recslice_list = self.slab_api.GetSlices(\
136 slice_filter = record['record_id'],\
137 slice_filter_type = 'record_id_user')
139 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
140 recslice_list %s \r\n \t RECORD %s \r\n" \
141 %(recslice_list , record))
142 #Append slice record in records list,
143 #therefore fetches user and slice info again(one more loop)
144 #Will update PIs and researcher for the slice
146 recuser = recslice_list[0]['reg_researchers']
147 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
148 recuser %s \r\n \r\n" %(recuser))
150 recslice = recslice_list[0]
151 recslice.update({'PI':[recuser['hrn']],
152 'researcher': [recuser['hrn']],
153 'name':record['hrn'],
156 'person_ids':[recuser['record_id']]})
158 for rec in recslice_list:
159 recslice['oar_job_id'].append(rec['oar_job_id'])
163 recslice.update({'type':'slice', \
164 'hrn':recslice_list[0]['hrn']})
167 #GetPersons takes [] as filters
168 user_slab = self.slab_api.GetPersons([record])
171 record.update(user_slab[0])
172 #For client_helper.py compatibility
173 record.update( { 'geni_urn':'',
176 record_list.append(recslice)
178 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
179 INFO TO USER records %s" %(record_list))
182 except TypeError, error:
183 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
189 def sliver_status(self, slice_urn, slice_hrn):
192 Receive a status request for slice named urn/hrn
193 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
194 shall return a structure as described in
195 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
196 NT : not sure if we should implement this or not, but used by sface.
198 :param slice_urn: slice urn
199 :type slice_urn: string
200 :param slice_hrn: slice hrn
201 :type slice_hrn: string
203 .. note:: UNUSED. sface deprecated. SA May 7th 2013
208 #First get the slice with the slice hrn
209 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
210 slice_filter_type = 'slice_hrn')
212 if len(slice_list) is 0:
213 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
215 #Used for fetching the user info witch comes along the slice info
216 one_slice = slice_list[0]
219 #Make a list of all the nodes hostnames in use for this slice
220 slice_nodes_list = []
221 #for single_slice in slice_list:
222 #for node in single_slice['node_ids']:
223 #slice_nodes_list.append(node['hostname'])
224 for node in one_slice:
225 slice_nodes_list.append(node['hostname'])
227 #Get all the corresponding nodes details
228 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
229 ['node_id', 'hostname','site','boot_state'])
230 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
231 for one_node in nodes_all])
235 for single_slice in slice_list:
238 top_level_status = 'empty'
241 ['geni_urn','pl_login','geni_status','geni_resources'], None)
242 result['pl_login'] = one_slice['reg_researchers']['hrn']
243 logger.debug("Slabdriver - sliver_status Sliver status \
244 urn %s hrn %s single_slice %s \r\n " \
245 %(slice_urn, slice_hrn, single_slice))
247 if 'node_ids' not in single_slice:
249 result['geni_status'] = top_level_status
250 result['geni_resources'] = []
253 top_level_status = 'ready'
255 #A job is running on Senslab for this slice
256 # report about the local nodes that are in the slice only
258 result['geni_urn'] = slice_urn
261 for node in single_slice['node_ids']:
263 #res['slab_hostname'] = node['hostname']
264 #res['slab_boot_state'] = node['boot_state']
266 res['pl_hostname'] = node['hostname']
267 res['pl_boot_state'] = \
268 nodeall_byhostname[node['hostname']]['boot_state']
269 #res['pl_last_contact'] = strftime(self.time_format, \
270 #gmtime(float(timestamp)))
271 sliver_id = Xrn(slice_urn, type='slice', \
272 id=nodeall_byhostname[node['hostname']]['node_id'], \
273 authority=self.hrn).urn
275 res['geni_urn'] = sliver_id
276 node_name = node['hostname']
277 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
279 res['geni_status'] = 'ready'
281 res['geni_status'] = 'failed'
282 top_level_status = 'failed'
284 res['geni_error'] = ''
286 resources.append(res)
288 result['geni_status'] = top_level_status
289 result['geni_resources'] = resources
290 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
295 def get_user_record(hrn):
298 Returns the user record based on the hrn from the SFA DB .
300 :param hrn: user's hrn
302 :return : user record from SFA database
306 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
309 def testbed_name (self):
312 Returns testbed's name.
318 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
319 def aggregate_version (self):
322 Returns the testbed's supported rspec advertisement and
327 version_manager = VersionManager()
328 ad_rspec_versions = []
329 request_rspec_versions = []
330 for rspec_version in version_manager.versions:
331 if rspec_version.content_type in ['*', 'ad']:
332 ad_rspec_versions.append(rspec_version.to_dict())
333 if rspec_version.content_type in ['*', 'request']:
334 request_rspec_versions.append(rspec_version.to_dict())
336 'testbed':self.testbed_name(),
337 'geni_request_rspec_versions': request_rspec_versions,
338 'geni_ad_rspec_versions': ad_rspec_versions,
343 def _get_requested_leases_list(self, rspec):
345 Process leases in rspec depending on the rspec version (format)
346 type. Find the lease requests in the rspec and creates
347 a lease request list with the mandatory information ( nodes,
348 start time and duration) of the valid leases (duration above or equal
349 to the senslab experiment minimum duration).
351 :param rspec: rspec request received.
353 :return: list of lease requests found in the rspec
356 requested_lease_list = []
357 for lease in rspec.version.get_leases():
358 single_requested_lease = {}
359 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
361 if not lease.get('lease_id'):
362 if get_authority(lease['component_id']) == \
363 self.slab_api.root_auth:
364 single_requested_lease['hostname'] = \
365 slab_xrn_to_hostname(\
366 lease.get('component_id').strip())
367 single_requested_lease['start_time'] = \
368 lease.get('start_time')
369 single_requested_lease['duration'] = lease.get('duration')
370 #Check the experiment's duration is valid before adding
371 #the lease to the requested leases list
372 duration_in_seconds = \
373 int(single_requested_lease['duration'])*60
374 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
375 requested_lease_list.append(single_requested_lease)
377 return requested_lease_list
380 def _group_leases_by_start_time(requested_lease_list):
382 Create dict of leases by start_time, regrouping nodes reserved
383 at the same time, for the same amount of time so as to
384 define one job on OAR.
386 :param requested_lease_list: list of leases
387 :type requested_lease_list: list
388 :return: Dictionary with key = start time, value = list of leases
389 with the same start time.
393 requested_job_dict = {}
394 for lease in requested_lease_list:
396 #In case it is an asap experiment start_time is empty
397 if lease['start_time'] == '':
398 lease['start_time'] = '0'
400 if lease['start_time'] not in requested_job_dict:
401 if isinstance(lease['hostname'], str):
402 #lease['hostname'] = [lease['hostname']]
403 lease['hostname'] = list(lease['hostname'])
405 requested_job_dict[lease['start_time']] = lease
408 job_lease = requested_job_dict[lease['start_time']]
409 if lease['duration'] == job_lease['duration'] :
410 job_lease['hostname'].append(lease['hostname'])
412 return requested_job_dict
414 def _process_requested_jobs(self, rspec):
416 Turns the requested leases and information into a dictionary
417 of requested jobs, grouped by starting time.
419 :param rspec: RSpec received
423 requested_lease_list = self._get_requested_leases_list(rspec)
424 job_dict = self._group_leases_by_start_time(requested_lease_list)
428 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
432 Creates the leases and slivers for the users from the information
433 found in the rspec string.
434 Launch experiment on OAR if the requested leases is valid. Delete
435 no longer requested leases.
438 :param creds: user's credentials
440 :param users: user record list
447 aggregate = SlabAggregate(self)
449 slices = SlabSlices(self)
450 peer = slices.get_peer(slice_hrn)
451 sfa_peer = slices.get_sfa_peer(slice_hrn)
454 if not isinstance(creds, list):
458 slice_record = users[0].get('slice_record', {})
459 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
460 creds %s \r\n \r\n users %s" \
462 slice_record['user'] = {'keys':users[0]['keys'], \
463 'email':users[0]['email'], \
464 'hrn':slice_record['reg-researchers'][0]}
466 rspec = RSpec(rspec_string)
467 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
468 %s slice_record %s users %s" \
469 %(rspec.version,slice_record, users))
472 # ensure site record exists?
473 # ensure slice record exists
474 #Removed options to verify_slice SA 14/08/12
475 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
478 # ensure person records exists
479 #verify_persons returns added persons but since the return value
481 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
482 sfa_peer, options=options)
483 #requested_attributes returned by rspec.version.get_slice_attributes()
484 #unused, removed SA 13/08/12
485 #rspec.version.get_slice_attributes()
487 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
489 # add/remove slice from nodes
491 #requested_slivers = [node.get('component_id') \
492 #for node in rspec.version.get_nodes_with_slivers()\
493 #if node.get('authority_id') is self.slab_api.root_auth]
494 #l = [ node for node in rspec.version.get_nodes_with_slivers() ]
495 #logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
496 #requested_slivers %s listnodes %s" \
497 #%(requested_slivers,l))
498 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
499 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
502 requested_job_dict = self._process_requested_jobs(rspec)
505 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
506 %(requested_job_dict))
507 #verify_slice_leases returns the leases , but the return value is unused
508 #here. Removed SA 13/08/12
509 slices.verify_slice_leases(sfa_slice, \
510 requested_job_dict, peer)
512 return aggregate.get_rspec(slice_xrn=slice_urn, \
513 login=sfa_slice['login'], version=rspec.version)
516 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
518 Deletes the lease associated with the slice hrn and the credentials
519 if the slice belongs to senslab.
521 :return: 1 if the slice to delete was not found on senslab,
522 True if the deletion was successful, False otherwise otherwise.
524 .. note:: Should really be named delete_leases because senslab does
525 not have any slivers, but only deals with leases. However, SFA api only
526 have delete_sliver define so far. SA 13.05/2013
529 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
530 slice_filter_type = 'slice_hrn')
532 if not sfa_slice_list:
535 #Delete all leases in the slice
536 for sfa_slice in sfa_slice_list:
539 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
540 slices = SlabSlices(self)
541 # determine if this is a peer slice
543 peer = slices.get_peer(slice_hrn)
545 logger.debug("SLABDRIVER.PY delete_sliver peer %s \
546 \r\n \t sfa_slice %s " %(peer, sfa_slice))
549 self.slab_api.DeleteSliceFromNodes(sfa_slice)
555 def list_resources (self, slice_urn, slice_hrn, creds, options):
557 List resources from the senslab aggregate and returns a Rspec
558 with resources found when slice_urn and slice_hrn are None
559 (in case of resource discovery).
560 If a slice hrn and urn are provided, list experiment's slice
561 nodes in a rspec format.
563 :return: rspec string in xml
567 #cached_requested = options.get('cached', True)
569 version_manager = VersionManager()
570 # get the rspec's return format from options
572 version_manager.get_version(options.get('geni_rspec_version'))
573 version_string = "rspec_%s" % (rspec_version)
575 #panos adding the info option to the caching key (can be improved)
576 if options.get('info'):
577 version_string = version_string + "_" + \
578 options.get('info', 'default')
580 # Adding the list_leases option to the caching key
581 if options.get('list_leases'):
582 version_string = version_string + "_" + \
583 options.get('list_leases', 'default')
585 # Adding geni_available to caching key
586 if options.get('geni_available'):
587 version_string = version_string + "_" + \
588 str(options.get('geni_available'))
590 # look in cache first
591 #if cached_requested and self.cache and not slice_hrn:
592 #rspec = self.cache.get(version_string)
594 #logger.debug("SlabDriver.ListResources: \
595 #returning cached advertisement")
598 #panos: passing user-defined options
599 aggregate = SlabAggregate(self)
600 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
601 #options.update({'origin_hrn':origin_hrn})
602 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
603 version=rspec_version, options=options)
606 #if self.cache and not slice_hrn:
607 #logger.debug("Slab.ListResources: stores advertisement in cache")
608 #self.cache.add(version_string, rspec)
613 def list_slices (self, creds, options):
614 # look in cache first
616 #slices = self.cache.get('slices')
618 #logger.debug("PlDriver.list_slices returns from cache")
623 slices = self.slab_api.GetSlices()
624 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
625 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
627 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
628 for slice_hrn in slice_hrns]
632 #logger.debug ("SlabDriver.list_slices stores value in cache")
633 #self.cache.add('slices', slice_urns)
638 def register (self, sfa_record, hrn, pub_key):
640 Adding new user, slice, node or site should not be handled
644 Adding users = LDAP Senslab
645 Adding slice = Import from LDAP users
651 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
652 """No site or node record update allowed in Senslab."""
654 pointer = old_sfa_record['pointer']
655 old_sfa_record_type = old_sfa_record['type']
657 # new_key implemented for users only
658 if new_key and old_sfa_record_type not in [ 'user' ]:
659 raise UnknownSfaType(old_sfa_record_type)
661 #if (type == "authority"):
662 #self.shell.UpdateSite(pointer, new_sfa_record)
664 if old_sfa_record_type == "slice":
665 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
667 if 'name' in slab_record:
668 slab_record.pop('name')
669 #Prototype should be UpdateSlice(self,
670 #auth, slice_id_or_name, slice_fields)
671 #Senslab cannot update slice since slice = job
672 #so we must delete and create another job
673 self.slab_api.UpdateSlice(pointer, slab_record)
675 elif old_sfa_record_type == "user":
677 all_fields = new_sfa_record
678 for key in all_fields.keys():
679 if key in ['first_name', 'last_name', 'title', 'email',
680 'password', 'phone', 'url', 'bio', 'accepted_aup',
682 update_fields[key] = all_fields[key]
683 self.slab_api.UpdatePerson(pointer, update_fields)
686 # must check this key against the previous one if it exists
687 persons = self.slab_api.GetPersons(['key_ids'])
689 keys = person['key_ids']
690 keys = self.slab_api.GetKeys(person['key_ids'])
692 # Delete all stale keys
695 if new_key != key['key']:
696 self.slab_api.DeleteKey(key['key_id'])
700 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
707 def remove (self, sfa_record):
708 sfa_record_type = sfa_record['type']
709 hrn = sfa_record['hrn']
710 if sfa_record_type == 'user':
712 #get user from senslab ldap
713 person = self.slab_api.GetPersons(sfa_record)
714 #No registering at a given site in Senslab.
715 #Once registered to the LDAP, all senslab sites are
718 #Mark account as disabled in ldap
719 self.slab_api.DeletePerson(sfa_record)
720 elif sfa_record_type == 'slice':
721 if self.slab_api.GetSlices(slice_filter = hrn, \
722 slice_filter_type = 'slice_hrn'):
723 self.slab_api.DeleteSlice(sfa_record)
725 #elif type == 'authority':
726 #if self.GetSites(pointer):
727 #self.DeleteSite(pointer)