return the correct error codes
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
17
18 from sfa.server.sfaapi import SfaApi
19
20 import sfa.plc.peers as peers
21 from sfa.plc.aggregate import Aggregate
22 from sfa.plc.slices import Slices
23
24 class AggregateManager:
25
26     def __init__ (self):
27         # xxx Thierry : caching at the aggregate level sounds wrong...
28         self.caching=True
29         #self.caching=False
30     
31     def GetVersion(self, api, options={}):
32     
33         version_manager = VersionManager()
34         ad_rspec_versions = []
35         request_rspec_versions = []
36         for rspec_version in version_manager.versions:
37             if rspec_version.content_type in ['*', 'ad']:
38                 ad_rspec_versions.append(rspec_version.to_dict())
39             if rspec_version.content_type in ['*', 'request']:
40                 request_rspec_versions.append(rspec_version.to_dict()) 
41         xrn=Xrn(api.hrn)
42         version_more = {'interface':'aggregate',
43                         'sfa': 2,
44                         'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
45                         'testbed':'myplc',
46                         'hrn':xrn.get_hrn(),
47                         'geni_request_rspec_versions': request_rspec_versions,
48                         'geni_ad_rspec_versions': ad_rspec_versions,
49                         }
50         return version_core(version_more)
51     
52     def _get_registry_objects(self, slice_xrn, creds, users):
53         """
54     
55         """
56         hrn, _ = urn_to_hrn(slice_xrn)
57     
58         hrn_auth = get_authority(hrn)
59     
60         # Build up objects that an SFA registry would return if SFA
61         # could contact the slice's registry directly
62         reg_objects = None
63     
64         if users:
65             # dont allow special characters in the site login base
66             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
67             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
68             slicename = hrn_to_pl_slicename(hrn)
69             login_base = slicename.split('_')[0]
70             reg_objects = {}
71             site = {}
72             site['site_id'] = 0
73             site['name'] = 'geni.%s' % login_base 
74             site['enabled'] = True
75             site['max_slices'] = 100
76     
77             # Note:
78             # Is it okay if this login base is the same as one already at this myplc site?
79             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
80             site['login_base'] = login_base
81             site['abbreviated_name'] = login_base
82             site['max_slivers'] = 1000
83             reg_objects['site'] = site
84     
85             slice = {}
86             
87             # get_expiration always returns a normalized datetime - no need to utcparse
88             extime = Credential(string=creds[0]).get_expiration()
89             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
90             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
91                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
92             slice['expires'] = int(time.mktime(extime.timetuple()))
93             slice['hrn'] = hrn
94             slice['name'] = hrn_to_pl_slicename(hrn)
95             slice['url'] = hrn
96             slice['description'] = hrn
97             slice['pointer'] = 0
98             reg_objects['slice_record'] = slice
99     
100             reg_objects['users'] = {}
101             for user in users:
102                 user['key_ids'] = []
103                 hrn, _ = urn_to_hrn(user['urn'])
104                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
105                 user['first_name'] = hrn
106                 user['last_name'] = hrn
107                 reg_objects['users'][user['email']] = user
108     
109             return reg_objects
110     
111     def SliverStatus(self, api, slice_xrn, creds, options={}):
112         call_id = options.get('call_id')
113         if Callids().already_handled(call_id): return {}
114     
115         (hrn, _) = urn_to_hrn(slice_xrn)
116         # find out where this slice is currently running
117         slicename = hrn_to_pl_slicename(hrn)
118         
119         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
120         if len(slices) == 0:        
121             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
122         slice = slices[0]
123         
124         # report about the local nodes only
125         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
126                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
127         site_ids = [node['site_id'] for node in nodes]
128     
129         result = {}
130         top_level_status = 'unknown'
131         if nodes:
132             top_level_status = 'ready'
133         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
134         result['geni_urn'] = slice_urn
135         result['pl_login'] = slice['name']
136         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
137         
138         resources = []
139         for node in nodes:
140             res = {}
141             res['pl_hostname'] = node['hostname']
142             res['pl_boot_state'] = node['boot_state']
143             res['pl_last_contact'] = node['last_contact']
144             if node['last_contact'] is not None:
145                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
146             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
147             res['geni_urn'] = sliver_id
148             if node['boot_state'] == 'boot':
149                 res['geni_status'] = 'ready'
150             else:
151                 res['geni_status'] = 'failed'
152                 top_level_status = 'failed' 
153                 
154             res['geni_error'] = ''
155     
156             resources.append(res)
157             
158         result['geni_status'] = top_level_status
159         result['geni_resources'] = resources
160         return result
161     
162     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
163         """
164         Create the sliver[s] (slice) at this aggregate.    
165         Verify HRN and initialize the slice record in PLC if necessary.
166         """
167         call_id = options.get('call_id')
168         if Callids().already_handled(call_id): return ""
169     
170         aggregate = Aggregate(api)
171         slices = Slices(api)
172         (hrn, _) = urn_to_hrn(slice_xrn)
173         peer = slices.get_peer(hrn)
174         sfa_peer = slices.get_sfa_peer(hrn)
175         slice_record=None    
176         if users:
177             slice_record = users[0].get('slice_record', {})
178     
179         # parse rspec
180         rspec = RSpec(rspec_string)
181         requested_attributes = rspec.version.get_slice_attributes()
182         
183         # ensure site record exists
184         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
185         # ensure slice record exists
186         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
187         # ensure person records exists
188         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
189         # ensure slice attributes exists
190         slices.verify_slice_attributes(slice, requested_attributes)
191         
192         # add/remove slice from nodes
193         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
194         slices.verify_slice_nodes(slice, requested_slivers, peer) 
195    
196         # add/remove links links 
197         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
198     
199         # handle MyPLC peer association.
200         # only used by plc and ple.
201         slices.handle_peer(site, slice, persons, peer)
202         
203         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
204     
205     
206     def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
207         call_id = options.get('call_id')
208         if Callids().already_handled(call_id): return True
209         (hrn, _) = urn_to_hrn(xrn)
210         slicename = hrn_to_pl_slicename(hrn)
211         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
212         if not slices:
213             raise RecordNotFound(hrn)
214         slice = slices[0]
215         requested_time = utcparse(expiration_time)
216         record = {'expires': int(time.mktime(requested_time.timetuple()))}
217         try:
218             api.driver.UpdateSlice(slice['slice_id'], record)
219             return True
220         except:
221             return False
222     
223     def start_slice(self, api, xrn, creds):
224         (hrn, _) = urn_to_hrn(xrn)
225         slicename = hrn_to_pl_slicename(hrn)
226         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
227         if not slices:
228             raise RecordNotFound(hrn)
229         slice_id = slices[0]['slice_id']
230         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
231         # just remove the tag if it exists
232         if slice_tags:
233             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
234     
235         return 1
236      
237     def stop_slice(self, api, xrn, creds):
238         hrn, _ = urn_to_hrn(xrn)
239         slicename = hrn_to_pl_slicename(hrn)
240         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
241         if not slices:
242             raise RecordNotFound(hrn)
243         slice_id = slices[0]['slice_id']
244         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
245         if not slice_tags:
246             api.driver.AddSliceTag(slice_id, 'enabled', '0')
247         elif slice_tags[0]['value'] != "0":
248             tag_id = slice_tags[0]['slice_tag_id']
249             api.driver.UpdateSliceTag(tag_id, '0')
250         return 1
251     
252     def reset_slice(self, api, xrn):
253         # XX not implemented at this interface
254         return 1
255     
256     def DeleteSliver(self, api, xrn, creds, options={}):
257         call_id = options.get('call_id')
258         if Callids().already_handled(call_id): return ""
259         (hrn, _) = urn_to_hrn(xrn)
260         slicename = hrn_to_pl_slicename(hrn)
261         slices = api.driver.GetSlices({'name': slicename})
262         if not slices:
263             return 1
264         slice = slices[0]
265     
266         # determine if this is a peer slice
267         peer = peers.get_peer(api, hrn)
268         try:
269             if peer:
270                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
271             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
272         finally:
273             if peer:
274                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
275         return 1
276     
277     def ListSlices(self, api, creds, options={}):
278         call_id = options.get('call_id')
279         if Callids().already_handled(call_id): return []
280         # look in cache first
281         if self.caching and api.cache:
282             slices = api.cache.get('slices')
283             if slices:
284                 return slices
285     
286         # get data from db 
287         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
288         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
289         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
290     
291         # cache the result
292         if self.caching and api.cache:
293             api.cache.add('slices', slice_urns) 
294     
295         return slice_urns
296         
297     def ListResources(self, api, creds, options={}):
298         call_id = options.get('call_id')
299         if Callids().already_handled(call_id): return ""
300         # get slice's hrn from options
301         xrn = options.get('geni_slice_urn', None)
302         cached = options.get('cached', True) 
303         (hrn, _) = urn_to_hrn(xrn)
304     
305         version_manager = VersionManager()
306         # get the rspec's return format from options
307         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
308         version_string = "rspec_%s" % (rspec_version)
309     
310         #panos adding the info option to the caching key (can be improved)
311         if options.get('info'):
312             version_string = version_string + "_"+options.get('info', 'default')
313     
314         # look in cache first
315         if self.caching and api.cache and not xrn and cached:
316             rspec = api.cache.get(version_string)
317             if rspec:
318                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
319                 return rspec 
320     
321         #panos: passing user-defined options
322         #print "manager options = ",options
323         aggregate = Aggregate(api)
324         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
325     
326         # cache the result
327         if self.caching and api.cache and not xrn:
328             api.cache.add(version_string, rspec)
329     
330         return rspec
331     
332     
333     def GetTicket(self, api, xrn, creds, rspec, users, options={}):
334     
335         (slice_hrn, _) = urn_to_hrn(xrn)
336         slices = Slices(api)
337         peer = slices.get_peer(slice_hrn)
338         sfa_peer = slices.get_sfa_peer(slice_hrn)
339     
340         # get the slice record
341         credential = api.getCredential()
342         interface = api.registries[api.hrn]
343         registry = api.server_proxy(interface, credential)
344         records = registry.Resolve(xrn, credential)
345     
346         # make sure we get a local slice record
347         record = None
348         for tmp_record in records:
349             if tmp_record['type'] == 'slice' and \
350                not tmp_record['peer_authority']:
351     #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
352                 record = SliceRecord(dict=tmp_record)
353         if not record:
354             raise RecordNotFound(slice_hrn)
355         
356         # similar to CreateSliver, we must verify that the required records exist
357         # at this aggregate before we can issue a ticket
358         # parse rspec
359         rspec = RSpec(rspec_string)
360         requested_attributes = rspec.version.get_slice_attributes()
361     
362         # ensure site record exists
363         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
364         # ensure slice record exists
365         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
366         # ensure person records exists
367         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
368         # ensure slice attributes exists
369         slices.verify_slice_attributes(slice, requested_attributes)
370         
371         # get sliver info
372         slivers = slices.get_slivers(slice_hrn)
373     
374         if not slivers:
375             raise SliverDoesNotExist(slice_hrn)
376     
377         # get initscripts
378         initscripts = []
379         data = {
380             'timestamp': int(time.time()),
381             'initscripts': initscripts,
382             'slivers': slivers
383         }
384     
385         # create the ticket
386         object_gid = record.get_gid_object()
387         new_ticket = SfaTicket(subject = object_gid.get_subject())
388         new_ticket.set_gid_caller(api.auth.client_gid)
389         new_ticket.set_gid_object(object_gid)
390         new_ticket.set_issuer(key=api.key, subject=api.hrn)
391         new_ticket.set_pubkey(object_gid.get_pubkey())
392         new_ticket.set_attributes(data)
393         new_ticket.set_rspec(rspec)
394         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
395         new_ticket.encode()
396         new_ticket.sign()
397     
398         return new_ticket.save_to_string(save_parents=True)