taking out debug statements from the previous commit
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, origin_hrn=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.origin_hrn = origin_hrn
40
41     def get_slivers(self, xrn, node=None):
42         hrn, type = urn_to_hrn(xrn)
43          
44         slice_name = hrn_to_pl_slicename(hrn)
45         # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46         # of doing all of this?
47         #return self.api.GetSliceTicket(self.auth, slice_name) 
48         
49         # from PLCAPI.GetSlivers.get_slivers()
50         slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51         slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52         # Build up list of users and slice attributes
53         person_ids = set()
54         all_slice_tag_ids = set()
55         for slice in slices:
56             person_ids.update(slice['person_ids'])
57             all_slice_tag_ids.update(slice['slice_tag_ids'])
58         person_ids = list(person_ids)
59         all_slice_tag_ids = list(all_slice_tag_ids)
60         # Get user information
61         all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
62         all_persons = {}
63         for person in all_persons_list:
64             all_persons[person['person_id']] = person        
65
66         # Build up list of keys
67         key_ids = set()
68         for person in all_persons.values():
69             key_ids.update(person['key_ids'])
70         key_ids = list(key_ids)
71         # Get user account keys
72         all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
73         all_keys = {}
74         for key in all_keys_list:
75             all_keys[key['key_id']] = key
76         # Get slice attributes
77         all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
78         all_slice_tags = {}
79         for slice_tag in all_slice_tags_list:
80             all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
81            
82         slivers = []
83         for slice in slices:
84             keys = []
85             for person_id in slice['person_ids']:
86                 if person_id in all_persons:
87                     person = all_persons[person_id]
88                     if not person['enabled']:
89                         continue
90                     for key_id in person['key_ids']:
91                         if key_id in all_keys:
92                             key = all_keys[key_id]
93                             keys += [{'key_type': key['key_type'],
94                                     'key': key['key']}]
95             attributes = []
96             # All (per-node and global) attributes for this slice
97             slice_tags = []
98             for slice_tag_id in slice['slice_tag_ids']:
99                 if slice_tag_id in all_slice_tags:
100                     slice_tags.append(all_slice_tags[slice_tag_id]) 
101             # Per-node sliver attributes take precedence over global
102             # slice attributes, so set them first.
103             # Then comes nodegroup slice attributes
104             # Followed by global slice attributes
105             sliver_attributes = []
106
107             if node is not None:
108                 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109                     sliver_attributes.append(sliver_attribute['tagname'])
110                     attributes.append({'tagname': sliver_attribute['tagname'],
111                                     'value': sliver_attribute['value']})
112
113             # set nodegroup slice attributes
114             for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115                 # Do not set any nodegroup slice attributes for
116                 # which there is at least one sliver attribute
117                 # already set.
118                 if slice_tag not in slice_tags:
119                     attributes.append({'tagname': slice_tag['tagname'],
120                         'value': slice_tag['value']})
121
122             for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123                 # Do not set any global slice attributes for
124                 # which there is at least one sliver attribute
125                 # already set.
126                 if slice_tag['tagname'] not in sliver_attributes:
127                     attributes.append({'tagname': slice_tag['tagname'],
128                                    'value': slice_tag['value']})
129
130             # XXX Sanity check; though technically this should be a system invariant
131             # checked with an assertion
132             if slice['expires'] > MAXINT:  slice['expires']= MAXINT
133             
134             slivers.append({
135                 'hrn': hrn,
136                 'name': slice['name'],
137                 'slice_id': slice['slice_id'],
138                 'instantiation': slice['instantiation'],
139                 'expires': slice['expires'],
140                 'keys': keys,
141                 'attributes': attributes
142             })
143
144         return slivers
145  
146     def get_peer(self, xrn):
147         hrn, type = urn_to_hrn(xrn)
148         # Becaues of myplc federation,  we first need to determine if this
149         # slice belongs to out local plc or a myplc peer. We will assume it 
150         # is a local site, unless we find out otherwise  
151         peer = None
152
153         # get this slice's authority (site)
154         slice_authority = get_authority(hrn)
155
156         # get this site's authority (sfa root authority or sub authority)
157         site_authority = get_authority(slice_authority).lower()
158
159         # check if we are already peered with this site_authority, if so
160         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161         for peer_record in peers:
162             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163             if site_authority in names:
164                 peer = peer_record['shortname']
165
166         return peer
167
168     def get_sfa_peer(self, xrn):
169         hrn, type = urn_to_hrn(xrn)
170
171         # return the authority for this hrn or None if we are the authority
172         sfa_peer = None
173         slice_authority = get_authority(hrn)
174         site_authority = get_authority(slice_authority)
175
176         if site_authority != self.api.hrn:
177             sfa_peer = site_authority
178
179         return sfa_peer 
180
181     def refresh(self):
182         """
183         Update the cached list of slices
184         """
185         # Reload components list
186         now = datetime.datetime.now()
187         if not self.has_key('threshold') or not self.has_key('timestamp') or \
188            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189             if self.api.interface in ['aggregate']:
190                 self.refresh_slices_aggregate()
191             elif self.api.interface in ['slicemgr']:
192                 self.refresh_slices_smgr()
193
194     def refresh_slices_aggregate(self):
195         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
197
198          # update timestamp and threshold
199         timestamp = datetime.datetime.now()
200         hr_timestamp = timestamp.strftime(self.api.time_format)
201         delta = datetime.timedelta(hours=self.ttl)
202         threshold = timestamp + delta
203         hr_threshold = threshold.strftime(self.api.time_format)
204         
205         slice_details = {'hrn': slice_hrns,
206                          'timestamp': hr_timestamp,
207                          'threshold': hr_threshold
208                         }
209         self.update(slice_details)
210         self.write()     
211         
212
213     def refresh_slices_smgr(self):
214         slice_hrns = []
215         aggregates = Aggregates(self.api)
216         credential = self.api.getCredential()
217         for aggregate in aggregates:
218             success = False
219             # request hash is optional so lets try the call without it 
220             try:
221                 slices = aggregates[aggregate].get_slices(credential)
222                 slice_hrns.extend(slices)
223                 success = True
224             except:
225                 print >> log, "%s" % (traceback.format_exc())
226                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
227
228             # try sending the request hash if the previous call failed 
229             if not success:
230                 arg_list = [credential]
231                 try:
232                     slices = aggregates[aggregate].get_slices(credential)
233                     slice_hrns.extend(slices)
234                     success = True
235                 except:
236                     print >> log, "%s" % (traceback.format_exc())
237                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
238
239         # update timestamp and threshold
240         timestamp = datetime.datetime.now()
241         hr_timestamp = timestamp.strftime(self.api.time_format)
242         delta = datetime.timedelta(hours=self.ttl)
243         threshold = timestamp + delta
244         hr_threshold = threshold.strftime(self.api.time_format)
245
246         slice_details = {'hrn': slice_hrns,
247                          'timestamp': hr_timestamp,
248                          'threshold': hr_threshold
249                         }
250         self.update(slice_details)
251         self.write()
252
253
254     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
255         authority = get_authority(slice_hrn)
256         authority_urn = hrn_to_urn(authority, 'authority')
257         site_records = registry.resolve(credential, authority_urn)
258             
259         site = {}
260         for site_record in site_records:
261             if site_record['type'] == 'authority':
262                 site = site_record
263         if not site:
264             raise RecordNotFound(authority)
265         remote_site_id = site.pop('site_id')    
266                 
267         login_base = get_leaf(authority)
268         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
269         if not sites:
270             site_id = self.api.plshell.AddSite(self.api.plauth, site)
271             if peer:
272                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
273             # mark this site as an sfa peer record
274             if sfa_peer:
275                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
276                 registry.register_peer_object(credential, peer_dict)
277         else:
278             site_id = sites[0]['site_id']
279             remote_site_id = sites[0]['peer_site_id']
280             old_site = sites[0]
281             #the site is alredy on the remote agg. Let us update(e.g. max_slices field) it with the latest info.
282             self.sync_site(old_site, site, peer)
283
284
285         return (site_id, remote_site_id) 
286
287     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
288         slice = {}
289         slice_record = None
290         authority = get_authority(slice_hrn)
291         slice_records = registry.resolve(credential, slice_hrn)
292
293         for record in slice_records:
294             if record['type'] in ['slice']:
295                 slice_record = record
296         if not slice_record:
297             raise RecordNotFound(hrn)
298         slicename = hrn_to_pl_slicename(slice_hrn)
299         parts = slicename.split("_")
300         login_base = parts[0]
301         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) 
302         if not slices:
303             slice_fields = {}
304             slice_keys = ['name', 'url', 'description']
305             for key in slice_keys:
306                 if key in slice_record and slice_record[key]:
307                     slice_fields[key] = slice_record[key]
308
309             # add the slice  
310             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
311             slice = slice_fields
312             slice['slice_id'] = slice_id
313
314             # mark this slice as an sfa peer record
315             if sfa_peer:
316                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
317                 registry.register_peer_object(credential, peer_dict)
318
319             #this belongs to a peer
320             if peer:
321                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
322             slice['node_ids'] = []
323         else:
324             slice = slices[0]
325             slice_id = slice['slice_id']
326             site_id = slice['site_id']
327             #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
328             self.sync_slice(slice, slice_record, peer)
329
330         slice['peer_slice_id'] = slice_record['pointer']
331         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
332     
333         return slice        
334
335     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
336         # get the list of valid slice users from the registry and make 
337         # sure they are added to the slice 
338         slicename = hrn_to_pl_slicename(slice_record['hrn'])
339         researchers = slice_record.get('researcher', [])
340         for researcher in researchers:
341             person_record = {}
342             person_records = registry.resolve(credential, researcher)
343             for record in person_records:
344                 if record['type'] in ['user']:
345                     person_record = record
346             if not person_record:
347                 pass
348             person_dict = person_record
349             local_person=False
350             if peer:
351                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
352                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
353                 if not persons:
354                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
355                     if persons:
356                        local_person=True
357
358             else:
359                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
360         
361             if not persons:
362                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
363                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
364                 
365                 # mark this person as an sfa peer record
366                 if sfa_peer:
367                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
368                     registry.register_peer_object(credential, peer_dict)
369
370                 if peer:
371                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
372                 key_ids = []
373             else:
374                 person_id = persons[0]['person_id']
375                 key_ids = persons[0]['key_ids']
376
377
378             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
379             # an error
380             if peer:
381                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
382                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
383
384             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
385             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
386             if peer and not local_person:
387                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
388             if peer:
389                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
390             
391             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
392
393     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
394         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
395         keys = [key['key'] for key in keylist]
396         
397         #add keys that arent already there
398         key_ids = person_dict['key_ids']
399         for personkey in person_dict['keys']:
400             if personkey not in keys:
401                 key = {'key_type': 'ssh', 'key': personkey}
402                 if peer:
403                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
404                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
405                 if peer and not local_person:
406                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
407                 if peer:
408                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
409
410                     except: pass   
411
412     def create_slice_aggregate(self, xrn, rspec):
413         hrn, type = urn_to_hrn(xrn)
414         # Determine if this is a peer slice
415         peer = self.get_peer(hrn)
416         sfa_peer = self.get_sfa_peer(hrn)
417
418         spec = RSpec(rspec)
419         # Get the slice record from sfa
420         slicename = hrn_to_pl_slicename(hrn) 
421         slice = {}
422         slice_record = None
423         registries = Registries(self.api)
424         registry = registries[self.api.hrn]
425         credential = self.api.getCredential()
426
427         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
428         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
429
430         # find out where this slice is currently running
431         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
432         hostnames = [node['hostname'] for node in nodelist]
433
434         # get netspec details
435         nodespecs = spec.getDictsByTagName('NodeSpec')
436
437         # dict in which to store slice attributes to set for the nodes
438         nodes = {}
439         for nodespec in nodespecs:
440             if isinstance(nodespec['name'], list):
441                 for nodename in nodespec['name']:
442                     nodes[nodename] = {}
443                     for k in nodespec.keys():
444                         rspec_attribute_value = nodespec[k]
445                         if (self.rspec_to_slice_tag.has_key(k)):
446                             slice_tag_name = self.rspec_to_slice_tag[k]
447                             nodes[nodename][slice_tag_name] = rspec_attribute_value
448             elif isinstance(nodespec['name'], StringTypes):
449                 nodename = nodespec['name']
450                 nodes[nodename] = {}
451                 for k in nodespec.keys():
452                     rspec_attribute_value = nodespec[k]
453                     if (self.rspec_to_slice_tag.has_key(k)):
454                         slice_tag_name = self.rspec_to_slice_tag[k]
455                         nodes[nodename][slice_tag_name] = rspec_attribute_value
456
457                 for k in nodespec.keys():
458                     rspec_attribute_value = nodespec[k]
459                     if (self.rspec_to_slice_tag.has_key(k)):
460                         slice_tag_name = self.rspec_to_slice_tag[k]
461                         nodes[nodename][slice_tag_name] = rspec_attribute_value
462
463         node_names = nodes.keys()
464         # remove nodes not in rspec
465         deleted_nodes = list(set(hostnames).difference(node_names))
466         # add nodes from rspec
467         added_nodes = list(set(node_names).difference(hostnames))
468
469         if peer:
470             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
471
472         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
473
474         # Add recognized slice tags
475         for node_name in node_names:
476             node = nodes[node_name]
477             for slice_tag in node.keys():
478                 value = node[slice_tag]
479                 if (isinstance(value, list)):
480                     value = value[0]
481
482                 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
483
484         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
485         if peer:
486             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
487
488         return 1
489
490     def sync_site(self, old_record, new_record, peer):
491         if old_record['max_slices'] != new_record['max_slices']:
492             if peer:
493                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', old_record['site_id'], peer)
494                 self.api.plshell.UpdateSite(self.api.plauth, old_record['site_id'], {'max_slices' : new_record['max_slices']})
495             if peer:
496                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', old_record['site_id'], peer, old_record['peer_site_id'])
497         return 1
498
499     def sync_slice(self, old_record, new_record, peer):
500         if old_record['expires'] != new_record['expires']:
501             if peer:
502                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
503                 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
504             if peer:
505                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])
506         return 1