added support for urn name format. urn is the default name format used over the wire
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, origin_hrn=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.origin_hrn = origin_hrn
40
41     def get_slivers(self, xrn, node=None):
42         hrn, type = urn_to_hrn(xrn)
43          
44         slice_name = hrn_to_pl_slicename(hrn)
45         # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46         # of doing all of this?
47         #return self.api.GetSliceTicket(self.auth, slice_name) 
48         
49         # from PLCAPI.GetSlivers.get_slivers()
50         slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51         slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52         # Build up list of users and slice attributes
53         person_ids = set()
54         all_slice_tag_ids = set()
55         for slice in slices:
56             person_ids.update(slice['person_ids'])
57             all_slice_tag_ids.update(slice['slice_tag_ids'])
58         person_ids = list(person_ids)
59         all_slice_tag_ids = list(all_slice_tag_ids)
60         # Get user information
61         all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
62         all_persons = {}
63         for person in all_persons_list:
64             all_persons[person['person_id']] = person        
65
66         # Build up list of keys
67         key_ids = set()
68         for person in all_persons.values():
69             key_ids.update(person['key_ids'])
70         key_ids = list(key_ids)
71         # Get user account keys
72         all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
73         all_keys = {}
74         for key in all_keys_list:
75             all_keys[key['key_id']] = key
76         # Get slice attributes
77         all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
78         all_slice_tags = {}
79         for slice_tag in all_slice_tags_list:
80             all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
81            
82         slivers = []
83         for slice in slices:
84             keys = []
85             for person_id in slice['person_ids']:
86                 if person_id in all_persons:
87                     person = all_persons[person_id]
88                     if not person['enabled']:
89                         continue
90                     for key_id in person['key_ids']:
91                         if key_id in all_keys:
92                             key = all_keys[key_id]
93                             keys += [{'key_type': key['key_type'],
94                                     'key': key['key']}]
95             attributes = []
96             # All (per-node and global) attributes for this slice
97             slice_tags = []
98             for slice_tag_id in slice['slice_tag_ids']:
99                 if slice_tag_id in all_slice_tags:
100                     slice_tags.append(all_slice_tags[slice_tag_id]) 
101             # Per-node sliver attributes take precedence over global
102             # slice attributes, so set them first.
103             # Then comes nodegroup slice attributes
104             # Followed by global slice attributes
105             sliver_attributes = []
106
107             if node is not None:
108                 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109                     sliver_attributes.append(sliver_attribute['tagname'])
110                     attributes.append({'tagname': sliver_attribute['tagname'],
111                                     'value': sliver_attribute['value']})
112
113             # set nodegroup slice attributes
114             for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115                 # Do not set any nodegroup slice attributes for
116                 # which there is at least one sliver attribute
117                 # already set.
118                 if slice_tag not in slice_tags:
119                     attributes.append({'tagname': slice_tag['tagname'],
120                         'value': slice_tag['value']})
121
122             for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123                 # Do not set any global slice attributes for
124                 # which there is at least one sliver attribute
125                 # already set.
126                 if slice_tag['tagname'] not in sliver_attributes:
127                     attributes.append({'tagname': slice_tag['tagname'],
128                                    'value': slice_tag['value']})
129
130             # XXX Sanity check; though technically this should be a system invariant
131             # checked with an assertion
132             if slice['expires'] > MAXINT:  slice['expires']= MAXINT
133             
134             slivers.append({
135                 'hrn': hrn,
136                 'name': slice['name'],
137                 'slice_id': slice['slice_id'],
138                 'instantiation': slice['instantiation'],
139                 'expires': slice['expires'],
140                 'keys': keys,
141                 'attributes': attributes
142             })
143
144         return slivers
145  
146     def get_peer(self, xrn):
147         hrn, type = urn_to_hrn(xrn)
148         # Becaues of myplc federation,  we first need to determine if this
149         # slice belongs to out local plc or a myplc peer. We will assume it 
150         # is a local site, unless we find out otherwise  
151         peer = None
152
153         # get this slice's authority (site)
154         slice_authority = get_authority(hrn)
155
156         # get this site's authority (sfa root authority or sub authority)
157         site_authority = get_authority(slice_authority).lower()
158
159         # check if we are already peered with this site_authority, if so
160         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161         for peer_record in peers:
162             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163             if site_authority in names:
164                 peer = peer_record['shortname']
165
166         return peer
167
168     def get_sfa_peer(self, xrn):
169         hrn, type = urn_to_hrn(xrn)
170
171         # return the authority for this hrn or None if we are the authority
172         sfa_peer = None
173         slice_authority = get_authority(hrn)
174         site_authority = get_authority(slice_authority)
175
176         if site_authority != self.api.hrn:
177             sfa_peer = site_authority
178
179         return sfa_peer 
180
181     def refresh(self):
182         """
183         Update the cached list of slices
184         """
185         # Reload components list
186         now = datetime.datetime.now()
187         if not self.has_key('threshold') or not self.has_key('timestamp') or \
188            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189             if self.api.interface in ['aggregate']:
190                 self.refresh_slices_aggregate()
191             elif self.api.interface in ['slicemgr']:
192                 self.refresh_slices_smgr()
193
194     def refresh_slices_aggregate(self):
195         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
197
198          # update timestamp and threshold
199         timestamp = datetime.datetime.now()
200         hr_timestamp = timestamp.strftime(self.api.time_format)
201         delta = datetime.timedelta(hours=self.ttl)
202         threshold = timestamp + delta
203         hr_threshold = threshold.strftime(self.api.time_format)
204         
205         slice_details = {'hrn': slice_hrns,
206                          'timestamp': hr_timestamp,
207                          'threshold': hr_threshold
208                         }
209         self.update(slice_details)
210         self.write()     
211         
212
213     def refresh_slices_smgr(self):
214         slice_hrns = []
215         aggregates = Aggregates(self.api)
216         credential = self.api.getCredential()
217         for aggregate in aggregates:
218             success = False
219             # request hash is optional so lets try the call without it 
220             try:
221                 slices = aggregates[aggregate].get_slices(credential)
222                 slice_hrns.extend(slices)
223                 success = True
224             except:
225                 print >> log, "%s" % (traceback.format_exc())
226                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
227
228             # try sending the request hash if the previous call failed 
229             if not success:
230                 arg_list = [credential]
231                 try:
232                     slices = aggregates[aggregate].get_slices(credential)
233                     slice_hrns.extend(slices)
234                     success = True
235                 except:
236                     print >> log, "%s" % (traceback.format_exc())
237                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
238
239         # update timestamp and threshold
240         timestamp = datetime.datetime.now()
241         hr_timestamp = timestamp.strftime(self.api.time_format)
242         delta = datetime.timedelta(hours=self.ttl)
243         threshold = timestamp + delta
244         hr_threshold = threshold.strftime(self.api.time_format)
245
246         slice_details = {'hrn': slice_hrns,
247                          'timestamp': hr_timestamp,
248                          'threshold': hr_threshold
249                         }
250         self.update(slice_details)
251         self.write()
252
253
254     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
255         authority = get_authority(slice_hrn)
256         authority_urn = hrn_to_urn(authority, 'authority')
257         site_records = registry.resolve(credential, authority_urn)
258             
259         site = {}
260         for site_record in site_records:
261             if site_record['type'] == 'authority':
262                 site = site_record
263         if not site:
264             raise RecordNotFound(authority)
265         remote_site_id = site.pop('site_id')    
266                 
267         login_base = get_leaf(authority)
268         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
269         if not sites:
270             site_id = self.api.plshell.AddSite(self.api.plauth, site)
271             if peer:
272                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
273             # mark this site as an sfa peer record
274             if sfa_peer:
275                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
276                 registry.register_peer_object(credential, peer_dict)
277         else:
278             site_id = sites[0]['site_id']
279             remote_site_id = sites[0]['peer_site_id']
280
281
282         return (site_id, remote_site_id) 
283
284     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
285         slice = {}
286         slice_record = None
287         authority = get_authority(slice_hrn)
288         slice_records = registry.resolve(credential, slice_hrn)
289
290         for record in slice_records:
291             if record['type'] in ['slice']:
292                 slice_record = record
293         if not slice_record:
294             raise RecordNotFound(hrn)
295         slicename = hrn_to_pl_slicename(slice_hrn)
296         parts = slicename.split("_")
297         login_base = parts[0]
298         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) 
299         if not slices:
300             slice_fields = {}
301             slice_keys = ['name', 'url', 'description']
302             for key in slice_keys:
303                 if key in slice_record and slice_record[key]:
304                     slice_fields[key] = slice_record[key]
305
306             # add the slice  
307             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
308             slice = slice_fields
309             slice['slice_id'] = slice_id
310
311             # mark this slice as an sfa peer record
312             if sfa_peer:
313                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
314                 registry.register_peer_object(credential, peer_dict)
315
316             #this belongs to a peer
317             if peer:
318                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
319             slice['node_ids'] = []
320         else:
321             slice = slices[0]
322             slice_id = slice['slice_id']
323             site_id = slice['site_id']
324             #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
325             self.sync_slice(slice, slice_record, peer)
326
327         slice['peer_slice_id'] = slice_record['pointer']
328         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
329     
330         return slice        
331
332     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
333         # get the list of valid slice users from the registry and make 
334         # sure they are added to the slice 
335         slicename = hrn_to_pl_slicename(slice_record['hrn'])
336         researchers = slice_record.get('researcher', [])
337         for researcher in researchers:
338             person_record = {}
339             person_records = registry.resolve(credential, researcher)
340             for record in person_records:
341                 if record['type'] in ['user']:
342                     person_record = record
343             if not person_record:
344                 pass
345             person_dict = person_record
346             local_person=False
347             if peer:
348                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
349                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
350                 if not persons:
351                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
352                     if persons:
353                        local_person=True
354
355             else:
356                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
357         
358             if not persons:
359                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
360                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
361                 
362                 # mark this person as an sfa peer record
363                 if sfa_peer:
364                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
365                     registry.register_peer_object(credential, peer_dict)
366
367                 if peer:
368                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
369                 key_ids = []
370             else:
371                 person_id = persons[0]['person_id']
372                 key_ids = persons[0]['key_ids']
373
374
375             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
376             # an error
377             if peer:
378                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
379                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
380
381             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
382             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
383             if peer and not local_person:
384                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
385             if peer:
386                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
387             
388             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
389
390     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
391         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
392         keys = [key['key'] for key in keylist]
393         
394         #add keys that arent already there
395         key_ids = person_dict['key_ids']
396         for personkey in person_dict['keys']:
397             if personkey not in keys:
398                 key = {'key_type': 'ssh', 'key': personkey}
399                 if peer:
400                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
401                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
402                 if peer and not local_person:
403                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
404                 if peer:
405                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
406
407                     except: pass   
408
409     def create_slice_aggregate(self, xrn, rspec):
410         hrn, type = urn_to_hrn(xrn)
411         # Determine if this is a peer slice
412         peer = self.get_peer(hrn)
413         sfa_peer = self.get_sfa_peer(hrn)
414
415         spec = RSpec(rspec)
416         # Get the slice record from sfa
417         slicename = hrn_to_pl_slicename(hrn) 
418         slice = {}
419         slice_record = None
420         registries = Registries(self.api)
421         registry = registries[self.api.hrn]
422         credential = self.api.getCredential()
423
424         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
425         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
426
427         # find out where this slice is currently running
428         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
429         hostnames = [node['hostname'] for node in nodelist]
430
431         # get netspec details
432         nodespecs = spec.getDictsByTagName('NodeSpec')
433
434         # dict in which to store slice attributes to set for the nodes
435         nodes = {}
436         for nodespec in nodespecs:
437             if isinstance(nodespec['name'], list):
438                 for nodename in nodespec['name']:
439                     nodes[nodename] = {}
440                     for k in nodespec.keys():
441                         rspec_attribute_value = nodespec[k]
442                         if (self.rspec_to_slice_tag.has_key(k)):
443                             slice_tag_name = self.rspec_to_slice_tag[k]
444                             nodes[nodename][slice_tag_name] = rspec_attribute_value
445             elif isinstance(nodespec['name'], StringTypes):
446                 nodename = nodespec['name']
447                 nodes[nodename] = {}
448                 for k in nodespec.keys():
449                     rspec_attribute_value = nodespec[k]
450                     if (self.rspec_to_slice_tag.has_key(k)):
451                         slice_tag_name = self.rspec_to_slice_tag[k]
452                         nodes[nodename][slice_tag_name] = rspec_attribute_value
453
454                 for k in nodespec.keys():
455                     rspec_attribute_value = nodespec[k]
456                     if (self.rspec_to_slice_tag.has_key(k)):
457                         slice_tag_name = self.rspec_to_slice_tag[k]
458                         nodes[nodename][slice_tag_name] = rspec_attribute_value
459
460         node_names = nodes.keys()
461         # remove nodes not in rspec
462         deleted_nodes = list(set(hostnames).difference(node_names))
463         # add nodes from rspec
464         added_nodes = list(set(node_names).difference(hostnames))
465
466         if peer:
467             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
468
469         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
470
471         # Add recognized slice tags
472         for node_name in node_names:
473             node = nodes[node_name]
474             for slice_tag in node.keys():
475                 value = node[slice_tag]
476                 if (isinstance(value, list)):
477                     value = value[0]
478
479                 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
480
481         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
482         if peer:
483             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
484
485         return 1
486
487     def sync_slice(self, old_record, new_record, peer):
488         if old_record['expires'] != new_record['expires']:
489             if peer:
490                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
491                 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
492             if peer:
493                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])
494         return 1