3 from datetime import datetime
5 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
6 from sfa.util.sfalogging import logger
7 from sfa.storage.alchemy import dbsession
8 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
9 from sqlalchemy.orm import joinedload
11 from sfa.trust.certificate import Keypair, convert_public_key
12 from sfa.trust.gid import create_uuid
13 from sfa.trust.hierarchy import Hierarchy
15 from sfa.managers.driver import Driver
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
19 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
21 from sfa.senslab.OARrestapi import OARrestapi
22 from sfa.senslab.LDAPapi import LDAPapi
24 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
27 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
29 from sfa.senslab.slabslices import SlabSlices
32 from sfa.senslab.slabapi import SlabTestbedAPI
36 class SlabDriver(Driver):
37 """ Senslab Driver class inherited from Driver generic class.
39 Contains methods compliant with the SFA standard and the testbed
40 infrastructure (calls to LDAP and OAR).
42 def __init__(self, config):
43 Driver.__init__ (self, config)
45 self.hrn = config.SFA_INTERFACE_HRN
47 self.db = SlabDB(config, debug = False)
48 self.slab_api = SlabTestbedAPI(config)
51 def augment_records_with_testbed_info (self, record_list ):
52 """ Adds specific testbed info to the records. """
53 return self.fill_record_info (record_list)
55 def fill_record_info(self, record_list):
57 Given a SFA record, fill in the senslab specific and SFA specific
61 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
62 if not isinstance(record_list, list):
63 record_list = [record_list]
66 for record in record_list:
67 #If the record is a SFA slice record, then add information
68 #about the user of this slice. This kind of
69 #information is in the Senslab's DB.
70 if str(record['type']) == 'slice':
71 if 'reg_researchers' in record and \
72 isinstance(record['reg_researchers'], list) :
73 record['reg_researchers'] = record['reg_researchers'][0].__dict__
74 record.update({'PI':[record['reg_researchers']['hrn']],
75 'researcher': [record['reg_researchers']['hrn']],
79 'person_ids':[record['reg_researchers']['record_id']],
80 'geni_urn':'', #For client_helper.py compatibility
81 'keys':'', #For client_helper.py compatibility
82 'key_ids':''}) #For client_helper.py compatibility
85 #Get slab slice record and oar job id if any.
86 recslice_list = self.slab_api.GetSlices(slice_filter = \
88 slice_filter_type = 'slice_hrn')
91 logger.debug("SLABDRIVER \tfill_record_info \
92 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
93 %s " %(record['hrn'], record['oar_job_id']))
95 for rec in recslice_list:
96 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
97 del record['reg_researchers']
98 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
102 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
103 recslice_list %s \r\n \t RECORD %s \r\n \
104 \r\n" %(recslice_list, record))
106 if str(record['type']) == 'user':
107 #The record is a SFA user record.
108 #Get the information about his slice from Senslab's DB
109 #and add it to the user record.
110 recslice_list = self.slab_api.GetSlices(\
111 slice_filter = record['record_id'],\
112 slice_filter_type = 'record_id_user')
114 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
115 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
116 #Append slice record in records list,
117 #therefore fetches user and slice info again(one more loop)
118 #Will update PIs and researcher for the slice
120 recuser = recslice_list[0]['reg_researchers']
121 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
122 recuser %s \r\n \r\n" %(recuser))
124 recslice = recslice_list[0]
125 recslice.update({'PI':[recuser['hrn']],
126 'researcher': [recuser['hrn']],
127 'name':record['hrn'],
130 'person_ids':[recuser['record_id']]})
132 for rec in recslice_list:
133 recslice['oar_job_id'].append(rec['oar_job_id'])
137 recslice.update({'type':'slice', \
138 'hrn':recslice_list[0]['hrn']})
141 #GetPersons takes [] as filters
142 user_slab = self.slab_api.GetPersons([record])
145 record.update(user_slab[0])
146 #For client_helper.py compatibility
147 record.update( { 'geni_urn':'',
150 record_list.append(recslice)
152 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
153 INFO TO USER records %s" %(record_list))
156 except TypeError, error:
157 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
163 def sliver_status(self, slice_urn, slice_hrn):
164 """Receive a status request for slice named urn/hrn
165 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
166 shall return a structure as described in
167 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
168 NT : not sure if we should implement this or not, but used by sface.
172 #First get the slice with the slice hrn
173 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
174 slice_filter_type = 'slice_hrn')
176 if len(slice_list) is 0:
177 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
179 #Used for fetching the user info witch comes along the slice info
180 one_slice = slice_list[0]
183 #Make a list of all the nodes hostnames in use for this slice
184 slice_nodes_list = []
185 #for single_slice in slice_list:
186 #for node in single_slice['node_ids']:
187 #slice_nodes_list.append(node['hostname'])
188 for node in one_slice:
189 slice_nodes_list.append(node['hostname'])
191 #Get all the corresponding nodes details
192 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
193 ['node_id', 'hostname','site','boot_state'])
194 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
195 for one_node in nodes_all])
199 for single_slice in slice_list:
202 top_level_status = 'empty'
205 ['geni_urn','pl_login','geni_status','geni_resources'], None)
206 result['pl_login'] = one_slice['reg_researchers']['hrn']
207 logger.debug("Slabdriver - sliver_status Sliver status \
208 urn %s hrn %s single_slice %s \r\n " \
209 %(slice_urn, slice_hrn, single_slice))
211 if 'node_ids' not in single_slice:
213 result['geni_status'] = top_level_status
214 result['geni_resources'] = []
217 top_level_status = 'ready'
219 #A job is running on Senslab for this slice
220 # report about the local nodes that are in the slice only
222 result['geni_urn'] = slice_urn
225 for node in single_slice['node_ids']:
227 #res['slab_hostname'] = node['hostname']
228 #res['slab_boot_state'] = node['boot_state']
230 res['pl_hostname'] = node['hostname']
231 res['pl_boot_state'] = \
232 nodeall_byhostname[node['hostname']]['boot_state']
233 #res['pl_last_contact'] = strftime(self.time_format, \
234 #gmtime(float(timestamp)))
235 sliver_id = Xrn(slice_urn, type='slice', \
236 id=nodeall_byhostname[node['hostname']]['node_id'], \
237 authority=self.hrn).urn
239 res['geni_urn'] = sliver_id
240 node_name = node['hostname']
241 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
243 res['geni_status'] = 'ready'
245 res['geni_status'] = 'failed'
246 top_level_status = 'failed'
248 res['geni_error'] = ''
250 resources.append(res)
252 result['geni_status'] = top_level_status
253 result['geni_resources'] = resources
254 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
259 def get_user_record( hrn):
260 """ Returns the user record based on the hrn from the SFA DB """
261 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
264 def testbed_name (self):
267 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
268 def aggregate_version (self):
269 version_manager = VersionManager()
270 ad_rspec_versions = []
271 request_rspec_versions = []
272 for rspec_version in version_manager.versions:
273 if rspec_version.content_type in ['*', 'ad']:
274 ad_rspec_versions.append(rspec_version.to_dict())
275 if rspec_version.content_type in ['*', 'request']:
276 request_rspec_versions.append(rspec_version.to_dict())
278 'testbed':self.testbed_name(),
279 'geni_request_rspec_versions': request_rspec_versions,
280 'geni_ad_rspec_versions': ad_rspec_versions,
284 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
287 aggregate = SlabAggregate(self)
289 slices = SlabSlices(self)
290 peer = slices.get_peer(slice_hrn)
291 sfa_peer = slices.get_sfa_peer(slice_hrn)
294 if not isinstance(creds, list):
298 slice_record = users[0].get('slice_record', {})
299 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
300 creds %s \r\n \r\n users %s" \
302 slice_record['user'] = {'keys':users[0]['keys'], \
303 'email':users[0]['email'], \
304 'hrn':slice_record['reg-researchers'][0]}
306 rspec = RSpec(rspec_string)
307 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
308 %s slice_record %s users %s" \
309 %(rspec.version,slice_record, users))
312 # ensure site record exists?
313 # ensure slice record exists
314 #Removed options to verify_slice SA 14/08/12
315 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
318 # ensure person records exists
319 #verify_persons returns added persons but since the return value
321 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
322 sfa_peer, options=options)
323 #requested_attributes returned by rspec.version.get_slice_attributes()
324 #unused, removed SA 13/08/12
325 rspec.version.get_slice_attributes()
327 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
329 # add/remove slice from nodes
331 requested_slivers = [node.get('component_id') \
332 for node in rspec.version.get_nodes_with_slivers()\
333 if node.get('authority_id') is self.slab_api.root_auth]
334 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
335 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
336 requested_slivers %s listnodes %s" \
337 %(requested_slivers,l))
338 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
339 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
342 requested_lease_list = []
346 for lease in rspec.version.get_leases():
347 single_requested_lease = {}
348 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
350 if not lease.get('lease_id'):
351 if get_authority(lease['component_id']) == self.slab_api.root_auth:
352 single_requested_lease['hostname'] = \
353 slab_xrn_to_hostname(\
354 lease.get('component_id').strip())
355 single_requested_lease['start_time'] = \
356 lease.get('start_time')
357 single_requested_lease['duration'] = lease.get('duration')
358 #Check the experiment's duration is valid before adding
359 #the lease to the requested leases list
360 duration_in_seconds = \
361 int(single_requested_lease['duration'])*60
362 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
363 requested_lease_list.append(single_requested_lease)
365 #Create dict of leases by start_time, regrouping nodes reserved
367 #time, for the same amount of time = one job on OAR
368 requested_job_dict = {}
369 for lease in requested_lease_list:
371 #In case it is an asap experiment start_time is empty
372 if lease['start_time'] == '':
373 lease['start_time'] = '0'
375 if lease['start_time'] not in requested_job_dict:
376 if isinstance(lease['hostname'], str):
377 lease['hostname'] = [lease['hostname']]
379 requested_job_dict[lease['start_time']] = lease
382 job_lease = requested_job_dict[lease['start_time']]
383 if lease['duration'] == job_lease['duration'] :
384 job_lease['hostname'].append(lease['hostname'])
389 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
390 %(requested_job_dict))
391 #verify_slice_leases returns the leases , but the return value is unused
392 #here. Removed SA 13/08/12
393 slices.verify_slice_leases(sfa_slice, \
394 requested_job_dict, peer)
396 return aggregate.get_rspec(slice_xrn=slice_urn, \
397 login=sfa_slice['login'], version=rspec.version)
400 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
402 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
403 slice_filter_type = 'slice_hrn')
405 if not sfa_slice_list:
408 #Delete all in the slice
409 for sfa_slice in sfa_slice_list:
412 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
413 slices = SlabSlices(self)
414 # determine if this is a peer slice
416 peer = slices.get_peer(slice_hrn)
417 #TODO delete_sliver SA : UnBindObjectFromPeer should be
418 #used when there is another
419 #senslab testbed, which is not the case 14/08/12 .
421 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
424 #self.slab_api.UnBindObjectFromPeer('slice', \
425 #sfa_slice['record_id_slice'], \
427 self.slab_api.DeleteSliceFromNodes(sfa_slice)
433 #self.slab_api.BindObjectToPeer('slice', \
434 #sfa_slice['record_id_slice'], \
435 #peer, sfa_slice['peer_slice_id'])
439 # first 2 args are None in case of resource discovery
440 def list_resources (self, slice_urn, slice_hrn, creds, options):
441 #cached_requested = options.get('cached', True)
443 version_manager = VersionManager()
444 # get the rspec's return format from options
446 version_manager.get_version(options.get('geni_rspec_version'))
447 version_string = "rspec_%s" % (rspec_version)
449 #panos adding the info option to the caching key (can be improved)
450 if options.get('info'):
451 version_string = version_string + "_" + \
452 options.get('info', 'default')
454 # Adding the list_leases option to the caching key
455 if options.get('list_leases'):
456 version_string = version_string + "_"+options.get('list_leases', 'default')
458 # Adding geni_available to caching key
459 if options.get('geni_available'):
460 version_string = version_string + "_" + str(options.get('geni_available'))
462 # look in cache first
463 #if cached_requested and self.cache and not slice_hrn:
464 #rspec = self.cache.get(version_string)
466 #logger.debug("SlabDriver.ListResources: \
467 #returning cached advertisement")
470 #panos: passing user-defined options
471 aggregate = SlabAggregate(self)
472 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
473 #options.update({'origin_hrn':origin_hrn})
474 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
475 version=rspec_version, options=options)
478 #if self.cache and not slice_hrn:
479 #logger.debug("Slab.ListResources: stores advertisement in cache")
480 #self.cache.add(version_string, rspec)
485 def list_slices (self, creds, options):
486 # look in cache first
488 #slices = self.cache.get('slices')
490 #logger.debug("PlDriver.list_slices returns from cache")
495 slices = self.slab_api.GetSlices()
496 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
497 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
499 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
500 for slice_hrn in slice_hrns]
504 #logger.debug ("SlabDriver.list_slices stores value in cache")
505 #self.cache.add('slices', slice_urns)
510 def register (self, sfa_record, hrn, pub_key):
512 Adding new user, slice, node or site should not be handled
516 Adding users = LDAP Senslab
517 Adding slice = Import from LDAP users
523 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
524 """No site or node record update allowed in Senslab."""
526 pointer = old_sfa_record['pointer']
527 old_sfa_record_type = old_sfa_record['type']
529 # new_key implemented for users only
530 if new_key and old_sfa_record_type not in [ 'user' ]:
531 raise UnknownSfaType(old_sfa_record_type)
533 #if (type == "authority"):
534 #self.shell.UpdateSite(pointer, new_sfa_record)
536 if old_sfa_record_type == "slice":
537 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
539 if 'name' in slab_record:
540 slab_record.pop('name')
541 #Prototype should be UpdateSlice(self,
542 #auth, slice_id_or_name, slice_fields)
543 #Senslab cannot update slice since slice = job
544 #so we must delete and create another job
545 self.slab_api.UpdateSlice(pointer, slab_record)
547 elif old_sfa_record_type == "user":
549 all_fields = new_sfa_record
550 for key in all_fields.keys():
551 if key in ['first_name', 'last_name', 'title', 'email',
552 'password', 'phone', 'url', 'bio', 'accepted_aup',
554 update_fields[key] = all_fields[key]
555 self.slab_api.UpdatePerson(pointer, update_fields)
558 # must check this key against the previous one if it exists
559 persons = self.slab_api.GetPersons(['key_ids'])
561 keys = person['key_ids']
562 keys = self.slab_api.GetKeys(person['key_ids'])
564 # Delete all stale keys
567 if new_key != key['key']:
568 self.slab_api.DeleteKey(key['key_id'])
572 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
579 def remove (self, sfa_record):
580 sfa_record_type = sfa_record['type']
581 hrn = sfa_record['hrn']
582 if sfa_record_type == 'user':
584 #get user from senslab ldap
585 person = self.slab_api.GetPersons(sfa_record)
586 #No registering at a given site in Senslab.
587 #Once registered to the LDAP, all senslab sites are
590 #Mark account as disabled in ldap
591 self.slab_api.DeletePerson(sfa_record)
592 elif sfa_record_type == 'slice':
593 if self.slab_api.GetSlices(slice_filter = hrn, \
594 slice_filter_type = 'slice_hrn'):
595 self.slab_api.DeleteSlice(sfa_record)
597 #elif type == 'authority':
598 #if self.GetSites(pointer):
599 #self.DeleteSite(pointer)