9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
22 class Slices(SimpleStorage):
24 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
26 def __init__(self, api, ttl = .5, caller_cred=None):
30 path = self.api.config.SFA_DATA_DIR
31 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
32 filepath = path + os.sep + filename
33 self.slices_file = filepath
34 SimpleStorage.__init__(self, self.slices_file)
35 self.policy = Policy(self.api)
37 self.caller_cred=caller_cred
40 def get_peer(self, hrn):
41 # Becaues of myplc federation, we first need to determine if this
42 # slice belongs to out local plc or a myplc peer. We will assume it
43 # is a local site, unless we find out otherwise
46 # get this slice's authority (site)
47 slice_authority = get_authority(hrn)
49 # get this site's authority (sfa root authority or sub authority)
50 site_authority = get_authority(slice_authority).lower()
52 # check if we are already peered with this site_authority, if so
53 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
54 for peer_record in peers:
55 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
56 if site_authority in names:
57 peer = peer_record['shortname']
61 def get_sfa_peer(self, hrn):
62 # return the authority for this hrn or None if we are the authority
64 slice_authority = get_authority(hrn)
65 site_authority = get_authority(slice_authority)
67 if site_authority != self.api.hrn:
68 sfa_peer = site_authority
74 Update the cached list of slices
76 # Reload components list
77 now = datetime.datetime.now()
78 if not self.has_key('threshold') or not self.has_key('timestamp') or \
79 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
80 if self.api.interface in ['aggregate']:
81 self.refresh_slices_aggregate()
82 elif self.api.interface in ['slicemgr']:
83 self.refresh_slices_smgr()
85 def refresh_slices_aggregate(self):
86 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
87 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
89 # update timestamp and threshold
90 timestamp = datetime.datetime.now()
91 hr_timestamp = timestamp.strftime(self.api.time_format)
92 delta = datetime.timedelta(hours=self.ttl)
93 threshold = timestamp + delta
94 hr_threshold = threshold.strftime(self.api.time_format)
96 slice_details = {'hrn': slice_hrns,
97 'timestamp': hr_timestamp,
98 'threshold': hr_threshold
100 self.update(slice_details)
104 def refresh_slices_smgr(self):
106 aggregates = Aggregates(self.api)
107 credential = self.api.getCredential()
108 for aggregate in aggregates:
110 # request hash is optional so lets try the call without it
112 slices = aggregates[aggregate].get_slices(credential)
113 slice_hrns.extend(slices)
116 print >> log, "%s" % (traceback.format_exc())
117 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
119 # try sending the request hash if the previous call failed
121 arg_list = [credential]
122 request_hash = self.api.key.compute_hash(arg_list)
124 slices = aggregates[aggregate].get_slices(credential, request_hash)
125 slice_hrns.extend(slices)
128 print >> log, "%s" % (traceback.format_exc())
129 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
131 # update timestamp and threshold
132 timestamp = datetime.datetime.now()
133 hr_timestamp = timestamp.strftime(self.api.time_format)
134 delta = datetime.timedelta(hours=self.ttl)
135 threshold = timestamp + delta
136 hr_threshold = threshold.strftime(self.api.time_format)
138 slice_details = {'hrn': slice_hrns,
139 'timestamp': hr_timestamp,
140 'threshold': hr_threshold
142 self.update(slice_details)
146 def delete_slice(self, hrn):
147 if self.api.interface in ['aggregate']:
148 self.delete_slice_aggregate(hrn)
149 elif self.api.interface in ['slicemgr']:
150 self.delete_slice_smgr(hrn)
152 def delete_slice_aggregate(self, hrn):
154 slicename = hrn_to_pl_slicename(hrn)
155 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
160 # determine if this is a peer slice
161 peer = self.get_peer(hrn)
163 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
164 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
166 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
169 def delete_slice_smgr(self, hrn):
170 credential = self.api.getCredential()
171 caller_cred = self.caller_cred
172 aggregates = Aggregates(self.api)
173 for aggregate in aggregates:
175 # request hash is optional so lets try the call without it
178 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
181 print >> log, "%s" % (traceback.format_exc())
182 print >> log, "Error calling list nodes at aggregate %s" % aggregate
184 # try sending the request hash if the previous call failed
187 arg_list = [credential, hrn]
188 request_hash = self.api.key.compute_hash(arg_list)
189 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
192 print >> log, "%s" % (traceback.format_exc())
193 print >> log, "Error calling list nodes at aggregate %s" % aggregate
195 def create_slice(self, hrn, rspec):
197 # check our slice policy before we procede
198 whitelist = self.policy['slice_whitelist']
199 blacklist = self.policy['slice_blacklist']
201 if whitelist and hrn not in whitelist or \
202 blacklist and hrn in blacklist:
203 policy_file = self.policy.policy_file
204 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
207 if self.api.interface in ['aggregate']:
208 self.create_slice_aggregate(hrn, rspec)
209 elif self.api.interface in ['slicemgr']:
210 self.create_slice_smgr(hrn, rspec)
212 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
213 authority = get_authority(slice_hrn)
215 site_records = registry.resolve(credential, authority)
217 arg_list = [credential, authority]
218 request_hash = self.api.key.compute_hash(arg_list)
219 site_records = registry.resolve(credential, authority, request_hash)
222 for site_record in site_records:
223 if site_record['type'] == 'authority':
226 raise RecordNotFound(authority)
227 remote_site_id = site.pop('site_id')
229 login_base = get_leaf(authority)
230 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
232 site_id = self.api.plshell.AddSite(self.api.plauth, site)
234 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
235 # mark this site as an sfa peer record
237 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
239 registry.register_peer_object(credential, peer_dict)
241 arg_list = [credential]
242 request_hash = self.api.key.compute_hash(arg_list)
243 registry.register_peer_object(credential, peer_dict, request_hash)
245 site_id = sites[0]['site_id']
246 remote_site_id = sites[0]['peer_site_id']
249 return (site_id, remote_site_id)
251 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
254 authority = get_authority(slice_hrn)
256 slice_records = registry.resolve(credential, slice_hrn)
258 arg_list = [credential, slice_hrn]
259 request_hash = self.api.key.compute_hash(arg_list)
260 slice_records = registry.resolve(credential, slice_hrn, request_hash)
262 for record in slice_records:
263 if record['type'] in ['slice']:
264 slice_record = record
266 raise RecordNotFound(hrn)
267 slicename = hrn_to_pl_slicename(slice_hrn)
268 parts = slicename.split("_")
269 login_base = parts[0]
270 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id'])
273 slice_keys = ['name', 'url', 'description']
274 for key in slice_keys:
275 if key in slice_record and slice_record[key]:
276 slice_fields[key] = slice_record[key]
279 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
281 slice['slice_id'] = slice_id
283 # mark this slice as an sfa peer record
285 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
287 registry.register_peer_object(credential, peer_dict)
289 arg_list = [credential]
290 request_hash = self.api.key.compute_hash(arg_list)
291 registry.register_peer_object(credential, peer_dict, request_hash)
293 #this belongs to a peer
295 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
296 slice['node_ids'] = []
299 slice_id = slice['slice_id']
300 site_id = slice['site_id']
302 slice['peer_slice_id'] = slice_record['pointer']
303 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
307 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
308 # get the list of valid slice users from the registry and make
309 # sure they are added to the slice
310 slicename = hrn_to_pl_slicename(slice_record['hrn'])
311 researchers = slice_record.get('researcher', [])
312 for researcher in researchers:
315 person_records = registry.resolve(credential, researcher)
317 arg_list = [credential, researcher]
318 request_hash = self.api.key.compute_hash(arg_list)
319 person_records = registry.resolve(credential, researcher, request_hash)
320 for record in person_records:
321 if record['type'] in ['user']:
322 person_record = record
323 if not person_record:
325 person_dict = person_record
328 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
329 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
331 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
336 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
339 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
340 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
342 # mark this person as an sfa peer record
344 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
346 registry.register_peer_object(credential, peer_dict)
348 arg_list = [credential]
349 request_hash = self.api.key.compute_hash(arg_list)
350 registry.register_peer_object(credential, peer_dict, request_hash)
353 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
356 person_id = persons[0]['person_id']
357 key_ids = persons[0]['key_ids']
360 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
363 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
364 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
366 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
367 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
368 if peer and not local_person:
369 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
371 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
373 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
375 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
376 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
377 keys = [key['key'] for key in keylist]
379 #add keys that arent already there
380 key_ids = person_dict['key_ids']
381 for personkey in person_dict['keys']:
382 if personkey not in keys:
383 key = {'key_type': 'ssh', 'key': personkey}
385 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
386 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
387 if peer and not local_person:
388 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
390 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
394 def create_slice_aggregate(self, hrn, rspec):
396 # Determine if this is a peer slice
397 peer = self.get_peer(hrn)
398 sfa_peer = self.get_sfa_peer(hrn)
401 # Get the slice record from sfa
402 slicename = hrn_to_pl_slicename(hrn)
405 registries = Registries(self.api)
406 registry = registries[self.api.hrn]
407 credential = self.api.getCredential()
409 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
410 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
412 # find out where this slice is currently running
413 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
414 hostnames = [node['hostname'] for node in nodelist]
416 # get netspec details
417 nodespecs = spec.getDictsByTagName('NodeSpec')
419 # dict in which to store slice attributes to set for the nodes
421 for nodespec in nodespecs:
422 if isinstance(nodespec['name'], list):
423 for nodename in nodespec['name']:
425 for k in nodespec.keys():
426 rspec_attribute_value = nodespec[k]
427 if (self.rspec_to_slice_tag.has_key(k)):
428 slice_tag_name = self.rspec_to_slice_tag[k]
429 nodes[nodename][slice_tag_name] = rspec_attribute_value
430 elif isinstance(nodespec['name'], StringTypes):
431 nodename = nodespec['name']
433 for k in nodespec.keys():
434 rspec_attribute_value = nodespec[k]
435 if (self.rspec_to_slice_tag.has_key(k)):
436 slice_tag_name = self.rspec_to_slice_tag[k]
437 nodes[nodename][slice_tag_name] = rspec_attribute_value
439 for k in nodespec.keys():
440 rspec_attribute_value = nodespec[k]
441 if (self.rspec_to_slice_tag.has_key(k)):
442 slice_tag_name = self.rspec_to_slice_tag[k]
443 nodes[nodename][slice_tag_name] = rspec_attribute_value
445 node_names = nodes.keys()
446 # remove nodes not in rspec
447 deleted_nodes = list(set(hostnames).difference(node_names))
448 # add nodes from rspec
449 added_nodes = list(set(node_names).difference(hostnames))
452 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
454 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
456 # Add recognized slice tags
457 for node_name in node_names:
458 node = nodes[node_name]
459 for slice_tag in node.keys():
460 value = node[slice_tag]
461 if (isinstance(value, list)):
464 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
466 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
468 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
472 def create_slice_smgr(self, hrn, rspec):
475 spec.parseString(rspec)
476 slicename = hrn_to_pl_slicename(hrn)
477 specDict = spec.toDict()
478 if specDict.has_key('RSpec'): specDict = specDict['RSpec']
479 if specDict.has_key('start_time'): start_time = specDict['start_time']
481 if specDict.has_key('end_time'): end_time = specDict['end_time']
485 aggregates = Aggregates(self.api)
486 credential = self.api.getCredential()
488 # split the netspecs into individual rspecs
489 netspecs = spec.getDictsByTagName('NetSpec')
490 for netspec in netspecs:
491 net_hrn = netspec['name']
492 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
493 resourceDict = {'RSpec': resources}
494 tempspec.parseDict(resourceDict)
495 rspecs[net_hrn] = tempspec.toxml()
497 # send each rspec to the appropriate aggregate/sm
498 caller_cred = self.caller_cred
499 for net_hrn in rspecs:
501 # if we are directly connected to the aggregate then we can just send them the rspec
502 # if not, then we may be connected to an sm thats connected to the aggregate
503 if net_hrn in aggregates:
504 # send the whloe rspec to the local aggregate
505 if net_hrn in [self.api.hrn]:
508 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
510 arg_list = [credential,hrn,rspec]
511 request_hash = self.api.key.compute_hash(arg_list)
512 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
516 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
518 arg_list = [credential,hrn,rspecs[net_hrn]]
519 request_hash = self.api.key.compute_hash(arg_list)
520 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
522 # lets forward this rspec to a sm that knows about the network
523 arg_list = [credential, net_hrn]
524 request_hash = self.api.compute_hash(arg_list)
525 for aggregate in aggregates:
527 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
529 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
533 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
535 arg_list = [credential, hrn, rspecs[net_hrn]]
536 request_hash = self.api.key.compute_hash(arg_list)
537 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
540 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
541 traceback.print_exc()
545 def start_slice(self, hrn):
546 if self.api.interface in ['aggregate']:
547 self.start_slice_aggregate(hrn)
548 elif self.api.interface in ['slicemgr']:
549 self.start_slice_smgr(hrn)
551 def start_slice_aggregate(self, hrn):
552 slicename = hrn_to_pl_slicename(hrn)
553 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
555 raise RecordNotFound(hrn)
557 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
558 attribute_id = attreibutes[0]['slice_attribute_id']
559 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
562 def start_slice_smgr(self, hrn):
563 credential = self.api.getCredential()
564 aggregates = Aggregates(self.api)
565 for aggregate in aggregates:
566 aggregates[aggregate].start_slice(credential, hrn)
570 def stop_slice(self, hrn):
571 if self.api.interface in ['aggregate']:
572 self.stop_slice_aggregate(hrn)
573 elif self.api.interface in ['slicemgr']:
574 self.stop_slice_smgr(hrn)
576 def stop_slice_aggregate(self, hrn):
577 slicename = hrn_to_pl_slicename(hrn)
578 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
580 raise RecordNotFound(hrn)
581 slice_id = slices[0]['slice_id']
582 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
583 attribute_id = attributes[0]['slice_attribute_id']
584 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
587 def stop_slice_smgr(self, hrn):
588 credential = self.api.getCredential()
589 aggregates = Aggregates(self.api)
590 arg_list = [credential, hrn]
591 request_hash = self.api.key.compute_hash(arg_list)
592 for aggregate in aggregates:
594 aggregates[aggregate].stop_slice(credential, hrn)
596 aggregates[aggregate].stop_slice(credential, hrn, request_hash)