fix import
[sfa.git] / sfa / openstack / nova_driver.py
1 import time
2 import datetime
3
4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5     RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \
6     SfaInvalidArgument
7
8 from sfa.util.sfalogging import logger
9 from sfa.util.defaultdict import defaultdict
10 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
11 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
12 from sfa.planetlab.plxrn import PlXrn
13 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename
14 from sfa.util.cache import Cache
15 from sfa.trust.credential import Credential
16 # used to be used in get_ticket
17 #from sfa.trust.sfaticket import SfaTicket
18
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec
21
22 # the driver interface, mostly provides default behaviours
23 from sfa.managers.driver import Driver
24 from sfa.openstack.shell import Shell
25 from sfa.openstack.osaggregate import OSAggregate
26 from sfa.planetlab.plslices import PlSlices
27 from sfa.util.osxrn import OSXrn
28
29
30 def list_to_dict(recs, key):
31     """
32     convert a list of dictionaries into a dictionary keyed on the 
33     specified dictionary key 
34     """
35     return dict ( [ (rec[key],rec) for rec in recs ] )
36
37 #
38 # PlShell is just an xmlrpc serverproxy where methods
39 # can be sent as-is; it takes care of authentication
40 # from the global config
41
42 class NovaDriver(Driver):
43
44     # the cache instance is a class member so it survives across incoming requests
45     cache = None
46
47     def __init__ (self, config):
48         Driver.__init__(self, config)
49         self.shell = Shell(config)
50         self.cache=None
51         if config.SFA_AGGREGATE_CACHING:
52             if NovaDriver.cache is None:
53                 NovaDriver.cache = Cache()
54             self.cache = NovaDriver.cache
55  
56     ########################################
57     ########## registry oriented
58     ########################################
59
60     ########## disabled users 
61     def is_enabled (self, record):
62         # all records are enabled
63         return True
64
65     def augment_records_with_testbed_info (self, sfa_records):
66         return self.fill_record_info (sfa_records)
67
68     ########## 
69     def register (self, sfa_record, hrn, pub_key):
70         type = sfa_record['type']
71         
72         #pl_record = self.sfa_fields_to_pl_fields(type     dd , hrn, sfa_record)
73            
74         if type == 'slice':
75             # add slice description, name, researchers, PI 
76             name = hrn_to_os_slicename(hrn)
77             researchers = sfa_record.get('researchers', [])
78             pis = sfa_record.get('pis', [])
79             project_manager = None
80             description = sfa_record.get('description', None)
81             if pis:
82                 project_manager = Xrn(pis[0], 'user').get_leaf()
83             elif researchers:
84                 project_manager = Xrn(researchers[0], 'user').get_leaf()
85             if not project_manager:
86                 err_string = "Cannot create a project without a project manager. " + \
87                              "Please specify at least one PI or researcher for project: " + \
88                              name    
89                 raise SfaInvalidArgument(err_string)
90
91             users = [Xrn(user, 'user').get_leaf() for user in \
92                      pis + researchers]
93             self.shell.auth_manager.create_project(name, project_manager, description, users)
94
95         elif type == 'user':
96             # add person roles, projects and keys
97             name = Xrn(hrn).get_leaf()
98             self.shell.auth_manager.create_user(name)
99             projects = sfa_records.get('slices', [])
100             for project in projects:
101                 project_name = Xrn(project).get_leaf()
102                 self.shell.auth_manager.add_to_project(name, project_name)
103             keys = sfa_records.get('keys', [])
104             for key in keys:
105                 key_dict = {
106                     'user_id': name,
107                     'name': name,
108                     'public': key,
109                 }
110                 self.shell.db.key_pair_create(key_dict)       
111                   
112         return name
113         
114     ##########
115     # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
116     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
117         type = new_sfa_record['type'] 
118         
119         # new_key implemented for users only
120         if new_key and type not in [ 'user' ]:
121             raise UnknownSfaType(type)
122
123         elif type == "slice":
124             # can update project manager and description
125             name = hrn_to_os_slicename(hrn)
126             researchers = sfa_record.get('researchers', [])
127             pis = sfa_record.get('pis', [])
128             project_manager = None
129             description = sfa_record.get('description', None)
130             if pis:
131                 project_manager = Xrn(pis[0], 'user').get_leaf()
132             elif researchers:
133                 project_manager = Xrn(researchers[0], 'user').get_leaf()
134             self.shell.auth_manager.modify_project(name, project_manager, description)
135
136         elif type == "user":
137             # can techinally update access_key and secret_key,
138             # but that is not in our scope, so we do nothing.  
139             pass
140         return True
141         
142
143     ##########
144     def remove (self, sfa_record):
145         type=sfa_record['type']
146         if type == 'user':
147             name = Xrn(sfa_record['hrn']).get_leaf()     
148             if self.shell.auth_manager.get_user(name):
149                 self.shell.auth_manager.delete_user(name)
150         elif type == 'slice':
151             name = hrn_to_os_slicename(sfa_record['hrn'])     
152             if self.shell.auth_manager.get_project(name):
153                 self.shell.auth_manager.delete_project(name)
154         return True
155
156
157     ####################
158     def fill_record_info(self, records):
159         """
160         Given a (list of) SFA record, fill in the PLC specific 
161         and SFA specific fields in the record. 
162         """
163         if not isinstance(records, list):
164             records = [records]
165
166         for record in records:
167             os_record = None
168             if record['type'] == 'user':
169                 name = Xrn(record['hrn']).get_leaf()
170                 os_record = self.shell.auth_manager.get_user(name)
171                 projects = self.shell.db.project_get_by_user(name)
172                 record['slices'] = [self.hrn + "." + proj.name for \
173                                     proj in projects]
174                 record['roles'] = self.shell.db.user_get_roles(name)
175                 keys = self.shell.db.key_pair_get_all_by_user(name)
176                 record['keys'] = [key.public_key for key in keys]     
177             elif record['type'] == 'slice':
178                 name = hrn_to_os_slicename(record['hrn']) 
179                 os_record = self.shell.auth_manager.get_project(name)
180                 record['description'] = os_record.description
181                 record['PI'] = [self.hrn + "." + os_record.project_manager.name]
182                 record['geni_creator'] = record['PI'] 
183                 record['researcher'] = [self.hrn + "." + user for \
184                                          user in os_record.member_ids]
185             else:
186                 continue
187             record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
188             record['geni_certificate'] = record['gid'] 
189             record['name'] = os_record.name
190             #if os_record.created_at is not None:    
191             #    record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
192             #if os_record.updated_at is not None:
193             #    record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
194  
195         return records
196
197
198     ####################
199     # plcapi works by changes, compute what needs to be added/deleted
200     def update_relation (self, subject_type, target_type, subject_id, target_ids):
201         # hard-wire the code for slice/user for now, could be smarter if needed
202         if subject_type =='slice' and target_type == 'user':
203             subject=self.shell.project_get(subject_id)[0]
204             current_target_ids = [user.name for user in subject.members]
205             add_target_ids = list ( set (target_ids).difference(current_target_ids))
206             del_target_ids = list ( set (current_target_ids).difference(target_ids))
207             logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
208             for target_id in add_target_ids:
209                 self.shell.project_add_member(target_id,subject_id)
210                 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
211             for target_id in del_target_ids:
212                 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
213                 self.shell.project_remove_member(target_id, subject_id)
214         else:
215             logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
216
217         
218     ########################################
219     ########## aggregate oriented
220     ########################################
221
222     def testbed_name (self): return "openstack"
223
224     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
225     def aggregate_version (self):
226         version_manager = VersionManager()
227         ad_rspec_versions = []
228         request_rspec_versions = []
229         for rspec_version in version_manager.versions:
230             if rspec_version.content_type in ['*', 'ad']:
231                 ad_rspec_versions.append(rspec_version.to_dict())
232             if rspec_version.content_type in ['*', 'request']:
233                 request_rspec_versions.append(rspec_version.to_dict()) 
234         return {
235             'testbed':self.testbed_name(),
236             'geni_request_rspec_versions': request_rspec_versions,
237             'geni_ad_rspec_versions': ad_rspec_versions,
238             }
239
240     def list_slices (self, creds, options):
241         # look in cache first
242         if self.cache:
243             slices = self.cache.get('slices')
244             if slices:
245                 logger.debug("OpenStackDriver.list_slices returns from cache")
246                 return slices
247     
248         # get data from db
249         projs = self.shell.auth_manager.get_projects()
250         slice_urns = [OSXrn(proj.name, 'slice').urn for proj in projs] 
251     
252         # cache the result
253         if self.cache:
254             logger.debug ("OpenStackDriver.list_slices stores value in cache")
255             self.cache.add('slices', slice_urns) 
256     
257         return slice_urns
258         
259     # first 2 args are None in case of resource discovery
260     def list_resources (self, slice_urn, slice_hrn, creds, options):
261         cached_requested = options.get('cached', True) 
262     
263         version_manager = VersionManager()
264         # get the rspec's return format from options
265         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
266         version_string = "rspec_%s" % (rspec_version)
267     
268         #panos adding the info option to the caching key (can be improved)
269         if options.get('info'):
270             version_string = version_string + "_"+options.get('info', 'default')
271     
272         # look in cache first
273         if cached_requested and self.cache and not slice_hrn:
274             rspec = self.cache.get(version_string)
275             if rspec:
276                 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
277                 return rspec 
278     
279         #panos: passing user-defined options
280         #print "manager options = ",options
281         aggregate = OSAggregate(self)
282         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
283                                      options=options)
284     
285         # cache the result
286         if self.cache and not slice_hrn:
287             logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
288             self.cache.add(version_string, rspec)
289     
290         return rspec
291     
292     def sliver_status (self, slice_urn, slice_hrn):
293         # find out where this slice is currently running
294         project_name = hrn_to_os_slicename(slice_hrn)
295         project = self.shell.auth_manager.get_project(project_name)
296         instances = self.shell.db.instance_get_all_by_project(project_name)
297         if len(instances) == 0:
298             raise SliverDoesNotExist("You have not allocated any slivers here") 
299         
300         result = {}
301         top_level_status = 'unknown'
302         if instances:
303             top_level_status = 'ready'
304         result['geni_urn'] = slice_urn
305         result['plos_login'] = 'root' 
306         result['plos_expires'] = None
307         
308         resources = []
309         for instance in instances:
310             res = {}
311             # instances are accessed by ip, not hostname. We need to report the ip
312             # somewhere so users know where to ssh to.     
313             res['plos_hostname'] = instance.hostname
314             res['plos_created_at'] = datetime_to_string(utcparse(instance.created_at))    
315             res['plos_boot_state'] = instance.vm_state
316             res['plos_sliver_type'] = instance.instance_type.name 
317             sliver_id =  Xrn(slice_urn).get_sliver_id(instance.project_id, \
318                                                       instance.hostname, instance.id)
319             res['geni_urn'] = sliver_id
320
321             if instance.vm_state == 'running':
322                 res['boot_state'] = 'ready';
323             else:
324                 res['boot_state'] = 'unknown'  
325             resources.append(res)
326             
327         result['geni_status'] = top_level_status
328         result['geni_resources'] = resources
329         return result
330
331     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
332
333         aggregate = OSAggregate(self)
334         rspec = RSpec(rspec_string)
335         instance_name = hrn_to_os_slicename(slice_hrn)
336        
337         # assume first user is the caller and use their context
338         # for the ec2/euca api connection. Also, use the first users
339         # key as the project key.
340         key_name = None
341         if len(users) > 1:
342             key_name = aggregate.create_instance_key(slice_hrn, users[0])
343
344         # collect public keys
345         pubkeys = []
346         for user in users:
347             pubkeys.extend(user['keys'])
348            
349         aggregate.run_instances(instance_name, rspec_string, key_name, pubkeys)    
350    
351         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
352
353     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
354         aggregate = OSAggregate(self)
355         project_name = hrn_to_os_slicename(slice_hrn)
356         return aggregate.delete_instances(project_name)   
357
358     def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
359         name = hrn_to_os_slicename(slice_hrn)
360         aggregate = OSAggregate(self)
361         return aggregate.update_instances(name)
362     
363     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
364         return True
365
366     def start_slice (self, slice_urn, slice_hrn, creds):
367         return 1
368
369     def stop_slice (self, slice_urn, slice_hrn, creds):
370         name = OSXrn(xrn=slice_urn).name
371         aggregate = OSAggregate(self)
372         return aggregate.stop_instances(name) 
373
374     def reset_slice (self, slice_urn, slice_hrn, creds):
375         raise SfaNotImplemented ("reset_slice not available at this interface")
376     
377     # xxx this code is quite old and has not run for ages
378     # it is obviously totally broken and needs a rewrite
379     def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
380         raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
381 # please keep this code for future reference
382 #        slices = PlSlices(self)
383 #        peer = slices.get_peer(slice_hrn)
384 #        sfa_peer = slices.get_sfa_peer(slice_hrn)
385 #    
386 #        # get the slice record
387 #        credential = api.getCredential()
388 #        interface = api.registries[api.hrn]
389 #        registry = api.server_proxy(interface, credential)
390 #        records = registry.Resolve(xrn, credential)
391 #    
392 #        # make sure we get a local slice record
393 #        record = None
394 #        for tmp_record in records:
395 #            if tmp_record['type'] == 'slice' and \
396 #               not tmp_record['peer_authority']:
397 #    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
398 #                slice_record = SliceRecord(dict=tmp_record)
399 #        if not record:
400 #            raise RecordNotFound(slice_hrn)
401 #        
402 #        # similar to CreateSliver, we must verify that the required records exist
403 #        # at this aggregate before we can issue a ticket
404 #        # parse rspec
405 #        rspec = RSpec(rspec_string)
406 #        requested_attributes = rspec.version.get_slice_attributes()
407 #    
408 #        # ensure site record exists
409 #        site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
410 #        # ensure slice record exists
411 #        slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
412 #        # ensure person records exists
413 #    # xxx users is undefined in this context
414 #        persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
415 #        # ensure slice attributes exists
416 #        slices.verify_slice_attributes(slice, requested_attributes)
417 #        
418 #        # get sliver info
419 #        slivers = slices.get_slivers(slice_hrn)
420 #    
421 #        if not slivers:
422 #            raise SliverDoesNotExist(slice_hrn)
423 #    
424 #        # get initscripts
425 #        initscripts = []
426 #        data = {
427 #            'timestamp': int(time.time()),
428 #            'initscripts': initscripts,
429 #            'slivers': slivers
430 #        }
431 #    
432 #        # create the ticket
433 #        object_gid = record.get_gid_object()
434 #        new_ticket = SfaTicket(subject = object_gid.get_subject())
435 #        new_ticket.set_gid_caller(api.auth.client_gid)
436 #        new_ticket.set_gid_object(object_gid)
437 #        new_ticket.set_issuer(key=api.key, subject=self.hrn)
438 #        new_ticket.set_pubkey(object_gid.get_pubkey())
439 #        new_ticket.set_attributes(data)
440 #        new_ticket.set_rspec(rspec)
441 #        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
442 #        new_ticket.encode()
443 #        new_ticket.sign()
444 #    
445 #        return new_ticket.save_to_string(save_parents=True)