9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
22 class Slices(SimpleStorage):
24 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
26 def __init__(self, api, ttl = .5, caller_cred=None):
30 path = self.api.config.SFA_BASE_DIR
31 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
32 filepath = path + os.sep + filename
33 self.slices_file = filepath
34 SimpleStorage.__init__(self, self.slices_file)
35 self.policy = Policy(self.api)
37 self.caller_cred=caller_cred
40 def get_peer(self, hrn):
41 # Becaues of myplc federation, we first need to determine if this
42 # slice belongs to out local plc or a myplc peer. We will assume it
43 # is a local site, unless we find out otherwise
46 # get this slice's authority (site)
47 slice_authority = get_authority(hrn)
49 # get this site's authority (sfa root authority or sub authority)
50 site_authority = get_authority(slice_authority).lower()
52 # check if we are already peered with this site_authority, if so
53 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
54 for peer_record in peers:
55 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
56 if site_authority in names:
57 peer = peer_record['shortname']
61 def get_sfa_peer(self, hrn):
62 # return the authority for this hrn or None if we are the authority
64 slice_authority = get_authority(hrn)
65 site_authority = get_authority(slice_authority)
67 if site_authority != self.api.hrn:
68 sfa_peer = site_authority
74 Update the cached list of slices
76 # Reload components list
77 now = datetime.datetime.now()
78 if not self.has_key('threshold') or not self.has_key('timestamp') or \
79 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
80 if self.api.interface in ['aggregate']:
81 self.refresh_slices_aggregate()
82 elif self.api.interface in ['slicemgr']:
83 self.refresh_slices_smgr()
85 def refresh_slices_aggregate(self):
86 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
87 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
89 # update timestamp and threshold
90 timestamp = datetime.datetime.now()
91 hr_timestamp = timestamp.strftime(self.api.time_format)
92 delta = datetime.timedelta(hours=self.ttl)
93 threshold = timestamp + delta
94 hr_threshold = threshold.strftime(self.api.time_format)
96 slice_details = {'hrn': slice_hrns,
97 'timestamp': hr_timestamp,
98 'threshold': hr_threshold
100 self.update(slice_details)
104 def refresh_slices_smgr(self):
106 aggregates = Aggregates(self.api)
107 credential = self.api.getCredential()
108 for aggregate in aggregates:
110 # request hash is optional so lets try the call without it
112 slices = aggregates[aggregate].get_slices(credential)
113 slice_hrns.extend(slices)
116 print >> log, "%s" % (traceback.format_exc())
117 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
119 # try sending the request hash if the previous call failed
121 arg_list = [credential]
122 request_hash = self.api.key.compute_hash(arg_list)
124 slices = aggregates[aggregate].get_slices(credential, request_hash)
125 slice_hrns.extend(slices)
128 print >> log, "%s" % (traceback.format_exc())
129 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
131 # update timestamp and threshold
132 timestamp = datetime.datetime.now()
133 hr_timestamp = timestamp.strftime(self.api.time_format)
134 delta = datetime.timedelta(hours=self.ttl)
135 threshold = timestamp + delta
136 hr_threshold = threshold.strftime(self.api.time_format)
138 slice_details = {'hrn': slice_hrns,
139 'timestamp': hr_timestamp,
140 'threshold': hr_threshold
142 self.update(slice_details)
146 def delete_slice(self, hrn):
147 if self.api.interface in ['aggregate']:
148 self.delete_slice_aggregate(hrn)
149 elif self.api.interface in ['slicemgr']:
150 self.delete_slice_smgr(hrn)
152 def delete_slice_aggregate(self, hrn):
154 slicename = hrn_to_pl_slicename(hrn)
155 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
160 # determine if this is a peer slice
161 peer = self.get_peer(hrn)
163 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
164 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
166 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
169 def delete_slice_smgr(self, hrn):
170 credential = self.api.getCredential()
171 caller_cred = self.caller_cred
172 aggregates = Aggregates(self.api)
173 for aggregate in aggregates:
175 # request hash is optional so lets try the call without it
177 aggregates[aggregate].delete_slice(credential, hrn, caller_cred)
180 print >> log, "%s" % (traceback.format_exc())
181 print >> log, "Error calling list nodes at aggregate %s" % aggregate
183 # try sending the request hash if the previous call failed
186 arg_list = [credential, hrn]
187 request_hash = self.api.key.compute_hash(arg_list)
188 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
191 print >> log, "%s" % (traceback.format_exc())
192 print >> log, "Error calling list nodes at aggregate %s" % aggregate
194 def create_slice(self, hrn, rspec):
196 # check our slice policy before we procede
197 whitelist = self.policy['slice_whitelist']
198 blacklist = self.policy['slice_blacklist']
200 if whitelist and hrn not in whitelist or \
201 blacklist and hrn in blacklist:
202 policy_file = self.policy.policy_file
203 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
206 if self.api.interface in ['aggregate']:
207 self.create_slice_aggregate(hrn, rspec)
208 elif self.api.interface in ['slicemgr']:
209 self.create_slice_smgr(hrn, rspec)
211 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
212 authority = get_authority(slice_hrn)
214 site_records = registry.resolve(credential, authority)
216 arg_list = [credential, authority]
217 request_hash = self.api.key.compute_hash(arg_list)
218 site_records = registry.resolve(credential, authority, request_hash)
221 for site_record in site_records:
222 if site_record['type'] == 'authority':
225 raise RecordNotFound(authority)
226 remote_site_id = site.pop('site_id')
228 login_base = get_leaf(authority)
229 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
231 site_id = self.api.plshell.AddSite(self.api.plauth, site)
233 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
234 # mark this site as an sfa peer record
236 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
238 registry.register_peer_object(credential, peer_dict)
240 arg_list = [credential]
241 request_hash = self.api.key.compute_hash(arg_list)
242 registry.register_peer_object(credential, peer_dict, request_hash)
244 site_id = sites[0]['site_id']
245 remote_site_id = sites[0]['peer_site_id']
248 return (site_id, remote_site_id)
250 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
253 authority = get_authority(slice_hrn)
255 slice_records = registry.resolve(credential, slice_hrn)
257 arg_list = [credential, slice_hrn]
258 request_hash = self.api.key.compute_hash(arg_list)
259 slice_records = registry.resolve(credential, slice_hrn, request_hash)
261 for record in slice_records:
262 if record['type'] in ['slice']:
263 slice_record = record
265 raise RecordNotFound(hrn)
266 slicename = hrn_to_pl_slicename(slice_hrn)
267 parts = slicename.split("_")
268 login_base = parts[0]
269 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id'])
272 slice_keys = ['name', 'url', 'description']
273 for key in slice_keys:
274 if key in slice_record and slice_record[key]:
275 slice_fields[key] = slice_record[key]
278 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
280 slice['slice_id'] = slice_id
282 # mark this slice as an sfa peer record
284 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
286 registry.register_peer_object(credential, peer_dict)
288 arg_list = [credential]
289 request_hash = self.api.key.compute_hash(arg_list)
290 registry.register_peer_object(credential, peer_dict, request_hash)
292 #this belongs to a peer
294 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
295 slice['node_ids'] = []
298 slice_id = slice['slice_id']
299 site_id = slice['site_id']
301 slice['peer_slice_id'] = slice_record['pointer']
302 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
306 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
307 # get the list of valid slice users from the registry and make
308 # sure they are added to the slice
309 slicename = hrn_to_pl_slicename(slice_record['hrn'])
310 researchers = slice_record.get('researcher', [])
311 for researcher in researchers:
314 person_records = registry.resolve(credential, researcher)
316 arg_list = [credential, researcher]
317 request_hash = self.api.key.compute_hash(arg_list)
318 person_records = registry.resolve(credential, researcher, request_hash)
319 for record in person_records:
320 if record['type'] in ['user']:
321 person_record = record
322 if not person_record:
324 person_dict = person_record
327 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
328 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
330 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
335 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
338 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
339 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
341 # mark this person as an sfa peer record
343 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
345 registry.register_peer_object(credential, peer_dict)
347 arg_list = [credential]
348 request_hash = self.api.key.compute_hash(arg_list)
349 registry.register_peer_object(credential, peer_dict, request_hash)
352 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
355 person_id = persons[0]['person_id']
356 key_ids = persons[0]['key_ids']
359 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
362 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
363 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
365 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
366 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
367 if peer and not local_person:
368 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
370 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
372 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
374 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
375 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
376 keys = [key['key'] for key in keylist]
378 #add keys that arent already there
379 key_ids = person_dict['key_ids']
380 for personkey in person_dict['keys']:
381 if personkey not in keys:
382 key = {'key_type': 'ssh', 'key': personkey}
384 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
385 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
386 if peer and not local_person:
387 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
389 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
393 def create_slice_aggregate(self, hrn, rspec):
395 # Determine if this is a peer slice
396 peer = self.get_peer(hrn)
397 sfa_peer = self.get_sfa_peer(hrn)
400 # Get the slice record from sfa
401 slicename = hrn_to_pl_slicename(hrn)
404 registries = Registries(self.api)
405 registry = registries[self.api.hrn]
406 credential = self.api.getCredential()
408 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
409 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
411 # find out where this slice is currently running
412 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
413 hostnames = [node['hostname'] for node in nodelist]
415 # get netspec details
416 nodespecs = spec.getDictsByTagName('NodeSpec')
418 # dict in which to store slice attributes to set for the nodes
420 for nodespec in nodespecs:
421 if isinstance(nodespec['name'], list):
422 for nodename in nodespec['name']:
424 for k in nodespec.keys():
425 rspec_attribute_value = nodespec[k]
426 if (self.rspec_to_slice_tag.has_key(k)):
427 slice_tag_name = self.rspec_to_slice_tag[k]
428 nodes[nodename][slice_tag_name] = rspec_attribute_value
429 elif isinstance(nodespec['name'], StringTypes):
430 nodename = nodespec['name']
432 for k in nodespec.keys():
433 rspec_attribute_value = nodespec[k]
434 if (self.rspec_to_slice_tag.has_key(k)):
435 slice_tag_name = self.rspec_to_slice_tag[k]
436 nodes[nodename][slice_tag_name] = rspec_attribute_value
438 for k in nodespec.keys():
439 rspec_attribute_value = nodespec[k]
440 if (self.rspec_to_slice_tag.has_key(k)):
441 slice_tag_name = self.rspec_to_slice_tag[k]
442 nodes[nodename][slice_tag_name] = rspec_attribute_value
444 node_names = nodes.keys()
445 # remove nodes not in rspec
446 deleted_nodes = list(set(hostnames).difference(node_names))
447 # add nodes from rspec
448 added_nodes = list(set(node_names).difference(hostnames))
451 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
453 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
455 # Add recognized slice tags
456 for node_name in node_names:
457 node = nodes[node_name]
458 for slice_tag in node.keys():
459 value = node[slice_tag]
460 if (isinstance(value, list)):
463 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
465 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
467 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
471 def create_slice_smgr(self, hrn, rspec):
474 spec.parseString(rspec)
475 slicename = hrn_to_pl_slicename(hrn)
476 specDict = spec.toDict()
477 if specDict.has_key('RSpec'): specDict = specDict['RSpec']
478 if specDict.has_key('start_time'): start_time = specDict['start_time']
480 if specDict.has_key('end_time'): end_time = specDict['end_time']
484 aggregates = Aggregates(self.api)
485 credential = self.api.getCredential()
487 # split the netspecs into individual rspecs
488 netspecs = spec.getDictsByTagName('NetSpec')
489 for netspec in netspecs:
490 net_hrn = netspec['name']
491 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
492 resourceDict = {'RSpec': resources}
493 tempspec.parseDict(resourceDict)
494 rspecs[net_hrn] = tempspec.toxml()
496 # send each rspec to the appropriate aggregate/sm
497 caller_cred = self.caller_cred
498 for net_hrn in rspecs:
500 # if we are directly connected to the aggregate then we can just send them the rspec
501 # if not, then we may be connected to an sm thats connected to the aggregate
502 if net_hrn in aggregates:
503 # send the whloe rspec to the local aggregate
504 if net_hrn in [self.api.hrn]:
506 aggregates[net_hrn].create_slice(credential, hrn, rspec, caller_cred)
508 arg_list = [credential,hrn,rspec]
509 request_hash = self.api.key.compute_hash(arg_list)
510 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
513 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
515 arg_list = [credential,hrn,rspecs[net_hrn]]
516 request_hash = self.api.key.compute_hash(arg_list)
517 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
519 # lets forward this rspec to a sm that knows about the network
520 arg_list = [credential, net_hrn]
521 request_hash = self.api.compute_hash(arg_list)
522 for aggregate in aggregates:
524 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
526 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
529 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], caller_cred)
531 arg_list = [credential, hrn, rspecs[net_hrn]]
532 request_hash = self.api.key.compute_hash(arg_list)
533 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
536 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
537 traceback.print_exc()
541 def start_slice(self, hrn):
542 if self.api.interface in ['aggregate']:
543 self.start_slice_aggregate(hrn)
544 elif self.api.interface in ['slicemgr']:
545 self.start_slice_smgr(hrn)
547 def start_slice_aggregate(self, hrn):
548 slicename = hrn_to_pl_slicename(hrn)
549 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
551 raise RecordNotFound(hrn)
553 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
554 attribute_id = attreibutes[0]['slice_attribute_id']
555 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
558 def start_slice_smgr(self, hrn):
559 credential = self.api.getCredential()
560 aggregates = Aggregates(self.api)
561 for aggregate in aggregates:
562 aggregates[aggregate].start_slice(credential, hrn)
566 def stop_slice(self, hrn):
567 if self.api.interface in ['aggregate']:
568 self.stop_slice_aggregate(hrn)
569 elif self.api.interface in ['slicemgr']:
570 self.stop_slice_smgr(hrn)
572 def stop_slice_aggregate(self, hrn):
573 slicename = hrn_to_pl_slicename(hrn)
574 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
576 raise RecordNotFound(hrn)
577 slice_id = slices[0]['slice_id']
578 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
579 attribute_id = attributes[0]['slice_attribute_id']
580 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
583 def stop_slice_smgr(self, hrn):
584 credential = self.api.getCredential()
585 aggregates = Aggregates(self.api)
586 arg_list = [credential, hrn]
587 request_hash = self.api.key.compute_hash(arg_list)
588 for aggregate in aggregates:
590 aggregates[aggregate].stop_slice(credential, hrn)
592 aggregates[aggregate].stop_slice(credential, hrn, request_hash)