fix delete_sliver. cleanup
[sfa.git] / sfa / openstack / nova_driver.py
1 import time
2 import datetime
3
4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5     RecordNotFound, SfaNotImplemented, SliverDoesNotExist
6
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_sliver_id
11 from sfa.util.cache import Cache
12 from sfa.trust.credential import Credential
13 # used to be used in get_ticket
14 #from sfa.trust.sfaticket import SfaTicket
15
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
18
19 # the driver interface, mostly provides default behaviours
20 from sfa.managers.driver import Driver
21 from sfa.openstack.nova_shell import NovaShell
22 from sfa.openstack.euca_shell import EucaShell
23 from sfa.openstack.osaggregate import OSAggregate
24 from sfa.plc.plslices import PlSlices
25 from sfa.util.osxrn import OSXrn
26
27
28 def list_to_dict(recs, key):
29     """
30     convert a list of dictionaries into a dictionary keyed on the 
31     specified dictionary key 
32     """
33     return dict ( [ (rec[key],rec) for rec in recs ] )
34
35 #
36 # PlShell is just an xmlrpc serverproxy where methods
37 # can be sent as-is; it takes care of authentication
38 # from the global config
39
40 class NovaDriver (Driver):
41
42     # the cache instance is a class member so it survives across incoming requests
43     cache = None
44
45     def __init__ (self, config):
46         Driver.__init__ (self, config)
47         self.shell = NovaShell (config)
48         self.euca_shell = EucaShell(config)
49         self.cache=None
50         if config.SFA_AGGREGATE_CACHING:
51             if NovaDriver.cache is None:
52                 NovaDriver.cache = Cache()
53             self.cache = NovaDriver.cache
54  
55     ########################################
56     ########## registry oriented
57     ########################################
58
59     ########## disabled users 
60     def is_enabled (self, record):
61         # all records are enabled
62         return True
63
64     def augment_records_with_testbed_info (self, sfa_records):
65         return self.fill_record_info (sfa_records)
66
67     ########## 
68     def register (self, sfa_record, hrn, pub_key):
69         type = sfa_record['type']
70         pl_record = self.sfa_fields_to_pl_fields(type, hrn, sfa_record)
71
72         if type == 'slice':
73             acceptable_fields=['url', 'instantiation', 'name', 'description']
74             # add slice description, name, researchers, PI 
75             pass
76
77         elif type == 'user':
78             # add person roles, projects and keys
79             pass
80         return pointer
81         
82     ##########
83     # xxx actually old_sfa_record comes filled with plc stuff as well in the original code
84     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
85         pointer = old_sfa_record['pointer']
86         type = old_sfa_record['type']
87
88         # new_key implemented for users only
89         if new_key and type not in [ 'user' ]:
90             raise UnknownSfaType(type)
91
92         elif type == "slice":
93             # can update description, researchers and PI
94             pass 
95         elif type == "user":
96             # can update  slices, keys and roles
97             pass
98         return True
99         
100
101     ##########
102     def remove (self, sfa_record):
103         type=sfa_record['type']
104         name = Xrn(sfa_record['hrn']).get_leaf()     
105         if type == 'user':
106             if self.shell.auth_manager.get_user(name):
107                 self.shell.auth_manager.delete_user(name)
108         elif type == 'slice':
109             if self.shell.auth_manager.get_project(name):
110                 self.shell.auth_manager.delete_project(name)
111         return True
112
113
114     ####################
115     def fill_record_info(self, records):
116         """
117         Given a (list of) SFA record, fill in the PLC specific 
118         and SFA specific fields in the record. 
119         """
120         if not isinstance(records, list):
121             records = [records]
122
123         for record in records:
124             name = Xrn(record['hrn']).get_leaf()
125             os_record = None
126             if record['type'] == 'user':
127                 os_record = self.shell.auth_manager.get_user(name)
128                 projects = self.shell.db.project_get_by_user(name)
129                 record['slices'] = [self.hrn + "." + proj.name for \
130                                     proj in projects]
131                 record['roles'] = self.shell.db.user_get_roles(name)
132                 keys = self.shell.db.key_pair_get_all_by_user(name)
133                 record['keys'] = [key.public_key for key in keys]     
134             elif record['type'] == 'slice': 
135                 os_record = self.shell.auth_manager.get_project(name)
136                 record['description'] = os_record.description
137                 record['PI'] = [self.hrn + "." + os_record.project_manager.name]
138                 record['geni_creator'] = record['PI'] 
139                 record['researcher'] = [self.hrn + "." + user for \
140                                          user in os_record.member_ids]
141             else:
142                 continue
143             record['geni_urn'] = hrn_to_urn(record['hrn'], record['type'])
144             record['geni_certificate'] = record['gid'] 
145             record['name'] = os_record.name
146             #if os_record.created_at is not None:    
147             #    record['date_created'] = datetime_to_string(utcparse(os_record.created_at))
148             #if os_record.updated_at is not None:
149             #    record['last_updated'] = datetime_to_string(utcparse(os_record.updated_at))
150  
151         return records
152
153
154     ####################
155     # plcapi works by changes, compute what needs to be added/deleted
156     def update_relation (self, subject_type, target_type, subject_id, target_ids):
157         # hard-wire the code for slice/user for now, could be smarter if needed
158         if subject_type =='slice' and target_type == 'user':
159             subject=self.shell.project_get(subject_id)[0]
160             current_target_ids = [user.name for user in subject.members]
161             add_target_ids = list ( set (target_ids).difference(current_target_ids))
162             del_target_ids = list ( set (current_target_ids).difference(target_ids))
163             logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
164             for target_id in add_target_ids:
165                 self.shell.project_add_member(target_id,subject_id)
166                 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
167             for target_id in del_target_ids:
168                 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
169                 self.shell.project_remove_member(target_id, subject_id)
170         else:
171             logger.info('unexpected relation to maintain, %s -> %s'%(subject_type,target_type))
172
173         
174     ########################################
175     ########## aggregate oriented
176     ########################################
177
178     def testbed_name (self): return "openstack"
179
180     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
181     def aggregate_version (self):
182         version_manager = VersionManager()
183         ad_rspec_versions = []
184         request_rspec_versions = []
185         for rspec_version in version_manager.versions:
186             if rspec_version.content_type in ['*', 'ad']:
187                 ad_rspec_versions.append(rspec_version.to_dict())
188             if rspec_version.content_type in ['*', 'request']:
189                 request_rspec_versions.append(rspec_version.to_dict()) 
190         return {
191             'testbed':self.testbed_name(),
192             'geni_request_rspec_versions': request_rspec_versions,
193             'geni_ad_rspec_versions': ad_rspec_versions,
194             }
195
196     def list_slices (self, creds, options):
197         # look in cache first
198         if self.cache:
199             slices = self.cache.get('slices')
200             if slices:
201                 logger.debug("OpenStackDriver.list_slices returns from cache")
202                 return slices
203     
204         # get data from db
205         projs = self.shell.auth_manager.get_projects()
206         slice_urns = [OSXrn(proj.name, 'slice').urn for proj in projs] 
207     
208         # cache the result
209         if self.cache:
210             logger.debug ("OpenStackDriver.list_slices stores value in cache")
211             self.cache.add('slices', slice_urns) 
212     
213         return slice_urns
214         
215     # first 2 args are None in case of resource discovery
216     def list_resources (self, slice_urn, slice_hrn, creds, options):
217         cached_requested = options.get('cached', True) 
218     
219         version_manager = VersionManager()
220         # get the rspec's return format from options
221         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
222         version_string = "rspec_%s" % (rspec_version)
223     
224         #panos adding the info option to the caching key (can be improved)
225         if options.get('info'):
226             version_string = version_string + "_"+options.get('info', 'default')
227     
228         # look in cache first
229         if cached_requested and self.cache and not slice_hrn:
230             rspec = self.cache.get(version_string)
231             if rspec:
232                 logger.debug("OpenStackDriver.ListResources: returning cached advertisement")
233                 return rspec 
234     
235         #panos: passing user-defined options
236         #print "manager options = ",options
237         aggregate = OSAggregate(self)
238         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
239                                      options=options)
240     
241         # cache the result
242         if self.cache and not slice_hrn:
243             logger.debug("OpenStackDriver.ListResources: stores advertisement in cache")
244             self.cache.add(version_string, rspec)
245     
246         return rspec
247     
248     def sliver_status (self, slice_urn, slice_hrn):
249         # find out where this slice is currently running
250         project_name = Xrn(slice_urn).get_leaf()
251         project = self.shell.auth_manager.get_project(project_name)
252         instances = self.shell.db.instance_get_all_by_project(project_name)
253         if len(instances) == 0:
254             raise SliverDoesNotExist("You have not allocated any slivers here") 
255         
256         result = {}
257         top_level_status = 'unknown'
258         if instances:
259             top_level_status = 'ready'
260         result['geni_urn'] = slice_urn
261         result['plos_login'] = 'root' 
262         result['plos_expires'] = None
263         
264         resources = []
265         for instance in instances:
266             res = {}
267             # instances are accessed by ip, not hostname. We need to report the ip
268             # somewhere so users know where to ssh to.     
269             res['plos_hostname'] = instance.hostname
270             res['plos_created_at'] = datetime_to_string(utcparse(instance.created_at))    
271             res['plos_boot_state'] = instance.vm_state
272             res['plos_sliver_type'] = instance.instance_type.name 
273             sliver_id =  Xrn(slice_urn).get_sliver_id(instance.project_id, \
274                                                       instance.hostname, instance.id)
275             res['geni_urn'] = sliver_id
276
277             if instance.vm_state == 'running':
278                 res['boot_state'] = 'ready';
279             else:
280                 res['boot_state'] = 'unknown'  
281             resources.append(res)
282             
283         result['geni_status'] = top_level_status
284         result['geni_resources'] = resources
285         return result
286
287     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
288
289         project_name = get_leaf(slice_hrn)
290         aggregate = OSAggregate(self)
291         # parse rspec
292         rspec = RSpec(rspec_string)
293        
294         # ensure project and users exist in local db
295         aggregate.create_project(project_name, users, options=options)
296      
297         # collect publick keys
298         pubkeys = []
299         project_key = None
300         for user in users:
301             pubkeys.extend(user['keys'])
302             # assume first user is the caller and use their context
303             # for the ec2/euca api connection. Also, use the first users
304             # key as the project key.   
305             if not project_key:
306                 username = Xrn(user['urn']).get_leaf()
307                 user_keys = self.shell.db.key_pair_get_all_by_user(username)
308                 if user_keys:
309                     project_key = user_keys[0].name
310                      
311         # ensure person records exists
312         self.euca_shell.init_context(project_name)  
313         aggregate.run_instances(project_name, rspec_string, project_key, pubkeys)    
314    
315         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
316
317     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
318         # we need to do this using the context of one of the slice users
319         project_name = Xrn(slice_urn).get_leaf()
320         self.euca_shell.init_context(project_name) 
321         name = OSXrn(xrn=slice_urn).name
322         aggregate = OSAggregate(self)
323         return aggregate.delete_instances(name)   
324
325     def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options):
326         name = OSXrn(xrn=slice_urn).name
327         aggregate = OSAggregate(self)
328         return aggregate.update_instances(name)
329     
330     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
331         return True
332
333     def start_slice (self, slice_urn, slice_hrn, creds):
334         return 1
335
336     def stop_slice (self, slice_urn, slice_hrn, creds):
337         name = OSXrn(xrn=slice_urn).name
338         aggregate = OSAggregate(self)
339         return aggregate.stop_instances(name) 
340
341     def reset_slice (self, slice_urn, slice_hrn, creds):
342         raise SfaNotImplemented ("reset_slice not available at this interface")
343     
344     # xxx this code is quite old and has not run for ages
345     # it is obviously totally broken and needs a rewrite
346     def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
347         raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite"
348 # please keep this code for future reference
349 #        slices = PlSlices(self)
350 #        peer = slices.get_peer(slice_hrn)
351 #        sfa_peer = slices.get_sfa_peer(slice_hrn)
352 #    
353 #        # get the slice record
354 #        credential = api.getCredential()
355 #        interface = api.registries[api.hrn]
356 #        registry = api.server_proxy(interface, credential)
357 #        records = registry.Resolve(xrn, credential)
358 #    
359 #        # make sure we get a local slice record
360 #        record = None
361 #        for tmp_record in records:
362 #            if tmp_record['type'] == 'slice' and \
363 #               not tmp_record['peer_authority']:
364 #    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
365 #                slice_record = SliceRecord(dict=tmp_record)
366 #        if not record:
367 #            raise RecordNotFound(slice_hrn)
368 #        
369 #        # similar to CreateSliver, we must verify that the required records exist
370 #        # at this aggregate before we can issue a ticket
371 #        # parse rspec
372 #        rspec = RSpec(rspec_string)
373 #        requested_attributes = rspec.version.get_slice_attributes()
374 #    
375 #        # ensure site record exists
376 #        site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
377 #        # ensure slice record exists
378 #        slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
379 #        # ensure person records exists
380 #    # xxx users is undefined in this context
381 #        persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
382 #        # ensure slice attributes exists
383 #        slices.verify_slice_attributes(slice, requested_attributes)
384 #        
385 #        # get sliver info
386 #        slivers = slices.get_slivers(slice_hrn)
387 #    
388 #        if not slivers:
389 #            raise SliverDoesNotExist(slice_hrn)
390 #    
391 #        # get initscripts
392 #        initscripts = []
393 #        data = {
394 #            'timestamp': int(time.time()),
395 #            'initscripts': initscripts,
396 #            'slivers': slivers
397 #        }
398 #    
399 #        # create the ticket
400 #        object_gid = record.get_gid_object()
401 #        new_ticket = SfaTicket(subject = object_gid.get_subject())
402 #        new_ticket.set_gid_caller(api.auth.client_gid)
403 #        new_ticket.set_gid_object(object_gid)
404 #        new_ticket.set_issuer(key=api.key, subject=self.hrn)
405 #        new_ticket.set_pubkey(object_gid.get_pubkey())
406 #        new_ticket.set_attributes(data)
407 #        new_ticket.set_rspec(rspec)
408 #        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
409 #        new_ticket.encode()
410 #        new_ticket.sign()
411 #    
412 #        return new_ticket.save_to_string(save_parents=True)