4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \
8 from sfa.util.sfalogging import logger
9 from sfa.util.defaultdict import defaultdict
10 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
11 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
12 from sfa.planetlab.plxrn import PlXrn
13 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename, hrn_to_os_tenant_name
14 from sfa.util.cache import Cache
15 from sfa.trust.credential import Credential
16 # used to be used in get_ticket
17 #from sfa.trust.sfaticket import SfaTicket
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec
22 # the driver interface, mostly provides default behaviours
23 from sfa.managers.driver import Driver
24 from sfa.openstack.shell import Shell
25 from sfa.openstack.osaggregate import OSAggregate
26 from sfa.planetlab.plslices import PlSlices
27 from sfa.util.osxrn import OSXrn
30 def list_to_dict(recs, key):
32 convert a list of dictionaries into a dictionary keyed on the
33 specified dictionary key
35 return dict ( [ (rec[key],rec) for rec in recs ] )
38 # PlShell is just an xmlrpc serverproxy where methods
39 # can be sent as-is; it takes care of authentication
40 # from the global config
42 class NovaDriver(Driver):
44 # the cache instance is a class member so it survives across incoming requests
47 def __init__ (self, config):
48 Driver.__init__(self, config)
49 self.shell = Shell(config)
51 if config.SFA_AGGREGATE_CACHING:
52 if NovaDriver.cache is None:
53 NovaDriver.cache = Cache()
54 self.cache = NovaDriver.cache
56 ########################################
57 ########## registry oriented
58 ########################################
60 ########## disabled users
61 def is_enabled (self, record):
62 # all records are enabled
65 def augment_records_with_testbed_info (self, sfa_records):
66 return self.fill_record_info (sfa_records)
69 def register (self, sfa_record, hrn, pub_key):
70 type = sfa_record['type']
72 #pl_record = self.sfa_fields_to_pl_fields(type dd , hrn, sfa_record)
75 # add slice description, name, researchers, PI
76 name = hrn_to_os_slicename(hrn)
77 researchers = sfa_record.get('researchers', [])
78 pis = sfa_record.get('pis', [])
79 project_manager = None
80 description = sfa_record.get('description', None)
82 project_manager = Xrn(pis[0], 'user').get_leaf()
84 project_manager = Xrn(researchers[0], 'user').get_leaf()
85 if not project_manager:
86 err_string = "Cannot create a project without a project manager. " + \
87 "Please specify at least one PI or researcher for project: " + \
89 raise SfaInvalidArgument(err_string)
91 users = [Xrn(user, 'user').get_leaf() for user in \
93 self.shell.auth_manager.create_project(name, project_manager, description, users)
96 # add person roles, projects and keys
97 name = Xrn(hrn).get_leaf()
98 self.shell.auth_manager.create_user(name)
99 projects = sfa_records.get('slices', [])
100 for project in projects:
101 project_name = Xrn(project).get_leaf()
102 self.shell.auth_manager.add_to_project(name, project_name)
103 keys = sfa_records.get('keys', [])
110 self.shell.db.key_pair_create(key_dict)
115 # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
116 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
117 type = new_sfa_record['type']
119 # new_key implemented for users only
120 if new_key and type not in [ 'user' ]:
121 raise UnknownSfaType(type)
123 elif type == "slice":
124 # can update project manager and description
125 name = hrn_to_os_slicename(hrn)
126 researchers = sfa_record.get('researchers', [])
127 pis = sfa_record.get('pis', [])
128 project_manager = None
129 description = sfa_record.get('description', None)
131 project_manager = Xrn(pis[0], 'user').get_leaf()
133 project_manager = Xrn(researchers[0], 'user').get_leaf()
134 self.shell.auth_manager.modify_project(name, project_manager, description)
137 # can techinally update access_key and secret_key,
138 # but that is not in our scope, so we do nothing.
144 def remove (self, sfa_record):
145 type=sfa_record['type']
147 name = Xrn(sfa_record['hrn']).get_leaf()
148 if self.shell.auth_manager.get_user(name):
149 self.shell.auth_manager.delete_user(name)
150 elif type == 'slice':
151 name = hrn_to_os_slicename(sfa_record['hrn'])
152 if self.shell.auth_manager.get_project(name):
153 self.shell.auth_manager.delete_project(name)
158 def fill_record_info(self, records):
160 Given a (list of) SFA record, fill in the PLC specific
161 and SFA specific fields in the record.
163 if not isinstance(records, list):
166 for record in records:
167 if record['type'] == 'user':
168 record = self.fill_user_record_info(record)
169 elif record['type'] == 'slice':
170 record = self.fill_slice_record_info(record)
171 elif record['type'].startswith('authority'):
172 record = self.fill_auth_record_info(record)
175 record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
176 record['geni_certificate'] = record['gid']
177 #if os_record.created_at is not None:
178 # record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
179 #if os_record.updated_at is not None:
180 # record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
184 def fill_user_record_info(self, record):
185 xrn = Xrn(record['hrn'])
186 name = xrn.get_leaf()
187 record['name'] = name
188 user = self.shell.auth_manager.users.find(name=name)
189 record['email'] = user.email
190 tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
192 all_tenants = self.shell.auth_manager.tenants.list()
193 for tmp_tenant in all_tenants:
194 if tmp_tenant.name.startswith(tenant.name +"."):
195 for tmp_user in tmp_tenant.list_users():
196 if tmp_user.name == user.name:
197 slice_hrn = ".",join([self.hrn, tmp_tenant.name])
198 slices.append(slice_hrn)
199 record['slices'] = slices
200 roles = self.shell.auth_manager.roles.roles_for_user(user, tenant)
201 record['roles'] = [role.name for role in roles]
202 keys = self.shell.nova_manager.keypairs.findall(name=record['hrn'])
203 record['keys'] = [key.public_key for key in keys]
206 def fill_slice_record_info(self, record):
207 tenant_name = hrn_to_os_tenant_name(record['hrn'])
208 tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
209 parent_tenant_name = OSXrn(xrn=tenant_name).get_authority_hrn()
210 parent_tenant = self.shell.auth_manager.tenants.find(name=parent_tenant_name)
214 # look for users and pis in slice tenant
215 for user in tenant.list_users():
216 for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
217 hrn = ".".join([self.hrn, tenant.name, user.name])
218 if role.name.lower() == 'pi':
220 elif role.name.lower() in ['user', 'member']:
221 researchers.append(hrn)
223 # look for pis in the slice's parent (site/organization) tenant
224 for user in parent_tenant.list_users():
225 for role in self.shell.auth_manager.roles.roles_for_user(user, parent_tenant):
226 if role.name.lower() == 'pi':
227 hrn = ".".join([self.hrn, tenant.name, user.name])
229 record['name'] = tenant_name
230 record['description'] = tenant.description
233 record['geni_creator'] = pis[0]
235 record['geni_creator'] = None
236 record['researcher'] = researchers
239 def fill_auth_record_info(self, record):
240 tenant_name = hrn_to_os_tenant_name(record['hrn'])
241 tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
245 # look for users and pis in slice tenant
246 for user in tenant.list_users():
247 for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
248 hrn = ".".join([self.hrn, tenant.name, user.name])
249 if role.name.lower() == 'pi':
251 elif role.name.lower() in ['user', 'member']:
252 researchers.append(hrn)
256 all_tenants = self.shell.auth_manager.tenants.list()
257 for tmp_tenant in all_tenants:
258 if tmp_tenant.name.startswith(tenant.name+"."):
259 slices.append(".".join([self.hrn, tmp_tenant.name]))
261 record['name'] = tenant_name
262 record['description'] = tenant.description
264 record['enabled'] = tenant.enabled
265 record['researchers'] = researchers
266 record['slices'] = slices
270 # plcapi works by changes, compute what needs to be added/deleted
271 def update_relation (self, subject_type, target_type, subject_id, target_ids):
272 # hard-wire the code for slice/user for now, could be smarter if needed
273 if subject_type =='slice' and target_type == 'user':
274 subject=self.shell.project_get(subject_id)[0]
275 current_target_ids = [user.name for user in subject.members]
276 add_target_ids = list ( set (target_ids).difference(current_target_ids))
277 del_target_ids = list ( set (current_target_ids).difference(target_ids))
278 logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
279 for target_id in add_target_ids:
280 self.shell.project_add_member(target_id,subject_id)
281 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
282 for target_id in del_target_ids:
283 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
284 self.shell.project_remove_member(target_id, subject_id)
286 logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
289 ########################################
290 ########## aggregate oriented
291 ########################################
293 def testbed_name (self): return "openstack"
295 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
296 def aggregate_version (self):
297 version_manager = VersionManager()
298 ad_rspec_versions = []
299 request_rspec_versions = []
300 for rspec_version in version_manager.versions:
301 if rspec_version.content_type in ['*', 'ad']:
302 ad_rspec_versions.append(rspec_version.to_dict())
303 if rspec_version.content_type in ['*', 'request']:
304 request_rspec_versions.append(rspec_version.to_dict())
306 'testbed':self.testbed_name(),
307 'geni_request_rspec_versions': request_rspec_versions,
308 'geni_ad_rspec_versions': ad_rspec_versions,
311 def list_slices (self, creds, options):
312 # look in cache first
314 slices = self.cache.get('slices')
316 logger.debug("OpenStackDriver.list_slices returns from cache")
320 projs = self.shell.auth_manager.get_projects()
321 slice_urns = [OSXrn(proj.name, 'slice').urn for proj in projs]
325 logger.debug ("OpenStackDriver.list_slices stores value in cache")
326 self.cache.add('slices', slice_urns)
330 # first 2 args are None in case of resource discovery
331 def list_resources (self, slice_urn, slice_hrn, creds, options):
332 cached_requested = options.get('cached', True)
334 version_manager = VersionManager()
335 # get the rspec's return format from options
336 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
337 version_string = "rspec_%s" % (rspec_version)
339 #panos adding the info option to the caching key (can be improved)
340 if options.get('info'):
341 version_string = version_string + "_"+options.get('info', 'default')
343 # look in cache first
344 if cached_requested and self.cache and not slice_hrn:
345 rspec = self.cache.get(version_string)
347 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
350 #panos: passing user-defined options
351 #print "manager options = ",options
352 aggregate = OSAggregate(self)
353 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
357 if self.cache and not slice_hrn:
358 logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
359 self.cache.add(version_string, rspec)
363 def sliver_status (self, slice_urn, slice_hrn):
364 # find out where this slice is currently running
365 project_name = hrn_to_os_slicename(slice_hrn)
366 project = self.shell.auth_manager.get_project(project_name)
367 instances = self.shell.db.instance_get_all_by_project(project_name)
368 if len(instances) == 0:
369 raise SliverDoesNotExist("You have not allocated any slivers here")
372 top_level_status = 'unknown'
374 top_level_status = 'ready'
375 result['geni_urn'] = slice_urn
376 result['plos_login'] = 'root'
377 result['plos_expires'] = None
380 for instance in instances:
382 # instances are accessed by ip, not hostname. We need to report the ip
383 # somewhere so users know where to ssh to.
384 res['plos_hostname'] = instance.hostname
385 res['plos_created_at'] = datetime_to_string(utcparse(instance.created_at))
386 res['plos_boot_state'] = instance.vm_state
387 res['plos_sliver_type'] = instance.instance_type.name
388 sliver_id = Xrn(slice_urn).get_sliver_id(instance.project_id, \
389 instance.hostname, instance.id)
390 res['geni_urn'] = sliver_id
392 if instance.vm_state == 'running':
393 res['boot_state'] = 'ready';
395 res['boot_state'] = 'unknown'
396 resources.append(res)
398 result['geni_status'] = top_level_status
399 result['geni_resources'] = resources
402 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
404 aggregate = OSAggregate(self)
405 rspec = RSpec(rspec_string)
406 instance_name = hrn_to_os_slicename(slice_hrn)
408 # assume first user is the caller and use their context
409 # for the ec2/euca api connection. Also, use the first users
410 # key as the project key.
413 key_name = aggregate.create_instance_key(slice_hrn, users[0])
415 # collect public keys
418 pubkeys.extend(user['keys'])
420 aggregate.run_instances(instance_name, rspec_string, key_name, pubkeys)
422 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
424 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
425 aggregate = OSAggregate(self)
426 project_name = hrn_to_os_slicename(slice_hrn)
427 return aggregate.delete_instances(project_name)
429 def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
430 name = hrn_to_os_slicename(slice_hrn)
431 aggregate = OSAggregate(self)
432 return aggregate.update_instances(name)
434 def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
437 def start_slice (self, slice_urn, slice_hrn, creds):
440 def stop_slice (self, slice_urn, slice_hrn, creds):
441 name = OSXrn(xrn=slice_urn).name
442 aggregate = OSAggregate(self)
443 return aggregate.stop_instances(name)
445 def reset_slice (self, slice_urn, slice_hrn, creds):
446 raise SfaNotImplemented ("reset_slice not available at this interface")
448 # xxx this code is quite old and has not run for ages
449 # it is obviously totally broken and needs a rewrite
450 def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
451 raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
452 # please keep this code for future reference
453 # slices = PlSlices(self)
454 # peer = slices.get_peer(slice_hrn)
455 # sfa_peer = slices.get_sfa_peer(slice_hrn)
457 # # get the slice record
458 # credential = api.getCredential()
459 # interface = api.registries[api.hrn]
460 # registry = api.server_proxy(interface, credential)
461 # records = registry.Resolve(xrn, credential)
463 # # make sure we get a local slice record
465 # for tmp_record in records:
466 # if tmp_record['type'] == 'slice' and \
467 # not tmp_record['peer_authority']:
468 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
469 # slice_record = SliceRecord(dict=tmp_record)
471 # raise RecordNotFound(slice_hrn)
473 # # similar to CreateSliver, we must verify that the required records exist
474 # # at this aggregate before we can issue a ticket
476 # rspec = RSpec(rspec_string)
477 # requested_attributes = rspec.version.get_slice_attributes()
479 # # ensure site record exists
480 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
481 # # ensure slice record exists
482 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
483 # # ensure person records exists
484 # # xxx users is undefined in this context
485 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
486 # # ensure slice attributes exists
487 # slices.verify_slice_attributes(slice, requested_attributes)
490 # slivers = slices.get_slivers(slice_hrn)
493 # raise SliverDoesNotExist(slice_hrn)
498 # 'timestamp': int(time.time()),
499 # 'initscripts': initscripts,
503 # # create the ticket
504 # object_gid = record.get_gid_object()
505 # new_ticket = SfaTicket(subject = object_gid.get_subject())
506 # new_ticket.set_gid_caller(api.auth.client_gid)
507 # new_ticket.set_gid_object(object_gid)
508 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
509 # new_ticket.set_pubkey(object_gid.get_pubkey())
510 # new_ticket.set_attributes(data)
511 # new_ticket.set_rspec(rspec)
512 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
513 # new_ticket.encode()
516 # return new_ticket.save_to_string(save_parents=True)