request hash is optional
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 class Slices(SimpleStorage):
23
24     def __init__(self, api, ttl = .5, caller_cred=None):
25         self.api = api
26         self.ttl = ttl
27         self.threshold = None
28         path = self.api.config.SFA_BASE_DIR
29         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
30         filepath = path + os.sep + filename
31         self.slices_file = filepath
32         SimpleStorage.__init__(self, self.slices_file)
33         self.policy = Policy(self.api)    
34         self.load()
35         self.caller_cred=caller_cred
36
37
38     def get_peer(self, hrn):
39         # Becaues of myplc federation,  we first need to determine if this
40         # slice belongs to out local plc or a myplc peer. We will assume it 
41         # is a local site, unless we find out otherwise  
42         peer = None
43
44         # get this slice's authority (site)
45         slice_authority = get_authority(hrn)
46
47         # get this site's authority (sfa root authority or sub authority)
48         site_authority = get_authority(slice_authority).lower()
49
50         # check if we are already peered with this site_authority, if so
51         peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
52         for peer_record in peers:
53             names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
54             if site_authority in names:
55                 peer = peer_record['shortname']
56
57         return peer
58
59     def get_sfa_peer(self, hrn):
60         # return the authority for this hrn or None if we are the authority
61         sfa_peer = None
62         slice_authority = get_authority(hrn)
63         site_authority = get_authority(slice_authority)
64
65         if site_authority != self.api.hrn:
66             sfa_peer = site_authority
67
68         return sfa_peer 
69
70     def refresh(self):
71         """
72         Update the cached list of slices
73         """
74         # Reload components list
75         now = datetime.datetime.now()
76         if not self.has_key('threshold') or not self.has_key('timestamp') or \
77            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
78             if self.api.interface in ['aggregate']:
79                 self.refresh_slices_aggregate()
80             elif self.api.interface in ['slicemgr']:
81                 self.refresh_slices_smgr()
82
83     def refresh_slices_aggregate(self):
84         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
85         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
86
87          # update timestamp and threshold
88         timestamp = datetime.datetime.now()
89         hr_timestamp = timestamp.strftime(self.api.time_format)
90         delta = datetime.timedelta(hours=self.ttl)
91         threshold = timestamp + delta
92         hr_threshold = threshold.strftime(self.api.time_format)
93         
94         slice_details = {'hrn': slice_hrns,
95                          'timestamp': hr_timestamp,
96                          'threshold': hr_threshold
97                         }
98         self.update(slice_details)
99         self.write()     
100         
101
102     def refresh_slices_smgr(self):
103         slice_hrns = []
104         aggregates = Aggregates(self.api)
105         credential = self.api.getCredential()
106         for aggregate in aggregates:
107             success = False
108             # request hash is optional so lets try the call without it 
109             try:
110                 slices = aggregates[aggregate].get_slices(credential)
111                 slice_hrns.extend(slices)
112                 success = True
113             except:
114                 print >> log, "%s" % (traceback.format_exc())
115                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
116
117             # try sending the request hash if the previous call failed 
118             if not success:
119                 arg_list = [credential]
120                 request_hash = self.api.key.compute_hash(arg_list)
121                 try:
122                     slices = aggregates[aggregate].get_slices(credential, request_hash)
123                     slice_hrns.extend(slices)
124                     success = True
125                 except:
126                     print >> log, "%s" % (traceback.format_exc())
127                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
128
129         # update timestamp and threshold
130         timestamp = datetime.datetime.now()
131         hr_timestamp = timestamp.strftime(self.api.time_format)
132         delta = datetime.timedelta(hours=self.ttl)
133         threshold = timestamp + delta
134         hr_threshold = threshold.strftime(self.api.time_format)
135
136         slice_details = {'hrn': slice_hrns,
137                          'timestamp': hr_timestamp,
138                          'threshold': hr_threshold
139                         }
140         self.update(slice_details)
141         self.write()
142
143
144     def delete_slice(self, hrn):
145         if self.api.interface in ['aggregate']:
146             self.delete_slice_aggregate(hrn)
147         elif self.api.interface in ['slicemgr']:
148             self.delete_slice_smgr(hrn)
149         
150     def delete_slice_aggregate(self, hrn):
151
152         slicename = hrn_to_pl_slicename(hrn)
153         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
154         if not slices:
155             return 1        
156         slice = slices[0]
157
158         # determine if this is a peer slice
159         peer = self.get_peer(hrn)
160         if peer:
161             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
162         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
163         if peer:
164             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
165         return 1
166
167     def delete_slice_smgr(self, hrn):
168         credential = self.api.getCredential()
169         caller_cred = self.caller_cred
170         aggregates = Aggregates(self.api)
171         for aggregate in aggregates:
172             success = False
173             # request hash is optional so lets try the call without it
174             try:
175                 aggregates[aggregate].delete_slice(credential, hrn, caller_cred)
176                 success = True
177             except:
178                 print >> log, "%s" % (traceback.format_exc())
179                 print >> log, "Error calling list nodes at aggregate %s" % aggregate
180             
181             # try sending the request hash if the previous call failed 
182             if not success:
183                 try:
184                     arg_list = [credential, hrn]
185                     request_hash = self.api.key.compute_hash(arg_list)
186                     aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
187                     success = True
188                 except:
189                     print >> log, "%s" % (traceback.format_exc())
190                     print >> log, "Error calling list nodes at aggregate %s" % aggregate
191                         
192     def create_slice(self, hrn, rspec):
193         
194         # check our slice policy before we procede
195         whitelist = self.policy['slice_whitelist']     
196         blacklist = self.policy['slice_blacklist']
197        
198         if whitelist and hrn not in whitelist or \
199            blacklist and hrn in blacklist:
200             policy_file = self.policy.policy_file
201             print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
202             return 1
203
204         if self.api.interface in ['aggregate']:     
205             self.create_slice_aggregate(hrn, rspec)
206         elif self.api.interface in ['slicemgr']:
207             self.create_slice_smgr(hrn, rspec)
208
209     def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
210         authority = get_authority(slice_hrn)
211         try:
212             site_records = registry.resolve(credential, authority)
213         except:
214             arg_list = [credential, authority]
215             request_hash = self.api.key.compute_hash(arg_list)
216             site_records = registry.resolve(credential, authority, request_hash)
217             
218         site = {}
219         for site_record in site_records:
220             if site_record['type'] == 'authority':
221                 site = site_record
222         if not site:
223             raise RecordNotFound(authority)
224         remote_site_id = site.pop('site_id')    
225                 
226         login_base = get_leaf(authority)
227         sites = self.api.plshell.GetSites(self.api.plauth, login_base)
228         if not sites:
229             site_id = self.api.plshell.AddSite(self.api.plauth, site)
230             if peer:
231                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)   
232             # mark this site as an sfa peer record
233             if sfa_peer:
234                 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
235                 try:
236                     registry.register_peer_object(credential, peer_dict)
237                 except:
238                     arg_list = [credential]
239                     request_hash = self.api.key.compute_hash(arg_list) 
240                     registry.register_peer_object(credential, peer_dict, request_hash)
241         else:
242             site_id = sites[0]['site_id']
243             remote_site_id = sites[0]['peer_site_id']
244
245
246         return (site_id, remote_site_id) 
247
248     def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
249         slice = {}
250         slice_record = None
251         authority = get_authority(slice_hrn)
252         try:
253             slice_records = registry.resolve(credential, slice_hrn)
254         except:    
255             arg_list = [credential, slice_hrn]
256             request_hash = self.api.key.compute_hash(arg_list)
257             slice_records = registry.resolve(credential, slice_hrn, request_hash)
258
259         for record in slice_records:
260             if record['type'] in ['slice']:
261                 slice_record = record
262         if not slice_record:
263             raise RecordNotFound(hrn)
264         slicename = hrn_to_pl_slicename(slice_hrn)
265         parts = slicename.split("_")
266         login_base = parts[0]
267         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id']) 
268         if not slices:
269             slice_fields = {}
270             slice_keys = ['name', 'url', 'description']
271             for key in slice_keys:
272                 if key in slice_record and slice_record[key]:
273                     slice_fields[key] = slice_record[key]
274
275             # add the slice  
276             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
277             slice = slice_fields
278             slice['slice_id'] = slice_id
279
280             # mark this slice as an sfa peer record
281             if sfa_peer:
282                 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
283                 try:
284                     registry.register_peer_object(credential, peer_dict)
285                 except:
286                     arg_list = [credential]
287                     request_hash = self.api.key.compute_hash(arg_list) 
288                     registry.register_peer_object(credential, peer_dict, request_hash)
289
290             #this belongs to a peer
291             if peer:
292                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
293             slice['node_ids'] = []
294         else:
295             slice = slices[0]
296             slice_id = slice['slice_id']
297             site_id = slice['site_id']
298
299         slice['peer_slice_id'] = slice_record['pointer']
300         self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
301     
302         return slice        
303
304     def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
305         # get the list of valid slice users from the registry and make 
306         # sure they are added to the slice 
307         slicename = hrn_to_pl_slicename(slice_record['hrn'])
308         researchers = slice_record.get('researcher', [])
309         for researcher in researchers:
310             person_record = {}
311             try:
312                 person_records = registry.resolve(credential, researcher)
313             except:
314                 arg_list = [credential, researcher]
315                 request_hash = self.api.key.compute_hash(arg_list) 
316                 person_records = registry.resolve(credential, researcher, request_hash)
317             for record in person_records:
318                 if record['type'] in ['user']:
319                     person_record = record
320             if not person_record:
321                 pass
322             person_dict = person_record
323             local_person=False
324             if peer:
325                 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
326                 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
327                 if not persons:
328                     persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
329                     if persons:
330                        local_person=True
331
332             else:
333                 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])   
334         
335             if not persons:
336                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
337                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
338                 
339                 # mark this person as an sfa peer record
340                 if sfa_peer:
341                     peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
342                     try:
343                         registry.register_peer_object(credential, peer_dict)
344                     except:
345                         arg_list = [credential]
346                         request_hash = self.api.key.compute_hash(arg_list) 
347                         registry.register_peer_object(credential, peer_dict, request_hash)
348
349                 if peer:
350                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
351                 key_ids = []
352             else:
353                 person_id = persons[0]['person_id']
354                 key_ids = persons[0]['key_ids']
355
356
357             # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
358             # an error
359             if peer:
360                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
361                 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id,  peer)
362
363             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
364             self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
365             if peer and not local_person:
366                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
367             if peer:
368                 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
369             
370             self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
371
372     def verify_keys(self, registry, credential, person_dict, key_ids, person_id,  peer, local_person):
373         keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
374         keys = [key['key'] for key in keylist]
375         
376         #add keys that arent already there
377         key_ids = person_dict['key_ids']
378         for personkey in person_dict['keys']:
379             if personkey not in keys:
380                 key = {'key_type': 'ssh', 'key': personkey}
381                 if peer:
382                     self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
383                 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
384                 if peer and not local_person:
385                     self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
386                 if peer:
387                     try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
388
389                     except: pass   
390
391     def create_slice_aggregate(self, hrn, rspec):
392
393         # Determine if this is a peer slice
394         peer = self.get_peer(hrn)
395         sfa_peer = self.get_sfa_peer(hrn)
396
397         spec = RSpec(rspec)
398         # Get the slice record from sfa
399         slicename = hrn_to_pl_slicename(hrn) 
400         slice = {}
401         slice_record = None
402         registries = Registries(self.api)
403         registry = registries[self.api.hrn]
404         credential = self.api.getCredential()
405
406         site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
407         slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
408
409         # find out where this slice is currently running
410         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
411         hostnames = [node['hostname'] for node in nodelist]
412
413         # get netspec details
414         nodespecs = spec.getDictsByTagName('NodeSpec')
415         nodes = []
416         for nodespec in nodespecs:
417             if isinstance(nodespec['name'], list):
418                 nodes.extend(nodespec['name'])
419             elif isinstance(nodespec['name'], StringTypes):
420                 nodes.append(nodespec['name'])
421
422         # remove nodes not in rspec
423         deleted_nodes = list(set(hostnames).difference(nodes))
424         # add nodes from rspec
425         added_nodes = list(set(nodes).difference(hostnames))
426
427         if peer:
428             self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
429         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
430         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
431         if peer:
432             self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
433
434         return 1
435
436     def create_slice_smgr(self, hrn, rspec):
437         spec = RSpec()
438         tempspec = RSpec()
439         spec.parseString(rspec)
440         slicename = hrn_to_pl_slicename(hrn)
441         specDict = spec.toDict()
442         if specDict.has_key('RSpec'): specDict = specDict['RSpec']
443         if specDict.has_key('start_time'): start_time = specDict['start_time']
444         else: start_time = 0
445         if specDict.has_key('end_time'): end_time = specDict['end_time']
446         else: end_time = 0
447
448         rspecs = {}
449         aggregates = Aggregates(self.api)
450         credential = self.api.getCredential()
451
452         # split the netspecs into individual rspecs
453         netspecs = spec.getDictsByTagName('NetSpec')
454         for netspec in netspecs:
455             net_hrn = netspec['name']
456             resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
457             resourceDict = {'RSpec': resources}
458             tempspec.parseDict(resourceDict)
459             rspecs[net_hrn] = tempspec.toxml()
460
461         # send each rspec to the appropriate aggregate/sm
462         caller_cred = self.caller_cred 
463         for net_hrn in rspecs:
464             try:
465                 # if we are directly connected to the aggregate then we can just send them the rspec
466                 # if not, then we may be connected to an sm thats connected to the aggregate
467                 if net_hrn in aggregates:
468                     # send the whloe rspec to the local aggregate
469                     if net_hrn in [self.api.hrn]:
470                         try:
471                             aggregates[net_hrn].create_slice(credential, hrn, rspec, caller_cred)
472                         except:
473                             arg_list = [credential,hrn,rspec]
474                             request_hash = self.api.key.compute_hash(arg_list)
475                             aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
476                     else:
477                         try:
478                             aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
479                         except:
480                             arg_list = [credential,hrn,rspecs[net_hrn]]
481                             request_hash = self.api.key.compute_hash(arg_list)
482                             aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
483                 else:
484                     # lets forward this rspec to a sm that knows about the network
485                     arg_list = [credential, net_hrn]
486                     request_hash = self.api.compute_hash(arg_list)    
487                     for aggregate in aggregates:
488                         try:
489                             network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
490                         except:
491                             network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
492                         if network_networks:
493                             try:
494                                 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
495                             except:
496                                 arg_list = [credential, hrn, rspecs[net_hrn]]
497                                 request_hash = self.api.key.compute_hash(arg_list) 
498                                 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
499                      
500             except:
501                 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
502                 traceback.print_exc()
503         return 1
504
505
506     def start_slice(self, hrn):
507         if self.api.interface in ['aggregate']:
508             self.start_slice_aggregate(hrn)
509         elif self.api.interface in ['slicemgr']:
510             self.start_slice_smgr(hrn)
511
512     def start_slice_aggregate(self, hrn):
513         slicename = hrn_to_pl_slicename(hrn)
514         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
515         if not slices:
516             raise RecordNotFound(hrn)
517         slice_id = slices[0]
518         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
519         attribute_id = attreibutes[0]['slice_attribute_id']
520         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
521         return 1
522
523     def start_slice_smgr(self, hrn):
524         credential = self.api.getCredential()
525         aggregates = Aggregates(self.api)
526         for aggregate in aggregates:
527             aggregates[aggregate].start_slice(credential, hrn)
528         return 1
529
530
531     def stop_slice(self, hrn):
532         if self.api.interface in ['aggregate']:
533             self.stop_slice_aggregate(hrn)
534         elif self.api.interface in ['slicemgr']:
535             self.stop_slice_smgr(hrn)
536
537     def stop_slice_aggregate(self, hrn):
538         slicename = hrn_to_pl_slicename(hrn)
539         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
540         if not slices:
541             raise RecordNotFound(hrn)
542         slice_id = slices[0]['slice_id']
543         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
544         attribute_id = attributes[0]['slice_attribute_id']
545         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
546         return 1
547
548     def stop_slice_smgr(self, hrn):
549         credential = self.api.getCredential()
550         aggregates = Aggregates(self.api)
551         arg_list = [credential, hrn]
552         request_hash = self.api.key.compute_hash(arg_list)
553         for aggregate in aggregates:
554             try:
555                 aggregates[aggregate].stop_slice(credential, hrn)
556             except:  
557                 aggregates[aggregate].stop_slice(credential, hrn, request_hash)
558