Merge branch 'master' of ssh://git.onelab.eu/git/sfa
[sfa.git] / sfa / openstack / nova_driver.py
1 import time
2 import datetime
3
4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5     RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \
6     SfaInvalidArgument
7
8 from sfa.util.sfalogging import logger
9 from sfa.util.defaultdict import defaultdict
10 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
11 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf
12 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename, hrn_to_os_tenant_name
13 from sfa.util.cache import Cache
14 from sfa.trust.credential import Credential
15 # used to be used in get_ticket
16 #from sfa.trust.sfaticket import SfaTicket
17
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
20
21 # the driver interface, mostly provides default behaviours
22 from sfa.managers.driver import Driver
23 from sfa.openstack.shell import Shell
24 from sfa.openstack.osaggregate import OSAggregate
25 from sfa.planetlab.plslices import PlSlices
26
27 def list_to_dict(recs, key):
28     """
29     convert a list of dictionaries into a dictionary keyed on the 
30     specified dictionary key 
31     """
32     return dict ( [ (rec[key],rec) for rec in recs ] )
33
34 #
35 # PlShell is just an xmlrpc serverproxy where methods
36 # can be sent as-is; it takes care of authentication
37 # from the global config
38
39 class NovaDriver(Driver):
40
41     # the cache instance is a class member so it survives across incoming requests
42     cache = None
43
44     def __init__ (self, config):
45         Driver.__init__(self, config)
46         self.shell = Shell(config=config)
47         self.cache=None
48         if config.SFA_AGGREGATE_CACHING:
49             if NovaDriver.cache is None:
50                 NovaDriver.cache = Cache()
51             self.cache = NovaDriver.cache
52  
53     ########################################
54     ########## registry oriented
55     ########################################
56
57     ########## disabled users 
58     def is_enabled (self, record):
59         # all records are enabled
60         return True
61
62     def augment_records_with_testbed_info (self, sfa_records):
63         return self.fill_record_info (sfa_records)
64
65     ########## 
66     def register (self, sfa_record, hrn, pub_key):
67         
68         if sfa_record['type'] == 'slice':
69             record = self.register_slice(sfa_record, hrn)         
70         elif sfa_record['type'] == 'user':
71             record = self.register_user(sfa_record, hrn, pub_key)
72         elif sfa_record['type'].startswith('authority'): 
73             record = self.register_authority(sfa_record, hrn)
74         # We should be returning the records id as a pointer but
75         # this is a string and the records table expects this to be an 
76         # int.
77         #return record.id
78         return -1
79
80     def register_slice(self, sfa_record, hrn):
81         # add slice description, name, researchers, PI
82         name = hrn_to_os_tenant_name(hrn)
83         description = sfa_record.get('description', None)
84         self.shell.auth_manager.tenants.create(name, description)
85         tenant = self.shell.auth_manager.tenants.find(name=name)
86         auth_hrn = OSXrn(xrn=hrn, type='slice').get_authority_hrn()
87         parent_tenant_name = OSXrn(xrn=auth_hrn, type='slice').get_tenant_name()
88         parent_tenant = self.shell.auth_manager.tenants.find(name=parent_tenant_name)
89         researchers = sfa_record.get('researchers', [])
90         for researcher in researchers:
91             name = Xrn(researcher).get_leaf()
92             user = self.shell.auth_manager.users.find(name=name)
93             self.shell.auth_manager.roles.add_user_role(user, 'Member', tenant)
94             self.shell.auth_manager.roles.add_user_role(user, 'user', tenant)
95             
96
97         pis = sfa_record.get('pis', [])
98         for pi in pis:
99             name = Xrn(pi).get_leaf()
100             user = self.shell.auth_manager.users.find(name=name)
101             self.shell.auth_manager.roles.add_user_role(user, 'pi', tenant)
102             self.shell.auth_manager.roles.add_user_role(user, 'pi', parent_tenant)
103
104         return tenant
105        
106     def register_user(self, sfa_record, hrn, pub_key):
107         # add person roles, projects and keys
108         email = sfa_record.get('email', None)
109         xrn = Xrn(hrn)
110         name = xrn.get_leaf()
111         auth_hrn = xrn.get_authority_hrn()
112         tenant_name = OSXrn(xrn=auth_hrn, type='authority').get_tenant_name()  
113         tenant = self.shell.auth_manager.tenants.find(name=tenant_name)  
114         self.shell.auth_manager.users.create(name, email=email, tenant_id=tenant.id)
115         user = self.shell.auth_manager.users.find(name=name)
116         slices = sfa_records.get('slices', [])
117         for slice in projects:
118             slice_tenant_name = OSXrn(xrn=slice, type='slice').get_tenant_name()
119             slice_tenant = self.shell.auth_manager.tenants.find(name=slice_tenant_name)
120             self.shell.auth_manager.roles.add_user_role(user, slice_tenant, 'user')
121         keys = sfa_records.get('keys', [])
122         for key in keys:
123             keyname = OSXrn(xrn=hrn, type='user').get_slicename()
124             self.shell.nova_client.keypairs.create(keyname, key)
125         return user
126
127     def register_authority(self, sfa_record, hrn):
128         name = OSXrn(xrn=hrn, type='authority').get_tenant_name()
129         self.shell.auth_manager.tenants.create(name, sfa_record.get('description', ''))
130         tenant = self.shell.auth_manager.tenants.find(name=name)
131         return tenant
132         
133         
134     ##########
135     # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
136     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
137         type = new_sfa_record['type'] 
138         
139         # new_key implemented for users only
140         if new_key and type not in [ 'user' ]:
141             raise UnknownSfaType(type)
142
143         elif type == "slice":
144             # can update project manager and description
145             name = hrn_to_os_slicename(hrn)
146             researchers = sfa_record.get('researchers', [])
147             pis = sfa_record.get('pis', [])
148             project_manager = None
149             description = sfa_record.get('description', None)
150             if pis:
151                 project_manager = Xrn(pis[0], 'user').get_leaf()
152             elif researchers:
153                 project_manager = Xrn(researchers[0], 'user').get_leaf()
154             self.shell.auth_manager.modify_project(name, project_manager, description)
155
156         elif type == "user":
157             # can techinally update access_key and secret_key,
158             # but that is not in our scope, so we do nothing.  
159             pass
160         return True
161         
162
163     ##########
164     def remove (self, sfa_record):
165         type=sfa_record['type']
166         if type == 'user':
167             name = Xrn(sfa_record['hrn']).get_leaf()     
168             if self.shell.auth_manager.get_user(name):
169                 self.shell.auth_manager.delete_user(name)
170         elif type == 'slice':
171             name = hrn_to_os_slicename(sfa_record['hrn'])     
172             if self.shell.auth_manager.get_project(name):
173                 self.shell.auth_manager.delete_project(name)
174         return True
175
176
177     ####################
178     def fill_record_info(self, records):
179         """
180         Given a (list of) SFA record, fill in the PLC specific 
181         and SFA specific fields in the record. 
182         """
183         if not isinstance(records, list):
184             records = [records]
185
186         for record in records:
187             if record['type'] == 'user':
188                 record = self.fill_user_record_info(record)
189             elif record['type'] == 'slice':
190                 record = self.fill_slice_record_info(record)
191             elif record['type'].startswith('authority'):
192                 record = self.fill_auth_record_info(record)
193             else:
194                 continue
195             record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
196             record['geni_certificate'] = record['gid'] 
197             #if os_record.created_at is not None:    
198             #    record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
199             #if os_record.updated_at is not None:
200             #    record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
201  
202         return records
203
204     def fill_user_record_info(self, record):
205         xrn = Xrn(record['hrn'])
206         name = xrn.get_leaf()
207         record['name'] = name
208         user = self.shell.auth_manager.users.find(name=name)
209         record['email'] = user.email
210         tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
211         slices = []
212         all_tenants = self.shell.auth_manager.tenants.list()
213         for tmp_tenant in all_tenants:
214             if tmp_tenant.name.startswith(tenant.name +"."):
215                 for tmp_user in tmp_tenant.list_users():
216                     if tmp_user.name == user.name:
217                         slice_hrn = ".".join([self.hrn, tmp_tenant.name]) 
218                         slices.append(slice_hrn)   
219         record['slices'] = slices
220         roles = self.shell.auth_manager.roles.roles_for_user(user, tenant)
221         record['roles'] = [role.name for role in roles] 
222         keys = self.shell.nova_manager.keypairs.findall(name=record['hrn'])
223         record['keys'] = [key.public_key for key in keys]
224         return record
225
226     def fill_slice_record_info(self, record):
227         tenant_name = hrn_to_os_tenant_name(record['hrn'])
228         tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
229         parent_tenant_name = OSXrn(xrn=tenant_name).get_authority_hrn()
230         parent_tenant = self.shell.auth_manager.tenants.find(name=parent_tenant_name)
231         researchers = []
232         pis = []
233
234         # look for users and pis in slice tenant
235         for user in tenant.list_users():
236             for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
237                 if role.name.lower() == 'pi':
238                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
239                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
240                     pis.append(hrn)
241                 elif role.name.lower() in ['user', 'member']:
242                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
243                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
244                     researchers.append(hrn)
245
246         # look for pis in the slice's parent (site/organization) tenant
247         for user in parent_tenant.list_users():
248             for role in self.shell.auth_manager.roles.roles_for_user(user, parent_tenant):
249                 if role.name.lower() == 'pi':
250                     user_tenant = self.shell.auth_manager.tenants.find(id=user.tenantId)
251                     hrn = ".".join([self.hrn, user_tenant.name, user.name])
252                     pis.append(hrn)
253         record['name'] = tenant_name
254         record['description'] = tenant.description
255         record['PI'] = pis
256         if pis:
257             record['geni_creator'] = pis[0]
258         else:
259             record['geni_creator'] = None
260         record['researcher'] = researchers
261         return record
262
263     def fill_auth_record_info(self, record):
264         tenant_name = hrn_to_os_tenant_name(record['hrn'])
265         tenant = self.shell.auth_manager.tenants.find(name=tenant_name)
266         researchers = []
267         pis = []
268
269         # look for users and pis in slice tenant
270         for user in tenant.list_users():
271             for role in self.shell.auth_manager.roles.roles_for_user(user, tenant):
272                 hrn = ".".join([self.hrn, tenant.name, user.name])
273                 if role.name.lower() == 'pi':
274                     pis.append(hrn)
275                 elif role.name.lower() in ['user', 'member']:
276                     researchers.append(hrn)
277
278         # look for slices
279         slices = []
280         all_tenants = self.shell.auth_manager.tenants.list() 
281         for tmp_tenant in all_tenants:
282             if tmp_tenant.name.startswith(tenant.name+"."):
283                 slices.append(".".join([self.hrn, tmp_tenant.name])) 
284
285         record['name'] = tenant_name
286         record['description'] = tenant.description
287         record['PI'] = pis
288         record['enabled'] = tenant.enabled
289         record['researchers'] = researchers
290         record['slices'] = slices
291         return record
292
293     ####################
294     # plcapi works by changes, compute what needs to be added/deleted
295     def update_relation (self, subject_type, target_type, subject_id, target_ids):
296         # hard-wire the code for slice/user for now, could be smarter if needed
297         if subject_type =='slice' and target_type == 'user':
298             subject=self.shell.project_get(subject_id)[0]
299             current_target_ids = [user.name for user in subject.members]
300             add_target_ids = list ( set (target_ids).difference(current_target_ids))
301             del_target_ids = list ( set (current_target_ids).difference(target_ids))
302             logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
303             for target_id in add_target_ids:
304                 self.shell.project_add_member(target_id,subject_id)
305                 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
306             for target_id in del_target_ids:
307                 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
308                 self.shell.project_remove_member(target_id, subject_id)
309         else:
310             logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
311
312         
313     ########################################
314     ########## aggregate oriented
315     ########################################
316
317     def testbed_name (self): return "openstack"
318
319     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
320     def aggregate_version (self):
321         version_manager = VersionManager()
322         ad_rspec_versions = []
323         request_rspec_versions = []
324         for rspec_version in version_manager.versions:
325             if rspec_version.content_type in ['*', 'ad']:
326                 ad_rspec_versions.append(rspec_version.to_dict())
327             if rspec_version.content_type in ['*', 'request']:
328                 request_rspec_versions.append(rspec_version.to_dict()) 
329         return {
330             'testbed':self.testbed_name(),
331             'geni_request_rspec_versions': request_rspec_versions,
332             'geni_ad_rspec_versions': ad_rspec_versions,
333             }
334
335     def list_slices (self, creds, options):
336         # look in cache first
337         if self.cache:
338             slices = self.cache.get('slices')
339             if slices:
340                 logger.debug("OpenStackDriver.list_slices returns from cache")
341                 return slices
342     
343         # get data from db
344         instance_urns = []
345         instances = self.shell.nova_manager.servers.findall()
346         for instance in instances:
347             if instance.name not in instance_urns:
348                 instance_urns.append(OSXrn(instance.name, type='slice').urn)
349
350         # cache the result
351         if self.cache:
352             logger.debug ("OpenStackDriver.list_slices stores value in cache")
353             self.cache.add('slices', instance_urns) 
354     
355         return instance_urns
356         
357     # first 2 args are None in case of resource discovery
358     def list_resources (self, slice_urn, slice_hrn, creds, options):
359         cached_requested = options.get('cached', True) 
360     
361         version_manager = VersionManager()
362         # get the rspec's return format from options
363         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
364         version_string = "rspec_%s" % (rspec_version)
365     
366         #panos adding the info option to the caching key (can be improved)
367         if options.get('info'):
368             version_string = version_string + "_"+options.get('info', 'default')
369     
370         # look in cache first
371         if cached_requested and self.cache and not slice_hrn:
372             rspec = self.cache.get(version_string)
373             if rspec:
374                 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
375                 return rspec 
376     
377         #panos: passing user-defined options
378         #print "manager options = ",options
379         aggregate = OSAggregate(self)
380         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
381                                      options=options)
382     
383         # cache the result
384         if self.cache and not slice_hrn:
385             logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
386             self.cache.add(version_string, rspec)
387     
388         return rspec
389     
390     def sliver_status (self, slice_urn, slice_hrn):
391         # update nova connection
392         tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name()
393         self.shell.nova_manager.connect(tenant=tenant_name)
394
395         # find out where this slice is currently running
396         project_name = hrn_to_os_slicename(slice_hrn)
397         instances = self.shell.nova_manager.servers.findall(name=project_name)
398         if len(instances) == 0:
399             raise SliverDoesNotExist("You have not allocated any slivers here") 
400         
401         result = {}
402         top_level_status = 'ready'
403         result['geni_urn'] = slice_urn
404         result['plos_login'] = 'root'
405         # do we need real dates here? 
406         result['plos_expires'] = None
407         result['geni_expires'] = None
408         
409         resources = []
410         for instance in instances:
411             res = {}
412             # instances are accessed by ip, not hostname. We need to report the ip
413             # somewhere so users know where to ssh to.     
414             res['geni_expires'] = None
415             #res['plos_hostname'] = instance.hostname
416             res['plos_created_at'] = datetime_to_string(utcparse(instance.created))    
417             res['plos_boot_state'] = instance.status
418             res['plos_sliver_type'] = self.shell.nova_manager.flavors.find(id=instance.flavor['id']).name 
419             res['geni_urn'] =  Xrn(slice_urn, type='slice', id=instance.id).get_urn()
420
421             if instance.status.lower() == 'active':
422                 res['boot_state'] = 'ready'
423                 res['geni_status'] = 'ready'
424             elif instance.status.lower() == 'error':
425                 res['boot_state'] = 'failed'
426                 res['geni_status'] = 'failed'
427                 top_level_status = 'failed'
428             else:
429                 res['boot_state'] = 'notready'  
430                 res['geni_status'] = 'notready'
431                 top_level_status = 'notready'
432             resources.append(res)
433             
434         result['geni_status'] = top_level_status
435         result['geni_resources'] = resources
436         return result
437
438     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
439    
440         aggregate = OSAggregate(self)
441
442         # assume first user is the caller and use their context
443         # for the ec2/euca api connection. Also, use the first users
444         # key as the project key.
445         key_name = None
446         if len(users) > 1:
447             key_name = aggregate.create_instance_key(slice_hrn, users[0])
448
449         # collect public keys
450         pubkeys = []
451         for user in users:
452             pubkeys.extend(user['keys'])
453            
454         rspec = RSpec(rspec_string)
455         instance_name = hrn_to_os_slicename(slice_hrn)
456         tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name()
457         instances = aggregate.run_instances(instance_name, tenant_name, rspec_string, key_name, pubkeys)
458         rspec_nodes = []
459         for instance in instances:
460             rspec_nodes.append(aggregate.instance_to_rspec_node(slice_urn, instance))    
461         version_manager = VersionManager()
462         manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
463         manifest_rspec = RSpec(version=manifest_version, user_options=options)
464         manifest_rspec.version.add_nodes(rspec_nodes) 
465          
466         return manifest_rspec.toxml()
467
468     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
469         aggregate = OSAggregate(self)
470         tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name()
471         project_name = hrn_to_os_slicename(slice_hrn)
472         return aggregate.delete_instances(project_name, tenant_name)   
473
474     def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
475         name = hrn_to_os_slicename(slice_hrn)
476         tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name()
477         aggregate = OSAggregate(self)
478         return aggregate.update_instances(name)
479     
480     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
481         return True
482
483     def start_slice (self, slice_urn, slice_hrn, creds):
484         return 1
485
486     def stop_slice (self, slice_urn, slice_hrn, creds):
487         tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name()
488         name = OSXrn(xrn=slice_urn).name
489         aggregate = OSAggregate(self)
490         return aggregate.stop_instances(name, tenant_name) 
491
492     def reset_slice (self, slice_urn, slice_hrn, creds):
493         raise SfaNotImplemented ("reset_slice not available at this interface")
494     
495     # xxx this code is quite old and has not run for ages
496     # it is obviously totally broken and needs a rewrite
497     def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
498         raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
499 # please keep this code for future reference
500 #        slices = PlSlices(self)
501 #        peer = slices.get_peer(slice_hrn)
502 #        sfa_peer = slices.get_sfa_peer(slice_hrn)
503 #    
504 #        # get the slice record
505 #        credential = api.getCredential()
506 #        interface = api.registries[api.hrn]
507 #        registry = api.server_proxy(interface, credential)
508 #        records = registry.Resolve(xrn, credential)
509 #    
510 #        # make sure we get a local slice record
511 #        record = None
512 #        for tmp_record in records:
513 #            if tmp_record['type'] == 'slice' and \
514 #               not tmp_record['peer_authority']:
515 #    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
516 #                slice_record = SliceRecord(dict=tmp_record)
517 #        if not record:
518 #            raise RecordNotFound(slice_hrn)
519 #        
520 #        # similar to CreateSliver, we must verify that the required records exist
521 #        # at this aggregate before we can issue a ticket
522 #        # parse rspec
523 #        rspec = RSpec(rspec_string)
524 #        requested_attributes = rspec.version.get_slice_attributes()
525 #    
526 #        # ensure site record exists
527 #        site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
528 #        # ensure slice record exists
529 #        slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
530 #        # ensure person records exists
531 #    # xxx users is undefined in this context
532 #        persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
533 #        # ensure slice attributes exists
534 #        slices.verify_slice_attributes(slice, requested_attributes)
535 #        
536 #        # get sliver info
537 #        slivers = slices.get_slivers(slice_hrn)
538 #    
539 #        if not slivers:
540 #            raise SliverDoesNotExist(slice_hrn)
541 #    
542 #        # get initscripts
543 #        initscripts = []
544 #        data = {
545 #            'timestamp': int(time.time()),
546 #            'initscripts': initscripts,
547 #            'slivers': slivers
548 #        }
549 #    
550 #        # create the ticket
551 #        object_gid = record.get_gid_object()
552 #        new_ticket = SfaTicket(subject = object_gid.get_subject())
553 #        new_ticket.set_gid_caller(api.auth.client_gid)
554 #        new_ticket.set_gid_object(object_gid)
555 #        new_ticket.set_issuer(key=api.key, subject=self.hrn)
556 #        new_ticket.set_pubkey(object_gid.get_pubkey())
557 #        new_ticket.set_attributes(data)
558 #        new_ticket.set_rspec(rspec)
559 #        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
560 #        new_ticket.encode()
561 #        new_ticket.sign()
562 #    
563 #        return new_ticket.save_to_string(save_parents=True)