move 'append' from users struct to options struct
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
18
19 from sfa.server.sfaapi import SfaApi
20
21 import sfa.plc.peers as peers
22 from sfa.plc.plaggregate import PlAggregate
23 from sfa.plc.plslices import PlSlices
24
25 class AggregateManager:
26
27     def __init__ (self):
28         # xxx Thierry : caching at the aggregate level sounds wrong...
29         self.caching=True
30         #self.caching=False
31     
32     def GetVersion(self, api, options={}):
33     
34         version_manager = VersionManager()
35         ad_rspec_versions = []
36         request_rspec_versions = []
37         for rspec_version in version_manager.versions:
38             if rspec_version.content_type in ['*', 'ad']:
39                 ad_rspec_versions.append(rspec_version.to_dict())
40             if rspec_version.content_type in ['*', 'request']:
41                 request_rspec_versions.append(rspec_version.to_dict()) 
42         xrn=Xrn(api.hrn)
43         version_more = {'interface':'aggregate',
44                         'sfa': 2,
45                         'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
46                         'testbed':'myplc',
47                         'hrn':xrn.get_hrn(),
48                         'geni_request_rspec_versions': request_rspec_versions,
49                         'geni_ad_rspec_versions': ad_rspec_versions,
50                         }
51         return version_core(version_more)
52     
53     def SliverStatus(self, api, slice_xrn, creds, options={}):
54         call_id = options.get('call_id')
55         if Callids().already_handled(call_id): return {}
56     
57         (hrn, _) = urn_to_hrn(slice_xrn)
58         # find out where this slice is currently running
59         slicename = hrn_to_pl_slicename(hrn)
60         
61         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
62         if len(slices) == 0:        
63             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
64         slice = slices[0]
65         
66         # report about the local nodes only
67         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
68                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
69         site_ids = [node['site_id'] for node in nodes]
70     
71         result = {}
72         top_level_status = 'unknown'
73         if nodes:
74             top_level_status = 'ready'
75         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
76         result['geni_urn'] = slice_urn
77         result['pl_login'] = slice['name']
78         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
79         
80         resources = []
81         for node in nodes:
82             res = {}
83             res['pl_hostname'] = node['hostname']
84             res['pl_boot_state'] = node['boot_state']
85             res['pl_last_contact'] = node['last_contact']
86             if node['last_contact'] is not None:
87                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
88             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
89             res['geni_urn'] = sliver_id
90             if node['boot_state'] == 'boot':
91                 res['geni_status'] = 'ready'
92             else:
93                 res['geni_status'] = 'failed'
94                 top_level_status = 'failed' 
95                 
96             res['geni_error'] = ''
97     
98             resources.append(res)
99             
100         result['geni_status'] = top_level_status
101         result['geni_resources'] = resources
102         return result
103     
104     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
105         """
106         Create the sliver[s] (slice) at this aggregate.    
107         Verify HRN and initialize the slice record in PLC if necessary.
108         """
109         call_id = options.get('call_id')
110         if Callids().already_handled(call_id): return ""
111     
112         aggregate = PlAggregate(self.driver)
113         slices = PlSlices(api)
114         (hrn, _) = urn_to_hrn(slice_xrn)
115         peer = slices.get_peer(hrn)
116         sfa_peer = slices.get_sfa_peer(hrn)
117         slice_record=None    
118         if users:
119             slice_record = users[0].get('slice_record', {})
120     
121         # parse rspec
122         rspec = RSpec(rspec_string)
123         requested_attributes = rspec.version.get_slice_attributes()
124         
125         # ensure site record exists
126         site = slices.verify_site(hrn, slice_record, peer, sfa_peer, options=options)
127         # ensure slice record exists
128         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer, options=options)
129         # ensure person records exists
130         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer, options=options)
131         # ensure slice attributes exists
132         slices.verify_slice_attributes(slice, requested_attributes, options=options)
133         
134         # add/remove slice from nodes
135         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
136         slices.verify_slice_nodes(slice, requested_slivers, peer) 
137    
138         # add/remove links links 
139         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
140     
141         # handle MyPLC peer association.
142         # only used by plc and ple.
143         slices.handle_peer(site, slice, persons, peer)
144         
145         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
146     
147     
148     def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
149         call_id = options.get('call_id')
150         if Callids().already_handled(call_id): return True
151         (hrn, _) = urn_to_hrn(xrn)
152         slicename = hrn_to_pl_slicename(hrn)
153         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
154         if not slices:
155             raise RecordNotFound(hrn)
156         slice = slices[0]
157         requested_time = utcparse(expiration_time)
158         record = {'expires': int(time.mktime(requested_time.timetuple()))}
159         try:
160             api.driver.UpdateSlice(slice['slice_id'], record)
161             return True
162         except:
163             return False
164     
165     def start_slice(self, api, xrn, creds):
166         (hrn, _) = urn_to_hrn(xrn)
167         slicename = hrn_to_pl_slicename(hrn)
168         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
169         if not slices:
170             raise RecordNotFound(hrn)
171         slice_id = slices[0]['slice_id']
172         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
173         # just remove the tag if it exists
174         if slice_tags:
175             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
176     
177         return 1
178      
179     def stop_slice(self, api, xrn, creds):
180         hrn, _ = urn_to_hrn(xrn)
181         slicename = hrn_to_pl_slicename(hrn)
182         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
183         if not slices:
184             raise RecordNotFound(hrn)
185         slice_id = slices[0]['slice_id']
186         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
187         if not slice_tags:
188             api.driver.AddSliceTag(slice_id, 'enabled', '0')
189         elif slice_tags[0]['value'] != "0":
190             tag_id = slice_tags[0]['slice_tag_id']
191             api.driver.UpdateSliceTag(tag_id, '0')
192         return 1
193     
194     def reset_slice(self, api, xrn):
195         # XX not implemented at this interface
196         return 1
197     
198     def DeleteSliver(self, api, xrn, creds, options={}):
199         call_id = options.get('call_id')
200         if Callids().already_handled(call_id): return ""
201         (hrn, _) = urn_to_hrn(xrn)
202         slicename = hrn_to_pl_slicename(hrn)
203         slices = api.driver.GetSlices({'name': slicename})
204         if not slices:
205             return 1
206         slice = slices[0]
207     
208         # determine if this is a peer slice
209         peer = peers.get_peer(api, hrn)
210         try:
211             if peer:
212                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
213             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
214         finally:
215             if peer:
216                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
217         return 1
218     
219     def ListSlices(self, api, creds, options={}):
220         call_id = options.get('call_id')
221         if Callids().already_handled(call_id): return []
222         # look in cache first
223         if self.caching and api.cache:
224             slices = api.cache.get('slices')
225             if slices:
226                 return slices
227     
228         # get data from db 
229         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
230         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
231         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
232     
233         # cache the result
234         if self.caching and api.cache:
235             api.cache.add('slices', slice_urns) 
236     
237         return slice_urns
238         
239     def ListResources(self, api, creds, options={}):
240         call_id = options.get('call_id')
241         if Callids().already_handled(call_id): return ""
242         # get slice's hrn from options
243         xrn = options.get('geni_slice_urn', None)
244         cached = options.get('cached', True) 
245         (hrn, _) = urn_to_hrn(xrn)
246     
247         version_manager = VersionManager()
248         # get the rspec's return format from options
249         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
250         version_string = "rspec_%s" % (rspec_version)
251     
252         #panos adding the info option to the caching key (can be improved)
253         if options.get('info'):
254             version_string = version_string + "_"+options.get('info', 'default')
255     
256         # look in cache first
257         if self.caching and api.cache and not xrn and cached:
258             rspec = api.cache.get(version_string)
259             if rspec:
260                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
261                 return rspec 
262     
263         #panos: passing user-defined options
264         #print "manager options = ",options
265         aggregate = PlAggregate(self.driver)
266         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
267     
268         # cache the result
269         if self.caching and api.cache and not xrn:
270             api.cache.add(version_string, rspec)
271     
272         return rspec
273     
274     
275     def GetTicket(self, api, xrn, creds, rspec, users, options={}):
276     
277         (slice_hrn, _) = urn_to_hrn(xrn)
278         slices = PlSlices(api)
279         peer = slices.get_peer(slice_hrn)
280         sfa_peer = slices.get_sfa_peer(slice_hrn)
281     
282         # get the slice record
283         credential = api.getCredential()
284         interface = api.registries[api.hrn]
285         registry = api.server_proxy(interface, credential)
286         records = registry.Resolve(xrn, credential)
287     
288         # make sure we get a local slice record
289         record = None
290         for tmp_record in records:
291             if tmp_record['type'] == 'slice' and \
292                not tmp_record['peer_authority']:
293     #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
294                 record = SliceRecord(dict=tmp_record)
295         if not record:
296             raise RecordNotFound(slice_hrn)
297         
298         # similar to CreateSliver, we must verify that the required records exist
299         # at this aggregate before we can issue a ticket
300         # parse rspec
301         rspec = RSpec(rspec_string)
302         requested_attributes = rspec.version.get_slice_attributes()
303     
304         # ensure site record exists
305         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
306         # ensure slice record exists
307         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
308         # ensure person records exists
309         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
310         # ensure slice attributes exists
311         slices.verify_slice_attributes(slice, requested_attributes)
312         
313         # get sliver info
314         slivers = slices.get_slivers(slice_hrn)
315     
316         if not slivers:
317             raise SliverDoesNotExist(slice_hrn)
318     
319         # get initscripts
320         initscripts = []
321         data = {
322             'timestamp': int(time.time()),
323             'initscripts': initscripts,
324             'slivers': slivers
325         }
326     
327         # create the ticket
328         object_gid = record.get_gid_object()
329         new_ticket = SfaTicket(subject = object_gid.get_subject())
330         new_ticket.set_gid_caller(api.auth.client_gid)
331         new_ticket.set_gid_object(object_gid)
332         new_ticket.set_issuer(key=api.key, subject=api.hrn)
333         new_ticket.set_pubkey(object_gid.get_pubkey())
334         new_ticket.set_attributes(data)
335         new_ticket.set_rspec(rspec)
336         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
337         new_ticket.encode()
338         new_ticket.sign()
339     
340         return new_ticket.save_to_string(save_parents=True)