IndentationError
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, gid_origin_caller=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.gid_origin_caller=gid_origin_caller
40
41     def get_slivers(self, hrn, node=None):
42          
43         slice_name = hrn_to_pl_slicename(hrn)
44         # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
45         # of doing all of this?
46         #return self.api.GetSliceTicket(self.auth, slice_name) 
47         
48         # from PLCAPI.GetSlivers.get_slivers()
49         slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
50         slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
51         # Build up list of users and slice attributes
52         person_ids = set()
53         all_slice_tag_ids = set()
54         for slice in slices:
55             person_ids.update(slice['person_ids'])
56             all_slice_tag_ids.update(slice['slice_tag_ids'])
57         person_ids = list(person_ids)
58         all_slice_tag_ids = list(all_slice_tag_ids)
59         # Get user information
60         all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
61         all_persons = {}
62         for person in all_persons_list:
63             all_persons[person['person_id']] = person        
64
65         # Build up list of keys
66         key_ids = set()
67         for person in all_persons.values():
68             key_ids.update(person['key_ids'])
69         key_ids = list(key_ids)
70         # Get user account keys
71         all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
72         all_keys = {}
73         for key in all_keys_list:
74             all_keys[key['key_id']] = key
75         # Get slice attributes
76         all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
77         all_slice_tags = {}
78         for slice_tag in all_slice_tags_list:
79             all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
80            
81         slivers = []
82         for slice in slices:
83             keys = []
84             for person_id in slice['person_ids']:
85                 if person_id in all_persons:
86                     person = all_persons[person_id]
87                     if not person['enabled']:
88                         continue
89                     for key_id in person['key_ids']:
90                         if key_id in all_keys:
91                             key = all_keys[key_id]
92                             keys += [{'key_type': key['key_type'],
93                                     'key': key['key']}]
94             attributes = []
95             # All (per-node and global) attributes for this slice
96             slice_tags = []
97             for slice_tag_id in slice['slice_tag_ids']:
98                 if slice_tag_id in all_slice_tags:
99                     slice_tags.append(all_slice_tags[slice_tag_id]) 
100             # Per-node sliver attributes take precedence over global
101             # slice attributes, so set them first.
102             # Then comes nodegroup slice attributes
103             # Followed by global slice attributes
104             sliver_attributes = []
105
106             if node is not None:
107                 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
108                     sliver_attributes.append(sliver_attribute['tagname'])
109                     attributes.append({'tagname': sliver_attribute['tagname'],
110                                     'value': sliver_attribute['value']})
111
112             # set nodegroup slice attributes
113             for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
114                 # Do not set any nodegroup slice attributes for
115                 # which there is at least one sliver attribute
116                 # already set.
117                 if slice_tag not in slice_tags:
118                     attributes.append({'tagname': slice_tag['tagname'],
119                         'value': slice_tag['value']})
120
121             for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
122                 # Do not set any global slice attributes for
123                 # which there is at least one sliver attribute
124                 # already set.
125                 if slice_tag['tagname'] not in sliver_attributes:
126                     attributes.append({'tagname': slice_tag['tagname'],
127                                    'value': slice_tag['value']})
128
129             # XXX Sanity check; though technically this should be a system invariant
130             # checked with an assertion
131             if slice['expires'] > MAXINT:  slice['expires']= MAXINT
132             
133             slivers.append({
134                 'hrn': hrn,
135                 'name': slice['name'],
136                 'slice_id': slice['slice_id'],
137                 'instantiation': slice['instantiation'],
138                 'expires': slice['expires'],
139                 'keys': keys,
140                 'attributes': attributes
141             })
142
143         return slivers
144  
145     def get_peer(self, hrn):
146         # Becaues of myplc federation,  we first need to determine if this
147         # slice belongs to out local plc or a myplc peer. We will assume it 
148         # is a local site, unless we find out otherwise  
149         peer = None
150
151         # get this slice's authority (site)
152         slice_authority = get_authority(hrn)
153
154         # get this site's authority (sfa root authority or sub authority)
155         site_authority = get_authority(slice_authority).lower()
156
157         # check if we are already peered with this site_authority, if so
158         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
159         for peer_record in peers:
160             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
161             if site_authority in names:
162                 peer = peer_record['shortname']
163
164         return peer
165
166     def get_sfa_peer(self, hrn):
167         # return the authority for this hrn or None if we are the authority
168         sfa_peer = None
169         slice_authority = get_authority(hrn)
170         site_authority = get_authority(slice_authority)
171
172         if site_authority != self.api.hrn:
173             sfa_peer = site_authority
174
175         return sfa_peer 
176
177     def refresh(self):
178         """
179         Update the cached list of slices
180         """
181         # Reload components list
182         now = datetime.datetime.now()
183         if not self.has_key('threshold') or not self.has_key('timestamp') or \
184            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
185             if self.api.interface in ['aggregate']:
186                 self.refresh_slices_aggregate()
187             elif self.api.interface in ['slicemgr']:
188                 self.refresh_slices_smgr()
189
190     def refresh_slices_aggregate(self):
191         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
192         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
193
194          # update timestamp and threshold
195         timestamp = datetime.datetime.now()
196         hr_timestamp = timestamp.strftime(self.api.time_format)
197         delta = datetime.timedelta(hours=self.ttl)
198         threshold = timestamp + delta
199         hr_threshold = threshold.strftime(self.api.time_format)
200         
201         slice_details = {'hrn': slice_hrns,
202                          'timestamp': hr_timestamp,
203                          'threshold': hr_threshold
204                         }
205         self.update(slice_details)
206         self.write()     
207         
208
209     def refresh_slices_smgr(self):
210         slice_hrns = []
211         aggregates = Aggregates(self.api)
212         credential = self.api.getCredential()
213         for aggregate in aggregates:
214             success = False
215             # request hash is optional so lets try the call without it 
216             try:
217                 request_hash=None
218                 slices = aggregates[aggregate].get_slices(credential, request_hash)
219                 slice_hrns.extend(slices)
220                 success = True
221             except:
222                 print >> log, "%s" % (traceback.format_exc())
223                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
224
225             # try sending the request hash if the previous call failed 
226             if not success:
227                 arg_list = [credential]
228                 request_hash = self.api.key.compute_hash(arg_list)
229                 try:
230                     slices = aggregates[aggregate].get_slices(credential, request_hash)
231                     slice_hrns.extend(slices)
232                     success = True
233                 except:
234                     print >> log, "%s" % (traceback.format_exc())
235                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
236
237         # update timestamp and threshold
238         timestamp = datetime.datetime.now()
239         hr_timestamp = timestamp.strftime(self.api.time_format)
240         delta = datetime.timedelta(hours=self.ttl)
241         threshold = timestamp + delta
242         hr_threshold = threshold.strftime(self.api.time_format)
243
244         slice_details = {'hrn': slice_hrns,
245                          'timestamp': hr_timestamp,
246                          'threshold': hr_threshold
247                         }
248         self.update(slice_details)
249         self.write()
250
251
252     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
253         authority = get_authority(slice_hrn)
254         try:
255             site_records = registry.resolve(credential, authority)
256         except:
257             arg_list = [credential, authority]
258             request_hash = self.api.key.compute_hash(arg_list)
259             site_records = registry.resolve(credential, authority, request_hash)
260             
261         site = {}
262         for site_record in site_records:
263             if site_record['type'] == 'authority':
264                 site = site_record
265         if not site:
266             raise RecordNotFound(authority)
267         remote_site_id = site.pop('site_id')    
268                 
269         login_base = get_leaf(authority)
270         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
271         if not sites:
272             site_id = self.api.plshell.AddSite(self.api.plauth, site)
273             if peer:
274                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
275             # mark this site as an sfa peer record
276             if sfa_peer:
277                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
278                 try:
279                     registry.register_peer_object(credential, peer_dict)
280                 except:
281                     arg_list = [credential]
282                     request_hash = self.api.key.compute_hash(arg_list) 
283                     registry.register_peer_object(credential, peer_dict, request_hash)
284         else:
285             site_id = sites[0]['site_id']
286             remote_site_id = sites[0]['peer_site_id']
287
288
289         return (site_id, remote_site_id) 
290
291     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
292         slice = {}
293         slice_record = None
294         authority = get_authority(slice_hrn)
295         try:
296             slice_records = registry.resolve(credential, slice_hrn)
297         except:    
298             arg_list = [credential, slice_hrn]
299             request_hash = self.api.key.compute_hash(arg_list)
300             slice_records = registry.resolve(credential, slice_hrn, request_hash)
301
302         for record in slice_records:
303             if record['type'] in ['slice']:
304                 slice_record = record
305         if not slice_record:
306             raise RecordNotFound(hrn)
307         slicename = hrn_to_pl_slicename(slice_hrn)
308         parts = slicename.split("_")
309         login_base = parts[0]
310         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) 
311         if not slices:
312             slice_fields = {}
313             slice_keys = ['name', 'url', 'description']
314             for key in slice_keys:
315                 if key in slice_record and slice_record[key]:
316                     slice_fields[key] = slice_record[key]
317
318             # add the slice  
319             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
320             slice = slice_fields
321             slice['slice_id'] = slice_id
322
323             # mark this slice as an sfa peer record
324             if sfa_peer:
325                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
326                 try:
327                     registry.register_peer_object(credential, peer_dict)
328                 except:
329                     arg_list = [credential]
330                     request_hash = self.api.key.compute_hash(arg_list) 
331                     registry.register_peer_object(credential, peer_dict, request_hash)
332
333             #this belongs to a peer
334             if peer:
335                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
336             slice['node_ids'] = []
337         else:
338             slice = slices[0]
339             slice_id = slice['slice_id']
340             site_id = slice['site_id']
341             #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
342             self.sync_slice(slice, slice_record, peer)
343
344         slice['peer_slice_id'] = slice_record['pointer']
345         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
346     
347         return slice        
348
349     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
350         # get the list of valid slice users from the registry and make 
351         # sure they are added to the slice 
352         slicename = hrn_to_pl_slicename(slice_record['hrn'])
353         researchers = slice_record.get('researcher', [])
354         for researcher in researchers:
355             person_record = {}
356             try:
357                 person_records = registry.resolve(credential, researcher)
358             except:
359                 arg_list = [credential, researcher]
360                 request_hash = self.api.key.compute_hash(arg_list) 
361                 person_records = registry.resolve(credential, researcher, request_hash)
362             for record in person_records:
363                 if record['type'] in ['user']:
364                     person_record = record
365             if not person_record:
366                 pass
367             person_dict = person_record
368             local_person=False
369             if peer:
370                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
371                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
372                 if not persons:
373                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
374                     if persons:
375                        local_person=True
376
377             else:
378                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
379         
380             if not persons:
381                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
382                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
383                 
384                 # mark this person as an sfa peer record
385                 if sfa_peer:
386                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
387                     try:
388                         registry.register_peer_object(credential, peer_dict)
389                     except:
390                         arg_list = [credential]
391                         request_hash = self.api.key.compute_hash(arg_list) 
392                         registry.register_peer_object(credential, peer_dict, request_hash)
393
394                 if peer:
395                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
396                 key_ids = []
397             else:
398                 person_id = persons[0]['person_id']
399                 key_ids = persons[0]['key_ids']
400
401
402             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
403             # an error
404             if peer:
405                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
406                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
407
408             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
409             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
410             if peer and not local_person:
411                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
412             if peer:
413                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
414             
415             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
416
417     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
418         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
419         keys = [key['key'] for key in keylist]
420         
421         #add keys that arent already there
422         key_ids = person_dict['key_ids']
423         for personkey in person_dict['keys']:
424             if personkey not in keys:
425                 key = {'key_type': 'ssh', 'key': personkey}
426                 if peer:
427                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
428                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
429                 if peer and not local_person:
430                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
431                 if peer:
432                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
433
434                     except: pass   
435
436     def create_slice_aggregate(self, hrn, rspec):
437
438         # Determine if this is a peer slice
439         peer = self.get_peer(hrn)
440         sfa_peer = self.get_sfa_peer(hrn)
441
442         spec = RSpec(rspec)
443         # Get the slice record from sfa
444         slicename = hrn_to_pl_slicename(hrn) 
445         slice = {}
446         slice_record = None
447         registries = Registries(self.api)
448         registry = registries[self.api.hrn]
449         credential = self.api.getCredential()
450
451         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
452         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
453
454         # find out where this slice is currently running
455         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
456         hostnames = [node['hostname'] for node in nodelist]
457
458         # get netspec details
459         nodespecs = spec.getDictsByTagName('NodeSpec')
460
461         # dict in which to store slice attributes to set for the nodes
462         nodes = {}
463         for nodespec in nodespecs:
464             if isinstance(nodespec['name'], list):
465                 for nodename in nodespec['name']:
466                     nodes[nodename] = {}
467                     for k in nodespec.keys():
468                         rspec_attribute_value = nodespec[k]
469                         if (self.rspec_to_slice_tag.has_key(k)):
470                             slice_tag_name = self.rspec_to_slice_tag[k]
471                             nodes[nodename][slice_tag_name] = rspec_attribute_value
472             elif isinstance(nodespec['name'], StringTypes):
473                 nodename = nodespec['name']
474                 nodes[nodename] = {}
475                 for k in nodespec.keys():
476                     rspec_attribute_value = nodespec[k]
477                     if (self.rspec_to_slice_tag.has_key(k)):
478                         slice_tag_name = self.rspec_to_slice_tag[k]
479                         nodes[nodename][slice_tag_name] = rspec_attribute_value
480
481                 for k in nodespec.keys():
482                     rspec_attribute_value = nodespec[k]
483                     if (self.rspec_to_slice_tag.has_key(k)):
484                         slice_tag_name = self.rspec_to_slice_tag[k]
485                         nodes[nodename][slice_tag_name] = rspec_attribute_value
486
487         node_names = nodes.keys()
488         # remove nodes not in rspec
489         deleted_nodes = list(set(hostnames).difference(node_names))
490         # add nodes from rspec
491         added_nodes = list(set(node_names).difference(hostnames))
492
493         if peer:
494             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
495
496         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
497
498         # Add recognized slice tags
499         for node_name in node_names:
500             node = nodes[node_name]
501             for slice_tag in node.keys():
502                 value = node[slice_tag]
503                 if (isinstance(value, list)):
504                     value = value[0]
505
506                 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
507
508         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
509         if peer:
510             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
511
512         return 1
513
514     def sync_slice(self, old_record, new_record, peer):
515         if old_record['expires'] != new_record['expires']:
516             if peer:
517                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
518                 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
519             if peer:
520                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])
521         return 1