4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5 RecordNotFound, SfaNotImplemented, SliverDoesNotExist
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
11 from sfa.util.cache import Cache
13 # one would think the driver should not need to mess with the SFA db, but..
15 # used to be used in get_ticket
16 #from sfa.trust.sfaticket import SfaTicket
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
21 # the driver interface, mostly provides default behaviours
22 from sfa.managers.driver import Driver
24 from sfa.openstack.openstack_shell import OpenstackShell
25 import sfa.plc.peers as peers
26 from sfa.plc.plaggregate import PlAggregate
27 from sfa.plc.plslices import PlSlices
28 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename, hrn_to_pl_login_base
31 def list_to_dict(recs, key):
33 convert a list of dictionaries into a dictionary keyed on the
34 specified dictionary key
36 return dict ( [ (rec[key],rec) for rec in recs ] )
39 # PlShell is just an xmlrpc serverproxy where methods
40 # can be sent as-is; it takes care of authentication
41 # from the global config
43 class OpenstackDriver (Driver):
45 # the cache instance is a class member so it survives across incoming requests
48 def __init__ (self, config):
49 Driver.__init__ (self, config)
50 self.shell = OpenstackShell (config)
52 if config.SFA_AGGREGATE_CACHING:
53 if OpenstackDriver.cache is None:
54 OpenstackDriver.cache = Cache()
55 self.cache = OpenstackDriver.cache
57 ########################################
58 ########## registry oriented
59 ########################################
61 ########## disabled users
62 def is_enabled (self, record):
63 # all records are enabled
66 def augment_records_with_testbed_info (self, sfa_records):
67 return self.fill_record_info (sfa_records)
70 def register (self, sfa_record, hrn, pub_key):
71 type = sfa_record['type']
72 pl_record = self.sfa_fields_to_pl_fields(type, hrn, sfa_record)
75 acceptable_fields=['url', 'instantiation', 'name', 'description']
76 # add slice description, name, researchers, PI
80 # add person roles, projects and keys
85 # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
86 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
87 pointer = old_sfa_record['pointer']
88 type = old_sfa_record['type']
90 # new_key implemented for users only
91 if new_key and type not in [ 'user' ]:
92 raise UnknownSfaType(type)
95 # can update description, researchers and PI
98 # can update slices, keys and roles
104 def remove (self, sfa_record):
105 type=sfa_record['type']
106 name = Xrn(sfa_record['hrn']).get_leaf()
108 if self.shell.user_get(name):
109 self.shell.user_delete(name)
110 elif type == 'slice':
111 if self.shell.project_get(name):
112 self.shell.project_delete(name)
117 def fill_record_info(self, records):
119 Given a (list of) SFA record, fill in the PLC specific
120 and SFA specific fields in the record.
122 if not isinstance(records, list):
125 for record in records:
126 name = Xrn(record['hrn']).get_leaf()
128 if record['type'] == 'user':
129 os_record = self.shell.user_get(name)
130 record['slices'] = [self.hrn + "." + proj.name for \
131 proj in os_record.projects]
132 record['roles'] = [role for role in os_record.roles]
133 keys = self.shell.key_pair_get_all_by_user(name)
134 record['keys'] = [key.public_key for key in keys]
135 elif record['type'] == 'slice':
136 os_record = self.shell.project_get(name)
137 record['description'] = os_record.description
138 record['PI'] = self.hrn + "." + os_record.project_manager
139 record['geni_creator'] = record['PI']
140 record['researcher'] = [self.hrn + "." + user.name for \
141 user in os_record.members]
144 record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
145 record['geni_certificate'] = record['gid']
146 record['name'] = os_record.name
147 if os_record.created_at is not None:
148 record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
149 if os_record.updated_at is not None:
150 record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
156 # plcapi works by changes, compute what needs to be added/deleted
157 def update_relation (self, subject_type, target_type, subject_id, target_ids):
158 # hard-wire the code for slice/user for now, could be smarter if needed
159 if subject_type =='slice' and target_type == 'user':
160 subject=self.shell.project_get(subject_id)[0]
161 current_target_ids = [user.name for user in subject.members]
162 add_target_ids = list ( set (target_ids).difference(current_target_ids))
163 del_target_ids = list ( set (current_target_ids).difference(target_ids))
164 logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
165 for target_id in add_target_ids:
166 self.shell.project_add_member(target_id,subject_id)
167 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
168 for target_id in del_target_ids:
169 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
170 self.shell.project_remove_member(target_id, subject_id)
172 logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
175 ########################################
176 ########## aggregate oriented
177 ########################################
179 def testbed_name (self): return "openstack"
181 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
182 def aggregate_version (self):
183 version_manager = VersionManager()
184 ad_rspec_versions = []
185 request_rspec_versions = []
186 for rspec_version in version_manager.versions:
187 if rspec_version.content_type in ['*', 'ad']:
188 ad_rspec_versions.append(rspec_version.to_dict())
189 if rspec_version.content_type in ['*', 'request']:
190 request_rspec_versions.append(rspec_version.to_dict())
192 'testbed':self.testbed_name(),
193 'geni_request_rspec_versions': request_rspec_versions,
194 'geni_ad_rspec_versions': ad_rspec_versions,
197 def list_slices (self, creds, options):
198 # look in cache first
200 slices = self.cache.get('slices')
202 logger.debug("PlDriver.list_slices returns from cache")
206 slices = self.shell.GetSlices({'peer_id': None}, ['name'])
207 slice_hrns = [slicename_to_hrn(self.hrn, slice['name']) for slice in slices]
208 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
212 logger.debug ("PlDriver.list_slices stores value in cache")
213 self.cache.add('slices', slice_urns)
217 # first 2 args are None in case of resource discovery
218 def list_resources (self, slice_urn, slice_hrn, creds, options):
219 cached_requested = options.get('cached', True)
221 version_manager = VersionManager()
222 # get the rspec's return format from options
223 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
224 version_string = "rspec_%s" % (rspec_version)
226 #panos adding the info option to the caching key (can be improved)
227 if options.get('info'):
228 version_string = version_string + "_"+options.get('info', 'default')
230 # look in cache first
231 if cached_requested and self.cache and not slice_hrn:
232 rspec = self.cache.get(version_string)
234 logger.debug("PlDriver.ListResources: returning cached advertisement")
237 #panos: passing user-defined options
238 #print "manager options = ",options
239 aggregate = PlAggregate(self)
240 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
244 if self.cache and not slice_hrn:
245 logger.debug("PlDriver.ListResources: stores advertisement in cache")
246 self.cache.add(version_string, rspec)
250 def sliver_status (self, slice_urn, slice_hrn):
251 # find out where this slice is currently running
252 slicename = hrn_to_pl_slicename(slice_hrn)
254 slices = self.shell.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
256 raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
259 # report about the local nodes only
260 nodes = self.shell.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
261 ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
264 raise SliverDoesNotExist("You have not allocated any slivers here")
266 site_ids = [node['site_id'] for node in nodes]
269 top_level_status = 'unknown'
271 top_level_status = 'ready'
272 result['geni_urn'] = slice_urn
273 result['pl_login'] = slice['name']
274 result['pl_expires'] = datetime_to_string(utcparse(slice['expires']))
279 res['pl_hostname'] = node['hostname']
280 res['pl_boot_state'] = node['boot_state']
281 res['pl_last_contact'] = node['last_contact']
282 if node['last_contact'] is not None:
284 res['pl_last_contact'] = datetime_to_string(utcparse(node['last_contact']))
285 sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
286 res['geni_urn'] = sliver_id
287 if node['boot_state'] == 'boot':
288 res['geni_status'] = 'ready'
290 res['geni_status'] = 'failed'
291 top_level_status = 'failed'
293 res['geni_error'] = ''
295 resources.append(res)
297 result['geni_status'] = top_level_status
298 result['geni_resources'] = resources
301 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
303 aggregate = PlAggregate(self)
304 slices = PlSlices(self)
305 peer = slices.get_peer(slice_hrn)
306 sfa_peer = slices.get_sfa_peer(slice_hrn)
309 slice_record = users[0].get('slice_record', {})
312 rspec = RSpec(rspec_string)
313 requested_attributes = rspec.version.get_slice_attributes()
315 # ensure site record exists
316 site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer, options=options)
317 # ensure slice record exists
318 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
319 # ensure person records exists
320 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
321 # ensure slice attributes exists
322 slices.verify_slice_attributes(slice, requested_attributes, options=options)
324 # add/remove slice from nodes
325 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
326 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
328 # add/remove links links
329 slices.verify_slice_links(slice, rspec.version.get_link_requests(), nodes)
331 # handle MyPLC peer association.
332 # only used by plc and ple.
333 slices.handle_peer(site, slice, persons, peer)
335 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
337 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
338 slicename = hrn_to_pl_slicename(slice_hrn)
339 slices = self.shell.GetSlices({'name': slicename})
344 # determine if this is a peer slice
345 # xxx I wonder if this would not need to use PlSlices.get_peer instead
346 # in which case plc.peers could be deprecated as this here
347 # is the only/last call to this last method in plc.peers
348 peer = peers.get_peer(self, slice_hrn)
351 self.shell.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
352 self.shell.DeleteSliceFromNodes(slicename, slice['node_ids'])
355 self.shell.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
358 def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
359 slicename = hrn_to_pl_slicename(slice_hrn)
360 slices = self.shell.GetSlices({'name': slicename}, ['slice_id'])
362 raise RecordNotFound(slice_hrn)
364 requested_time = utcparse(expiration_time)
365 record = {'expires': int(datetime_to_epoch(requested_time))}
367 self.shell.UpdateSlice(slice['slice_id'], record)
372 # remove the 'enabled' tag
373 def start_slice (self, slice_urn, slice_hrn, creds):
374 slicename = hrn_to_pl_slicename(slice_hrn)
375 slices = self.shell.GetSlices({'name': slicename}, ['slice_id'])
377 raise RecordNotFound(slice_hrn)
378 slice_id = slices[0]['slice_id']
379 slice_tags = self.shell.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
380 # just remove the tag if it exists
382 self.shell.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
385 # set the 'enabled' tag to 0
386 def stop_slice (self, slice_urn, slice_hrn, creds):
387 slicename = hrn_to_pl_slicename(slice_hrn)
388 slices = self.shell.GetSlices({'name': slicename}, ['slice_id'])
390 raise RecordNotFound(slice_hrn)
391 slice_id = slices[0]['slice_id']
392 slice_tags = self.shell.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
394 self.shell.AddSliceTag(slice_id, 'enabled', '0')
395 elif slice_tags[0]['value'] != "0":
396 tag_id = slice_tags[0]['slice_tag_id']
397 self.shell.UpdateSliceTag(tag_id, '0')
400 def reset_slice (self, slice_urn, slice_hrn, creds):
401 raise SfaNotImplemented ("reset_slice not available at this interface")
403 # xxx this code is quite old and has not run for ages
404 # it is obviously totally broken and needs a rewrite
405 def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
406 raise SfaNotImplemented,"PlDriver.get_ticket needs a rewrite"
407 # please keep this code for future reference
408 # slices = PlSlices(self)
409 # peer = slices.get_peer(slice_hrn)
410 # sfa_peer = slices.get_sfa_peer(slice_hrn)
412 # # get the slice record
413 # credential = api.getCredential()
414 # interface = api.registries[api.hrn]
415 # registry = api.server_proxy(interface, credential)
416 # records = registry.Resolve(xrn, credential)
418 # # make sure we get a local slice record
420 # for tmp_record in records:
421 # if tmp_record['type'] == 'slice' and \
422 # not tmp_record['peer_authority']:
423 # #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
424 # slice_record = SliceRecord(dict=tmp_record)
426 # raise RecordNotFound(slice_hrn)
428 # # similar to CreateSliver, we must verify that the required records exist
429 # # at this aggregate before we can issue a ticket
431 # rspec = RSpec(rspec_string)
432 # requested_attributes = rspec.version.get_slice_attributes()
434 # # ensure site record exists
435 # site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
436 # # ensure slice record exists
437 # slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
438 # # ensure person records exists
439 # # xxx users is undefined in this context
440 # persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
441 # # ensure slice attributes exists
442 # slices.verify_slice_attributes(slice, requested_attributes)
445 # slivers = slices.get_slivers(slice_hrn)
448 # raise SliverDoesNotExist(slice_hrn)
453 # 'timestamp': int(time.time()),
454 # 'initscripts': initscripts,
458 # # create the ticket
459 # object_gid = record.get_gid_object()
460 # new_ticket = SfaTicket(subject = object_gid.get_subject())
461 # new_ticket.set_gid_caller(api.auth.client_gid)
462 # new_ticket.set_gid_object(object_gid)
463 # new_ticket.set_issuer(key=api.key, subject=self.hrn)
464 # new_ticket.set_pubkey(object_gid.get_pubkey())
465 # new_ticket.set_attributes(data)
466 # new_ticket.set_rspec(rspec)
467 # #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
468 # new_ticket.encode()
471 # return new_ticket.save_to_string(save_parents=True)