CreateSliver uses new node object
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
6 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
7 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
8 from sfa.util.version import version_core
9 from sfa.util.sfatime import utcparse
10 from sfa.util.callids import Callids
11
12 from sfa.trust.sfaticket import SfaTicket
13 from sfa.trust.credential import Credential
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
16
17 from sfa.server.sfaapi import SfaApi
18
19 import sfa.plc.peers as peers
20 from sfa.plc.aggregate import Aggregate
21 from sfa.plc.slices import Slices
22
23 class AggregateManager:
24
25     def __init__ (self):
26         # xxx Thierry : caching at the aggregate level sounds wrong...
27         #self.caching=True
28         self.caching=False
29     
30     def GetVersion(self, api):
31     
32         version_manager = VersionManager()
33         ad_rspec_versions = []
34         request_rspec_versions = []
35         for rspec_version in version_manager.versions:
36             if rspec_version.content_type in ['*', 'ad']:
37                 ad_rspec_versions.append(rspec_version.to_dict())
38             if rspec_version.content_type in ['*', 'request']:
39                 request_rspec_versions.append(rspec_version.to_dict()) 
40         default_rspec_version = version_manager.get_version("sfa 1").to_dict()
41         xrn=Xrn(api.hrn)
42         version_more = {'interface':'aggregate',
43                         'testbed':'myplc',
44                         'hrn':xrn.get_hrn(),
45                         'request_rspec_versions': request_rspec_versions,
46                         'ad_rspec_versions': ad_rspec_versions,
47                         'default_ad_rspec': default_rspec_version
48                         }
49         return version_core(version_more)
50     
51     def _get_registry_objects(self, slice_xrn, creds, users):
52         """
53     
54         """
55         hrn, _ = urn_to_hrn(slice_xrn)
56     
57         hrn_auth = get_authority(hrn)
58     
59         # Build up objects that an SFA registry would return if SFA
60         # could contact the slice's registry directly
61         reg_objects = None
62     
63         if users:
64             # dont allow special characters in the site login base
65             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
66             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
67             slicename = hrn_to_pl_slicename(hrn)
68             login_base = slicename.split('_')[0]
69             reg_objects = {}
70             site = {}
71             site['site_id'] = 0
72             site['name'] = 'geni.%s' % login_base 
73             site['enabled'] = True
74             site['max_slices'] = 100
75     
76             # Note:
77             # Is it okay if this login base is the same as one already at this myplc site?
78             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
79             site['login_base'] = login_base
80             site['abbreviated_name'] = login_base
81             site['max_slivers'] = 1000
82             reg_objects['site'] = site
83     
84             slice = {}
85             
86             # get_expiration always returns a normalized datetime - no need to utcparse
87             extime = Credential(string=creds[0]).get_expiration()
88             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
89             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
90                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
91             slice['expires'] = int(time.mktime(extime.timetuple()))
92             slice['hrn'] = hrn
93             slice['name'] = hrn_to_pl_slicename(hrn)
94             slice['url'] = hrn
95             slice['description'] = hrn
96             slice['pointer'] = 0
97             reg_objects['slice_record'] = slice
98     
99             reg_objects['users'] = {}
100             for user in users:
101                 user['key_ids'] = []
102                 hrn, _ = urn_to_hrn(user['urn'])
103                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
104                 user['first_name'] = hrn
105                 user['last_name'] = hrn
106                 reg_objects['users'][user['email']] = user
107     
108             return reg_objects
109     
110     def SliverStatus(self, api, slice_xrn, creds, call_id):
111         if Callids().already_handled(call_id): return {}
112     
113         (hrn, _) = urn_to_hrn(slice_xrn)
114         # find out where this slice is currently running
115         slicename = hrn_to_pl_slicename(hrn)
116         
117         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
118         if len(slices) == 0:        
119             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
120         slice = slices[0]
121         
122         # report about the local nodes only
123         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
124                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
125         site_ids = [node['site_id'] for node in nodes]
126     
127         result = {}
128         top_level_status = 'unknown'
129         if nodes:
130             top_level_status = 'ready'
131         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
132         result['geni_urn'] = slice_urn
133         result['pl_login'] = slice['name']
134         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
135         
136         resources = []
137         for node in nodes:
138             res = {}
139             res['pl_hostname'] = node['hostname']
140             res['pl_boot_state'] = node['boot_state']
141             res['pl_last_contact'] = node['last_contact']
142             if node['last_contact'] is not None:
143                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
144             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
145             res['geni_urn'] = sliver_id
146             if node['boot_state'] == 'boot':
147                 res['geni_status'] = 'ready'
148             else:
149                 res['geni_status'] = 'failed'
150                 top_level_status = 'failed' 
151                 
152             res['geni_error'] = ''
153     
154             resources.append(res)
155             
156         result['geni_status'] = top_level_status
157         result['geni_resources'] = resources
158         return result
159     
160     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
161         """
162         Create the sliver[s] (slice) at this aggregate.    
163         Verify HRN and initialize the slice record in PLC if necessary.
164         """
165         if Callids().already_handled(call_id): return ""
166     
167         aggregate = Aggregate(api)
168         slices = Slices(api)
169         (hrn, _) = urn_to_hrn(slice_xrn)
170         peer = slices.get_peer(hrn)
171         sfa_peer = slices.get_sfa_peer(hrn)
172         slice_record=None    
173         if users:
174             slice_record = users[0].get('slice_record', {})
175     
176         # parse rspec
177         rspec = RSpec(rspec_string)
178         requested_attributes = rspec.version.get_slice_attributes()
179         
180         # ensure site record exists
181         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
182         # ensure slice record exists
183         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
184         # ensure person records exists
185         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
186         # ensure slice attributes exists
187         slices.verify_slice_attributes(slice, requested_attributes)
188         
189         # add/remove slice from nodes
190         requested_slivers = [node['component_name'] for node in rspec.version.get_nodes_with_slivers()]
191         slices.verify_slice_nodes(slice, requested_slivers, peer) 
192    
193         # add/remove links links 
194         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
195     
196         # handle MyPLC peer association.
197         # only used by plc and ple.
198         slices.handle_peer(site, slice, persons, peer)
199         
200         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
201     
202     
203     def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
204         if Callids().already_handled(call_id): return True
205         (hrn, _) = urn_to_hrn(xrn)
206         slicename = hrn_to_pl_slicename(hrn)
207         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
208         if not slices:
209             raise RecordNotFound(hrn)
210         slice = slices[0]
211         requested_time = utcparse(expiration_time)
212         record = {'expires': int(time.mktime(requested_time.timetuple()))}
213         try:
214             api.driver.UpdateSlice(slice['slice_id'], record)
215             return True
216         except:
217             return False
218     
219     def start_slice(self, api, xrn, creds):
220         (hrn, _) = urn_to_hrn(xrn)
221         slicename = hrn_to_pl_slicename(hrn)
222         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
223         if not slices:
224             raise RecordNotFound(hrn)
225         slice_id = slices[0]['slice_id']
226         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
227         # just remove the tag if it exists
228         if slice_tags:
229             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
230     
231         return 1
232      
233     def stop_slice(self, api, xrn, creds):
234         hrn, _ = urn_to_hrn(xrn)
235         slicename = hrn_to_pl_slicename(hrn)
236         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
237         if not slices:
238             raise RecordNotFound(hrn)
239         slice_id = slices[0]['slice_id']
240         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
241         if not slice_tags:
242             api.driver.AddSliceTag(slice_id, 'enabled', '0')
243         elif slice_tags[0]['value'] != "0":
244             tag_id = slice_tags[0]['slice_tag_id']
245             api.driver.UpdateSliceTag(tag_id, '0')
246         return 1
247     
248     def reset_slice(self, api, xrn):
249         # XX not implemented at this interface
250         return 1
251     
252     def DeleteSliver(self, api, xrn, creds, call_id):
253         if Callids().already_handled(call_id): return ""
254         (hrn, _) = urn_to_hrn(xrn)
255         slicename = hrn_to_pl_slicename(hrn)
256         slices = api.driver.GetSlices({'name': slicename})
257         if not slices:
258             return 1
259         slice = slices[0]
260     
261         # determine if this is a peer slice
262         peer = peers.get_peer(api, hrn)
263         try:
264             if peer:
265                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
266             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
267         finally:
268             if peer:
269                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
270         return 1
271     
272     def ListSlices(self, api, creds, call_id):
273         if Callids().already_handled(call_id): return []
274         # look in cache first
275         if self.caching and api.cache:
276             slices = api.cache.get('slices')
277             if slices:
278                 return slices
279     
280         # get data from db 
281         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
282         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
283         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
284     
285         # cache the result
286         if self.caching and api.cache:
287             api.cache.add('slices', slice_urns) 
288     
289         return slice_urns
290         
291     def ListResources(self, api, creds, options, call_id):
292         if Callids().already_handled(call_id): return ""
293         # get slice's hrn from options
294         xrn = options.get('geni_slice_urn', None)
295         (hrn, _) = urn_to_hrn(xrn)
296     
297         version_manager = VersionManager()
298         # get the rspec's return format from options
299         rspec_version = version_manager.get_version(options.get('rspec_version'))
300         version_string = "rspec_%s" % (rspec_version.to_string())
301     
302         #panos adding the info option to the caching key (can be improved)
303         if options.get('info'):
304             version_string = version_string + "_"+options.get('info', 'default')
305     
306         # look in cache first
307         if self.caching and api.cache and not xrn:
308             rspec = api.cache.get(version_string)
309             if rspec:
310                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
311                 return rspec 
312     
313         #panos: passing user-defined options
314         #print "manager options = ",options
315         aggregate = Aggregate(api, options)
316         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
317     
318         # cache the result
319         if self.caching and api.cache and not xrn:
320             api.cache.add(version_string, rspec)
321     
322         return rspec
323     
324     
325     def get_ticket(self, api, xrn, creds, rspec, users):
326     
327         (slice_hrn, _) = urn_to_hrn(xrn)
328         slices = Slices(api)
329         peer = slices.get_peer(slice_hrn)
330         sfa_peer = slices.get_sfa_peer(slice_hrn)
331     
332         # get the slice record
333         credential = api.getCredential()
334         interface = api.registries[api.hrn]
335         registry = api.server_proxy(interface, credential)
336         records = registry.Resolve(xrn, credential)
337     
338         # make sure we get a local slice record
339         record = None
340         for tmp_record in records:
341             if tmp_record['type'] == 'slice' and \
342                not tmp_record['peer_authority']:
343     #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
344                 record = SliceRecord(dict=tmp_record)
345         if not record:
346             raise RecordNotFound(slice_hrn)
347         
348         # similar to CreateSliver, we must verify that the required records exist
349         # at this aggregate before we can issue a ticket
350         # parse rspec
351         rspec = RSpec(rspec_string)
352         requested_attributes = rspec.version.get_slice_attributes()
353     
354         # ensure site record exists
355         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
356         # ensure slice record exists
357         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
358         # ensure person records exists
359         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
360         # ensure slice attributes exists
361         slices.verify_slice_attributes(slice, requested_attributes)
362         
363         # get sliver info
364         slivers = slices.get_slivers(slice_hrn)
365     
366         if not slivers:
367             raise SliverDoesNotExist(slice_hrn)
368     
369         # get initscripts
370         initscripts = []
371         data = {
372             'timestamp': int(time.time()),
373             'initscripts': initscripts,
374             'slivers': slivers
375         }
376     
377         # create the ticket
378         object_gid = record.get_gid_object()
379         new_ticket = SfaTicket(subject = object_gid.get_subject())
380         new_ticket.set_gid_caller(api.auth.client_gid)
381         new_ticket.set_gid_object(object_gid)
382         new_ticket.set_issuer(key=api.key, subject=api.hrn)
383         new_ticket.set_pubkey(object_gid.get_pubkey())
384         new_ticket.set_attributes(data)
385         new_ticket.set_rspec(rspec)
386         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
387         new_ticket.encode()
388         new_ticket.sign()
389     
390         return new_ticket.save_to_string(save_parents=True)