4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_hrn
11 from sfa.util.cache import Cache
13 # one would think the driver should not need to mess with the SFA db, but..
14 from sfa.storage.alchemy import dbsession
15 from sfa.storage.model import RegRecord
17 # used to be used in get_ticket
18 #from sfa.trust.sfaticket import SfaTicket
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 # the driver interface, mostly provides default behaviours
24 from sfa.managers.driver import Driver
26 from sfa.nitos.nitosshell import NitosShell
27 from sfa.nitos.nitosaggregate import NitosAggregate
28 from sfa.nitos.nitosslices import NitosSlices
30 from sfa.nitos.nitosxrn import NitosXrn, slicename_to_hrn, hostname_to_hrn, hrn_to_nitos_slicename, xrn_to_hostname
32 def list_to_dict(recs, key):
34 convert a list of dictionaries into a dictionary keyed on the
35 specified dictionary key
37 return dict ( [ (rec[key],rec) for rec in recs ] )
40 # NitosShell is just an xmlrpc serverproxy where methods
41 # can be sent as-is; it takes care of authentication
42 # from the global config
44 class NitosDriver (Driver):
46 # the cache instance is a class member so it survives across incoming requests
49 def __init__ (self, config):
50 Driver.__init__ (self, config)
51 self.shell = NitosShell (config)
53 self.testbedInfo = self.shell.getTestbedInfo()
54 # un-comment below lines to enable caching
55 # if config.SFA_AGGREGATE_CACHING:
56 # if NitosDriver.cache is None:
57 # NitosDriver.cache = Cache()
58 # self.cache = NitosDriver.cache
60 ###########################################
61 ########## utility methods for NITOS driver
62 ###########################################
65 def filter_nitos_results (self, listo, filters_dict):
67 the Nitos scheduler API does not provide a get result filtring so we do it here
72 for filter in filters_dict:
73 if filter not in dicto or dicto[filter] != filters_dict[filter]:
78 def convert_id (self, list_of_dict):
80 convert object id retrived in string format to int format
82 for dicto in list_of_dict:
84 if key in ['node_id', 'slice_id', 'user_id', 'channel_id', 'reservation_id'] and isinstance(dicto[key], str):
85 dicto[key] = int(dicto[key])
86 elif key in ['user_ids']:
88 for user_id in dicto['user_ids']:
89 user_ids2.append(int(user_id))
90 dicto['user_ids'] = user_ids2
95 ########################################
96 ########## registry oriented
97 ########################################
99 def augment_records_with_testbed_info (self, sfa_records):
100 return self.fill_record_info (sfa_records)
103 def register (self, sfa_record, hrn, pub_key):
104 type = sfa_record['type']
105 nitos_record = self.sfa_fields_to_nitos_fields(type, hrn, sfa_record)
107 if type == 'authority':
110 elif type == 'slice':
111 slices = self.shell.getSlices()
114 if slice['slice_name'] == nitos_record['name']:
115 slice_id = slice['slice_id']
119 pointer = self.shell.addSlice({'slice_name' : nitos_record['name']})
124 users = self.shell.getUsers()
127 if user['user_name'] == nitos_record['name']:
128 user_id = user['user_id']
131 pointer = self.shell.addUser({'username' : nitos_record['name'], 'email' : nitos_record['email']})
138 self.shell.addUserKey({'user_id' : pointer,'key' : pub_key})
141 nodes = self.shell.GetNodes({}, [])
144 if node['hostname'] == nitos_record['name']:
145 node_id = node['node_id']
149 pointer = self.shell.addNode(nitos_record)
156 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
158 pointer = old_sfa_record['pointer']
159 type = old_sfa_record['type']
160 new_nitos_record = self.sfa_fields_to_nitos_fields(type, hrn, new_sfa_record)
162 # new_key implemented for users only
163 if new_key and type not in [ 'user' ]:
164 raise UnknownSfaType(type)
167 if 'name' in new_sfa_record:
168 self.shell.updateSlice({'slice_id': pointer, 'fields': {'slice_name': new_sfa_record['name']}})
172 if 'name' in new_sfa_record:
173 update_fields['username'] = new_sfa_record['name']
174 if 'email' in new_sfa_record:
175 update_fields['email'] = new_sfa_record['email']
177 self.shell.updateUser({'user_id': pointer, 'fields': update_fields})
180 # needs to be improved
181 self.shell.addUserKey({'user_id': pointer, 'key': new_key})
184 self.shell.updateNode({'node_id': pointer, 'fields': new_sfa_record})
190 def remove (self, sfa_record):
192 type=sfa_record['type']
193 pointer=sfa_record['pointer']
195 self.shell.deleteUser({'user_id': pointer})
196 elif type == 'slice':
197 self.shell.deleteSlice({'slice_id': pointer})
199 self.shell.deleteNode({'node_id': pointer})
208 # Convert SFA fields to NITOS fields for use when registering or updating
209 # registry record in the NITOS Scheduler database
212 def sfa_fields_to_nitos_fields(self, type, hrn, sfa_record):
217 nitos_record["slice_name"] = hrn_to_nitos_slicename(hrn)
219 if "hostname" not in sfa_record:
220 raise MissingSfaInfo("hostname")
221 nitos_record["node_name"] = sfa_record["hostname"]
226 def fill_record_info(self, records):
228 Given a (list of) SFA record, fill in the NITOS specific
229 and SFA specific fields in the record.
231 if not isinstance(records, list):
234 self.fill_record_nitos_info(records)
235 self.fill_record_hrns(records)
236 self.fill_record_sfa_info(records)
239 def fill_record_nitos_info(self, records):
241 Fill in the nitos specific fields of a SFA record. This
242 involves calling the appropriate NITOS API method to retrieve the
243 database record for the object.
245 @param record: record to fill in field (in/out param)
249 node_ids, slice_ids = [], []
250 user_ids, key_ids = [], []
251 type_map = {'node': node_ids, 'slice': slice_ids, 'user': user_ids}
253 for record in records:
254 for type in type_map:
255 if type == record['type']:
256 type_map[type].append(record['pointer'])
259 nodes, slices, users, keys = {}, {}, {}, {}
261 all_nodes = self.convert_id(self.shell.getNodes({}, []))
262 node_list = [node for node in all_nodes if node['node_id'] in node_ids]
263 nodes = list_to_dict(node_list, 'node_id')
265 all_slices = self.convert_id(self.shell.getSlices({}, []))
266 slice_list = [slice for slice in all_slices if slice['slice_id'] in slice_ids]
267 slices = list_to_dict(slice_list, 'slice_id')
269 all_users = self.convert_id(self.shell.getUsers())
270 user_list = [user for user in all_users if user['user_id'] in user_ids]
271 users = list_to_dict(user_list, 'user_id')
273 nitos_records = {'node': nodes, 'slice': slices, 'user': users}
277 for record in records:
278 if record['pointer'] == -1:
281 for type in nitos_records:
282 if record['type'] == type:
283 if record['pointer'] in nitos_records[type]:
284 record.update(nitos_records[type][record['pointer']])
287 if record['type'] == 'user':
288 if record['pointer'] in nitos_records['user']:
289 record['keys'] = nitos_records['user'][record['pointer']]['keys']
294 def fill_record_hrns(self, records):
296 convert nitos ids to hrns
301 slice_ids, user_ids, node_ids = [], [], []
302 for record in records:
303 if 'user_ids' in record:
304 user_ids.extend(record['user_ids'])
305 if 'slice_ids' in record:
306 slice_ids.extend(record['slice_ids'])
307 if 'node_ids' in record:
308 node_ids.extend(record['node_ids'])
311 slices, users, nodes = {}, {}, {}
313 all_nodes = self.convert_id(self.shell.getNodes({}, []))
314 node_list = [node for node in all_nodes if node['node_id'] in node_ids]
315 nodes = list_to_dict(node_list, 'node_id')
317 all_slices = self.convert_id(self.shell.getSlices({}, []))
318 slice_list = [slice for slice in all_slices if slice['slice_id'] in slice_ids]
319 slices = list_to_dict(slice_list, 'slice_id')
321 all_users = self.convert_id(self.shell.getUsers())
322 user_list = [user for user in all_users if user['user_id'] in user_ids]
323 users = list_to_dict(user_list, 'user_id')
326 # convert ids to hrns
327 for record in records:
328 # get all relevant data
329 type = record['type']
330 pointer = record['pointer']
332 testbed_name = self.testbedInfo['name']
335 if 'user_ids' in record:
336 usernames = [users[user_id]['username'] for user_id in record['user_ids'] \
338 user_hrns = [".".join([auth_hrn, testbed_name, username]) for username in usernames]
339 record['users'] = user_hrns
340 if 'slice_ids' in record:
341 slicenames = [slices[slice_id]['slice_name'] for slice_id in record['slice_ids'] \
342 if slice_id in slices]
343 slice_hrns = [slicename_to_hrn(auth_hrn, slicename) for slicename in slicenames]
344 record['slices'] = slice_hrns
345 if 'node_ids' in record:
346 hostnames = [nodes[node_id]['hostname'] for node_id in record['node_ids'] \
348 node_hrns = [hostname_to_hrn(auth_hrn, login_base, hostname) for hostname in hostnames]
349 record['nodes'] = node_hrns
351 if 'expires' in record:
352 date = utcparse(record['expires'])
353 datestring = datetime_to_string(date)
354 record['expires'] = datestring
358 def fill_record_sfa_info(self, records):
360 def startswith(prefix, values):
361 return [value for value in values if value.startswith(prefix)]
365 for record in records:
366 user_ids.extend(record.get("user_ids", []))
368 # get the registry records
369 user_list, users = [], {}
370 user_list = dbsession.query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all()
371 # create a hrns keyed on the sfa record's pointer.
372 # Its possible for multiple records to have the same pointer so
373 # the dict's value will be a list of hrns.
374 users = defaultdict(list)
375 for user in user_list:
376 users[user.pointer].append(user)
378 # get the nitos records
379 nitos_user_list, nitos_users = [], {}
380 nitos_all_users = self.convert_id(self.shell.getUsers())
381 nitos_user_list = [user for user in nitos_all_users if user['user_id'] in user_ids]
382 nitos_users = list_to_dict(nitos_user_list, 'user_id')
386 for record in records:
387 if record['pointer'] == -1:
391 type = record['type']
392 logger.info("fill_record_sfa_info - incoming record typed %s"%type)
393 if (type == "slice"):
394 # all slice users are researchers
395 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
396 record['researcher'] = []
397 for user_id in record.get('user_ids', []):
398 hrns = [user.hrn for user in users[user_id]]
399 record['researcher'].extend(hrns)
401 elif (type == "node"):
402 sfa_info['dns'] = record.get("hostname", "")
403 # xxx TODO: URI, LatLong, IP, DNS
405 elif (type == "user"):
406 logger.info('setting user.email')
407 sfa_info['email'] = record.get("email", "")
408 sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
409 sfa_info['geni_certificate'] = record['gid']
410 # xxx TODO: PostalAddress, Phone
411 record.update(sfa_info)
414 def update_relation (self, subject_type, target_type, relation_name, subject_id, target_ids):
416 if subject_type =='slice' and target_type == 'user' and relation_name == 'researcher':
417 subject=self.shell.getSlices ({'slice_id': subject_id}, [])[0]
418 current_target_ids = subject['user_ids']
419 add_target_ids = list ( set (target_ids).difference(current_target_ids))
420 del_target_ids = list ( set (current_target_ids).difference(target_ids))
421 logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
422 for target_id in add_target_ids:
423 self.shell.addUserToSlice ({'user_id': target_id, 'slice_id': subject_id})
424 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
425 for target_id in del_target_ids:
426 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
427 self.shell.deleteUserFromSlice ({'user_id': target_id, 'slice_id': subject_id})
429 logger.info('unexpected relation %s to maintain, %s -> %s'%(relation_name,subject_type,target_type))
432 ########################################
433 ########## aggregate oriented
434 ########################################
436 def testbed_name (self): return "nitos"
438 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
439 def aggregate_version (self):
440 version_manager = VersionManager()
441 ad_rspec_versions = []
442 request_rspec_versions = []
443 for rspec_version in version_manager.versions:
444 if rspec_version.content_type in ['*', 'ad']:
445 ad_rspec_versions.append(rspec_version.to_dict())
446 if rspec_version.content_type in ['*', 'request']:
447 request_rspec_versions.append(rspec_version.to_dict())
449 'testbed':self.testbed_name(),
450 'geni_request_rspec_versions': request_rspec_versions,
451 'geni_ad_rspec_versions': ad_rspec_versions,
454 def list_slices (self, creds, options):
455 # look in cache first
457 slices = self.cache.get('slices')
459 logger.debug("NitosDriver.list_slices returns from cache")
463 slices = self.shell.getSlices({}, [])
464 testbed_name = self.testbedInfo['name']
465 slice_hrns = [slicename_to_hrn(self.hrn, testbed_name, slice['slice_name']) for slice in slices]
466 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
470 logger.debug ("NitosDriver.list_slices stores value in cache")
471 self.cache.add('slices', slice_urns)
475 # first 2 args are None in case of resource discovery
476 def list_resources (self, slice_urn, slice_hrn, creds, options):
477 cached_requested = options.get('cached', True)
478 version_manager = VersionManager()
479 # get the rspec's return format from options
480 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
481 version_string = "rspec_%s" % (rspec_version)
483 #panos adding the info option to the caching key (can be improved)
484 if options.get('info'):
485 version_string = version_string + "_"+options.get('info', 'default')
487 # Adding the list_leases option to the caching key
488 if options.get('list_leases'):
489 version_string = version_string + "_"+options.get('list_leases', 'default')
491 # Adding geni_available to caching key
492 if options.get('geni_available'):
493 version_string = version_string + "_" + str(options.get('geni_available'))
495 # look in cache first
496 if cached_requested and self.cache and not slice_hrn:
497 rspec = self.cache.get(version_string)
499 logger.debug("NitosDriver.ListResources: returning cached advertisement")
502 #panos: passing user-defined options
503 #print "manager options = ",options
504 aggregate = NitosAggregate(self)
505 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
509 if self.cache and not slice_hrn:
510 logger.debug("NitosDriver.ListResources: stores advertisement in cache")
511 self.cache.add(version_string, rspec)
515 def sliver_status (self, slice_urn, slice_hrn):
516 # find out where this slice is currently running
517 slicename = hrn_to_nitos_slicename(slice_hrn)
519 slices = self.shell.getSlices({}, [])
522 raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
525 if slice['slice_name'] == slicename:
530 raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
532 # report about the reserved nodes only
533 reserved_nodes = self.shell.getReservedNodes({}, [])
534 nodes = self.shell.getNodes({}, [])
536 slice_reserved_nodes = []
537 for r_node in reserved_nodes:
538 if r_node['slice_id'] == slice['slice_id']:
540 if node['node_id'] == r_node['node_id']:
541 slice_reserved_nodes.append(node)
546 if len(slice_reserved_nodes) == 0:
547 raise SliverDoesNotExist("You have not allocated any slivers here")
549 ##### continue from here
553 if slice['user_ids']:
554 users = self.shell.getUsers()
555 # filter users on slice['user_ids']
557 if usr['user_id'] in slice['user_ids']:
558 keys.extend(usr['keys'])
561 user.update({'urn': slice_urn,
562 'login': slice['slice_name'],
569 top_level_status = 'unknown'
570 if slice_reserved_nodes:
571 top_level_status = 'ready'
572 result['geni_urn'] = slice_urn
573 result['nitos_gateway_login'] = slice['slice_name']
574 #result['pl_expires'] = datetime_to_string(utcparse(slice['expires']))
575 #result['geni_expires'] = datetime_to_string(utcparse(slice['expires']))
578 for node in slice_reserved_nodes:
580 res['nitos_hostname'] = node['hostname']
581 sliver_id = Xrn(slice_urn, type='slice', id=node['node_id']).urn
582 res['geni_urn'] = sliver_id
583 res['geni_status'] = 'ready'
584 res['geni_error'] = ''
585 res['users'] = [user]
587 resources.append(res)
589 result['geni_status'] = top_level_status
590 result['geni_resources'] = resources
594 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
596 aggregate = NitosAggregate(self)
597 slices = NitosSlices(self)
598 sfa_peer = slices.get_sfa_peer(slice_hrn)
601 slice_record = users[0].get('slice_record', {})
604 rspec = RSpec(rspec_string, version='NITOS 1')
606 # ensure slice record exists
607 slice = slices.verify_slice(slice_hrn, slice_record, sfa_peer, options=options)
608 # ensure user records exists
609 #users = slices.verify_users(slice_hrn, slice, users, sfa_peer, options=options)
611 # add/remove leases (nodes and channels)
612 # a lease in Nitos RSpec case is a reservation of nodes and channels grouped by (slice,timeslot)
613 rspec_requested_nodes, rspec_requested_channels = rspec.version.get_leases()
615 nodes = slices.verify_slice_leases_nodes(slice, rspec_requested_nodes)
616 channels = slices.verify_slice_leases_channels(slice, rspec_requested_channels)
618 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
620 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
621 slicename = hrn_to_nitos_slicename(slice_hrn)
622 slices = self.filter_nitos_results(self.shell.getSlices({}, []), {'slice_name': slicename})
627 slice_reserved_nodes = self.filter_nitos_results(self.shell.getReservedNodes({}, []), {'slice_id': slice['slice_id'] })
628 slice_reserved_channels = self.filter_nitos_results(self.shell.getReservedChannels(), {'slice_id': slice['slice_id'] })
630 slice_reserved_nodes_ids = [node['reservation_id'] for node in slice_reserved_nodes]
631 slice_reserved_channels_ids = [channel['reservation_id'] for channel in slice_reserved_channels]
633 # release all reserved nodes and channels for that slice
635 print "Nodes: %s\nChannels: %s" %(slice_reserved_nodes_ids, slice_reserved_channels_ids)
636 released_nodes = self.shell.releaseNodes({'reservation_ids': slice_reserved_nodes_ids})
637 released_channels = self.shell.releaseChannels({'reservation_ids': slice_reserved_channels_ids})
642 def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
643 slicename = hrn_to_nitos_slicename(slice_hrn)
644 slices = self.shell.GetSlices({'slicename': slicename}, ['slice_id'])
646 raise RecordNotFound(slice_hrn)
648 requested_time = utcparse(expiration_time)
649 record = {'expires': int(datetime_to_epoch(requested_time))}
651 self.shell.UpdateSlice(slice['slice_id'], record)
658 # xxx this code is quite old and has not run for ages
659 # it is obviously totally broken and needs a rewrite
660 def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
661 raise SfaNotImplemented,"NitosDriver.get_ticket needs a rewrite"
662 # please keep this code for future reference
663 # slices = PlSlices(self)
664 # peer = slices.get_peer(slice_hrn)
665 # sfa_peer = slices.get_sfa_peer(slice_hrn)
667 # # get the slice record
668 # credential = api.getCredential()
669 # interface = api.registries[api.hrn]
670 # registry = api.server_proxy(interface, credential)
671 # records = registry.Resolve(xrn, credential)
673 # # make sure we get a local slice record
675 # for tmp_record in records:
676 # if tmp_record['type'] == 'slice' and \
677 # not tmp_record['peer_authority']:
678 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
679 # slice_record = SliceRecord(dict=tmp_record)
681 # raise RecordNotFound(slice_hrn)
683 # # similar to CreateSliver, we must verify that the required records exist
684 # # at this aggregate before we can issue a ticket
686 # rspec = RSpec(rspec_string)
687 # requested_attributes = rspec.version.get_slice_attributes()
689 # # ensure site record exists
690 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
691 # # ensure slice record exists
692 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
693 # # ensure person records exists
694 # # xxx users is undefined in this context
695 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
696 # # ensure slice attributes exists
697 # slices.verify_slice_attributes(slice, requested_attributes)
700 # slivers = slices.get_slivers(slice_hrn)
703 # raise SliverDoesNotExist(slice_hrn)
708 # 'timestamp': int(time.time()),
709 # 'initscripts': initscripts,
713 # # create the ticket
714 # object_gid = record.get_gid_object()
715 # new_ticket = SfaTicket(subject = object_gid.get_subject())
716 # new_ticket.set_gid_caller(api.auth.client_gid)
717 # new_ticket.set_gid_object(object_gid)
718 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
719 # new_ticket.set_pubkey(object_gid.get_pubkey())
720 # new_ticket.set_attributes(data)
721 # new_ticket.set_rspec(rspec)
722 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
723 # new_ticket.encode()
726 # return new_ticket.save_to_string(save_parents=True)