added request_hash to get_slices() call
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 class Slices(SimpleStorage):
23
24     def __init__(self, api, ttl = .5, caller_cred=None):
25         self.api = api
26         self.ttl = ttl
27         self.threshold = None
28         path = self.api.config.SFA_BASE_DIR
29         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
30         filepath = path + os.sep + filename
31         self.slices_file = filepath
32         SimpleStorage.__init__(self, self.slices_file)
33         self.policy = Policy(self.api)    
34         self.load()
35         self.caller_cred=caller_cred
36
37
38     def get_peer(self, hrn):
39         # Becaues of myplc federation,  we first need to determine if this
40         # slice belongs to out local plc or a myplc peer. We will assume it 
41         # is a local site, unless we find out otherwise  
42         peer = None
43
44         # get this slice's authority (site)
45         slice_authority = get_authority(hrn)
46
47         # get this site's authority (sfa root authority or sub authority)
48         site_authority = get_authority(slice_authority).lower()
49
50         # check if we are already peered with this site_authority, if so
51         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
52         for peer_record in peers:
53             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
54             if site_authority in names:
55                 peer = peer_record['shortname']
56
57         return peer
58
59     def get_sfa_peer(self, hrn):
60         # return the authority for this hrn or None if we are the authority
61         sfa_peer = None
62         slice_authority = get_authority(hrn)
63         site_authority = get_authority(slice_authority)
64
65         if site_authority != self.api.hrn:
66             sfa_peer = site_authority
67
68         return sfa_peer 
69
70     def refresh(self):
71         """
72         Update the cached list of slices
73         """
74         # Reload components list
75         now = datetime.datetime.now()
76         if not self.has_key('threshold') or not self.has_key('timestamp') or \
77            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
78             if self.api.interface in ['aggregate']:
79                 self.refresh_slices_aggregate()
80             elif self.api.interface in ['slicemgr']:
81                 self.refresh_slices_smgr()
82
83     def refresh_slices_aggregate(self):
84         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
85         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
86
87          # update timestamp and threshold
88         timestamp = datetime.datetime.now()
89         hr_timestamp = timestamp.strftime(self.api.time_format)
90         delta = datetime.timedelta(hours=self.ttl)
91         threshold = timestamp + delta
92         hr_threshold = threshold.strftime(self.api.time_format)
93         
94         slice_details = {'hrn': slice_hrns,
95                          'timestamp': hr_timestamp,
96                          'threshold': hr_threshold
97                         }
98         self.update(slice_details)
99         self.write()     
100         
101
102     def refresh_slices_smgr(self):
103         slice_hrns = []
104         aggregates = Aggregates(self.api)
105         credential = self.api.getCredential()
106         arg_list = [credential]
107         request_hash = self.api.key.compute_hash(arg_list)
108         for aggregate in aggregates:
109             try:
110                 slices = aggregates[aggregate].get_slices(credential, request_hash)
111                 slice_hrns.extend(slices)
112             except:
113                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
114          # update timestamp and threshold
115         timestamp = datetime.datetime.now()
116         hr_timestamp = timestamp.strftime(self.api.time_format)
117         delta = datetime.timedelta(hours=self.ttl)
118         threshold = timestamp + delta
119         hr_threshold = threshold.strftime(self.api.time_format)
120
121         slice_details = {'hrn': slice_hrns,
122                          'timestamp': hr_timestamp,
123                          'threshold': hr_threshold
124                         }
125         self.update(slice_details)
126         self.write()
127
128
129     def delete_slice(self, hrn):
130         if self.api.interface in ['aggregate']:
131             self.delete_slice_aggregate(hrn)
132         elif self.api.interface in ['slicemgr']:
133             self.delete_slice_smgr(hrn)
134         
135     def delete_slice_aggregate(self, hrn):
136
137         slicename = hrn_to_pl_slicename(hrn)
138         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
139         if not slices:
140             return 1        
141         slice = slices[0]
142
143         # determine if this is a peer slice
144         peer = self.get_peer(hrn)
145         if peer:
146             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
147         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
148         if peer:
149             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
150         return 1
151
152     def delete_slice_smgr(self, hrn):
153         credential = self.api.getCredential()
154         aggregates = Aggregates(self.api)
155         for aggregate in aggregates:
156             try:
157                 aggregates[aggregate].delete_slice(credential, hrn, caller_cred=self.caller_cred)
158             except:
159                 print >> log, "Error calling list nodes at aggregate %s" % aggregate
160                 traceback.print_exc(log)
161                 exc_type, exc_value, exc_traceback = sys.exc_info()
162                 print exc_type, exc_value, exc_traceback
163
164     def create_slice(self, hrn, rspec):
165         
166         # check our slice policy before we procede
167         whitelist = self.policy['slice_whitelist']     
168         blacklist = self.policy['slice_blacklist']
169        
170         if whitelist and hrn not in whitelist or \
171            blacklist and hrn in blacklist:
172             policy_file = self.policy.policy_file
173             print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
174             return 1
175
176         if self.api.interface in ['aggregate']:     
177             self.create_slice_aggregate(hrn, rspec)
178         elif self.api.interface in ['slicemgr']:
179             self.create_slice_smgr(hrn, rspec)
180
181     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
182         authority = get_authority(slice_hrn)
183         site_records = registry.resolve(credential, authority)
184         site = {}
185         for site_record in site_records:
186             if site_record['type'] == 'authority':
187                 site = site_record.as_dict()
188         if not site:
189             raise RecordNotFound(authority)
190         remote_site_id = site.pop('site_id')    
191                 
192         login_base = get_leaf(authority)
193         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
194         if not sites:
195             site_id = self.api.plshell.AddSite(self.api.plauth, site)
196             if peer:
197                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
198             # mark this site as an sfa peer record
199             if sfa_peer:
200                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id} 
201                 registry.register_peer_object(credential, peer_dict)
202                 pass
203         else:
204             site_id = sites[0]['site_id']
205             remote_site_id = sites[0]['peer_site_id']
206
207
208         return (site_id, remote_site_id) 
209
210     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
211         slice = {}
212         slice_record = None
213         authority = get_authority(slice_hrn)
214         slice_records = registry.resolve(credential, slice_hrn)
215         for record in slice_records:
216             if record['type'] in ['slice']:
217                 slice_record = record
218         if not slice_record:
219             raise RecordNotFound(hrn)
220         slicename = hrn_to_pl_slicename(slice_hrn)
221         parts = slicename.split("_")
222         login_base = parts[0]
223         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id']) 
224         if not slices:
225             slice_fields = {}
226             slice_keys = ['name', 'url', 'description']
227             for key in slice_keys:
228                 if key in slice_record and slice_record[key]:
229                     slice_fields[key] = slice_record[key]
230
231             # add the slice  
232             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
233             slice = slice_fields
234             slice['slice_id'] = slice_id
235
236             # mark this slice as an sfa peer record
237             if sfa_peer:
238                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id} 
239                 registry.register_peer_object(credential, peer_dict)
240                 pass
241
242             #this belongs to a peer
243             if peer:
244                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
245             slice['node_ids'] = []
246         else:
247             slice = slices[0]
248             slice_id = slice['slice_id']
249             site_id = slice['site_id']
250
251         slice['peer_slice_id'] = slice_record['pointer']
252         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
253     
254         return slice        
255
256     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
257         # get the list of valid slice users from the registry and make 
258         # sure they are added to the slice 
259         slicename = hrn_to_pl_slicename(slice_record['hrn'])
260         researchers = slice_record.get('researcher', [])
261         for researcher in researchers:
262             person_record = {}
263             person_records = registry.resolve(credential, researcher)
264             for record in person_records:
265                 if record['type'] in ['user']:
266                     person_record = record
267             if not person_record:
268                 pass
269             person_dict = person_record.as_dict()
270             if peer:
271                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
272                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
273
274             else:
275                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
276         
277             if not persons:
278                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
279                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
280                 
281                 # mark this person as an sfa peer record
282                 if sfa_peer:
283                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id} 
284                     registry.register_peer_object(credential, peer_dict)
285                     pass
286
287                 if peer:
288                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
289                 key_ids = []
290             else:
291                 person_id = persons[0]['person_id']
292                 key_ids = persons[0]['key_ids']
293
294
295             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
296             # an error
297             if peer:
298                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
299                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
300
301             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
302             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
303             if peer:
304                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
305                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
306             
307             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer)
308
309     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer):
310         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
311         keys = [key['key'] for key in keylist]
312         
313         #add keys that arent already there
314         key_ids = person_dict['key_ids']
315         for personkey in person_dict['keys']:
316             if personkey not in keys:
317                 key = {'key_type': 'ssh', 'key': personkey}
318                 if peer:
319                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
320                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
321                 if peer:
322                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
323                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
324
325                     except: pass   
326
327     def create_slice_aggregate(self, hrn, rspec):
328
329         # Determine if this is a peer slice
330         peer = self.get_peer(hrn)
331         sfa_peer = self.get_sfa_peer(hrn)
332
333         spec = Rspec(rspec)
334         # Get the slice record from sfa
335         slicename = hrn_to_pl_slicename(hrn) 
336         slice = {}
337         slice_record = None
338         registries = Registries(self.api)
339         registry = registries[self.api.hrn]
340         credential = self.api.getCredential()
341
342         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
343         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
344
345         # find out where this slice is currently running
346         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
347         hostnames = [node['hostname'] for node in nodelist]
348
349         # get netspec details
350         nodespecs = spec.getDictsByTagName('NodeSpec')
351         nodes = []
352         for nodespec in nodespecs:
353             if isinstance(nodespec['name'], list):
354                 nodes.extend(nodespec['name'])
355             elif isinstance(nodespec['name'], StringTypes):
356                 nodes.append(nodespec['name'])
357
358         # remove nodes not in rspec
359         deleted_nodes = list(set(hostnames).difference(nodes))
360         # add nodes from rspec
361         added_nodes = list(set(nodes).difference(hostnames))
362
363         if peer:
364             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
365         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
366         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
367         if peer:
368             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
369
370         return 1
371
372     def create_slice_smgr(self, hrn, rspec):
373         spec = Rspec()
374         tempspec = Rspec()
375         spec.parseString(rspec)
376         slicename = hrn_to_pl_slicename(hrn)
377         specDict = spec.toDict()
378         if specDict.has_key('Rspec'): specDict = specDict['Rspec']
379         if specDict.has_key('start_time'): start_time = specDict['start_time']
380         else: start_time = 0
381         if specDict.has_key('end_time'): end_time = specDict['end_time']
382         else: end_time = 0
383
384         rspecs = {}
385         aggregates = Aggregates(self.api)
386         credential = self.api.getCredential()
387
388         # split the netspecs into individual rspecs
389         netspecs = spec.getDictsByTagName('NetSpec')
390         for netspec in netspecs:
391             net_hrn = netspec['name']
392             resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
393             resourceDict = {'Rspec': resources}
394             tempspec.parseDict(resourceDict)
395             rspecs[net_hrn] = tempspec.toxml()
396
397         # send each rspec to the appropriate aggregate/sm 
398         for net_hrn in rspecs:
399             try:
400                 # if we are directly connected to the aggregate then we can just send them the rspec
401                 # if not, then we may be connected to an sm thats connected to the aggregate  
402                 if net_hrn in aggregates:
403                     # send the whloe rspec to the local aggregate
404                     if net_hrn in [self.api.hrn]:
405                         aggregates[net_hrn].create_slice(credential, hrn, rspec, caller_cred=self.caller_cred)
406                     else:
407                         aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], caller_cred=self.caller_cred)
408                 else:
409                     # lets forward this rspec to a sm that knows about the network    
410                     for aggregate in aggregates:
411                         network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
412                         if network_networks:
413                             aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], caller_cred=self.caller_cred)
414                      
415             except:
416                 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
417                 traceback.print_exc()
418         return 1
419
420
421     def start_slice(self, hrn):
422         if self.api.interface in ['aggregate']:
423             self.start_slice_aggregate(hrn)
424         elif self.api.interface in ['slicemgr']:
425             self.start_slice_smgr(hrn)
426
427     def start_slice_aggregate(self, hrn):
428         slicename = hrn_to_pl_slicename(hrn)
429         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
430         if not slices:
431             raise RecordNotFound(hrn)
432         slice_id = slices[0]
433         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
434         attribute_id = attreibutes[0]['slice_attribute_id']
435         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
436         return 1
437
438     def start_slice_smgr(self, hrn):
439         credential = self.api.getCredential()
440         aggregates = Aggregates(self.api)
441         for aggregate in aggregates:
442             aggregates[aggregate].start_slice(credential, hrn)
443         return 1
444
445
446     def stop_slice(self, hrn):
447         if self.api.interface in ['aggregate']:
448             self.stop_slice_aggregate(hrn)
449         elif self.api.interface in ['slicemgr']:
450             self.stop_slice_smgr(hrn)
451
452     def stop_slice_aggregate(self, hrn):
453         slicename = hrn_to_pl_slicename(hrn)
454         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
455         if not slices:
456             raise RecordNotFound(hrn)
457         slice_id = slices[0]['slice_id']
458         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
459         attribute_id = attributes[0]['slice_attribute_id']
460         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
461         return 1
462
463     def stop_slice_smgr(self, hrn):
464         credential = self.api.getCredential()
465         aggregates = Aggregates(self.api)
466         for aggregate in aggregates:
467             aggregates[aggregate].stop_slice(credential, hrn)  
468