4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_hrn
11 from sfa.util.cache import Cache
13 # one would think the driver should not need to mess with the SFA db, but..
14 from sfa.storage.alchemy import dbsession
15 from sfa.storage.model import RegRecord
17 # used to be used in get_ticket
18 #from sfa.trust.sfaticket import SfaTicket
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 # the driver interface, mostly provides default behaviours
24 from sfa.managers.driver import Driver
26 from sfa.nitos.nitosshell import NitosShell
27 from sfa.nitos.nitosaggregate import NitosAggregate
28 from sfa.nitos.nitosslices import NitosSlices
30 from sfa.nitos.nitosxrn import NitosXrn, slicename_to_hrn, hostname_to_hrn, hrn_to_nitos_slicename, xrn_to_hostname
32 def list_to_dict(recs, key):
34 convert a list of dictionaries into a dictionary keyed on the
35 specified dictionary key
37 return dict ( [ (rec[key],rec) for rec in recs ] )
40 # NitosShell is just an xmlrpc serverproxy where methods
41 # can be sent as-is; it takes care of authentication
42 # from the global config
44 class NitosDriver (Driver):
46 # the cache instance is a class member so it survives across incoming requests
49 def __init__ (self, config):
50 Driver.__init__ (self, config)
51 self.shell = NitosShell (config)
53 self.testbedInfo = self.shell.getTestbedInfo()
54 # un-comment below lines to enable caching
55 # if config.SFA_AGGREGATE_CACHING:
56 # if NitosDriver.cache is None:
57 # NitosDriver.cache = Cache()
58 # self.cache = NitosDriver.cache
60 ###########################################
61 ########## utility methods for NITOS driver
62 ###########################################
65 def filter_nitos_results (self, listo, filters_dict):
67 the Nitos scheduler API does not provide a get result filtring so we do it here
72 for filter in filters_dict:
73 if filter not in dicto or dicto[filter] != filters_dict[filter]:
78 def convert_id (self, list_of_dict):
80 convert object id retrived in string format to int format
82 for dicto in list_of_dict:
84 if key in ['node_id', 'slice_id', 'user_id', 'channel_id', 'reservation_id'] and isinstance(dicto[key], str):
85 dicto[key] = int(dicto[key])
86 elif key in ['user_ids']:
88 for user_id in dicto['user_ids']:
89 user_ids2.append(int(user_id))
90 dicto['user_ids'] = user_ids2
95 ########################################
96 ########## registry oriented
97 ########################################
99 def augment_records_with_testbed_info (self, sfa_records):
100 return self.fill_record_info (sfa_records)
103 def register (self, sfa_record, hrn, pub_key):
104 type = sfa_record['type']
105 nitos_record = self.sfa_fields_to_nitos_fields(type, hrn, sfa_record)
107 if type == 'authority':
110 elif type == 'slice':
111 slices = self.shell.getSlices()
114 if slice['slice_name'] == nitos_record['name']:
115 slice_id = slice['slice_id']
119 pointer = self.shell.addSlice({'slice_name' : nitos_record['name']})
124 users = self.shell.getUsers()
127 if user['user_name'] == nitos_record['name']:
128 user_id = user['user_id']
131 pointer = self.shell.addUser({'username' : nitos_record['name'], 'email' : nitos_record['email']})
138 self.shell.addUserKey({'user_id' : pointer,'key' : pub_key})
141 nodes = self.shell.GetNodes({}, [])
144 if node['hostname'] == nitos_record['name']:
145 node_id = node['node_id']
149 pointer = self.shell.addNode(nitos_record)
156 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
158 pointer = old_sfa_record['pointer']
159 type = old_sfa_record['type']
160 new_nitos_record = self.sfa_fields_to_nitos_fields(type, hrn, new_sfa_record)
162 # new_key implemented for users only
163 if new_key and type not in [ 'user' ]:
164 raise UnknownSfaType(type)
167 if 'name' in new_sfa_record:
168 self.shell.updateSlice({'slice_id': pointer, 'fields': {'slice_name': new_sfa_record['name']}})
172 if 'name' in new_sfa_record:
173 update_fields['username'] = new_sfa_record['name']
174 if 'email' in new_sfa_record:
175 update_fields['email'] = new_sfa_record['email']
177 self.shell.updateUser({'user_id': pointer, 'fields': update_fields})
180 # needs to be improved
181 self.shell.addUserKey({'user_id': pointer, 'key': new_key})
184 self.shell.updateNode({'node_id': pointer, 'fields': new_sfa_record})
190 def remove (self, sfa_record):
192 type=sfa_record['type']
193 pointer=sfa_record['pointer']
195 self.shell.deleteUser({'user_id': pointer})
196 elif type == 'slice':
197 self.shell.deleteSlice({'slice_id': pointer})
199 self.shell.deleteNode({'node_id': pointer})
208 # Convert SFA fields to NITOS fields for use when registering or updating
209 # registry record in the NITOS Scheduler database
212 def sfa_fields_to_nitos_fields(self, type, hrn, sfa_record):
217 nitos_record["slice_name"] = hrn_to_nitos_slicename(hrn)
219 if "hostname" not in sfa_record:
220 raise MissingSfaInfo("hostname")
221 nitos_record["node_name"] = sfa_record["hostname"]
226 def fill_record_info(self, records):
228 Given a (list of) SFA record, fill in the NITOS specific
229 and SFA specific fields in the record.
231 if not isinstance(records, list):
234 self.fill_record_nitos_info(records)
235 self.fill_record_hrns(records)
236 self.fill_record_sfa_info(records)
239 def fill_record_nitos_info(self, records):
241 Fill in the nitos specific fields of a SFA record. This
242 involves calling the appropriate NITOS API method to retrieve the
243 database record for the object.
245 @param record: record to fill in field (in/out param)
249 node_ids, slice_ids = [], []
250 user_ids, key_ids = [], []
251 type_map = {'node': node_ids, 'slice': slice_ids, 'user': user_ids}
253 for record in records:
254 for type in type_map:
255 if type == record['type']:
256 type_map[type].append(record['pointer'])
259 nodes, slices, users, keys = {}, {}, {}, {}
261 all_nodes = self.convert_id(self.shell.getNodes({}, []))
262 node_list = [node for node in all_nodes if node['node_id'] in node_ids]
263 nodes = list_to_dict(node_list, 'node_id')
265 all_slices = self.convert_id(self.shell.getSlices({}, []))
266 slice_list = [slice for slice in all_slices if slice['slice_id'] in slice_ids]
267 slices = list_to_dict(slice_list, 'slice_id')
269 all_users = self.convert_id(self.shell.getUsers())
270 user_list = [user for user in all_users if user['user_id'] in user_ids]
271 users = list_to_dict(user_list, 'user_id')
273 nitos_records = {'node': nodes, 'slice': slices, 'user': users}
277 for record in records:
278 if record['pointer'] == -1:
281 for type in nitos_records:
282 if record['type'] == type:
283 if record['pointer'] in nitos_records[type]:
284 record.update(nitos_records[type][record['pointer']])
287 if record['type'] == 'user':
288 if record['pointer'] in nitos_records['user']:
289 record['keys'] = nitos_records['user'][record['pointer']]['keys']
294 def fill_record_hrns(self, records):
296 convert nitos ids to hrns
301 slice_ids, user_ids, node_ids = [], [], []
302 for record in records:
303 if 'user_ids' in record:
304 user_ids.extend(record['user_ids'])
305 if 'slice_ids' in record:
306 slice_ids.extend(record['slice_ids'])
307 if 'node_ids' in record:
308 node_ids.extend(record['node_ids'])
311 slices, users, nodes = {}, {}, {}
313 all_nodes = self.convert_id(self.shell.getNodes({}, []))
314 node_list = [node for node in all_nodes if node['node_id'] in node_ids]
315 nodes = list_to_dict(node_list, 'node_id')
317 all_slices = self.convert_id(self.shell.getSlices({}, []))
318 slice_list = [slice for slice in all_slices if slice['slice_id'] in slice_ids]
319 slices = list_to_dict(slice_list, 'slice_id')
321 all_users = self.convert_id(self.shell.getUsers())
322 user_list = [user for user in all_users if user['user_id'] in user_ids]
323 users = list_to_dict(user_list, 'user_id')
326 # convert ids to hrns
327 for record in records:
328 # get all relevant data
329 type = record['type']
330 pointer = record['pointer']
332 testbed_name = self.testbedInfo['name']
335 if 'user_ids' in record:
336 usernames = [users[user_id]['username'] for user_id in record['user_ids'] \
338 user_hrns = [".".join([auth_hrn, testbed_name, username]) for username in usernames]
339 record['users'] = user_hrns
340 if 'slice_ids' in record:
341 slicenames = [slices[slice_id]['slice_name'] for slice_id in record['slice_ids'] \
342 if slice_id in slices]
343 slice_hrns = [slicename_to_hrn(auth_hrn, slicename) for slicename in slicenames]
344 record['slices'] = slice_hrns
345 if 'node_ids' in record:
346 hostnames = [nodes[node_id]['hostname'] for node_id in record['node_ids'] \
348 node_hrns = [hostname_to_hrn(auth_hrn, login_base, hostname) for hostname in hostnames]
349 record['nodes'] = node_hrns
351 if 'expires' in record:
352 date = utcparse(record['expires'])
353 datestring = datetime_to_string(date)
354 record['expires'] = datestring
358 def fill_record_sfa_info(self, records):
360 def startswith(prefix, values):
361 return [value for value in values if value.startswith(prefix)]
365 for record in records:
366 user_ids.extend(record.get("user_ids", []))
368 # get the registry records
369 user_list, users = [], {}
370 user_list = dbsession.query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all()
371 # create a hrns keyed on the sfa record's pointer.
372 # Its possible for multiple records to have the same pointer so
373 # the dict's value will be a list of hrns.
374 users = defaultdict(list)
375 for user in user_list:
376 users[user.pointer].append(user)
378 # get the nitos records
379 nitos_user_list, nitos_users = [], {}
380 nitos_all_users = self.convert_id(self.shell.getUsers())
381 nitos_user_list = [user for user in nitos_all_users if user['user_id'] in user_ids]
382 nitos_users = list_to_dict(nitos_user_list, 'user_id')
386 for record in records:
387 if record['pointer'] == -1:
391 type = record['type']
392 logger.info("fill_record_sfa_info - incoming record typed %s"%type)
393 if (type == "slice"):
394 # all slice users are researchers
395 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
396 record['researcher'] = []
397 for user_id in record.get('user_ids', []):
398 hrns = [user.hrn for user in users[user_id]]
399 record['researcher'].extend(hrns)
401 elif (type == "node"):
402 sfa_info['dns'] = record.get("hostname", "")
403 # xxx TODO: URI, LatLong, IP, DNS
405 elif (type == "user"):
406 logger.info('setting user.email')
407 sfa_info['email'] = record.get("email", "")
408 sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
409 sfa_info['geni_certificate'] = record['gid']
410 # xxx TODO: PostalAddress, Phone
411 record.update(sfa_info)
414 def update_relation (self, subject_type, target_type, relation_name, subject_id, target_ids):
416 if subject_type =='slice' and target_type == 'user' and relation_name == 'researcher':
417 subject=self.shell.getSlices ({'slice_id': subject_id}, [])[0]
418 current_target_ids = subject['user_ids']
419 add_target_ids = list ( set (target_ids).difference(current_target_ids))
420 del_target_ids = list ( set (current_target_ids).difference(target_ids))
421 logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
422 for target_id in add_target_ids:
423 self.shell.addUserToSlice ({'user_id': target_id, 'slice_id': subject_id})
424 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
425 for target_id in del_target_ids:
426 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
427 self.shell.deleteUserFromSlice ({'user_id': target_id, 'slice_id': subject_id})
429 logger.info('unexpected relation %s to maintain, %s -> %s'%(relation_name,subject_type,target_type))
432 ########################################
433 ########## aggregate oriented
434 ########################################
436 def testbed_name (self): return "nitos"
438 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
439 def aggregate_version (self):
440 version_manager = VersionManager()
441 ad_rspec_versions = []
442 request_rspec_versions = []
443 for rspec_version in version_manager.versions:
444 if rspec_version.content_type in ['*', 'ad']:
445 ad_rspec_versions.append(rspec_version.to_dict())
446 if rspec_version.content_type in ['*', 'request']:
447 request_rspec_versions.append(rspec_version.to_dict())
449 'testbed':self.testbed_name(),
450 'geni_request_rspec_versions': request_rspec_versions,
451 'geni_ad_rspec_versions': ad_rspec_versions,
454 def list_slices (self, creds, options):
455 # look in cache first
457 slices = self.cache.get('slices')
459 logger.debug("NitosDriver.list_slices returns from cache")
463 slices = self.shell.getSlices({}, [])
464 testbed_name = self.testbedInfo['name']
465 slice_hrns = [slicename_to_hrn(self.hrn, testbed_name, slice['slice_name']) for slice in slices]
466 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
470 logger.debug ("NitosDriver.list_slices stores value in cache")
471 self.cache.add('slices', slice_urns)
475 # first 2 args are None in case of resource discovery
476 def list_resources (self, slice_urn, slice_hrn, creds, options):
477 cached_requested = options.get('cached', True)
478 version_manager = VersionManager()
479 # get the rspec's return format from options
480 #rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
481 # rspec's return format for nitos aggregate is version NITOS 1
482 rspec_version = version_manager.get_version('NITOS 1')
483 version_string = "rspec_%s" % (rspec_version)
485 #panos adding the info option to the caching key (can be improved)
486 if options.get('info'):
487 version_string = version_string + "_"+options.get('info', 'default')
489 # Adding the list_leases option to the caching key
490 if options.get('list_leases'):
491 version_string = version_string + "_"+options.get('list_leases', 'default')
493 # Adding geni_available to caching key
494 if options.get('geni_available'):
495 version_string = version_string + "_" + str(options.get('geni_available'))
497 # look in cache first
498 if cached_requested and self.cache and not slice_hrn:
499 rspec = self.cache.get(version_string)
501 logger.debug("NitosDriver.ListResources: returning cached advertisement")
504 #panos: passing user-defined options
505 #print "manager options = ",options
506 aggregate = NitosAggregate(self)
507 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
511 if self.cache and not slice_hrn:
512 logger.debug("NitosDriver.ListResources: stores advertisement in cache")
513 self.cache.add(version_string, rspec)
517 def sliver_status (self, slice_urn, slice_hrn):
518 # find out where this slice is currently running
519 slicename = hrn_to_nitos_slicename(slice_hrn)
521 slices = self.shell.getSlices({}, [])
524 raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
527 if slice['slice_name'] == slicename:
532 raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
534 # report about the reserved nodes only
535 reserved_nodes = self.shell.getReservedNodes({}, [])
536 nodes = self.shell.getNodes({}, [])
538 slice_reserved_nodes = []
539 for r_node in reserved_nodes:
540 if r_node['slice_id'] == slice['slice_id']:
542 if node['node_id'] == r_node['node_id']:
543 slice_reserved_nodes.append(node)
548 if len(slice_reserved_nodes) == 0:
549 raise SliverDoesNotExist("You have not allocated any slivers here")
551 ##### continue from here
555 if slice['user_ids']:
556 users = self.shell.getUsers()
557 # filter users on slice['user_ids']
559 if usr['user_id'] in slice['user_ids']:
560 keys.extend(usr['keys'])
563 user.update({'urn': slice_urn,
564 'login': slice['slice_name'],
571 top_level_status = 'unknown'
572 if slice_reserved_nodes:
573 top_level_status = 'ready'
574 result['geni_urn'] = slice_urn
575 result['nitos_gateway_login'] = slice['slice_name']
576 #result['pl_expires'] = datetime_to_string(utcparse(slice['expires']))
577 #result['geni_expires'] = datetime_to_string(utcparse(slice['expires']))
580 for node in slice_reserved_nodes:
582 res['nitos_hostname'] = node['hostname']
583 sliver_id = Xrn(slice_urn, type='slice', id=node['node_id']).urn
584 res['geni_urn'] = sliver_id
585 res['geni_status'] = 'ready'
586 res['geni_error'] = ''
587 res['users'] = [user]
589 resources.append(res)
591 result['geni_status'] = top_level_status
592 result['geni_resources'] = resources
596 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
598 aggregate = NitosAggregate(self)
599 slices = NitosSlices(self)
600 sfa_peer = slices.get_sfa_peer(slice_hrn)
603 slice_record = users[0].get('slice_record', {})
606 rspec = RSpec(rspec_string, version='NITOS 1')
608 # ensure slice record exists
609 slice = slices.verify_slice(slice_hrn, slice_record, sfa_peer, options=options)
610 # ensure user records exists
611 users = slices.verify_users(slice_hrn, slice, users, sfa_peer, options=options)
613 # add/remove leases (nodes and channels)
614 # a lease in Nitos RSpec case is a reservation of nodes and channels grouped by (slice,timeslot)
615 rspec_requested_leases = rspec.version.get_leases()
616 rspec_requested_nodes = []
617 rspec_requested_channels = []
618 for lease in rspec_requested_leases:
619 if lease['type'] == 'node':
620 lease.pop('type', None)
621 rspec_requested_nodes.append(lease)
623 lease.pop('type', None)
624 rspec_requested_channels.append(lease)
626 nodes = slices.verify_slice_leases_nodes(slice, rspec_requested_nodes)
627 channels = slices.verify_slice_leases_channels(slice, rspec_requested_channels)
629 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
631 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
632 slicename = hrn_to_nitos_slicename(slice_hrn)
633 slices = self.filter_nitos_results(self.shell.getSlices({}, []), {'slice_name': slicename})
638 slice_reserved_nodes = self.filter_nitos_results(self.shell.getReservedNodes({}, []), {'slice_id': slice['slice_id'] })
639 slice_reserved_channels = self.filter_nitos_results(self.shell.getReservedChannels(), {'slice_id': slice['slice_id'] })
641 slice_reserved_nodes_ids = [node['reservation_id'] for node in slice_reserved_nodes]
642 slice_reserved_channels_ids = [channel['reservation_id'] for channel in slice_reserved_channels]
644 # release all reserved nodes and channels for that slice
646 released_nodes = self.shell.releaseNodes({'reservation_ids': slice_reserved_nodes_ids})
647 released_channels = self.shell.releaseChannels({'reservation_ids': slice_reserved_channels_ids})
652 def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
653 slicename = hrn_to_nitos_slicename(slice_hrn)
654 slices = self.shell.GetSlices({'slicename': slicename}, ['slice_id'])
656 raise RecordNotFound(slice_hrn)
658 requested_time = utcparse(expiration_time)
659 record = {'expires': int(datetime_to_epoch(requested_time))}
661 self.shell.UpdateSlice(slice['slice_id'], record)
668 # xxx this code is quite old and has not run for ages
669 # it is obviously totally broken and needs a rewrite
670 def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
671 raise SfaNotImplemented,"NitosDriver.get_ticket needs a rewrite"
672 # please keep this code for future reference
673 # slices = PlSlices(self)
674 # peer = slices.get_peer(slice_hrn)
675 # sfa_peer = slices.get_sfa_peer(slice_hrn)
677 # # get the slice record
678 # credential = api.getCredential()
679 # interface = api.registries[api.hrn]
680 # registry = api.server_proxy(interface, credential)
681 # records = registry.Resolve(xrn, credential)
683 # # make sure we get a local slice record
685 # for tmp_record in records:
686 # if tmp_record['type'] == 'slice' and \
687 # not tmp_record['peer_authority']:
688 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
689 # slice_record = SliceRecord(dict=tmp_record)
691 # raise RecordNotFound(slice_hrn)
693 # # similar to CreateSliver, we must verify that the required records exist
694 # # at this aggregate before we can issue a ticket
696 # rspec = RSpec(rspec_string)
697 # requested_attributes = rspec.version.get_slice_attributes()
699 # # ensure site record exists
700 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
701 # # ensure slice record exists
702 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
703 # # ensure person records exists
704 # # xxx users is undefined in this context
705 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
706 # # ensure slice attributes exists
707 # slices.verify_slice_attributes(slice, requested_attributes)
710 # slivers = slices.get_slivers(slice_hrn)
713 # raise SliverDoesNotExist(slice_hrn)
718 # 'timestamp': int(time.time()),
719 # 'initscripts': initscripts,
723 # # create the ticket
724 # object_gid = record.get_gid_object()
725 # new_ticket = SfaTicket(subject = object_gid.get_subject())
726 # new_ticket.set_gid_caller(api.auth.client_gid)
727 # new_ticket.set_gid_object(object_gid)
728 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
729 # new_ticket.set_pubkey(object_gid.get_pubkey())
730 # new_ticket.set_attributes(data)
731 # new_ticket.set_rspec(rspec)
732 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
733 # new_ticket.encode()
736 # return new_ticket.save_to_string(save_parents=True)