4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \
8 from sfa.util.sfalogging import logger
9 from sfa.util.defaultdict import defaultdict
10 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
11 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
12 from sfa.planetlab.plxrn import PlXrn
13 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename, hrn_to_os_tenant_name
14 from sfa.util.cache import Cache
15 from sfa.trust.credential import Credential
16 # used to be used in get_ticket
17 #from sfa.trust.sfaticket import SfaTicket
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec
22 # the driver interface, mostly provides default behaviours
23 from sfa.managers.driver import Driver
24 from sfa.openstack.shell import Shell
25 from sfa.openstack.osaggregate import OSAggregate
26 from sfa.planetlab.plslices import PlSlices
27 from sfa.util.osxrn import OSXrn
30 def list_to_dict(recs, key):
32 convert a list of dictionaries into a dictionary keyed on the
33 specified dictionary key
35 return dict ( [ (rec[key],rec) for rec in recs ] )
38 # PlShell is just an xmlrpc serverproxy where methods
39 # can be sent as-is; it takes care of authentication
40 # from the global config
42 class NovaDriver(Driver):
44 # the cache instance is a class member so it survives across incoming requests
47 def __init__ (self, config):
48 Driver.__init__(self, config)
49 self.shell = Shell(config)
51 if config.SFA_AGGREGATE_CACHING:
52 if NovaDriver.cache is None:
53 NovaDriver.cache = Cache()
54 self.cache = NovaDriver.cache
56 ########################################
57 ########## registry oriented
58 ########################################
60 ########## disabled users
61 def is_enabled (self, record):
62 # all records are enabled
65 def augment_records_with_testbed_info (self, sfa_records):
66 return self.fill_record_info (sfa_records)
69 def register (self, sfa_record, hrn, pub_key):
70 type = sfa_record['type']
72 #pl_record = self.sfa_fields_to_pl_fields(type dd , hrn, sfa_record)
75 # add slice description, name, researchers, PI
76 name = hrn_to_os_slicename(hrn)
77 researchers = sfa_record.get('researchers', [])
78 pis = sfa_record.get('pis', [])
79 project_manager = None
80 description = sfa_record.get('description', None)
82 project_manager = Xrn(pis[0], 'user').get_leaf()
84 project_manager = Xrn(researchers[0], 'user').get_leaf()
85 if not project_manager:
86 err_string = "Cannot create a project without a project manager. " + \
87 "Please specify at least one PI or researcher for project: " + \
89 raise SfaInvalidArgument(err_string)
91 users = [Xrn(user, 'user').get_leaf() for user in \
93 self.shell.auth_manager.create_project(name, project_manager, description, users)
96 # add person roles, projects and keys
97 name = Xrn(hrn).get_leaf()
98 self.shell.auth_manager.create_user(name)
99 projects = sfa_records.get('slices', [])
100 for project in projects:
101 project_name = Xrn(project).get_leaf()
102 self.shell.auth_manager.add_to_project(name, project_name)
103 keys = sfa_records.get('keys', [])
110 self.shell.db.key_pair_create(key_dict)
115 # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
116 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
117 type = new_sfa_record['type']
119 # new_key implemented for users only
120 if new_key and type not in [ 'user' ]:
121 raise UnknownSfaType(type)
123 elif type == "slice":
124 # can update project manager and description
125 name = hrn_to_os_slicename(hrn)
126 researchers = sfa_record.get('researchers', [])
127 pis = sfa_record.get('pis', [])
128 project_manager = None
129 description = sfa_record.get('description', None)
131 project_manager = Xrn(pis[0], 'user').get_leaf()
133 project_manager = Xrn(researchers[0], 'user').get_leaf()
134 self.shell.auth_manager.modify_project(name, project_manager, description)
137 # can techinally update access_key and secret_key,
138 # but that is not in our scope, so we do nothing.
144 def remove (self, sfa_record):
145 type=sfa_record['type']
147 name = Xrn(sfa_record['hrn']).get_leaf()
148 if self.shell.auth_manager.get_user(name):
149 self.shell.auth_manager.delete_user(name)
150 elif type == 'slice':
151 name = hrn_to_os_slicename(sfa_record['hrn'])
152 if self.shell.auth_manager.get_project(name):
153 self.shell.auth_manager.delete_project(name)
158 def fill_record_info(self, records):
160 Given a (list of) SFA record, fill in the PLC specific
161 and SFA specific fields in the record.
163 if not isinstance(records, list):
166 for record in records:
167 if record['type'] == 'user':
168 record = self.fill_user_record_info(record)
169 elif record['type'] == 'slice':
170 record = self.fill_slice_record_info(record)
171 elif record['type'].startswith('authority'):
172 record = self.fill_auth_record_info(record)
175 record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
176 record['geni_certificate'] = record['gid']
177 #if os_record.created_at is not None:
178 # record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
179 #if os_record.updated_at is not None:
180 # record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
184 def fill_user_record_info(self, record):
185 xrn = Xrn(record['hrn'])
186 name = xrn.get_leaf()
187 record['name'] = name
188 user = self.shell.auth_manager.users.find(name=name)
189 record['email'] = user.email
190 tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
192 all_tenants = self.shell.auth_manager.tenants.list()
193 for tmp_tenant in all_tenants:
194 if tmp_tenant.name.startswith(tenant.name +"."):
195 for tmp_user in tmp_tenant.list_users():
196 if tmp_user.name == user.name:
197 slice_hrn = ".".join([self.hrn, tmp_tenant.name])
198 slices.append(slice_hrn)
199 record['slices'] = slices
200 roles = self.shell.auth_manager.roles.roles_for_user(user, tenant)
201 record['roles'] = [role.name for role in roles]
202 keys = self.shell.nova_manager.keypairs.findall(name=record['hrn'])
203 record['keys'] = [key.public_key for key in keys]
206 def fill_slice_record_info(self, record):
207 tenant_name = hrn_to_os_tenant_name(record['hrn'])
208 tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
209 parent_tenant_name = OSXrn(xrn=tenant_name).get_authority_hrn()
210 parent_tenant = self.shell.auth_manager.tenants.find(name=parent_tenant_name)
214 # look for users and pis in slice tenant
215 for user in tenant.list_users():
216 for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
217 if role.name.lower() == 'pi':
218 user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
219 hrn = ".".join([self.hrn, user_tenant.name, user.name])
221 elif role.name.lower() in ['user', 'member']:
222 user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
223 hrn = ".".join([self.hrn, user_tenant.name, user.name])
224 researchers.append(hrn)
226 # look for pis in the slice's parent (site/organization) tenant
227 for user in parent_tenant.list_users():
228 for role in self.shell.auth_manager.roles.roles_for_user(user, parent_tenant):
229 if role.name.lower() == 'pi':
230 user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
231 hrn = ".".join([self.hrn, user_tenant.name, user.name])
233 record['name'] = tenant_name
234 record['description'] = tenant.description
237 record['geni_creator'] = pis[0]
239 record['geni_creator'] = None
240 record['researcher'] = researchers
243 def fill_auth_record_info(self, record):
244 tenant_name = hrn_to_os_tenant_name(record['hrn'])
245 tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
249 # look for users and pis in slice tenant
250 for user in tenant.list_users():
251 for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
252 hrn = ".".join([self.hrn, tenant.name, user.name])
253 if role.name.lower() == 'pi':
255 elif role.name.lower() in ['user', 'member']:
256 researchers.append(hrn)
260 all_tenants = self.shell.auth_manager.tenants.list()
261 for tmp_tenant in all_tenants:
262 if tmp_tenant.name.startswith(tenant.name+"."):
263 slices.append(".".join([self.hrn, tmp_tenant.name]))
265 record['name'] = tenant_name
266 record['description'] = tenant.description
268 record['enabled'] = tenant.enabled
269 record['researchers'] = researchers
270 record['slices'] = slices
274 # plcapi works by changes, compute what needs to be added/deleted
275 def update_relation (self, subject_type, target_type, subject_id, target_ids):
276 # hard-wire the code for slice/user for now, could be smarter if needed
277 if subject_type =='slice' and target_type == 'user':
278 subject=self.shell.project_get(subject_id)[0]
279 current_target_ids = [user.name for user in subject.members]
280 add_target_ids = list ( set (target_ids).difference(current_target_ids))
281 del_target_ids = list ( set (current_target_ids).difference(target_ids))
282 logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
283 for target_id in add_target_ids:
284 self.shell.project_add_member(target_id,subject_id)
285 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
286 for target_id in del_target_ids:
287 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
288 self.shell.project_remove_member(target_id, subject_id)
290 logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
293 ########################################
294 ########## aggregate oriented
295 ########################################
297 def testbed_name (self): return "openstack"
299 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
300 def aggregate_version (self):
301 version_manager = VersionManager()
302 ad_rspec_versions = []
303 request_rspec_versions = []
304 for rspec_version in version_manager.versions:
305 if rspec_version.content_type in ['*', 'ad']:
306 ad_rspec_versions.append(rspec_version.to_dict())
307 if rspec_version.content_type in ['*', 'request']:
308 request_rspec_versions.append(rspec_version.to_dict())
310 'testbed':self.testbed_name(),
311 'geni_request_rspec_versions': request_rspec_versions,
312 'geni_ad_rspec_versions': ad_rspec_versions,
315 def list_slices (self, creds, options):
316 # look in cache first
318 slices = self.cache.get('slices')
320 logger.debug("OpenStackDriver.list_slices returns from cache")
324 projs = self.shell.auth_manager.get_projects()
325 slice_urns = [OSXrn(proj.name, 'slice').urn for proj in projs]
329 logger.debug ("OpenStackDriver.list_slices stores value in cache")
330 self.cache.add('slices', slice_urns)
334 # first 2 args are None in case of resource discovery
335 def list_resources (self, slice_urn, slice_hrn, creds, options):
336 cached_requested = options.get('cached', True)
338 version_manager = VersionManager()
339 # get the rspec's return format from options
340 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
341 version_string = "rspec_%s" % (rspec_version)
343 #panos adding the info option to the caching key (can be improved)
344 if options.get('info'):
345 version_string = version_string + "_"+options.get('info', 'default')
347 # look in cache first
348 if cached_requested and self.cache and not slice_hrn:
349 rspec = self.cache.get(version_string)
351 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
354 #panos: passing user-defined options
355 #print "manager options = ",options
356 aggregate = OSAggregate(self)
357 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
361 if self.cache and not slice_hrn:
362 logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
363 self.cache.add(version_string, rspec)
367 def sliver_status (self, slice_urn, slice_hrn):
368 # find out where this slice is currently running
369 project_name = hrn_to_os_slicename(slice_hrn)
370 project = self.shell.auth_manager.get_project(project_name)
371 instances = self.shell.db.instance_get_all_by_project(project_name)
372 if len(instances) == 0:
373 raise SliverDoesNotExist("You have not allocated any slivers here")
376 top_level_status = 'unknown'
378 top_level_status = 'ready'
379 result['geni_urn'] = slice_urn
380 result['plos_login'] = 'root'
381 result['plos_expires'] = None
384 for instance in instances:
386 # instances are accessed by ip, not hostname. We need to report the ip
387 # somewhere so users know where to ssh to.
388 res['plos_hostname'] = instance.hostname
389 res['plos_created_at'] = datetime_to_string(utcparse(instance.created_at))
390 res['plos_boot_state'] = instance.vm_state
391 res['plos_sliver_type'] = instance.instance_type.name
392 sliver_id = Xrn(slice_urn).get_sliver_id(instance.project_id, \
393 instance.hostname, instance.id)
394 res['geni_urn'] = sliver_id
396 if instance.vm_state == 'running':
397 res['boot_state'] = 'ready';
399 res['boot_state'] = 'unknown'
400 resources.append(res)
402 result['geni_status'] = top_level_status
403 result['geni_resources'] = resources
406 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
408 aggregate = OSAggregate(self)
409 rspec = RSpec(rspec_string)
410 instance_name = hrn_to_os_slicename(slice_hrn)
412 # assume first user is the caller and use their context
413 # for the ec2/euca api connection. Also, use the first users
414 # key as the project key.
417 key_name = aggregate.create_instance_key(slice_hrn, users[0])
419 # collect public keys
422 pubkeys.extend(user['keys'])
424 aggregate.run_instances(instance_name, rspec_string, key_name, pubkeys)
426 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
428 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
429 aggregate = OSAggregate(self)
430 project_name = hrn_to_os_slicename(slice_hrn)
431 return aggregate.delete_instances(project_name)
433 def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
434 name = hrn_to_os_slicename(slice_hrn)
435 aggregate = OSAggregate(self)
436 return aggregate.update_instances(name)
438 def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
441 def start_slice (self, slice_urn, slice_hrn, creds):
444 def stop_slice (self, slice_urn, slice_hrn, creds):
445 name = OSXrn(xrn=slice_urn).name
446 aggregate = OSAggregate(self)
447 return aggregate.stop_instances(name)
449 def reset_slice (self, slice_urn, slice_hrn, creds):
450 raise SfaNotImplemented ("reset_slice not available at this interface")
452 # xxx this code is quite old and has not run for ages
453 # it is obviously totally broken and needs a rewrite
454 def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
455 raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
456 # please keep this code for future reference
457 # slices = PlSlices(self)
458 # peer = slices.get_peer(slice_hrn)
459 # sfa_peer = slices.get_sfa_peer(slice_hrn)
461 # # get the slice record
462 # credential = api.getCredential()
463 # interface = api.registries[api.hrn]
464 # registry = api.server_proxy(interface, credential)
465 # records = registry.Resolve(xrn, credential)
467 # # make sure we get a local slice record
469 # for tmp_record in records:
470 # if tmp_record['type'] == 'slice' and \
471 # not tmp_record['peer_authority']:
472 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
473 # slice_record = SliceRecord(dict=tmp_record)
475 # raise RecordNotFound(slice_hrn)
477 # # similar to CreateSliver, we must verify that the required records exist
478 # # at this aggregate before we can issue a ticket
480 # rspec = RSpec(rspec_string)
481 # requested_attributes = rspec.version.get_slice_attributes()
483 # # ensure site record exists
484 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
485 # # ensure slice record exists
486 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
487 # # ensure person records exists
488 # # xxx users is undefined in this context
489 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
490 # # ensure slice attributes exists
491 # slices.verify_slice_attributes(slice, requested_attributes)
494 # slivers = slices.get_slivers(slice_hrn)
497 # raise SliverDoesNotExist(slice_hrn)
502 # 'timestamp': int(time.time()),
503 # 'initscripts': initscripts,
507 # # create the ticket
508 # object_gid = record.get_gid_object()
509 # new_ticket = SfaTicket(subject = object_gid.get_subject())
510 # new_ticket.set_gid_caller(api.auth.client_gid)
511 # new_ticket.set_gid_object(object_gid)
512 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
513 # new_ticket.set_pubkey(object_gid.get_pubkey())
514 # new_ticket.set_attributes(data)
515 # new_ticket.set_rspec(rspec)
516 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
517 # new_ticket.encode()
520 # return new_ticket.save_to_string(save_parents=True)