removed another bunch of references to geni
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, origin_hrn=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.origin_hrn = origin_hrn
40
41     def get_slivers(self, hrn, node=None):
42          
43         slice_name = hrn_to_pl_slicename(hrn)
44         # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
45         # of doing all of this?
46         #return self.api.GetSliceTicket(self.auth, slice_name) 
47         
48         # from PLCAPI.GetSlivers.get_slivers()
49         slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
50         slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
51         # Build up list of users and slice attributes
52         person_ids = set()
53         all_slice_tag_ids = set()
54         for slice in slices:
55             person_ids.update(slice['person_ids'])
56             all_slice_tag_ids.update(slice['slice_tag_ids'])
57         person_ids = list(person_ids)
58         all_slice_tag_ids = list(all_slice_tag_ids)
59         # Get user information
60         all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
61         all_persons = {}
62         for person in all_persons_list:
63             all_persons[person['person_id']] = person        
64
65         # Build up list of keys
66         key_ids = set()
67         for person in all_persons.values():
68             key_ids.update(person['key_ids'])
69         key_ids = list(key_ids)
70         # Get user account keys
71         all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
72         all_keys = {}
73         for key in all_keys_list:
74             all_keys[key['key_id']] = key
75         # Get slice attributes
76         all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
77         all_slice_tags = {}
78         for slice_tag in all_slice_tags_list:
79             all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
80            
81         slivers = []
82         for slice in slices:
83             keys = []
84             for person_id in slice['person_ids']:
85                 if person_id in all_persons:
86                     person = all_persons[person_id]
87                     if not person['enabled']:
88                         continue
89                     for key_id in person['key_ids']:
90                         if key_id in all_keys:
91                             key = all_keys[key_id]
92                             keys += [{'key_type': key['key_type'],
93                                     'key': key['key']}]
94             attributes = []
95             # All (per-node and global) attributes for this slice
96             slice_tags = []
97             for slice_tag_id in slice['slice_tag_ids']:
98                 if slice_tag_id in all_slice_tags:
99                     slice_tags.append(all_slice_tags[slice_tag_id]) 
100             # Per-node sliver attributes take precedence over global
101             # slice attributes, so set them first.
102             # Then comes nodegroup slice attributes
103             # Followed by global slice attributes
104             sliver_attributes = []
105
106             if node is not None:
107                 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
108                     sliver_attributes.append(sliver_attribute['tagname'])
109                     attributes.append({'tagname': sliver_attribute['tagname'],
110                                     'value': sliver_attribute['value']})
111
112             # set nodegroup slice attributes
113             for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
114                 # Do not set any nodegroup slice attributes for
115                 # which there is at least one sliver attribute
116                 # already set.
117                 if slice_tag not in slice_tags:
118                     attributes.append({'tagname': slice_tag['tagname'],
119                         'value': slice_tag['value']})
120
121             for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
122                 # Do not set any global slice attributes for
123                 # which there is at least one sliver attribute
124                 # already set.
125                 if slice_tag['tagname'] not in sliver_attributes:
126                     attributes.append({'tagname': slice_tag['tagname'],
127                                    'value': slice_tag['value']})
128
129             # XXX Sanity check; though technically this should be a system invariant
130             # checked with an assertion
131             if slice['expires'] > MAXINT:  slice['expires']= MAXINT
132             
133             slivers.append({
134                 'hrn': hrn,
135                 'name': slice['name'],
136                 'slice_id': slice['slice_id'],
137                 'instantiation': slice['instantiation'],
138                 'expires': slice['expires'],
139                 'keys': keys,
140                 'attributes': attributes
141             })
142
143         return slivers
144  
145     def get_peer(self, hrn):
146         # Becaues of myplc federation,  we first need to determine if this
147         # slice belongs to out local plc or a myplc peer. We will assume it 
148         # is a local site, unless we find out otherwise  
149         peer = None
150
151         # get this slice's authority (site)
152         slice_authority = get_authority(hrn)
153
154         # get this site's authority (sfa root authority or sub authority)
155         site_authority = get_authority(slice_authority).lower()
156
157         # check if we are already peered with this site_authority, if so
158         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
159         for peer_record in peers:
160             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
161             if site_authority in names:
162                 peer = peer_record['shortname']
163
164         return peer
165
166     def get_sfa_peer(self, hrn):
167         # return the authority for this hrn or None if we are the authority
168         sfa_peer = None
169         slice_authority = get_authority(hrn)
170         site_authority = get_authority(slice_authority)
171
172         if site_authority != self.api.hrn:
173             sfa_peer = site_authority
174
175         return sfa_peer 
176
177     def refresh(self):
178         """
179         Update the cached list of slices
180         """
181         # Reload components list
182         now = datetime.datetime.now()
183         if not self.has_key('threshold') or not self.has_key('timestamp') or \
184            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
185             if self.api.interface in ['aggregate']:
186                 self.refresh_slices_aggregate()
187             elif self.api.interface in ['slicemgr']:
188                 self.refresh_slices_smgr()
189
190     def refresh_slices_aggregate(self):
191         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
192         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
193
194          # update timestamp and threshold
195         timestamp = datetime.datetime.now()
196         hr_timestamp = timestamp.strftime(self.api.time_format)
197         delta = datetime.timedelta(hours=self.ttl)
198         threshold = timestamp + delta
199         hr_threshold = threshold.strftime(self.api.time_format)
200         
201         slice_details = {'hrn': slice_hrns,
202                          'timestamp': hr_timestamp,
203                          'threshold': hr_threshold
204                         }
205         self.update(slice_details)
206         self.write()     
207         
208
209     def refresh_slices_smgr(self):
210         slice_hrns = []
211         aggregates = Aggregates(self.api)
212         credential = self.api.getCredential()
213         for aggregate in aggregates:
214             success = False
215             # request hash is optional so lets try the call without it 
216             try:
217                 slices = aggregates[aggregate].get_slices(credential)
218                 slice_hrns.extend(slices)
219                 success = True
220             except:
221                 print >> log, "%s" % (traceback.format_exc())
222                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
223
224             # try sending the request hash if the previous call failed 
225             if not success:
226                 arg_list = [credential]
227                 try:
228                     slices = aggregates[aggregate].get_slices(credential)
229                     slice_hrns.extend(slices)
230                     success = True
231                 except:
232                     print >> log, "%s" % (traceback.format_exc())
233                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
234
235         # update timestamp and threshold
236         timestamp = datetime.datetime.now()
237         hr_timestamp = timestamp.strftime(self.api.time_format)
238         delta = datetime.timedelta(hours=self.ttl)
239         threshold = timestamp + delta
240         hr_threshold = threshold.strftime(self.api.time_format)
241
242         slice_details = {'hrn': slice_hrns,
243                          'timestamp': hr_timestamp,
244                          'threshold': hr_threshold
245                         }
246         self.update(slice_details)
247         self.write()
248
249
250     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
251         authority = get_authority(slice_hrn)
252         site_records = registry.resolve(credential, authority)
253             
254         site = {}
255         for site_record in site_records:
256             if site_record['type'] == 'authority':
257                 site = site_record
258         if not site:
259             raise RecordNotFound(authority)
260         remote_site_id = site.pop('site_id')    
261                 
262         login_base = get_leaf(authority)
263         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
264         if not sites:
265             site_id = self.api.plshell.AddSite(self.api.plauth, site)
266             if peer:
267                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
268             # mark this site as an sfa peer record
269             if sfa_peer:
270                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
271                 registry.register_peer_object(credential, peer_dict)
272         else:
273             site_id = sites[0]['site_id']
274             remote_site_id = sites[0]['peer_site_id']
275
276
277         return (site_id, remote_site_id) 
278
279     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
280         slice = {}
281         slice_record = None
282         authority = get_authority(slice_hrn)
283         slice_records = registry.resolve(credential, slice_hrn)
284
285         for record in slice_records:
286             if record['type'] in ['slice']:
287                 slice_record = record
288         if not slice_record:
289             raise RecordNotFound(hrn)
290         slicename = hrn_to_pl_slicename(slice_hrn)
291         parts = slicename.split("_")
292         login_base = parts[0]
293         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) 
294         if not slices:
295             slice_fields = {}
296             slice_keys = ['name', 'url', 'description']
297             for key in slice_keys:
298                 if key in slice_record and slice_record[key]:
299                     slice_fields[key] = slice_record[key]
300
301             # add the slice  
302             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
303             slice = slice_fields
304             slice['slice_id'] = slice_id
305
306             # mark this slice as an sfa peer record
307             if sfa_peer:
308                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
309                 registry.register_peer_object(credential, peer_dict)
310
311             #this belongs to a peer
312             if peer:
313                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
314             slice['node_ids'] = []
315         else:
316             slice = slices[0]
317             slice_id = slice['slice_id']
318             site_id = slice['site_id']
319             #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
320             self.sync_slice(slice, slice_record, peer)
321
322         slice['peer_slice_id'] = slice_record['pointer']
323         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
324     
325         return slice        
326
327     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
328         # get the list of valid slice users from the registry and make 
329         # sure they are added to the slice 
330         slicename = hrn_to_pl_slicename(slice_record['hrn'])
331         researchers = slice_record.get('researcher', [])
332         for researcher in researchers:
333             person_record = {}
334             person_records = registry.resolve(credential, researcher)
335             for record in person_records:
336                 if record['type'] in ['user']:
337                     person_record = record
338             if not person_record:
339                 pass
340             person_dict = person_record
341             local_person=False
342             if peer:
343                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
344                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
345                 if not persons:
346                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
347                     if persons:
348                        local_person=True
349
350             else:
351                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
352         
353             if not persons:
354                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
355                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
356                 
357                 # mark this person as an sfa peer record
358                 if sfa_peer:
359                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
360                     registry.register_peer_object(credential, peer_dict)
361
362                 if peer:
363                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
364                 key_ids = []
365             else:
366                 person_id = persons[0]['person_id']
367                 key_ids = persons[0]['key_ids']
368
369
370             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
371             # an error
372             if peer:
373                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
374                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
375
376             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
377             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
378             if peer and not local_person:
379                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
380             if peer:
381                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
382             
383             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
384
385     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
386         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
387         keys = [key['key'] for key in keylist]
388         
389         #add keys that arent already there
390         key_ids = person_dict['key_ids']
391         for personkey in person_dict['keys']:
392             if personkey not in keys:
393                 key = {'key_type': 'ssh', 'key': personkey}
394                 if peer:
395                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
396                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
397                 if peer and not local_person:
398                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
399                 if peer:
400                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
401
402                     except: pass   
403
404     def create_slice_aggregate(self, hrn, rspec):
405
406         # Determine if this is a peer slice
407         peer = self.get_peer(hrn)
408         sfa_peer = self.get_sfa_peer(hrn)
409
410         spec = RSpec(rspec)
411         # Get the slice record from sfa
412         slicename = hrn_to_pl_slicename(hrn) 
413         slice = {}
414         slice_record = None
415         registries = Registries(self.api)
416         registry = registries[self.api.hrn]
417         credential = self.api.getCredential()
418
419         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
420         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
421
422         # find out where this slice is currently running
423         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
424         hostnames = [node['hostname'] for node in nodelist]
425
426         # get netspec details
427         nodespecs = spec.getDictsByTagName('NodeSpec')
428
429         # dict in which to store slice attributes to set for the nodes
430         nodes = {}
431         for nodespec in nodespecs:
432             if isinstance(nodespec['name'], list):
433                 for nodename in nodespec['name']:
434                     nodes[nodename] = {}
435                     for k in nodespec.keys():
436                         rspec_attribute_value = nodespec[k]
437                         if (self.rspec_to_slice_tag.has_key(k)):
438                             slice_tag_name = self.rspec_to_slice_tag[k]
439                             nodes[nodename][slice_tag_name] = rspec_attribute_value
440             elif isinstance(nodespec['name'], StringTypes):
441                 nodename = nodespec['name']
442                 nodes[nodename] = {}
443                 for k in nodespec.keys():
444                     rspec_attribute_value = nodespec[k]
445                     if (self.rspec_to_slice_tag.has_key(k)):
446                         slice_tag_name = self.rspec_to_slice_tag[k]
447                         nodes[nodename][slice_tag_name] = rspec_attribute_value
448
449                 for k in nodespec.keys():
450                     rspec_attribute_value = nodespec[k]
451                     if (self.rspec_to_slice_tag.has_key(k)):
452                         slice_tag_name = self.rspec_to_slice_tag[k]
453                         nodes[nodename][slice_tag_name] = rspec_attribute_value
454
455         node_names = nodes.keys()
456         # remove nodes not in rspec
457         deleted_nodes = list(set(hostnames).difference(node_names))
458         # add nodes from rspec
459         added_nodes = list(set(node_names).difference(hostnames))
460
461         if peer:
462             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
463
464         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
465
466         # Add recognized slice tags
467         for node_name in node_names:
468             node = nodes[node_name]
469             for slice_tag in node.keys():
470                 value = node[slice_tag]
471                 if (isinstance(value, list)):
472                     value = value[0]
473
474                 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
475
476         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
477         if peer:
478             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
479
480         return 1
481
482     def sync_slice(self, old_record, new_record, peer):
483         if old_record['expires'] != new_record['expires']:
484             if peer:
485                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
486                 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
487             if peer:
488                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])
489         return 1