if the site configuation (e.g. max_slices) is changed in the local aggregate, we...
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, origin_hrn=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.origin_hrn = origin_hrn
40
41     def get_slivers(self, xrn, node=None):
42         hrn, type = urn_to_hrn(xrn)
43          
44         slice_name = hrn_to_pl_slicename(hrn)
45         # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46         # of doing all of this?
47         #return self.api.GetSliceTicket(self.auth, slice_name) 
48         
49         # from PLCAPI.GetSlivers.get_slivers()
50         slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51         slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52         # Build up list of users and slice attributes
53         person_ids = set()
54         all_slice_tag_ids = set()
55         for slice in slices:
56             person_ids.update(slice['person_ids'])
57             all_slice_tag_ids.update(slice['slice_tag_ids'])
58         person_ids = list(person_ids)
59         all_slice_tag_ids = list(all_slice_tag_ids)
60         # Get user information
61         all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
62         all_persons = {}
63         for person in all_persons_list:
64             all_persons[person['person_id']] = person        
65
66         # Build up list of keys
67         key_ids = set()
68         for person in all_persons.values():
69             key_ids.update(person['key_ids'])
70         key_ids = list(key_ids)
71         # Get user account keys
72         all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
73         all_keys = {}
74         for key in all_keys_list:
75             all_keys[key['key_id']] = key
76         # Get slice attributes
77         all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
78         all_slice_tags = {}
79         for slice_tag in all_slice_tags_list:
80             all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
81            
82         slivers = []
83         for slice in slices:
84             keys = []
85             for person_id in slice['person_ids']:
86                 if person_id in all_persons:
87                     person = all_persons[person_id]
88                     if not person['enabled']:
89                         continue
90                     for key_id in person['key_ids']:
91                         if key_id in all_keys:
92                             key = all_keys[key_id]
93                             keys += [{'key_type': key['key_type'],
94                                     'key': key['key']}]
95             attributes = []
96             # All (per-node and global) attributes for this slice
97             slice_tags = []
98             for slice_tag_id in slice['slice_tag_ids']:
99                 if slice_tag_id in all_slice_tags:
100                     slice_tags.append(all_slice_tags[slice_tag_id]) 
101             # Per-node sliver attributes take precedence over global
102             # slice attributes, so set them first.
103             # Then comes nodegroup slice attributes
104             # Followed by global slice attributes
105             sliver_attributes = []
106
107             if node is not None:
108                 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109                     sliver_attributes.append(sliver_attribute['tagname'])
110                     attributes.append({'tagname': sliver_attribute['tagname'],
111                                     'value': sliver_attribute['value']})
112
113             # set nodegroup slice attributes
114             for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115                 # Do not set any nodegroup slice attributes for
116                 # which there is at least one sliver attribute
117                 # already set.
118                 if slice_tag not in slice_tags:
119                     attributes.append({'tagname': slice_tag['tagname'],
120                         'value': slice_tag['value']})
121
122             for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123                 # Do not set any global slice attributes for
124                 # which there is at least one sliver attribute
125                 # already set.
126                 if slice_tag['tagname'] not in sliver_attributes:
127                     attributes.append({'tagname': slice_tag['tagname'],
128                                    'value': slice_tag['value']})
129
130             # XXX Sanity check; though technically this should be a system invariant
131             # checked with an assertion
132             if slice['expires'] > MAXINT:  slice['expires']= MAXINT
133             
134             slivers.append({
135                 'hrn': hrn,
136                 'name': slice['name'],
137                 'slice_id': slice['slice_id'],
138                 'instantiation': slice['instantiation'],
139                 'expires': slice['expires'],
140                 'keys': keys,
141                 'attributes': attributes
142             })
143
144         return slivers
145  
146     def get_peer(self, xrn):
147         hrn, type = urn_to_hrn(xrn)
148         # Becaues of myplc federation,  we first need to determine if this
149         # slice belongs to out local plc or a myplc peer. We will assume it 
150         # is a local site, unless we find out otherwise  
151         peer = None
152
153         # get this slice's authority (site)
154         slice_authority = get_authority(hrn)
155
156         # get this site's authority (sfa root authority or sub authority)
157         site_authority = get_authority(slice_authority).lower()
158
159         # check if we are already peered with this site_authority, if so
160         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161         for peer_record in peers:
162             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163             if site_authority in names:
164                 peer = peer_record['shortname']
165
166         return peer
167
168     def get_sfa_peer(self, xrn):
169         hrn, type = urn_to_hrn(xrn)
170
171         # return the authority for this hrn or None if we are the authority
172         sfa_peer = None
173         slice_authority = get_authority(hrn)
174         site_authority = get_authority(slice_authority)
175
176         if site_authority != self.api.hrn:
177             sfa_peer = site_authority
178
179         return sfa_peer 
180
181     def refresh(self):
182         """
183         Update the cached list of slices
184         """
185         # Reload components list
186         now = datetime.datetime.now()
187         if not self.has_key('threshold') or not self.has_key('timestamp') or \
188            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189             if self.api.interface in ['aggregate']:
190                 self.refresh_slices_aggregate()
191             elif self.api.interface in ['slicemgr']:
192                 self.refresh_slices_smgr()
193
194     def refresh_slices_aggregate(self):
195         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
197
198          # update timestamp and threshold
199         timestamp = datetime.datetime.now()
200         hr_timestamp = timestamp.strftime(self.api.time_format)
201         delta = datetime.timedelta(hours=self.ttl)
202         threshold = timestamp + delta
203         hr_threshold = threshold.strftime(self.api.time_format)
204         
205         slice_details = {'hrn': slice_hrns,
206                          'timestamp': hr_timestamp,
207                          'threshold': hr_threshold
208                         }
209         self.update(slice_details)
210         self.write()     
211         
212
213     def refresh_slices_smgr(self):
214         slice_hrns = []
215         aggregates = Aggregates(self.api)
216         credential = self.api.getCredential()
217         for aggregate in aggregates:
218             success = False
219             # request hash is optional so lets try the call without it 
220             try:
221                 slices = aggregates[aggregate].get_slices(credential)
222                 slice_hrns.extend(slices)
223                 success = True
224             except:
225                 print >> log, "%s" % (traceback.format_exc())
226                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
227
228             # try sending the request hash if the previous call failed 
229             if not success:
230                 arg_list = [credential]
231                 try:
232                     slices = aggregates[aggregate].get_slices(credential)
233                     slice_hrns.extend(slices)
234                     success = True
235                 except:
236                     print >> log, "%s" % (traceback.format_exc())
237                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
238
239         # update timestamp and threshold
240         timestamp = datetime.datetime.now()
241         hr_timestamp = timestamp.strftime(self.api.time_format)
242         delta = datetime.timedelta(hours=self.ttl)
243         threshold = timestamp + delta
244         hr_threshold = threshold.strftime(self.api.time_format)
245
246         slice_details = {'hrn': slice_hrns,
247                          'timestamp': hr_timestamp,
248                          'threshold': hr_threshold
249                         }
250         self.update(slice_details)
251         self.write()
252
253
254     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
255         import pdb
256         pdb.set_trace()
257         authority = get_authority(slice_hrn)
258         authority_urn = hrn_to_urn(authority, 'authority')
259         site_records = registry.resolve(credential, authority_urn)
260             
261         site = {}
262         for site_record in site_records:
263             if site_record['type'] == 'authority':
264                 site = site_record
265         if not site:
266             raise RecordNotFound(authority)
267         remote_site_id = site.pop('site_id')    
268                 
269         login_base = get_leaf(authority)
270         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
271         if not sites:
272             site_id = self.api.plshell.AddSite(self.api.plauth, site)
273             if peer:
274                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
275             # mark this site as an sfa peer record
276             if sfa_peer:
277                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
278                 registry.register_peer_object(credential, peer_dict)
279         else:
280             site_id = sites[0]['site_id']
281             remote_site_id = sites[0]['peer_site_id']
282             old_site = sites[0]
283             #the site is alredy on the remote agg. Let us update(e.g. max_slices field) it with the latest info.
284             self.sync_site(old_site, site, peer)
285
286
287         return (site_id, remote_site_id) 
288
289     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
290         slice = {}
291         slice_record = None
292         authority = get_authority(slice_hrn)
293         slice_records = registry.resolve(credential, slice_hrn)
294
295         for record in slice_records:
296             if record['type'] in ['slice']:
297                 slice_record = record
298         if not slice_record:
299             raise RecordNotFound(hrn)
300         slicename = hrn_to_pl_slicename(slice_hrn)
301         parts = slicename.split("_")
302         login_base = parts[0]
303         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) 
304         if not slices:
305             slice_fields = {}
306             slice_keys = ['name', 'url', 'description']
307             for key in slice_keys:
308                 if key in slice_record and slice_record[key]:
309                     slice_fields[key] = slice_record[key]
310
311             # add the slice  
312             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
313             slice = slice_fields
314             slice['slice_id'] = slice_id
315
316             # mark this slice as an sfa peer record
317             if sfa_peer:
318                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
319                 registry.register_peer_object(credential, peer_dict)
320
321             #this belongs to a peer
322             if peer:
323                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
324             slice['node_ids'] = []
325         else:
326             slice = slices[0]
327             slice_id = slice['slice_id']
328             site_id = slice['site_id']
329             #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
330             self.sync_slice(slice, slice_record, peer)
331
332         slice['peer_slice_id'] = slice_record['pointer']
333         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
334     
335         return slice        
336
337     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
338         # get the list of valid slice users from the registry and make 
339         # sure they are added to the slice 
340         slicename = hrn_to_pl_slicename(slice_record['hrn'])
341         researchers = slice_record.get('researcher', [])
342         for researcher in researchers:
343             person_record = {}
344             person_records = registry.resolve(credential, researcher)
345             for record in person_records:
346                 if record['type'] in ['user']:
347                     person_record = record
348             if not person_record:
349                 pass
350             person_dict = person_record
351             local_person=False
352             if peer:
353                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
354                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
355                 if not persons:
356                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
357                     if persons:
358                        local_person=True
359
360             else:
361                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
362         
363             if not persons:
364                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
365                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
366                 
367                 # mark this person as an sfa peer record
368                 if sfa_peer:
369                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
370                     registry.register_peer_object(credential, peer_dict)
371
372                 if peer:
373                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
374                 key_ids = []
375             else:
376                 person_id = persons[0]['person_id']
377                 key_ids = persons[0]['key_ids']
378
379
380             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
381             # an error
382             if peer:
383                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
384                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
385
386             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
387             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
388             if peer and not local_person:
389                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
390             if peer:
391                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
392             
393             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
394
395     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
396         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
397         keys = [key['key'] for key in keylist]
398         
399         #add keys that arent already there
400         key_ids = person_dict['key_ids']
401         for personkey in person_dict['keys']:
402             if personkey not in keys:
403                 key = {'key_type': 'ssh', 'key': personkey}
404                 if peer:
405                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
406                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
407                 if peer and not local_person:
408                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
409                 if peer:
410                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
411
412                     except: pass   
413
414     def create_slice_aggregate(self, xrn, rspec):
415         hrn, type = urn_to_hrn(xrn)
416         # Determine if this is a peer slice
417         peer = self.get_peer(hrn)
418         sfa_peer = self.get_sfa_peer(hrn)
419
420         spec = RSpec(rspec)
421         # Get the slice record from sfa
422         slicename = hrn_to_pl_slicename(hrn) 
423         slice = {}
424         slice_record = None
425         registries = Registries(self.api)
426         registry = registries[self.api.hrn]
427         credential = self.api.getCredential()
428
429         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
430         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
431
432         # find out where this slice is currently running
433         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
434         hostnames = [node['hostname'] for node in nodelist]
435
436         # get netspec details
437         nodespecs = spec.getDictsByTagName('NodeSpec')
438
439         # dict in which to store slice attributes to set for the nodes
440         nodes = {}
441         for nodespec in nodespecs:
442             if isinstance(nodespec['name'], list):
443                 for nodename in nodespec['name']:
444                     nodes[nodename] = {}
445                     for k in nodespec.keys():
446                         rspec_attribute_value = nodespec[k]
447                         if (self.rspec_to_slice_tag.has_key(k)):
448                             slice_tag_name = self.rspec_to_slice_tag[k]
449                             nodes[nodename][slice_tag_name] = rspec_attribute_value
450             elif isinstance(nodespec['name'], StringTypes):
451                 nodename = nodespec['name']
452                 nodes[nodename] = {}
453                 for k in nodespec.keys():
454                     rspec_attribute_value = nodespec[k]
455                     if (self.rspec_to_slice_tag.has_key(k)):
456                         slice_tag_name = self.rspec_to_slice_tag[k]
457                         nodes[nodename][slice_tag_name] = rspec_attribute_value
458
459                 for k in nodespec.keys():
460                     rspec_attribute_value = nodespec[k]
461                     if (self.rspec_to_slice_tag.has_key(k)):
462                         slice_tag_name = self.rspec_to_slice_tag[k]
463                         nodes[nodename][slice_tag_name] = rspec_attribute_value
464
465         node_names = nodes.keys()
466         # remove nodes not in rspec
467         deleted_nodes = list(set(hostnames).difference(node_names))
468         # add nodes from rspec
469         added_nodes = list(set(node_names).difference(hostnames))
470
471         if peer:
472             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
473
474         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
475
476         # Add recognized slice tags
477         for node_name in node_names:
478             node = nodes[node_name]
479             for slice_tag in node.keys():
480                 value = node[slice_tag]
481                 if (isinstance(value, list)):
482                     value = value[0]
483
484                 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
485
486         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
487         if peer:
488             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
489
490         return 1
491
492     def sync_site(self, old_record, new_record, peer):
493         if old_record['max_slices'] != new_record['max_slices']:
494             if peer:
495                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', old_record['site_id'], peer)
496                 self.api.plshell.UpdateSite(self.api.plauth, old_record['site_id'], {'max_slices' : new_record['max_slices']})
497             if peer:
498                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', old_record['site_id'], peer, old_record['peer_site_id'])
499         return 1
500
501     def sync_slice(self, old_record, new_record, peer):
502         if old_record['expires'] != new_record['expires']:
503             if peer:
504                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
505                 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
506             if peer:
507                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])
508         return 1