3 from datetime import datetime
5 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
6 from sfa.util.sfalogging import logger
7 from sfa.storage.alchemy import dbsession
8 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
9 from sqlalchemy.orm import joinedload
11 from sfa.trust.certificate import Keypair, convert_public_key
12 from sfa.trust.gid import create_uuid
13 from sfa.trust.hierarchy import Hierarchy
15 from sfa.managers.driver import Driver
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
19 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
21 from sfa.senslab.OARrestapi import OARrestapi
22 from sfa.senslab.LDAPapi import LDAPapi
24 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
27 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
29 from sfa.senslab.slabslices import SlabSlices
32 from sfa.senslab.slabapi import SlabTestbedAPI
36 class SlabDriver(Driver):
37 """ Senslab Driver class inherited from Driver generic class.
39 Contains methods compliant with the SFA standard and the testbed
40 infrastructure (calls to LDAP and OAR).
42 def __init__(self, config):
43 Driver.__init__ (self, config)
45 self.hrn = config.SFA_INTERFACE_HRN
47 self.db = SlabDB(config, debug = False)
48 self.slab_api = SlabTestbedAPI(config)
51 def augment_records_with_testbed_info (self, record_list ):
52 """ Adds specific testbed info to the records. """
53 return self.fill_record_info (record_list)
55 def fill_record_info(self, record_list):
57 Given a SFA record, fill in the senslab specific and SFA specific
61 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
62 if not isinstance(record_list, list):
63 record_list = [record_list]
66 for record in record_list:
67 #If the record is a SFA slice record, then add information
68 #about the user of this slice. This kind of
69 #information is in the Senslab's DB.
70 if str(record['type']) == 'slice':
71 if 'reg_researchers' in record and \
72 isinstance(record['reg_researchers'], list) :
73 record['reg_researchers'] = record['reg_researchers'][0].__dict__
74 record.update({'PI':[record['reg_researchers']['hrn']],
75 'researcher': [record['reg_researchers']['hrn']],
79 'person_ids':[record['reg_researchers']['record_id']],
80 'geni_urn':'', #For client_helper.py compatibility
81 'keys':'', #For client_helper.py compatibility
82 'key_ids':''}) #For client_helper.py compatibility
85 #Get slab slice record and oar job id if any.
86 recslice_list = self.slab_api.GetSlices(slice_filter = \
88 slice_filter_type = 'slice_hrn')
91 logger.debug("SLABDRIVER \tfill_record_info \
92 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
93 %s " %(record['hrn'], record['oar_job_id']))
94 del record['reg_researchers']
96 for rec in recslice_list:
97 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
99 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
104 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
105 recslice_list %s \r\n \t RECORD %s \r\n \
106 \r\n" %(recslice_list, record))
108 if str(record['type']) == 'user':
109 #The record is a SFA user record.
110 #Get the information about his slice from Senslab's DB
111 #and add it to the user record.
112 recslice_list = self.slab_api.GetSlices(\
113 slice_filter = record['record_id'],\
114 slice_filter_type = 'record_id_user')
116 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
117 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
118 #Append slice record in records list,
119 #therefore fetches user and slice info again(one more loop)
120 #Will update PIs and researcher for the slice
122 recuser = recslice_list[0]['reg_researchers']
123 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
124 recuser %s \r\n \r\n" %(recuser))
126 recslice = recslice_list[0]
127 recslice.update({'PI':[recuser['hrn']],
128 'researcher': [recuser['hrn']],
129 'name':record['hrn'],
132 'person_ids':[recuser['record_id']]})
134 for rec in recslice_list:
135 recslice['oar_job_id'].append(rec['oar_job_id'])
139 recslice.update({'type':'slice', \
140 'hrn':recslice_list[0]['hrn']})
143 #GetPersons takes [] as filters
144 user_slab = self.slab_api.GetPersons([record])
147 record.update(user_slab[0])
148 #For client_helper.py compatibility
149 record.update( { 'geni_urn':'',
152 record_list.append(recslice)
154 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
155 INFO TO USER records %s" %(record_list))
158 except TypeError, error:
159 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
165 def sliver_status(self, slice_urn, slice_hrn):
166 """Receive a status request for slice named urn/hrn
167 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
168 shall return a structure as described in
169 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
170 NT : not sure if we should implement this or not, but used by sface.
174 #First get the slice with the slice hrn
175 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
176 slice_filter_type = 'slice_hrn')
178 if len(slice_list) is 0:
179 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
181 #Used for fetching the user info witch comes along the slice info
182 one_slice = slice_list[0]
185 #Make a list of all the nodes hostnames in use for this slice
186 slice_nodes_list = []
187 #for single_slice in slice_list:
188 #for node in single_slice['node_ids']:
189 #slice_nodes_list.append(node['hostname'])
190 for node in one_slice:
191 slice_nodes_list.append(node['hostname'])
193 #Get all the corresponding nodes details
194 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
195 ['node_id', 'hostname','site','boot_state'])
196 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
197 for one_node in nodes_all])
201 for single_slice in slice_list:
204 top_level_status = 'empty'
207 ['geni_urn','pl_login','geni_status','geni_resources'], None)
208 result['pl_login'] = one_slice['reg_researchers']['hrn']
209 logger.debug("Slabdriver - sliver_status Sliver status \
210 urn %s hrn %s single_slice %s \r\n " \
211 %(slice_urn, slice_hrn, single_slice))
213 if 'node_ids' not in single_slice:
215 result['geni_status'] = top_level_status
216 result['geni_resources'] = []
219 top_level_status = 'ready'
221 #A job is running on Senslab for this slice
222 # report about the local nodes that are in the slice only
224 result['geni_urn'] = slice_urn
227 for node in single_slice['node_ids']:
229 #res['slab_hostname'] = node['hostname']
230 #res['slab_boot_state'] = node['boot_state']
232 res['pl_hostname'] = node['hostname']
233 res['pl_boot_state'] = \
234 nodeall_byhostname[node['hostname']]['boot_state']
235 #res['pl_last_contact'] = strftime(self.time_format, \
236 #gmtime(float(timestamp)))
237 sliver_id = Xrn(slice_urn, type='slice', \
238 id=nodeall_byhostname[node['hostname']]['node_id'], \
239 authority=self.hrn).urn
241 res['geni_urn'] = sliver_id
242 node_name = node['hostname']
243 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
245 res['geni_status'] = 'ready'
247 res['geni_status'] = 'failed'
248 top_level_status = 'failed'
250 res['geni_error'] = ''
252 resources.append(res)
254 result['geni_status'] = top_level_status
255 result['geni_resources'] = resources
256 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
261 def get_user_record( hrn):
262 """ Returns the user record based on the hrn from the SFA DB """
263 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
266 def testbed_name (self):
269 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
270 def aggregate_version (self):
271 version_manager = VersionManager()
272 ad_rspec_versions = []
273 request_rspec_versions = []
274 for rspec_version in version_manager.versions:
275 if rspec_version.content_type in ['*', 'ad']:
276 ad_rspec_versions.append(rspec_version.to_dict())
277 if rspec_version.content_type in ['*', 'request']:
278 request_rspec_versions.append(rspec_version.to_dict())
280 'testbed':self.testbed_name(),
281 'geni_request_rspec_versions': request_rspec_versions,
282 'geni_ad_rspec_versions': ad_rspec_versions,
286 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
289 aggregate = SlabAggregate(self)
291 slices = SlabSlices(self)
292 peer = slices.get_peer(slice_hrn)
293 sfa_peer = slices.get_sfa_peer(slice_hrn)
296 if not isinstance(creds, list):
300 slice_record = users[0].get('slice_record', {})
301 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
302 creds %s \r\n \r\n users %s" \
304 slice_record['user'] = {'keys':users[0]['keys'], \
305 'email':users[0]['email'], \
306 'hrn':slice_record['reg-researchers'][0]}
308 rspec = RSpec(rspec_string)
309 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
310 %s slice_record %s users %s" \
311 %(rspec.version,slice_record, users))
314 # ensure site record exists?
315 # ensure slice record exists
316 #Removed options to verify_slice SA 14/08/12
317 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
320 # ensure person records exists
321 #verify_persons returns added persons but since the return value
323 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
324 sfa_peer, options=options)
325 #requested_attributes returned by rspec.version.get_slice_attributes()
326 #unused, removed SA 13/08/12
327 rspec.version.get_slice_attributes()
329 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
331 # add/remove slice from nodes
333 requested_slivers = [node.get('component_id') \
334 for node in rspec.version.get_nodes_with_slivers()\
335 if node.get('authority_id') is self.slab_api.root_auth]
336 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
337 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
338 requested_slivers %s listnodes %s" \
339 %(requested_slivers,l))
340 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
341 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
344 requested_lease_list = []
348 for lease in rspec.version.get_leases():
349 single_requested_lease = {}
350 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
352 if not lease.get('lease_id'):
353 if get_authority(lease['component_id']) == self.slab_api.root_auth:
354 single_requested_lease['hostname'] = \
355 slab_xrn_to_hostname(\
356 lease.get('component_id').strip())
357 single_requested_lease['start_time'] = \
358 lease.get('start_time')
359 single_requested_lease['duration'] = lease.get('duration')
360 #Check the experiment's duration is valid before adding
361 #the lease to the requested leases list
362 duration_in_seconds = \
363 int(single_requested_lease['duration'])*60
364 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
365 requested_lease_list.append(single_requested_lease)
367 #Create dict of leases by start_time, regrouping nodes reserved
369 #time, for the same amount of time = one job on OAR
370 requested_job_dict = {}
371 for lease in requested_lease_list:
373 #In case it is an asap experiment start_time is empty
374 if lease['start_time'] == '':
375 lease['start_time'] = '0'
377 if lease['start_time'] not in requested_job_dict:
378 if isinstance(lease['hostname'], str):
379 lease['hostname'] = [lease['hostname']]
381 requested_job_dict[lease['start_time']] = lease
384 job_lease = requested_job_dict[lease['start_time']]
385 if lease['duration'] == job_lease['duration'] :
386 job_lease['hostname'].append(lease['hostname'])
391 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
392 %(requested_job_dict))
393 #verify_slice_leases returns the leases , but the return value is unused
394 #here. Removed SA 13/08/12
395 slices.verify_slice_leases(sfa_slice, \
396 requested_job_dict, peer)
398 return aggregate.get_rspec(slice_xrn=slice_urn, \
399 login=sfa_slice['login'], version=rspec.version)
402 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
404 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
405 slice_filter_type = 'slice_hrn')
407 if not sfa_slice_list:
410 #Delete all in the slice
411 for sfa_slice in sfa_slice_list:
414 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
415 slices = SlabSlices(self)
416 # determine if this is a peer slice
418 peer = slices.get_peer(slice_hrn)
419 #TODO delete_sliver SA : UnBindObjectFromPeer should be
420 #used when there is another
421 #senslab testbed, which is not the case 14/08/12 .
423 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
426 #self.slab_api.UnBindObjectFromPeer('slice', \
427 #sfa_slice['record_id_slice'], \
429 self.slab_api.DeleteSliceFromNodes(sfa_slice)
435 #self.slab_api.BindObjectToPeer('slice', \
436 #sfa_slice['record_id_slice'], \
437 #peer, sfa_slice['peer_slice_id'])
441 # first 2 args are None in case of resource discovery
442 def list_resources (self, slice_urn, slice_hrn, creds, options):
443 #cached_requested = options.get('cached', True)
445 version_manager = VersionManager()
446 # get the rspec's return format from options
448 version_manager.get_version(options.get('geni_rspec_version'))
449 version_string = "rspec_%s" % (rspec_version)
451 #panos adding the info option to the caching key (can be improved)
452 if options.get('info'):
453 version_string = version_string + "_" + \
454 options.get('info', 'default')
456 # Adding the list_leases option to the caching key
457 if options.get('list_leases'):
458 version_string = version_string + "_"+options.get('list_leases', 'default')
460 # Adding geni_available to caching key
461 if options.get('geni_available'):
462 version_string = version_string + "_" + str(options.get('geni_available'))
464 # look in cache first
465 #if cached_requested and self.cache and not slice_hrn:
466 #rspec = self.cache.get(version_string)
468 #logger.debug("SlabDriver.ListResources: \
469 #returning cached advertisement")
472 #panos: passing user-defined options
473 aggregate = SlabAggregate(self)
474 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
475 #options.update({'origin_hrn':origin_hrn})
476 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
477 version=rspec_version, options=options)
480 #if self.cache and not slice_hrn:
481 #logger.debug("Slab.ListResources: stores advertisement in cache")
482 #self.cache.add(version_string, rspec)
487 def list_slices (self, creds, options):
488 # look in cache first
490 #slices = self.cache.get('slices')
492 #logger.debug("PlDriver.list_slices returns from cache")
497 slices = self.slab_api.GetSlices()
498 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
499 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
501 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
502 for slice_hrn in slice_hrns]
506 #logger.debug ("SlabDriver.list_slices stores value in cache")
507 #self.cache.add('slices', slice_urns)
512 def register (self, sfa_record, hrn, pub_key):
514 Adding new user, slice, node or site should not be handled
518 Adding users = LDAP Senslab
519 Adding slice = Import from LDAP users
525 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
526 """No site or node record update allowed in Senslab."""
528 pointer = old_sfa_record['pointer']
529 old_sfa_record_type = old_sfa_record['type']
531 # new_key implemented for users only
532 if new_key and old_sfa_record_type not in [ 'user' ]:
533 raise UnknownSfaType(old_sfa_record_type)
535 #if (type == "authority"):
536 #self.shell.UpdateSite(pointer, new_sfa_record)
538 if old_sfa_record_type == "slice":
539 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
541 if 'name' in slab_record:
542 slab_record.pop('name')
543 #Prototype should be UpdateSlice(self,
544 #auth, slice_id_or_name, slice_fields)
545 #Senslab cannot update slice since slice = job
546 #so we must delete and create another job
547 self.slab_api.UpdateSlice(pointer, slab_record)
549 elif old_sfa_record_type == "user":
551 all_fields = new_sfa_record
552 for key in all_fields.keys():
553 if key in ['first_name', 'last_name', 'title', 'email',
554 'password', 'phone', 'url', 'bio', 'accepted_aup',
556 update_fields[key] = all_fields[key]
557 self.slab_api.UpdatePerson(pointer, update_fields)
560 # must check this key against the previous one if it exists
561 persons = self.slab_api.GetPersons(['key_ids'])
563 keys = person['key_ids']
564 keys = self.slab_api.GetKeys(person['key_ids'])
566 # Delete all stale keys
569 if new_key != key['key']:
570 self.slab_api.DeleteKey(key['key_id'])
574 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
581 def remove (self, sfa_record):
582 sfa_record_type = sfa_record['type']
583 hrn = sfa_record['hrn']
584 if sfa_record_type == 'user':
586 #get user from senslab ldap
587 person = self.slab_api.GetPersons(sfa_record)
588 #No registering at a given site in Senslab.
589 #Once registered to the LDAP, all senslab sites are
592 #Mark account as disabled in ldap
593 self.slab_api.DeletePerson(sfa_record)
594 elif sfa_record_type == 'slice':
595 if self.slab_api.GetSlices(slice_filter = hrn, \
596 slice_filter_type = 'slice_hrn'):
597 self.slab_api.DeleteSlice(sfa_record)
599 #elif type == 'authority':
600 #if self.GetSites(pointer):
601 #self.DeleteSite(pointer)