4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_hrn
11 from sfa.util.cache import Cache
13 # one would think the driver should not need to mess with the SFA db, but..
14 from sfa.storage.model import RegRecord
16 # used to be used in get_ticket
17 #from sfa.trust.sfaticket import SfaTicket
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec
22 # the driver interface, mostly provides default behaviours
23 from sfa.managers.driver import Driver
25 from sfa.nitos.nitosshell import NitosShell
26 from sfa.nitos.nitosaggregate import NitosAggregate
27 from sfa.nitos.nitosslices import NitosSlices
29 from sfa.nitos.nitosxrn import NitosXrn, slicename_to_hrn, hostname_to_hrn, hrn_to_nitos_slicename, xrn_to_hostname
32 def list_to_dict(recs, key):
34 convert a list of dictionaries into a dictionary keyed on the
35 specified dictionary key
37 return dict([(rec[key], rec) for rec in recs])
40 # NitosShell is just an xmlrpc serverproxy where methods
41 # can be sent as-is; it takes care of authentication
42 # from the global config
46 class NitosDriver (Driver):
48 # the cache instance is a class member so it survives across incoming
52 def __init__(self, api):
53 Driver.__init__(self, api)
55 self.shell = NitosShell(config)
57 self.testbedInfo = self.shell.getTestbedInfo()
58 # un-comment below lines to enable caching
59 # if config.SFA_AGGREGATE_CACHING:
60 # if NitosDriver.cache is None:
61 # NitosDriver.cache = Cache()
62 # self.cache = NitosDriver.cache
64 ###########################################
65 # utility methods for NITOS driver
66 ###########################################
68 def filter_nitos_results(self, listo, filters_dict):
70 the Nitos scheduler API does not provide a get result filtring so we do it here
75 for filter in filters_dict:
76 if filter not in dicto or dicto[filter] != filters_dict[filter]:
81 def convert_id(self, list_of_dict):
83 convert object id retrived in string format to int format
85 for dicto in list_of_dict:
87 if key in ['node_id', 'slice_id', 'user_id', 'channel_id', 'reservation_id'] and isinstance(dicto[key], str):
88 dicto[key] = int(dicto[key])
89 elif key in ['user_ids']:
91 for user_id in dicto['user_ids']:
92 user_ids2.append(int(user_id))
93 dicto['user_ids'] = user_ids2
96 ########################################
98 ########################################
100 def augment_records_with_testbed_info(self, sfa_records):
101 return self.fill_record_info(sfa_records)
104 def register(self, sfa_record, hrn, pub_key):
105 type = sfa_record['type']
106 nitos_record = self.sfa_fields_to_nitos_fields(type, hrn, sfa_record)
108 if type == 'authority':
111 elif type == 'slice':
112 slices = self.shell.getSlices()
115 if slice['slice_name'] == nitos_record['name']:
116 slice_id = slice['slice_id']
120 pointer = self.shell.addSlice(
121 {'slice_name': nitos_record['name']})
126 users = self.shell.getUsers()
129 if user['user_name'] == nitos_record['name']:
130 user_id = user['user_id']
133 pointer = self.shell.addUser(
134 {'username': nitos_record['name'], 'email': nitos_record['email']})
140 self.shell.addUserKey({'user_id': pointer, 'key': pub_key})
143 nodes = self.shell.GetNodes({}, [])
146 if node['hostname'] == nitos_record['name']:
147 node_id = node['node_id']
151 pointer = self.shell.addNode(nitos_record)
158 def update(self, old_sfa_record, new_sfa_record, hrn, new_key):
160 pointer = old_sfa_record['pointer']
161 type = old_sfa_record['type']
162 new_nitos_record = self.sfa_fields_to_nitos_fields(
163 type, hrn, new_sfa_record)
165 # new_key implemented for users only
166 if new_key and type not in ['user']:
167 raise UnknownSfaType(type)
170 if 'name' in new_sfa_record:
171 self.shell.updateSlice({'slice_id': pointer, 'fields': {
172 'slice_name': new_sfa_record['name']}})
176 if 'name' in new_sfa_record:
177 update_fields['username'] = new_sfa_record['name']
178 if 'email' in new_sfa_record:
179 update_fields['email'] = new_sfa_record['email']
181 self.shell.updateUser(
182 {'user_id': pointer, 'fields': update_fields})
185 # needs to be improved
186 self.shell.addUserKey({'user_id': pointer, 'key': new_key})
189 self.shell.updateNode(
190 {'node_id': pointer, 'fields': new_sfa_record})
195 def remove(self, sfa_record):
197 type = sfa_record['type']
198 pointer = sfa_record['pointer']
200 self.shell.deleteUser({'user_id': pointer})
201 elif type == 'slice':
202 self.shell.deleteSlice({'slice_id': pointer})
204 self.shell.deleteNode({'node_id': pointer})
209 # Convert SFA fields to NITOS fields for use when registering or updating
210 # registry record in the NITOS Scheduler database
213 def sfa_fields_to_nitos_fields(self, type, hrn, sfa_record):
218 nitos_record["slice_name"] = hrn_to_nitos_slicename(hrn)
220 if "hostname" not in sfa_record:
221 raise MissingSfaInfo("hostname")
222 nitos_record["node_name"] = sfa_record["hostname"]
227 def fill_record_info(self, records):
229 Given a (list of) SFA record, fill in the NITOS specific
230 and SFA specific fields in the record.
232 if not isinstance(records, list):
235 self.fill_record_nitos_info(records)
236 self.fill_record_hrns(records)
237 self.fill_record_sfa_info(records)
240 def fill_record_nitos_info(self, records):
242 Fill in the nitos specific fields of a SFA record. This
243 involves calling the appropriate NITOS API method to retrieve the
244 database record for the object.
246 @param record: record to fill in field (in/out param)
250 node_ids, slice_ids = [], []
251 user_ids, key_ids = [], []
252 type_map = {'node': node_ids, 'slice': slice_ids, 'user': user_ids}
254 for record in records:
255 for type in type_map:
256 if type == record['type']:
257 type_map[type].append(record['pointer'])
260 nodes, slices, users, keys = {}, {}, {}, {}
262 all_nodes = self.convert_id(self.shell.getNodes({}, []))
263 node_list = [node for node in all_nodes if node[
264 'node_id'] in node_ids]
265 nodes = list_to_dict(node_list, 'node_id')
267 all_slices = self.convert_id(self.shell.getSlices({}, []))
268 slice_list = [slice for slice in all_slices if slice[
269 'slice_id'] in slice_ids]
270 slices = list_to_dict(slice_list, 'slice_id')
272 all_users = self.convert_id(self.shell.getUsers())
273 user_list = [user for user in all_users if user[
274 'user_id'] in user_ids]
275 users = list_to_dict(user_list, 'user_id')
277 nitos_records = {'node': nodes, 'slice': slices, 'user': users}
280 for record in records:
281 if record['pointer'] == -1:
284 for type in nitos_records:
285 if record['type'] == type:
286 if record['pointer'] in nitos_records[type]:
287 record.update(nitos_records[type][record['pointer']])
290 if record['type'] == 'user':
291 if record['pointer'] in nitos_records['user']:
292 record['keys'] = nitos_records[
293 'user'][record['pointer']]['keys']
297 def fill_record_hrns(self, records):
299 convert nitos ids to hrns
303 slice_ids, user_ids, node_ids = [], [], []
304 for record in records:
305 if 'user_ids' in record:
306 user_ids.extend(record['user_ids'])
307 if 'slice_ids' in record:
308 slice_ids.extend(record['slice_ids'])
309 if 'node_ids' in record:
310 node_ids.extend(record['node_ids'])
313 slices, users, nodes = {}, {}, {}
315 all_nodes = self.convert_id(self.shell.getNodes({}, []))
316 node_list = [node for node in all_nodes if node[
317 'node_id'] in node_ids]
318 nodes = list_to_dict(node_list, 'node_id')
320 all_slices = self.convert_id(self.shell.getSlices({}, []))
321 slice_list = [slice for slice in all_slices if slice[
322 'slice_id'] in slice_ids]
323 slices = list_to_dict(slice_list, 'slice_id')
325 all_users = self.convert_id(self.shell.getUsers())
326 user_list = [user for user in all_users if user[
327 'user_id'] in user_ids]
328 users = list_to_dict(user_list, 'user_id')
330 # convert ids to hrns
331 for record in records:
332 # get all relevant data
333 type = record['type']
334 pointer = record['pointer']
336 testbed_name = self.testbedInfo['name']
339 if 'user_ids' in record:
340 usernames = [users[user_id]['username'] for user_id in record['user_ids']
342 user_hrns = [".".join([auth_hrn, testbed_name, username])
343 for username in usernames]
344 record['users'] = user_hrns
345 if 'slice_ids' in record:
346 slicenames = [slices[slice_id]['slice_name'] for slice_id in record['slice_ids']
347 if slice_id in slices]
348 slice_hrns = [slicename_to_hrn(
349 auth_hrn, slicename) for slicename in slicenames]
350 record['slices'] = slice_hrns
351 if 'node_ids' in record:
352 hostnames = [nodes[node_id]['hostname'] for node_id in record['node_ids']
354 node_hrns = [hostname_to_hrn(
355 auth_hrn, login_base, hostname) for hostname in hostnames]
356 record['nodes'] = node_hrns
358 if 'expires' in record:
359 date = utcparse(record['expires'])
360 datestring = datetime_to_string(date)
361 record['expires'] = datestring
365 def fill_record_sfa_info(self, records):
367 def startswith(prefix, values):
368 return [value for value in values if value.startswith(prefix)]
372 for record in records:
373 user_ids.extend(record.get("user_ids", []))
375 # get the registry records
376 user_list, users = [], {}
377 user_list = self.api.dbsession().query(RegRecord).filter(
378 RegRecord.pointer.in_(user_ids)).all()
379 # create a hrns keyed on the sfa record's pointer.
380 # Its possible for multiple records to have the same pointer so
381 # the dict's value will be a list of hrns.
382 users = defaultdict(list)
383 for user in user_list:
384 users[user.pointer].append(user)
386 # get the nitos records
387 nitos_user_list, nitos_users = [], {}
388 nitos_all_users = self.convert_id(self.shell.getUsers())
390 user for user in nitos_all_users if user['user_id'] in user_ids]
391 nitos_users = list_to_dict(nitos_user_list, 'user_id')
394 for record in records:
395 if record['pointer'] == -1:
399 type = record['type']
401 "fill_record_sfa_info - incoming record typed %s" % type)
402 if (type == "slice"):
403 # all slice users are researchers
404 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
405 record['researcher'] = []
406 for user_id in record.get('user_ids', []):
407 hrns = [user.hrn for user in users[user_id]]
408 record['researcher'].extend(hrns)
410 elif (type == "node"):
411 sfa_info['dns'] = record.get("hostname", "")
412 # xxx TODO: URI, LatLong, IP, DNS
414 elif (type == "user"):
415 logger.info('setting user.email')
416 sfa_info['email'] = record.get("email", "")
417 sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
418 sfa_info['geni_certificate'] = record['gid']
419 # xxx TODO: PostalAddress, Phone
420 record.update(sfa_info)
423 def update_relation(self, subject_type, target_type, relation_name, subject_id, target_ids):
425 if subject_type == 'slice' and target_type == 'user' and relation_name == 'researcher':
426 subject = self.shell.getSlices({'slice_id': subject_id}, [])[0]
427 current_target_ids = subject['user_ids']
428 add_target_ids = list(
429 set(target_ids).difference(current_target_ids))
430 del_target_ids = list(
431 set(current_target_ids).difference(target_ids))
432 logger.debug("subject_id = %s (type=%s)" %
433 (subject_id, type(subject_id)))
434 for target_id in add_target_ids:
435 self.shell.addUserToSlice(
436 {'user_id': target_id, 'slice_id': subject_id})
437 logger.debug("add_target_id = %s (type=%s)" %
438 (target_id, type(target_id)))
439 for target_id in del_target_ids:
440 logger.debug("del_target_id = %s (type=%s)" %
441 (target_id, type(target_id)))
442 self.shell.deleteUserFromSlice(
443 {'user_id': target_id, 'slice_id': subject_id})
445 logger.info('unexpected relation %s to maintain, %s -> %s' %
446 (relation_name, subject_type, target_type))
448 ########################################
450 ########################################
452 def testbed_name(self): return "nitos"
454 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
455 def aggregate_version(self):
456 version_manager = VersionManager()
457 ad_rspec_versions = []
458 request_rspec_versions = []
459 for rspec_version in version_manager.versions:
460 if rspec_version.content_type in ['*', 'ad']:
461 ad_rspec_versions.append(rspec_version.to_dict())
462 if rspec_version.content_type in ['*', 'request']:
463 request_rspec_versions.append(rspec_version.to_dict())
465 'testbed': self.testbed_name(),
466 'geni_request_rspec_versions': request_rspec_versions,
467 'geni_ad_rspec_versions': ad_rspec_versions,
470 def list_slices(self, creds, options):
471 # look in cache first
473 slices = self.cache.get('slices')
475 logger.debug("NitosDriver.list_slices returns from cache")
479 slices = self.shell.getSlices({}, [])
480 testbed_name = self.testbedInfo['name']
481 slice_hrns = [slicename_to_hrn(self.hrn, testbed_name, slice[
482 'slice_name']) for slice in slices]
483 slice_urns = [hrn_to_urn(slice_hrn, 'slice')
484 for slice_hrn in slice_hrns]
488 logger.debug("NitosDriver.list_slices stores value in cache")
489 self.cache.add('slices', slice_urns)
493 # first 2 args are None in case of resource discovery
494 def list_resources(self, slice_urn, slice_hrn, creds, options):
495 cached_requested = options.get('cached', True)
496 version_manager = VersionManager()
497 # get the rspec's return format from options
498 #rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
499 # rspec's return format for nitos aggregate is version NITOS 1
500 rspec_version = version_manager.get_version('NITOS 1')
501 version_string = "rspec_%s" % (rspec_version)
503 # panos adding the info option to the caching key (can be improved)
504 if options.get('info'):
505 version_string = version_string + \
506 "_" + options.get('info', 'default')
508 # Adding the list_leases option to the caching key
509 if options.get('list_leases'):
510 version_string = version_string + "_" + \
511 options.get('list_leases', 'default')
513 # Adding geni_available to caching key
514 if options.get('geni_available'):
515 version_string = version_string + "_" + \
516 str(options.get('geni_available'))
518 # look in cache first
519 if cached_requested and self.cache and not slice_hrn:
520 rspec = self.cache.get(version_string)
523 "NitosDriver.ListResources: returning cached advertisement")
526 # panos: passing user-defined options
527 # print "manager options = ",options
528 aggregate = NitosAggregate(self)
529 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
533 if self.cache and not slice_hrn:
535 "NitosDriver.ListResources: stores advertisement in cache")
536 self.cache.add(version_string, rspec)
540 def sliver_status(self, slice_urn, slice_hrn):
541 # find out where this slice is currently running
542 slicename = hrn_to_nitos_slicename(slice_hrn)
544 slices = self.shell.getSlices({}, [])
547 raise SliverDoesNotExist(
548 "%s (used %s as slicename internally)" % (slice_hrn, slicename))
551 if slice['slice_name'] == slicename:
556 raise SliverDoesNotExist(
557 "%s (used %s as slicename internally)" % (slice_hrn, slicename))
559 # report about the reserved nodes only
560 reserved_nodes = self.shell.getReservedNodes({}, [])
561 nodes = self.shell.getNodes({}, [])
563 slice_reserved_nodes = []
564 for r_node in reserved_nodes:
565 if r_node['slice_id'] == slice['slice_id']:
567 if node['node_id'] == r_node['node_id']:
568 slice_reserved_nodes.append(node)
570 if len(slice_reserved_nodes) == 0:
571 raise SliverDoesNotExist("You have not allocated any slivers here")
577 if slice['user_ids']:
578 users = self.shell.getUsers()
579 # filter users on slice['user_ids']
581 if usr['user_id'] in slice['user_ids']:
582 keys.extend(usr['keys'])
584 user.update({'urn': slice_urn,
585 'login': slice['slice_name'],
591 top_level_status = 'unknown'
592 if slice_reserved_nodes:
593 top_level_status = 'ready'
594 result['geni_urn'] = slice_urn
595 result['nitos_gateway_login'] = slice['slice_name']
596 #result['pl_expires'] = datetime_to_string(utcparse(slice['expires']))
597 #result['geni_expires'] = datetime_to_string(utcparse(slice['expires']))
600 for node in slice_reserved_nodes:
602 res['nitos_hostname'] = node['hostname']
603 sliver_id = Xrn(slice_urn, type='slice', id=node['node_id']).urn
604 res['geni_urn'] = sliver_id
605 res['geni_status'] = 'ready'
606 res['geni_error'] = ''
607 res['users'] = [user]
609 resources.append(res)
611 result['geni_status'] = top_level_status
612 result['geni_resources'] = resources
616 def create_sliver(self, slice_urn, slice_hrn, creds, rspec_string, users, options):
618 aggregate = NitosAggregate(self)
619 slices = NitosSlices(self)
620 sfa_peer = slices.get_sfa_peer(slice_hrn)
623 slice_record = users[0].get('slice_record', {})
626 rspec = RSpec(rspec_string, version='NITOS 1')
628 # ensure slice record exists
629 slice = slices.verify_slice(
630 slice_hrn, slice_record, sfa_peer, options=options)
631 # ensure user records exists
632 users = slices.verify_users(
633 slice_hrn, slice, users, sfa_peer, options=options)
635 # add/remove leases (nodes and channels)
636 # a lease in Nitos RSpec case is a reservation of nodes and channels
637 # grouped by (slice,timeslot)
638 rspec_requested_leases = rspec.version.get_leases()
639 rspec_requested_nodes = []
640 rspec_requested_channels = []
641 for lease in rspec_requested_leases:
642 if lease['type'] == 'node':
643 lease.pop('type', None)
644 rspec_requested_nodes.append(lease)
646 lease.pop('type', None)
647 rspec_requested_channels.append(lease)
649 nodes = slices.verify_slice_leases_nodes(slice, rspec_requested_nodes)
650 channels = slices.verify_slice_leases_channels(
651 slice, rspec_requested_channels)
653 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
655 def delete_sliver(self, slice_urn, slice_hrn, creds, options):
656 slicename = hrn_to_nitos_slicename(slice_hrn)
657 slices = self.filter_nitos_results(
658 self.shell.getSlices({}, []), {'slice_name': slicename})
663 slice_reserved_nodes = self.filter_nitos_results(
664 self.shell.getReservedNodes({}, []), {'slice_id': slice['slice_id']})
665 slice_reserved_channels = self.filter_nitos_results(
666 self.shell.getReservedChannels(), {'slice_id': slice['slice_id']})
668 slice_reserved_nodes_ids = [node['reservation_id']
669 for node in slice_reserved_nodes]
670 slice_reserved_channels_ids = [
671 channel['reservation_id'] for channel in slice_reserved_channels]
673 # release all reserved nodes and channels for that slice
675 released_nodes = self.shell.releaseNodes(
676 {'reservation_ids': slice_reserved_nodes_ids})
677 released_channels = self.shell.releaseChannels(
678 {'reservation_ids': slice_reserved_channels_ids})
683 def renew_sliver(self, slice_urn, slice_hrn, creds, expiration_time, options):
684 slicename = hrn_to_nitos_slicename(slice_hrn)
685 slices = self.shell.GetSlices({'slicename': slicename}, ['slice_id'])
687 raise RecordNotFound(slice_hrn)
689 requested_time = utcparse(expiration_time)
690 record = {'expires': int(datetime_to_epoch(requested_time))}
692 self.shell.UpdateSlice(slice['slice_id'], record)
698 # xxx this code is quite old and has not run for ages
699 # it is obviously totally broken and needs a rewrite
700 def get_ticket(self, slice_urn, slice_hrn, creds, rspec_string, options):
701 raise SfaNotImplemented("NitosDriver.get_ticket needs a rewrite")
702 # please keep this code for future reference
703 # slices = PlSlices(self)
704 # peer = slices.get_peer(slice_hrn)
705 # sfa_peer = slices.get_sfa_peer(slice_hrn)
707 # # get the slice record
708 # credential = api.getCredential()
709 # interface = api.registries[api.hrn]
710 # registry = api.server_proxy(interface, credential)
711 # records = registry.Resolve(xrn, credential)
713 # # make sure we get a local slice record
715 # for tmp_record in records:
716 # if tmp_record['type'] == 'slice' and \
717 # not tmp_record['peer_authority']:
718 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
719 # slice_record = SliceRecord(dict=tmp_record)
721 # raise RecordNotFound(slice_hrn)
723 # # similar to CreateSliver, we must verify that the required records exist
724 # # at this aggregate before we can issue a ticket
726 # rspec = RSpec(rspec_string)
727 # requested_attributes = rspec.version.get_slice_attributes()
729 # # ensure site record exists
730 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
731 # # ensure slice record exists
732 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
733 # # ensure person records exists
734 # # xxx users is undefined in this context
735 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
736 # # ensure slice attributes exists
737 # slices.verify_slice_attributes(slice, requested_attributes)
740 # slivers = slices.get_slivers(slice_hrn)
743 # raise SliverDoesNotExist(slice_hrn)
748 # 'timestamp': int(time.time()),
749 # 'initscripts': initscripts,
753 # # create the ticket
754 # object_gid = record.get_gid_object()
755 # new_ticket = SfaTicket(subject = object_gid.get_subject())
756 # new_ticket.set_gid_caller(api.auth.client_gid)
757 # new_ticket.set_gid_object(object_gid)
758 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
759 # new_ticket.set_pubkey(object_gid.get_pubkey())
760 # new_ticket.set_attributes(data)
761 # new_ticket.set_rspec(rspec)
762 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
763 # new_ticket.encode()
766 # return new_ticket.save_to_string(save_parents=True)