more consistency between API method names and corresponding manager implementation
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
17
18 from sfa.server.sfaapi import SfaApi
19
20 import sfa.plc.peers as peers
21 from sfa.plc.aggregate import Aggregate
22 from sfa.plc.slices import Slices
23
24 class AggregateManager:
25
26     def __init__ (self):
27         # xxx Thierry : caching at the aggregate level sounds wrong...
28         #self.caching=True
29         self.caching=False
30     
31     def GetVersion(self, api):
32     
33         version_manager = VersionManager()
34         ad_rspec_versions = []
35         request_rspec_versions = []
36         for rspec_version in version_manager.versions:
37             if rspec_version.content_type in ['*', 'ad']:
38                 ad_rspec_versions.append(rspec_version.to_dict())
39             if rspec_version.content_type in ['*', 'request']:
40                 request_rspec_versions.append(rspec_version.to_dict()) 
41         default_rspec_version = version_manager.get_version("sfa 1").to_dict()
42         xrn=Xrn(api.hrn)
43         version_more = {'interface':'aggregate',
44                         'testbed':'myplc',
45                         'hrn':xrn.get_hrn(),
46                         'request_rspec_versions': request_rspec_versions,
47                         'ad_rspec_versions': ad_rspec_versions,
48                         'default_ad_rspec': default_rspec_version
49                         }
50         return version_core(version_more)
51     
52     def _get_registry_objects(self, slice_xrn, creds, users):
53         """
54     
55         """
56         hrn, _ = urn_to_hrn(slice_xrn)
57     
58         hrn_auth = get_authority(hrn)
59     
60         # Build up objects that an SFA registry would return if SFA
61         # could contact the slice's registry directly
62         reg_objects = None
63     
64         if users:
65             # dont allow special characters in the site login base
66             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
67             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
68             slicename = hrn_to_pl_slicename(hrn)
69             login_base = slicename.split('_')[0]
70             reg_objects = {}
71             site = {}
72             site['site_id'] = 0
73             site['name'] = 'geni.%s' % login_base 
74             site['enabled'] = True
75             site['max_slices'] = 100
76     
77             # Note:
78             # Is it okay if this login base is the same as one already at this myplc site?
79             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
80             site['login_base'] = login_base
81             site['abbreviated_name'] = login_base
82             site['max_slivers'] = 1000
83             reg_objects['site'] = site
84     
85             slice = {}
86             
87             # get_expiration always returns a normalized datetime - no need to utcparse
88             extime = Credential(string=creds[0]).get_expiration()
89             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
90             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
91                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
92             slice['expires'] = int(time.mktime(extime.timetuple()))
93             slice['hrn'] = hrn
94             slice['name'] = hrn_to_pl_slicename(hrn)
95             slice['url'] = hrn
96             slice['description'] = hrn
97             slice['pointer'] = 0
98             reg_objects['slice_record'] = slice
99     
100             reg_objects['users'] = {}
101             for user in users:
102                 user['key_ids'] = []
103                 hrn, _ = urn_to_hrn(user['urn'])
104                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
105                 user['first_name'] = hrn
106                 user['last_name'] = hrn
107                 reg_objects['users'][user['email']] = user
108     
109             return reg_objects
110     
111     def SliverStatus(self, api, slice_xrn, creds, call_id):
112         if Callids().already_handled(call_id): return {}
113     
114         (hrn, _) = urn_to_hrn(slice_xrn)
115         # find out where this slice is currently running
116         slicename = hrn_to_pl_slicename(hrn)
117         
118         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
119         if len(slices) == 0:        
120             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
121         slice = slices[0]
122         
123         # report about the local nodes only
124         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
125                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
126         site_ids = [node['site_id'] for node in nodes]
127     
128         result = {}
129         top_level_status = 'unknown'
130         if nodes:
131             top_level_status = 'ready'
132         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
133         result['geni_urn'] = slice_urn
134         result['pl_login'] = slice['name']
135         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
136         
137         resources = []
138         for node in nodes:
139             res = {}
140             res['pl_hostname'] = node['hostname']
141             res['pl_boot_state'] = node['boot_state']
142             res['pl_last_contact'] = node['last_contact']
143             if node['last_contact'] is not None:
144                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
145             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
146             res['geni_urn'] = sliver_id
147             if node['boot_state'] == 'boot':
148                 res['geni_status'] = 'ready'
149             else:
150                 res['geni_status'] = 'failed'
151                 top_level_status = 'failed' 
152                 
153             res['geni_error'] = ''
154     
155             resources.append(res)
156             
157         result['geni_status'] = top_level_status
158         result['geni_resources'] = resources
159         return result
160     
161     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
162         """
163         Create the sliver[s] (slice) at this aggregate.    
164         Verify HRN and initialize the slice record in PLC if necessary.
165         """
166         if Callids().already_handled(call_id): return ""
167     
168         aggregate = Aggregate(api)
169         slices = Slices(api)
170         (hrn, _) = urn_to_hrn(slice_xrn)
171         peer = slices.get_peer(hrn)
172         sfa_peer = slices.get_sfa_peer(hrn)
173         slice_record=None    
174         if users:
175             slice_record = users[0].get('slice_record', {})
176     
177         # parse rspec
178         rspec = RSpec(rspec_string)
179         requested_attributes = rspec.version.get_slice_attributes()
180         
181         # ensure site record exists
182         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
183         # ensure slice record exists
184         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
185         # ensure person records exists
186         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
187         # ensure slice attributes exists
188         slices.verify_slice_attributes(slice, requested_attributes)
189         
190         # add/remove slice from nodes
191         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
192         slices.verify_slice_nodes(slice, requested_slivers, peer) 
193    
194         # add/remove links links 
195         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
196     
197         # handle MyPLC peer association.
198         # only used by plc and ple.
199         slices.handle_peer(site, slice, persons, peer)
200         
201         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
202     
203     
204     def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
205         if Callids().already_handled(call_id): return True
206         (hrn, _) = urn_to_hrn(xrn)
207         slicename = hrn_to_pl_slicename(hrn)
208         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
209         if not slices:
210             raise RecordNotFound(hrn)
211         slice = slices[0]
212         requested_time = utcparse(expiration_time)
213         record = {'expires': int(time.mktime(requested_time.timetuple()))}
214         try:
215             api.driver.UpdateSlice(slice['slice_id'], record)
216             return True
217         except:
218             return False
219     
220     def start_slice(self, api, xrn, creds):
221         (hrn, _) = urn_to_hrn(xrn)
222         slicename = hrn_to_pl_slicename(hrn)
223         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
224         if not slices:
225             raise RecordNotFound(hrn)
226         slice_id = slices[0]['slice_id']
227         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
228         # just remove the tag if it exists
229         if slice_tags:
230             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
231     
232         return 1
233      
234     def stop_slice(self, api, xrn, creds):
235         hrn, _ = urn_to_hrn(xrn)
236         slicename = hrn_to_pl_slicename(hrn)
237         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
238         if not slices:
239             raise RecordNotFound(hrn)
240         slice_id = slices[0]['slice_id']
241         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
242         if not slice_tags:
243             api.driver.AddSliceTag(slice_id, 'enabled', '0')
244         elif slice_tags[0]['value'] != "0":
245             tag_id = slice_tags[0]['slice_tag_id']
246             api.driver.UpdateSliceTag(tag_id, '0')
247         return 1
248     
249     def reset_slice(self, api, xrn):
250         # XX not implemented at this interface
251         return 1
252     
253     def DeleteSliver(self, api, xrn, creds, call_id):
254         if Callids().already_handled(call_id): return ""
255         (hrn, _) = urn_to_hrn(xrn)
256         slicename = hrn_to_pl_slicename(hrn)
257         slices = api.driver.GetSlices({'name': slicename})
258         if not slices:
259             return 1
260         slice = slices[0]
261     
262         # determine if this is a peer slice
263         peer = peers.get_peer(api, hrn)
264         try:
265             if peer:
266                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
267             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
268         finally:
269             if peer:
270                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
271         return 1
272     
273     def ListSlices(self, api, creds, call_id):
274         if Callids().already_handled(call_id): return []
275         # look in cache first
276         if self.caching and api.cache:
277             slices = api.cache.get('slices')
278             if slices:
279                 return slices
280     
281         # get data from db 
282         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
283         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
284         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
285     
286         # cache the result
287         if self.caching and api.cache:
288             api.cache.add('slices', slice_urns) 
289     
290         return slice_urns
291         
292     def ListResources(self, api, creds, options, call_id):
293         if Callids().already_handled(call_id): return ""
294         # get slice's hrn from options
295         xrn = options.get('geni_slice_urn', None)
296         (hrn, _) = urn_to_hrn(xrn)
297     
298         version_manager = VersionManager()
299         # get the rspec's return format from options
300         rspec_version = version_manager.get_version(options.get('rspec_version'))
301         version_string = "rspec_%s" % (rspec_version.to_string())
302     
303         #panos adding the info option to the caching key (can be improved)
304         if options.get('info'):
305             version_string = version_string + "_"+options.get('info', 'default')
306     
307         # look in cache first
308         if self.caching and api.cache and not xrn:
309             rspec = api.cache.get(version_string)
310             if rspec:
311                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
312                 return rspec 
313     
314         #panos: passing user-defined options
315         #print "manager options = ",options
316         aggregate = Aggregate(api, options)
317         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
318     
319         # cache the result
320         if self.caching and api.cache and not xrn:
321             api.cache.add(version_string, rspec)
322     
323         return rspec
324     
325     
326     def GetTicket(self, api, xrn, creds, rspec, users):
327     
328         (slice_hrn, _) = urn_to_hrn(xrn)
329         slices = Slices(api)
330         peer = slices.get_peer(slice_hrn)
331         sfa_peer = slices.get_sfa_peer(slice_hrn)
332     
333         # get the slice record
334         credential = api.getCredential()
335         interface = api.registries[api.hrn]
336         registry = api.server_proxy(interface, credential)
337         records = registry.Resolve(xrn, credential)
338     
339         # make sure we get a local slice record
340         record = None
341         for tmp_record in records:
342             if tmp_record['type'] == 'slice' and \
343                not tmp_record['peer_authority']:
344     #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
345                 record = SliceRecord(dict=tmp_record)
346         if not record:
347             raise RecordNotFound(slice_hrn)
348         
349         # similar to CreateSliver, we must verify that the required records exist
350         # at this aggregate before we can issue a ticket
351         # parse rspec
352         rspec = RSpec(rspec_string)
353         requested_attributes = rspec.version.get_slice_attributes()
354     
355         # ensure site record exists
356         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
357         # ensure slice record exists
358         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
359         # ensure person records exists
360         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
361         # ensure slice attributes exists
362         slices.verify_slice_attributes(slice, requested_attributes)
363         
364         # get sliver info
365         slivers = slices.get_slivers(slice_hrn)
366     
367         if not slivers:
368             raise SliverDoesNotExist(slice_hrn)
369     
370         # get initscripts
371         initscripts = []
372         data = {
373             'timestamp': int(time.time()),
374             'initscripts': initscripts,
375             'slivers': slivers
376         }
377     
378         # create the ticket
379         object_gid = record.get_gid_object()
380         new_ticket = SfaTicket(subject = object_gid.get_subject())
381         new_ticket.set_gid_caller(api.auth.client_gid)
382         new_ticket.set_gid_object(object_gid)
383         new_ticket.set_issuer(key=api.key, subject=api.hrn)
384         new_ticket.set_pubkey(object_gid.get_pubkey())
385         new_ticket.set_attributes(data)
386         new_ticket.set_rspec(rspec)
387         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
388         new_ticket.encode()
389         new_ticket.sign()
390     
391         return new_ticket.save_to_string(save_parents=True)