9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
22 class Slices(SimpleStorage):
24 def __init__(self, api, ttl = .5, caller_cred=None):
28 path = self.api.config.SFA_BASE_DIR
29 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
30 filepath = path + os.sep + filename
31 self.slices_file = filepath
32 SimpleStorage.__init__(self, self.slices_file)
33 self.policy = Policy(self.api)
35 self.caller_cred=caller_cred
38 def get_peer(self, hrn):
39 # Becaues of myplc federation, we first need to determine if this
40 # slice belongs to out local plc or a myplc peer. We will assume it
41 # is a local site, unless we find out otherwise
44 # get this slice's authority (site)
45 slice_authority = get_authority(hrn)
47 # get this site's authority (sfa root authority or sub authority)
48 site_authority = get_authority(slice_authority).lower()
50 # check if we are already peered with this site_authority, if so
51 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
52 for peer_record in peers:
53 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
54 if site_authority in names:
55 peer = peer_record['shortname']
59 def get_sfa_peer(self, hrn):
60 # return the authority for this hrn or None if we are the authority
62 slice_authority = get_authority(hrn)
63 site_authority = get_authority(slice_authority)
65 if site_authority != self.api.hrn:
66 sfa_peer = site_authority
72 Update the cached list of slices
74 # Reload components list
75 now = datetime.datetime.now()
76 if not self.has_key('threshold') or not self.has_key('timestamp') or \
77 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
78 if self.api.interface in ['aggregate']:
79 self.refresh_slices_aggregate()
80 elif self.api.interface in ['slicemgr']:
81 self.refresh_slices_smgr()
83 def refresh_slices_aggregate(self):
84 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
85 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
87 # update timestamp and threshold
88 timestamp = datetime.datetime.now()
89 hr_timestamp = timestamp.strftime(self.api.time_format)
90 delta = datetime.timedelta(hours=self.ttl)
91 threshold = timestamp + delta
92 hr_threshold = threshold.strftime(self.api.time_format)
94 slice_details = {'hrn': slice_hrns,
95 'timestamp': hr_timestamp,
96 'threshold': hr_threshold
98 self.update(slice_details)
102 def refresh_slices_smgr(self):
104 aggregates = Aggregates(self.api)
105 credential = self.api.getCredential()
106 for aggregate in aggregates:
108 # request hash is optional so lets try the call without it
110 slices = aggregates[aggregate].get_slices(credential)
111 slice_hrns.extend(slices)
114 print >> log, "%s" % (traceback.format_exc())
115 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
117 # try sending the request hash if the previous call failed
119 arg_list = [credential]
120 request_hash = self.api.key.compute_hash(arg_list)
122 slices = aggregates[aggregate].get_slices(credential, request_hash)
123 slice_hrns.extend(slices)
126 print >> log, "%s" % (traceback.format_exc())
127 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
129 # update timestamp and threshold
130 timestamp = datetime.datetime.now()
131 hr_timestamp = timestamp.strftime(self.api.time_format)
132 delta = datetime.timedelta(hours=self.ttl)
133 threshold = timestamp + delta
134 hr_threshold = threshold.strftime(self.api.time_format)
136 slice_details = {'hrn': slice_hrns,
137 'timestamp': hr_timestamp,
138 'threshold': hr_threshold
140 self.update(slice_details)
144 def delete_slice(self, hrn):
145 if self.api.interface in ['aggregate']:
146 self.delete_slice_aggregate(hrn)
147 elif self.api.interface in ['slicemgr']:
148 self.delete_slice_smgr(hrn)
150 def delete_slice_aggregate(self, hrn):
152 slicename = hrn_to_pl_slicename(hrn)
153 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
158 # determine if this is a peer slice
159 peer = self.get_peer(hrn)
161 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
162 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
164 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
167 def delete_slice_smgr(self, hrn):
168 credential = self.api.getCredential()
169 caller_cred = self.caller_cred
170 aggregates = Aggregates(self.api)
171 for aggregate in aggregates:
173 # request hash is optional so lets try the call without it
175 aggregates[aggregate].delete_slice(credential, hrn, caller_cred)
178 print >> log, "%s" % (traceback.format_exc())
179 print >> log, "Error calling list nodes at aggregate %s" % aggregate
181 # try sending the request hash if the previous call failed
184 arg_list = [credential, hrn]
185 request_hash = self.api.key.compute_hash(arg_list)
186 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
189 print >> log, "%s" % (traceback.format_exc())
190 print >> log, "Error calling list nodes at aggregate %s" % aggregate
192 def create_slice(self, hrn, rspec):
194 # check our slice policy before we procede
195 whitelist = self.policy['slice_whitelist']
196 blacklist = self.policy['slice_blacklist']
198 if whitelist and hrn not in whitelist or \
199 blacklist and hrn in blacklist:
200 policy_file = self.policy.policy_file
201 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
204 if self.api.interface in ['aggregate']:
205 self.create_slice_aggregate(hrn, rspec)
206 elif self.api.interface in ['slicemgr']:
207 self.create_slice_smgr(hrn, rspec)
209 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
210 authority = get_authority(slice_hrn)
212 site_records = registry.resolve(credential, authority)
214 arg_list = [credential, authority]
215 request_hash = self.api.key.compute_hash(arg_list)
216 site_records = registry.resolve(credential, authority, request_hash)
219 for site_record in site_records:
220 if site_record['type'] == 'authority':
223 raise RecordNotFound(authority)
224 remote_site_id = site.pop('site_id')
226 login_base = get_leaf(authority)
227 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
229 site_id = self.api.plshell.AddSite(self.api.plauth, site)
231 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
232 # mark this site as an sfa peer record
234 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
236 registry.register_peer_object(credential, peer_dict)
238 arg_list = [credential]
239 request_hash = self.api.key.compute_hash(arg_list)
240 registry.register_peer_object(credential, peer_dict, request_hash)
242 site_id = sites[0]['site_id']
243 remote_site_id = sites[0]['peer_site_id']
246 return (site_id, remote_site_id)
248 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
251 authority = get_authority(slice_hrn)
253 slice_records = registry.resolve(credential, slice_hrn)
255 arg_list = [credential, slice_hrn]
256 request_hash = self.api.key.compute_hash(arg_list)
257 slice_records = registry.resolve(credential, slice_hrn, request_hash)
259 for record in slice_records:
260 if record['type'] in ['slice']:
261 slice_record = record
263 raise RecordNotFound(hrn)
264 slicename = hrn_to_pl_slicename(slice_hrn)
265 parts = slicename.split("_")
266 login_base = parts[0]
267 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id'])
270 slice_keys = ['name', 'url', 'description']
271 for key in slice_keys:
272 if key in slice_record and slice_record[key]:
273 slice_fields[key] = slice_record[key]
276 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
278 slice['slice_id'] = slice_id
280 # mark this slice as an sfa peer record
282 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
284 registry.register_peer_object(credential, peer_dict)
286 arg_list = [credential]
287 request_hash = self.api.key.compute_hash(arg_list)
288 registry.register_peer_object(credential, peer_dict, request_hash)
290 #this belongs to a peer
292 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
293 slice['node_ids'] = []
296 slice_id = slice['slice_id']
297 site_id = slice['site_id']
299 slice['peer_slice_id'] = slice_record['pointer']
300 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
304 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
305 # get the list of valid slice users from the registry and make
306 # sure they are added to the slice
307 slicename = hrn_to_pl_slicename(slice_record['hrn'])
308 researchers = slice_record.get('researcher', [])
309 for researcher in researchers:
312 person_records = registry.resolve(credential, researcher)
314 arg_list = [credential, researcher]
315 request_hash = self.api.key.compute_hash(arg_list)
316 person_records = registry.resolve(credential, researcher, request_hash)
317 for record in person_records:
318 if record['type'] in ['user']:
319 person_record = record
320 if not person_record:
322 person_dict = person_record
325 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
326 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
328 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
333 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
336 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
337 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
339 # mark this person as an sfa peer record
341 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
343 registry.register_peer_object(credential, peer_dict)
345 arg_list = [credential]
346 request_hash = self.api.key.compute_hash(arg_list)
347 registry.register_peer_object(credential, peer_dict, request_hash)
350 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
353 person_id = persons[0]['person_id']
354 key_ids = persons[0]['key_ids']
357 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
360 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
361 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
363 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
364 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
365 if peer and not local_person:
366 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
368 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
370 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
372 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
373 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
374 keys = [key['key'] for key in keylist]
376 #add keys that arent already there
377 key_ids = person_dict['key_ids']
378 for personkey in person_dict['keys']:
379 if personkey not in keys:
380 key = {'key_type': 'ssh', 'key': personkey}
382 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
383 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
384 if peer and not local_person:
385 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
387 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
391 def create_slice_aggregate(self, hrn, rspec):
393 # Determine if this is a peer slice
394 peer = self.get_peer(hrn)
395 sfa_peer = self.get_sfa_peer(hrn)
398 # Get the slice record from sfa
399 slicename = hrn_to_pl_slicename(hrn)
402 registries = Registries(self.api)
403 registry = registries[self.api.hrn]
404 credential = self.api.getCredential()
406 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
407 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
409 # find out where this slice is currently running
410 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
411 hostnames = [node['hostname'] for node in nodelist]
413 # get netspec details
414 nodespecs = spec.getDictsByTagName('NodeSpec')
416 for nodespec in nodespecs:
417 if isinstance(nodespec['name'], list):
418 nodes.extend(nodespec['name'])
419 elif isinstance(nodespec['name'], StringTypes):
420 nodes.append(nodespec['name'])
422 # remove nodes not in rspec
423 deleted_nodes = list(set(hostnames).difference(nodes))
424 # add nodes from rspec
425 added_nodes = list(set(nodes).difference(hostnames))
428 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
429 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
430 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
432 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
436 def create_slice_smgr(self, hrn, rspec):
439 spec.parseString(rspec)
440 slicename = hrn_to_pl_slicename(hrn)
441 specDict = spec.toDict()
442 if specDict.has_key('RSpec'): specDict = specDict['RSpec']
443 if specDict.has_key('start_time'): start_time = specDict['start_time']
445 if specDict.has_key('end_time'): end_time = specDict['end_time']
449 aggregates = Aggregates(self.api)
450 credential = self.api.getCredential()
452 # split the netspecs into individual rspecs
453 netspecs = spec.getDictsByTagName('NetSpec')
454 for netspec in netspecs:
455 net_hrn = netspec['name']
456 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
457 resourceDict = {'RSpec': resources}
458 tempspec.parseDict(resourceDict)
459 rspecs[net_hrn] = tempspec.toxml()
461 # send each rspec to the appropriate aggregate/sm
462 caller_cred = self.caller_cred
463 for net_hrn in rspecs:
465 # if we are directly connected to the aggregate then we can just send them the rspec
466 # if not, then we may be connected to an sm thats connected to the aggregate
467 if net_hrn in aggregates:
468 # send the whloe rspec to the local aggregate
469 if net_hrn in [self.api.hrn]:
471 aggregates[net_hrn].create_slice(credential, hrn, rspec, caller_cred)
473 arg_list = [credential,hrn,rspec]
474 request_hash = self.api.key.compute_hash(arg_list)
475 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
478 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
480 arg_list = [credential,hrn,rspecs[net_hrn]]
481 request_hash = self.api.key.compute_hash(arg_list)
482 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
484 # lets forward this rspec to a sm that knows about the network
485 arg_list = [credential, net_hrn]
486 request_hash = self.api.compute_hash(arg_list)
487 for aggregate in aggregates:
489 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
491 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
494 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
496 arg_list = [credential, hrn, rspecs[net_hrn]]
497 request_hash = self.api.key.compute_hash(arg_list)
498 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
501 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
502 traceback.print_exc()
506 def start_slice(self, hrn):
507 if self.api.interface in ['aggregate']:
508 self.start_slice_aggregate(hrn)
509 elif self.api.interface in ['slicemgr']:
510 self.start_slice_smgr(hrn)
512 def start_slice_aggregate(self, hrn):
513 slicename = hrn_to_pl_slicename(hrn)
514 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
516 raise RecordNotFound(hrn)
518 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
519 attribute_id = attreibutes[0]['slice_attribute_id']
520 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
523 def start_slice_smgr(self, hrn):
524 credential = self.api.getCredential()
525 aggregates = Aggregates(self.api)
526 for aggregate in aggregates:
527 aggregates[aggregate].start_slice(credential, hrn)
531 def stop_slice(self, hrn):
532 if self.api.interface in ['aggregate']:
533 self.stop_slice_aggregate(hrn)
534 elif self.api.interface in ['slicemgr']:
535 self.stop_slice_smgr(hrn)
537 def stop_slice_aggregate(self, hrn):
538 slicename = hrn_to_pl_slicename(hrn)
539 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
541 raise RecordNotFound(hrn)
542 slice_id = slices[0]['slice_id']
543 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
544 attribute_id = attributes[0]['slice_attribute_id']
545 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
548 def stop_slice_smgr(self, hrn):
549 credential = self.api.getCredential()
550 aggregates = Aggregates(self.api)
551 arg_list = [credential, hrn]
552 request_hash = self.api.key.compute_hash(arg_list)
553 for aggregate in aggregates:
555 aggregates[aggregate].stop_slice(credential, hrn)
557 aggregates[aggregate].stop_slice(credential, hrn, request_hash)