fix merge problem
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
6 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
7 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
8 from sfa.util.version import version_core
9 from sfa.util.sfatime import utcparse
10 from sfa.util.callids import Callids
11
12 from sfa.trust.sfaticket import SfaTicket
13 from sfa.trust.credential import Credential
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
16
17 from sfa.server.sfaapi import SfaApi
18
19 import sfa.plc.peers as peers
20 from sfa.plc.aggregate import Aggregate
21 from sfa.plc.slices import Slices
22
23 class AggregateManager:
24
25     def __init__ (self):
26         # xxx Thierry : caching at the aggregate level sounds wrong...
27         #self.caching=True
28         self.caching=False
29     
30     def GetVersion(self, api):
31     
32         version_manager = VersionManager()
33         ad_rspec_versions = []
34         request_rspec_versions = []
35         for rspec_version in version_manager.versions:
36             if rspec_version.content_type in ['*', 'ad']:
37                 ad_rspec_versions.append(rspec_version.to_dict())
38             if rspec_version.content_type in ['*', 'request']:
39                 request_rspec_versions.append(rspec_version.to_dict()) 
40         default_rspec_version = version_manager.get_version("sfa 1").to_dict()
41         xrn=Xrn(api.hrn)
42         version_more = {'interface':'aggregate',
43                         'testbed':'myplc',
44                         'hrn':xrn.get_hrn(),
45                         'request_rspec_versions': request_rspec_versions,
46                         'ad_rspec_versions': ad_rspec_versions,
47                         'default_ad_rspec': default_rspec_version
48                         }
49         return version_core(version_more)
50     
51     def _get_registry_objects(self, slice_xrn, creds, users):
52         """
53     
54         """
55         hrn, _ = urn_to_hrn(slice_xrn)
56     
57         hrn_auth = get_authority(hrn)
58     
59         # Build up objects that an SFA registry would return if SFA
60         # could contact the slice's registry directly
61         reg_objects = None
62     
63         if users:
64             # dont allow special characters in the site login base
65             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
66             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
67             slicename = hrn_to_pl_slicename(hrn)
68             login_base = slicename.split('_')[0]
69             reg_objects = {}
70             site = {}
71             site['site_id'] = 0
72             site['name'] = 'geni.%s' % login_base 
73             site['enabled'] = True
74             site['max_slices'] = 100
75     
76             # Note:
77             # Is it okay if this login base is the same as one already at this myplc site?
78             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
79             site['login_base'] = login_base
80             site['abbreviated_name'] = login_base
81             site['max_slivers'] = 1000
82             reg_objects['site'] = site
83     
84             slice = {}
85             
86             # get_expiration always returns a normalized datetime - no need to utcparse
87             extime = Credential(string=creds[0]).get_expiration()
88             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
89             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
90                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
91             slice['expires'] = int(time.mktime(extime.timetuple()))
92             slice['hrn'] = hrn
93             slice['name'] = hrn_to_pl_slicename(hrn)
94             slice['url'] = hrn
95             slice['description'] = hrn
96             slice['pointer'] = 0
97             reg_objects['slice_record'] = slice
98     
99             reg_objects['users'] = {}
100             for user in users:
101                 user['key_ids'] = []
102                 hrn, _ = urn_to_hrn(user['urn'])
103                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
104                 user['first_name'] = hrn
105                 user['last_name'] = hrn
106                 reg_objects['users'][user['email']] = user
107     
108             return reg_objects
109     
110     def SliverStatus(self, api, slice_xrn, creds, call_id):
111         if Callids().already_handled(call_id): return {}
112     
113         (hrn, _) = urn_to_hrn(slice_xrn)
114         # find out where this slice is currently running
115         slicename = hrn_to_pl_slicename(hrn)
116         
117         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
118         if len(slices) == 0:        
119             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
120         slice = slices[0]
121         
122         # report about the local nodes only
123         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
124                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
125         site_ids = [node['site_id'] for node in nodes]
126     
127         result = {}
128         top_level_status = 'unknown'
129         if nodes:
130             top_level_status = 'ready'
131         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
132         result['geni_urn'] = slice_urn
133         result['pl_login'] = slice['name']
134         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
135         
136         resources = []
137         for node in nodes:
138             res = {}
139             res['pl_hostname'] = node['hostname']
140             res['pl_boot_state'] = node['boot_state']
141             res['pl_last_contact'] = node['last_contact']
142             if node['last_contact'] is not None:
143                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
144             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
145             res['geni_urn'] = sliver_id
146             if node['boot_state'] == 'boot':
147                 res['geni_status'] = 'ready'
148             else:
149                 res['geni_status'] = 'failed'
150                 top_level_status = 'failed' 
151                 
152             res['geni_error'] = ''
153     
154             resources.append(res)
155             
156         result['geni_status'] = top_level_status
157         result['geni_resources'] = resources
158         return result
159     
160     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
161         """
162         Create the sliver[s] (slice) at this aggregate.    
163         Verify HRN and initialize the slice record in PLC if necessary.
164         """
165         if Callids().already_handled(call_id): return ""
166     
167         aggregate = Aggregate(api)
168         slices = Slices(api)
169         (hrn, _) = urn_to_hrn(slice_xrn)
170         peer = slices.get_peer(hrn)
171         sfa_peer = slices.get_sfa_peer(hrn)
172         slice_record=None    
173         if users:
174             slice_record = users[0].get('slice_record', {})
175     
176         # parse rspec
177         rspec = RSpec(rspec_string)
178         requested_attributes = rspec.version.get_slice_attributes()
179         
180         # ensure site record exists
181         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
182         # ensure slice record exists
183         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
184         # ensure person records exists
185         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
186         # ensure slice attributes exists
187         slices.verify_slice_attributes(slice, requested_attributes)
188         
189         # add/remove slice from nodes
190         requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
191         slices.verify_slice_nodes(slice, requested_slivers, peer) 
192     
193         aggregate.prepare_nodes({'hostname': requested_slivers})
194         aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})    
195         slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
196     
197         # handle MyPLC peer association.
198         # only used by plc and ple.
199         slices.handle_peer(site, slice, persons, peer)
200         
201         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
202     
203     
204     def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
205         if Callids().already_handled(call_id): return True
206         (hrn, _) = urn_to_hrn(xrn)
207         slicename = hrn_to_pl_slicename(hrn)
208         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
209         if not slices:
210             raise RecordNotFound(hrn)
211         slice = slices[0]
212         requested_time = utcparse(expiration_time)
213         record = {'expires': int(time.mktime(requested_time.timetuple()))}
214         try:
215             api.driver.UpdateSlice(slice['slice_id'], record)
216             return True
217         except:
218             return False
219     
220     def start_slice(self, api, xrn, creds):
221         (hrn, _) = urn_to_hrn(xrn)
222         slicename = hrn_to_pl_slicename(hrn)
223         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
224         if not slices:
225             raise RecordNotFound(hrn)
226         slice_id = slices[0]['slice_id']
227         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
228         # just remove the tag if it exists
229         if slice_tags:
230             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
231     
232         return 1
233      
234     def stop_slice(self, api, xrn, creds):
235         hrn, _ = urn_to_hrn(xrn)
236         slicename = hrn_to_pl_slicename(hrn)
237         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
238         if not slices:
239             raise RecordNotFound(hrn)
240         slice_id = slices[0]['slice_id']
241         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
242         if not slice_tags:
243             api.driver.AddSliceTag(slice_id, 'enabled', '0')
244         elif slice_tags[0]['value'] != "0":
245             tag_id = slice_tags[0]['slice_tag_id']
246             api.driver.UpdateSliceTag(tag_id, '0')
247         return 1
248     
249     def reset_slice(self, api, xrn):
250         # XX not implemented at this interface
251         return 1
252     
253     def DeleteSliver(self, api, xrn, creds, call_id):
254         if Callids().already_handled(call_id): return ""
255         (hrn, _) = urn_to_hrn(xrn)
256         slicename = hrn_to_pl_slicename(hrn)
257         slices = api.driver.GetSlices({'name': slicename})
258         if not slices:
259             return 1
260         slice = slices[0]
261     
262         # determine if this is a peer slice
263         peer = peers.get_peer(api, hrn)
264         try:
265             if peer:
266                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
267             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
268         finally:
269             if peer:
270                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
271         return 1
272     
273     def ListSlices(self, api, creds, call_id):
274         if Callids().already_handled(call_id): return []
275         # look in cache first
276         if self.caching and api.cache:
277             slices = api.cache.get('slices')
278             if slices:
279                 return slices
280     
281         # get data from db 
282         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
283         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
284         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
285     
286         # cache the result
287         if self.caching and api.cache:
288             api.cache.add('slices', slice_urns) 
289     
290         return slice_urns
291         
292     def ListResources(self, api, creds, options, call_id):
293         if Callids().already_handled(call_id): return ""
294         # get slice's hrn from options
295         xrn = options.get('geni_slice_urn', None)
296         (hrn, _) = urn_to_hrn(xrn)
297     
298         version_manager = VersionManager()
299         # get the rspec's return format from options
300         rspec_version = version_manager.get_version(options.get('rspec_version'))
301         version_string = "rspec_%s" % (rspec_version.to_string())
302     
303         #panos adding the info option to the caching key (can be improved)
304         if options.get('info'):
305             version_string = version_string + "_"+options.get('info', 'default')
306     
307         # look in cache first
308         if self.caching and api.cache and not xrn:
309             rspec = api.cache.get(version_string)
310             if rspec:
311                 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
312                 return rspec 
313     
314         #panos: passing user-defined options
315         #print "manager options = ",options
316         aggregate = Aggregate(api, options)
317         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
318     
319         # cache the result
320         if self.caching and api.cache and not xrn:
321             api.cache.add(version_string, rspec)
322     
323         return rspec
324     
325     
326     def get_ticket(self, api, xrn, creds, rspec, users):
327     
328         (slice_hrn, _) = urn_to_hrn(xrn)
329         slices = Slices(api)
330         peer = slices.get_peer(slice_hrn)
331         sfa_peer = slices.get_sfa_peer(slice_hrn)
332     
333         # get the slice record
334         credential = api.getCredential()
335         interface = api.registries[api.hrn]
336         registry = api.server_proxy(interface, credential)
337         records = registry.Resolve(xrn, credential)
338     
339         # make sure we get a local slice record
340         record = None
341         for tmp_record in records:
342             if tmp_record['type'] == 'slice' and \
343                not tmp_record['peer_authority']:
344     #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
345                 record = SliceRecord(dict=tmp_record)
346         if not record:
347             raise RecordNotFound(slice_hrn)
348         
349         # similar to CreateSliver, we must verify that the required records exist
350         # at this aggregate before we can issue a ticket
351         # parse rspec
352         rspec = RSpec(rspec_string)
353         requested_attributes = rspec.version.get_slice_attributes()
354     
355         # ensure site record exists
356         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
357         # ensure slice record exists
358         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
359         # ensure person records exists
360         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
361         # ensure slice attributes exists
362         slices.verify_slice_attributes(slice, requested_attributes)
363         
364         # get sliver info
365         slivers = slices.get_slivers(slice_hrn)
366     
367         if not slivers:
368             raise SliverDoesNotExist(slice_hrn)
369     
370         # get initscripts
371         initscripts = []
372         data = {
373             'timestamp': int(time.time()),
374             'initscripts': initscripts,
375             'slivers': slivers
376         }
377     
378         # create the ticket
379         object_gid = record.get_gid_object()
380         new_ticket = SfaTicket(subject = object_gid.get_subject())
381         new_ticket.set_gid_caller(api.auth.client_gid)
382         new_ticket.set_gid_object(object_gid)
383         new_ticket.set_issuer(key=api.key, subject=api.hrn)
384         new_ticket.set_pubkey(object_gid.get_pubkey())
385         new_ticket.set_attributes(data)
386         new_ticket.set_rspec(rspec)
387         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
388         new_ticket.encode()
389         new_ticket.sign()
390     
391         return new_ticket.save_to_string(save_parents=True)