fix bug in fill_slice_record_info
[sfa.git] / sfa / openstack / nova_driver.py
1 import time
2 import datetime
3
4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5     RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \
6     SfaInvalidArgument
7
8 from sfa.util.sfalogging import logger
9 from sfa.util.defaultdict import defaultdict
10 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
11 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
12 from sfa.planetlab.plxrn import PlXrn
13 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename, hrn_to_os_tenant_name
14 from sfa.util.cache import Cache
15 from sfa.trust.credential import Credential
16 # used to be used in get_ticket
17 #from sfa.trust.sfaticket import SfaTicket
18
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec
21
22 # the driver interface, mostly provides default behaviours
23 from sfa.managers.driver import Driver
24 from sfa.openstack.shell import Shell
25 from sfa.openstack.osaggregate import OSAggregate
26 from sfa.planetlab.plslices import PlSlices
27 from sfa.util.osxrn import OSXrn
28
29
30 def list_to_dict(recs, key):
31     """
32     convert a list of dictionaries into a dictionary keyed on the 
33     specified dictionary key 
34     """
35     return dict ( [ (rec[key],rec) for rec in recs ] )
36
37 #
38 # PlShell is just an xmlrpc serverproxy where methods
39 # can be sent as-is; it takes care of authentication
40 # from the global config
41
42 class NovaDriver(Driver):
43
44     # the cache instance is a class member so it survives across incoming requests
45     cache = None
46
47     def __init__ (self, config):
48         Driver.__init__(self, config)
49         self.shell = Shell(config)
50         self.cache=None
51         if config.SFA_AGGREGATE_CACHING:
52             if NovaDriver.cache is None:
53                 NovaDriver.cache = Cache()
54             self.cache = NovaDriver.cache
55  
56     ########################################
57     ########## registry oriented
58     ########################################
59
60     ########## disabled users 
61     def is_enabled (self, record):
62         # all records are enabled
63         return True
64
65     def augment_records_with_testbed_info (self, sfa_records):
66         return self.fill_record_info (sfa_records)
67
68     ########## 
69     def register (self, sfa_record, hrn, pub_key):
70         type = sfa_record['type']
71         
72         #pl_record = self.sfa_fields_to_pl_fields(type     dd , hrn, sfa_record)
73            
74         if type == 'slice':
75             # add slice description, name, researchers, PI 
76             name = hrn_to_os_slicename(hrn)
77             researchers = sfa_record.get('researchers', [])
78             pis = sfa_record.get('pis', [])
79             project_manager = None
80             description = sfa_record.get('description', None)
81             if pis:
82                 project_manager = Xrn(pis[0], 'user').get_leaf()
83             elif researchers:
84                 project_manager = Xrn(researchers[0], 'user').get_leaf()
85             if not project_manager:
86                 err_string = "Cannot create a project without a project manager. " + \
87                              "Please specify at least one PI or researcher for project: " + \
88                              name    
89                 raise SfaInvalidArgument(err_string)
90
91             users = [Xrn(user, 'user').get_leaf() for user in \
92                      pis + researchers]
93             self.shell.auth_manager.create_project(name, project_manager, description, users)
94
95         elif type == 'user':
96             # add person roles, projects and keys
97             name = Xrn(hrn).get_leaf()
98             self.shell.auth_manager.create_user(name)
99             projects = sfa_records.get('slices', [])
100             for project in projects:
101                 project_name = Xrn(project).get_leaf()
102                 self.shell.auth_manager.add_to_project(name, project_name)
103             keys = sfa_records.get('keys', [])
104             for key in keys:
105                 key_dict = {
106                     'user_id': name,
107                     'name': name,
108                     'public': key,
109                 }
110                 self.shell.db.key_pair_create(key_dict)       
111                   
112         return name
113         
114     ##########
115     # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
116     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
117         type = new_sfa_record['type'] 
118         
119         # new_key implemented for users only
120         if new_key and type not in [ 'user' ]:
121             raise UnknownSfaType(type)
122
123         elif type == "slice":
124             # can update project manager and description
125             name = hrn_to_os_slicename(hrn)
126             researchers = sfa_record.get('researchers', [])
127             pis = sfa_record.get('pis', [])
128             project_manager = None
129             description = sfa_record.get('description', None)
130             if pis:
131                 project_manager = Xrn(pis[0], 'user').get_leaf()
132             elif researchers:
133                 project_manager = Xrn(researchers[0], 'user').get_leaf()
134             self.shell.auth_manager.modify_project(name, project_manager, description)
135
136         elif type == "user":
137             # can techinally update access_key and secret_key,
138             # but that is not in our scope, so we do nothing.  
139             pass
140         return True
141         
142
143     ##########
144     def remove (self, sfa_record):
145         type=sfa_record['type']
146         if type == 'user':
147             name = Xrn(sfa_record['hrn']).get_leaf()     
148             if self.shell.auth_manager.get_user(name):
149                 self.shell.auth_manager.delete_user(name)
150         elif type == 'slice':
151             name = hrn_to_os_slicename(sfa_record['hrn'])     
152             if self.shell.auth_manager.get_project(name):
153                 self.shell.auth_manager.delete_project(name)
154         return True
155
156
157     ####################
158     def fill_record_info(self, records):
159         """
160         Given a (list of) SFA record, fill in the PLC specific 
161         and SFA specific fields in the record. 
162         """
163         if not isinstance(records, list):
164             records = [records]
165
166         for record in records:
167             if record['type'] == 'user':
168                 record = self.fill_user_record_info(record)
169             elif record['type'] == 'slice':
170                 record = self.fill_slice_record_info(record)
171             elif record['type'].startswith('authority'):
172                 record = self.fill_auth_record_info(record)
173             else:
174                 continue
175             record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
176             record['geni_certificate'] = record['gid'] 
177             #if os_record.created_at is not None:    
178             #    record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
179             #if os_record.updated_at is not None:
180             #    record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
181  
182         return records
183
184     def fill_user_record_info(self, record):
185         xrn = Xrn(record['hrn'])
186         name = xrn.get_leaf()
187         record['name'] = name
188         user = self.shell.auth_manager.users.find(name=name)
189         record['email'] = user.email
190         tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
191         slices = []
192         all_tenants = self.shell.auth_manager.tenants.list()
193         for tmp_tenant in all_tenants:
194             if tmp_tenant.name.startswith(tenant.name +"."):
195                 for tmp_user in tmp_tenant.list_users():
196                     if tmp_user.name == user.name:
197                         slice_hrn = ".".join([self.hrn, tmp_tenant.name]) 
198                         slices.append(slice_hrn)   
199         record['slices'] = slices
200         roles = self.shell.auth_manager.roles.roles_for_user(user, tenant)
201         record['roles'] = [role.name for role in roles] 
202         keys = self.shell.nova_manager.keypairs.findall(name=record['hrn'])
203         record['keys'] = [key.public_key for key in keys]
204         return record
205
206     def fill_slice_record_info(self, record):
207         tenant_name = hrn_to_os_tenant_name(record['hrn'])
208         tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
209         parent_tenant_name = OSXrn(xrn=tenant_name).get_authority_hrn()
210         parent_tenant = self.shell.auth_manager.tenants.find(name=parent_tenant_name)
211         researchers = []
212         pis = []
213
214         # look for users and pis in slice tenant
215         for user in tenant.list_users():
216             for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
217                 if role.name.lower() == 'pi':
218                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
219                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
220                     pis.append(hrn)
221                 elif role.name.lower() in ['user', 'member']:
222                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
223                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
224                     researchers.append(hrn)
225
226         # look for pis in the slice's parent (site/organization) tenant
227         for user in parent_tenant.list_users():
228             for role in self.shell.auth_manager.roles.roles_for_user(user, parent_tenant):
229                 if role.name.lower() == 'pi':
230                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
231                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
232                     pis.append(hrn)
233         record['name'] = tenant_name
234         record['description'] = tenant.description
235         record['PI'] = pis
236         if pis:
237             record['geni_creator'] = pis[0]
238         else:
239             record['geni_creator'] = None
240         record['researcher'] = researchers
241         return record
242
243     def fill_auth_record_info(self, record):
244         tenant_name = hrn_to_os_tenant_name(record['hrn'])
245         tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
246         researchers = []
247         pis = []
248
249         # look for users and pis in slice tenant
250         for user in tenant.list_users():
251             for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
252                 hrn = ".".join([self.hrn, tenant.name, user.name])
253                 if role.name.lower() == 'pi':
254                     pis.append(hrn)
255                 elif role.name.lower() in ['user', 'member']:
256                     researchers.append(hrn)
257
258         # look for slices
259         slices = []
260         all_tenants = self.shell.auth_manager.tenants.list() 
261         for tmp_tenant in all_tenants:
262             if tmp_tenant.name.startswith(tenant.name+"."):
263                 slices.append(".".join([self.hrn, tmp_tenant.name])) 
264
265         record['name'] = tenant_name
266         record['description'] = tenant.description
267         record['PI'] = pis
268         record['enabled'] = tenant.enabled
269         record['researchers'] = researchers
270         record['slices'] = slices
271         return record
272
273     ####################
274     # plcapi works by changes, compute what needs to be added/deleted
275     def update_relation (self, subject_type, target_type, subject_id, target_ids):
276         # hard-wire the code for slice/user for now, could be smarter if needed
277         if subject_type =='slice' and target_type == 'user':
278             subject=self.shell.project_get(subject_id)[0]
279             current_target_ids = [user.name for user in subject.members]
280             add_target_ids = list ( set (target_ids).difference(current_target_ids))
281             del_target_ids = list ( set (current_target_ids).difference(target_ids))
282             logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
283             for target_id in add_target_ids:
284                 self.shell.project_add_member(target_id,subject_id)
285                 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
286             for target_id in del_target_ids:
287                 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
288                 self.shell.project_remove_member(target_id, subject_id)
289         else:
290             logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
291
292         
293     ########################################
294     ########## aggregate oriented
295     ########################################
296
297     def testbed_name (self): return "openstack"
298
299     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
300     def aggregate_version (self):
301         version_manager = VersionManager()
302         ad_rspec_versions = []
303         request_rspec_versions = []
304         for rspec_version in version_manager.versions:
305             if rspec_version.content_type in ['*', 'ad']:
306                 ad_rspec_versions.append(rspec_version.to_dict())
307             if rspec_version.content_type in ['*', 'request']:
308                 request_rspec_versions.append(rspec_version.to_dict()) 
309         return {
310             'testbed':self.testbed_name(),
311             'geni_request_rspec_versions': request_rspec_versions,
312             'geni_ad_rspec_versions': ad_rspec_versions,
313             }
314
315     def list_slices (self, creds, options):
316         # look in cache first
317         if self.cache:
318             slices = self.cache.get('slices')
319             if slices:
320                 logger.debug("OpenStackDriver.list_slices returns from cache")
321                 return slices
322     
323         # get data from db
324         projs = self.shell.auth_manager.get_projects()
325         slice_urns = [OSXrn(proj.name, 'slice').urn for proj in projs] 
326     
327         # cache the result
328         if self.cache:
329             logger.debug ("OpenStackDriver.list_slices stores value in cache")
330             self.cache.add('slices', slice_urns) 
331     
332         return slice_urns
333         
334     # first 2 args are None in case of resource discovery
335     def list_resources (self, slice_urn, slice_hrn, creds, options):
336         cached_requested = options.get('cached', True) 
337     
338         version_manager = VersionManager()
339         # get the rspec's return format from options
340         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
341         version_string = "rspec_%s" % (rspec_version)
342     
343         #panos adding the info option to the caching key (can be improved)
344         if options.get('info'):
345             version_string = version_string + "_"+options.get('info', 'default')
346     
347         # look in cache first
348         if cached_requested and self.cache and not slice_hrn:
349             rspec = self.cache.get(version_string)
350             if rspec:
351                 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
352                 return rspec 
353     
354         #panos: passing user-defined options
355         #print "manager options = ",options
356         aggregate = OSAggregate(self)
357         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
358                                      options=options)
359     
360         # cache the result
361         if self.cache and not slice_hrn:
362             logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
363             self.cache.add(version_string, rspec)
364     
365         return rspec
366     
367     def sliver_status (self, slice_urn, slice_hrn):
368         # find out where this slice is currently running
369         project_name = hrn_to_os_slicename(slice_hrn)
370         project = self.shell.auth_manager.get_project(project_name)
371         instances = self.shell.db.instance_get_all_by_project(project_name)
372         if len(instances) == 0:
373             raise SliverDoesNotExist("You have not allocated any slivers here") 
374         
375         result = {}
376         top_level_status = 'unknown'
377         if instances:
378             top_level_status = 'ready'
379         result['geni_urn'] = slice_urn
380         result['plos_login'] = 'root' 
381         result['plos_expires'] = None
382         
383         resources = []
384         for instance in instances:
385             res = {}
386             # instances are accessed by ip, not hostname. We need to report the ip
387             # somewhere so users know where to ssh to.     
388             res['plos_hostname'] = instance.hostname
389             res['plos_created_at'] = datetime_to_string(utcparse(instance.created_at))    
390             res['plos_boot_state'] = instance.vm_state
391             res['plos_sliver_type'] = instance.instance_type.name 
392             sliver_id =  Xrn(slice_urn).get_sliver_id(instance.project_id, \
393                                                       instance.hostname, instance.id)
394             res['geni_urn'] = sliver_id
395
396             if instance.vm_state == 'running':
397                 res['boot_state'] = 'ready';
398             else:
399                 res['boot_state'] = 'unknown'  
400             resources.append(res)
401             
402         result['geni_status'] = top_level_status
403         result['geni_resources'] = resources
404         return result
405
406     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
407
408         aggregate = OSAggregate(self)
409         rspec = RSpec(rspec_string)
410         instance_name = hrn_to_os_slicename(slice_hrn)
411        
412         # assume first user is the caller and use their context
413         # for the ec2/euca api connection. Also, use the first users
414         # key as the project key.
415         key_name = None
416         if len(users) > 1:
417             key_name = aggregate.create_instance_key(slice_hrn, users[0])
418
419         # collect public keys
420         pubkeys = []
421         for user in users:
422             pubkeys.extend(user['keys'])
423            
424         aggregate.run_instances(instance_name, rspec_string, key_name, pubkeys)    
425    
426         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
427
428     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
429         aggregate = OSAggregate(self)
430         project_name = hrn_to_os_slicename(slice_hrn)
431         return aggregate.delete_instances(project_name)   
432
433     def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
434         name = hrn_to_os_slicename(slice_hrn)
435         aggregate = OSAggregate(self)
436         return aggregate.update_instances(name)
437     
438     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
439         return True
440
441     def start_slice (self, slice_urn, slice_hrn, creds):
442         return 1
443
444     def stop_slice (self, slice_urn, slice_hrn, creds):
445         name = OSXrn(xrn=slice_urn).name
446         aggregate = OSAggregate(self)
447         return aggregate.stop_instances(name) 
448
449     def reset_slice (self, slice_urn, slice_hrn, creds):
450         raise SfaNotImplemented ("reset_slice not available at this interface")
451     
452     # xxx this code is quite old and has not run for ages
453     # it is obviously totally broken and needs a rewrite
454     def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
455         raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
456 # please keep this code for future reference
457 #        slices = PlSlices(self)
458 #        peer = slices.get_peer(slice_hrn)
459 #        sfa_peer = slices.get_sfa_peer(slice_hrn)
460 #    
461 #        # get the slice record
462 #        credential = api.getCredential()
463 #        interface = api.registries[api.hrn]
464 #        registry = api.server_proxy(interface, credential)
465 #        records = registry.Resolve(xrn, credential)
466 #    
467 #        # make sure we get a local slice record
468 #        record = None
469 #        for tmp_record in records:
470 #            if tmp_record['type'] == 'slice' and \
471 #               not tmp_record['peer_authority']:
472 #    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
473 #                slice_record = SliceRecord(dict=tmp_record)
474 #        if not record:
475 #            raise RecordNotFound(slice_hrn)
476 #        
477 #        # similar to CreateSliver, we must verify that the required records exist
478 #        # at this aggregate before we can issue a ticket
479 #        # parse rspec
480 #        rspec = RSpec(rspec_string)
481 #        requested_attributes = rspec.version.get_slice_attributes()
482 #    
483 #        # ensure site record exists
484 #        site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
485 #        # ensure slice record exists
486 #        slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
487 #        # ensure person records exists
488 #    # xxx users is undefined in this context
489 #        persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
490 #        # ensure slice attributes exists
491 #        slices.verify_slice_attributes(slice, requested_attributes)
492 #        
493 #        # get sliver info
494 #        slivers = slices.get_slivers(slice_hrn)
495 #    
496 #        if not slivers:
497 #            raise SliverDoesNotExist(slice_hrn)
498 #    
499 #        # get initscripts
500 #        initscripts = []
501 #        data = {
502 #            'timestamp': int(time.time()),
503 #            'initscripts': initscripts,
504 #            'slivers': slivers
505 #        }
506 #    
507 #        # create the ticket
508 #        object_gid = record.get_gid_object()
509 #        new_ticket = SfaTicket(subject = object_gid.get_subject())
510 #        new_ticket.set_gid_caller(api.auth.client_gid)
511 #        new_ticket.set_gid_object(object_gid)
512 #        new_ticket.set_issuer(key=api.key, subject=self.hrn)
513 #        new_ticket.set_pubkey(object_gid.get_pubkey())
514 #        new_ticket.set_attributes(data)
515 #        new_ticket.set_rspec(rspec)
516 #        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
517 #        new_ticket.encode()
518 #        new_ticket.sign()
519 #    
520 #        return new_ticket.save_to_string(save_parents=True)