9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, gid_origin_caller=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.gid_origin_caller=gid_origin_caller
41 def get_slivers(self, hrn, node=None):
43 slice_name = hrn_to_pl_slicename(hrn)
44 # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
45 # of doing all of this?
46 #return self.api.GetSliceTicket(self.auth, slice_name)
48 # from PLCAPI.GetSlivers.get_slivers()
49 slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
50 slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
51 # Build up list of users and slice attributes
53 all_slice_tag_ids = set()
55 person_ids.update(slice['person_ids'])
56 all_slice_tag_ids.update(slice['slice_tag_ids'])
57 person_ids = list(person_ids)
58 all_slice_tag_ids = list(all_slice_tag_ids)
59 # Get user information
60 all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
62 for person in all_persons_list:
63 all_persons[person['person_id']] = person
65 # Build up list of keys
67 for person in all_persons.values():
68 key_ids.update(person['key_ids'])
69 key_ids = list(key_ids)
70 # Get user account keys
71 all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
73 for key in all_keys_list:
74 all_keys[key['key_id']] = key
75 # Get slice attributes
76 all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
78 for slice_tag in all_slice_tags_list:
79 all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
84 for person_id in slice['person_ids']:
85 if person_id in all_persons:
86 person = all_persons[person_id]
87 if not person['enabled']:
89 for key_id in person['key_ids']:
90 if key_id in all_keys:
91 key = all_keys[key_id]
92 keys += [{'key_type': key['key_type'],
95 # All (per-node and global) attributes for this slice
97 for slice_tag_id in slice['slice_tag_ids']:
98 if slice_tag_id in all_slice_tags:
99 slice_tags.append(all_slice_tags[slice_tag_id])
100 # Per-node sliver attributes take precedence over global
101 # slice attributes, so set them first.
102 # Then comes nodegroup slice attributes
103 # Followed by global slice attributes
104 sliver_attributes = []
107 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
108 sliver_attributes.append(sliver_attribute['tagname'])
109 attributes.append({'tagname': sliver_attribute['tagname'],
110 'value': sliver_attribute['value']})
112 # set nodegroup slice attributes
113 for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
114 # Do not set any nodegroup slice attributes for
115 # which there is at least one sliver attribute
117 if slice_tag not in slice_tags:
118 attributes.append({'tagname': slice_tag['tagname'],
119 'value': slice_tag['value']})
121 for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
122 # Do not set any global slice attributes for
123 # which there is at least one sliver attribute
125 if slice_tag['tagname'] not in sliver_attributes:
126 attributes.append({'tagname': slice_tag['tagname'],
127 'value': slice_tag['value']})
129 # XXX Sanity check; though technically this should be a system invariant
130 # checked with an assertion
131 if slice['expires'] > MAXINT: slice['expires']= MAXINT
135 'name': slice['name'],
136 'slice_id': slice['slice_id'],
137 'instantiation': slice['instantiation'],
138 'expires': slice['expires'],
140 'attributes': attributes
145 def get_peer(self, hrn):
146 # Becaues of myplc federation, we first need to determine if this
147 # slice belongs to out local plc or a myplc peer. We will assume it
148 # is a local site, unless we find out otherwise
151 # get this slice's authority (site)
152 slice_authority = get_authority(hrn)
154 # get this site's authority (sfa root authority or sub authority)
155 site_authority = get_authority(slice_authority).lower()
157 # check if we are already peered with this site_authority, if so
158 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
159 for peer_record in peers:
160 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
161 if site_authority in names:
162 peer = peer_record['shortname']
166 def get_sfa_peer(self, hrn):
167 # return the authority for this hrn or None if we are the authority
169 slice_authority = get_authority(hrn)
170 site_authority = get_authority(slice_authority)
172 if site_authority != self.api.hrn:
173 sfa_peer = site_authority
179 Update the cached list of slices
181 # Reload components list
182 now = datetime.datetime.now()
183 if not self.has_key('threshold') or not self.has_key('timestamp') or \
184 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
185 if self.api.interface in ['aggregate']:
186 self.refresh_slices_aggregate()
187 elif self.api.interface in ['slicemgr']:
188 self.refresh_slices_smgr()
190 def refresh_slices_aggregate(self):
191 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
192 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
194 # update timestamp and threshold
195 timestamp = datetime.datetime.now()
196 hr_timestamp = timestamp.strftime(self.api.time_format)
197 delta = datetime.timedelta(hours=self.ttl)
198 threshold = timestamp + delta
199 hr_threshold = threshold.strftime(self.api.time_format)
201 slice_details = {'hrn': slice_hrns,
202 'timestamp': hr_timestamp,
203 'threshold': hr_threshold
205 self.update(slice_details)
209 def refresh_slices_smgr(self):
211 aggregates = Aggregates(self.api)
212 credential = self.api.getCredential()
213 for aggregate in aggregates:
215 # request hash is optional so lets try the call without it
218 slices = aggregates[aggregate].get_slices(credential, request_hash)
219 slice_hrns.extend(slices)
222 print >> log, "%s" % (traceback.format_exc())
223 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
225 # try sending the request hash if the previous call failed
227 arg_list = [credential]
228 request_hash = self.api.key.compute_hash(arg_list)
230 slices = aggregates[aggregate].get_slices(credential, request_hash)
231 slice_hrns.extend(slices)
234 print >> log, "%s" % (traceback.format_exc())
235 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
237 # update timestamp and threshold
238 timestamp = datetime.datetime.now()
239 hr_timestamp = timestamp.strftime(self.api.time_format)
240 delta = datetime.timedelta(hours=self.ttl)
241 threshold = timestamp + delta
242 hr_threshold = threshold.strftime(self.api.time_format)
244 slice_details = {'hrn': slice_hrns,
245 'timestamp': hr_timestamp,
246 'threshold': hr_threshold
248 self.update(slice_details)
252 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
253 authority = get_authority(slice_hrn)
255 site_records = registry.resolve(credential, authority)
257 arg_list = [credential, authority]
258 request_hash = self.api.key.compute_hash(arg_list)
259 site_records = registry.resolve(credential, authority, request_hash)
262 for site_record in site_records:
263 if site_record['type'] == 'authority':
266 raise RecordNotFound(authority)
267 remote_site_id = site.pop('site_id')
269 login_base = get_leaf(authority)
270 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
272 site_id = self.api.plshell.AddSite(self.api.plauth, site)
274 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
275 # mark this site as an sfa peer record
277 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
279 registry.register_peer_object(credential, peer_dict)
281 arg_list = [credential]
282 request_hash = self.api.key.compute_hash(arg_list)
283 registry.register_peer_object(credential, peer_dict, request_hash)
285 site_id = sites[0]['site_id']
286 remote_site_id = sites[0]['peer_site_id']
289 return (site_id, remote_site_id)
291 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
294 authority = get_authority(slice_hrn)
296 slice_records = registry.resolve(credential, slice_hrn)
298 arg_list = [credential, slice_hrn]
299 request_hash = self.api.key.compute_hash(arg_list)
300 slice_records = registry.resolve(credential, slice_hrn, request_hash)
302 for record in slice_records:
303 if record['type'] in ['slice']:
304 slice_record = record
306 raise RecordNotFound(hrn)
307 slicename = hrn_to_pl_slicename(slice_hrn)
308 parts = slicename.split("_")
309 login_base = parts[0]
310 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename])
313 slice_keys = ['name', 'url', 'description']
314 for key in slice_keys:
315 if key in slice_record and slice_record[key]:
316 slice_fields[key] = slice_record[key]
319 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
321 slice['slice_id'] = slice_id
323 # mark this slice as an sfa peer record
325 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
327 registry.register_peer_object(credential, peer_dict)
329 arg_list = [credential]
330 request_hash = self.api.key.compute_hash(arg_list)
331 registry.register_peer_object(credential, peer_dict, request_hash)
333 #this belongs to a peer
335 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
336 slice['node_ids'] = []
339 slice_id = slice['slice_id']
340 site_id = slice['site_id']
341 #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
342 self.sync_slice(slice, slice_record, peer)
344 slice['peer_slice_id'] = slice_record['pointer']
345 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
349 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
350 # get the list of valid slice users from the registry and make
351 # sure they are added to the slice
352 slicename = hrn_to_pl_slicename(slice_record['hrn'])
353 researchers = slice_record.get('researcher', [])
354 for researcher in researchers:
357 person_records = registry.resolve(credential, researcher)
359 arg_list = [credential, researcher]
360 request_hash = self.api.key.compute_hash(arg_list)
361 person_records = registry.resolve(credential, researcher, request_hash)
362 for record in person_records:
363 if record['type'] in ['user']:
364 person_record = record
365 if not person_record:
367 person_dict = person_record
370 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
371 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
373 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
378 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
381 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
382 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
384 # mark this person as an sfa peer record
386 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
388 registry.register_peer_object(credential, peer_dict)
390 arg_list = [credential]
391 request_hash = self.api.key.compute_hash(arg_list)
392 registry.register_peer_object(credential, peer_dict, request_hash)
395 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
398 person_id = persons[0]['person_id']
399 key_ids = persons[0]['key_ids']
402 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
405 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
406 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
408 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
409 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
410 if peer and not local_person:
411 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
413 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
415 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
417 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
418 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
419 keys = [key['key'] for key in keylist]
421 #add keys that arent already there
422 key_ids = person_dict['key_ids']
423 for personkey in person_dict['keys']:
424 if personkey not in keys:
425 key = {'key_type': 'ssh', 'key': personkey}
427 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
428 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
429 if peer and not local_person:
430 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
432 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
436 def create_slice_aggregate(self, hrn, rspec):
438 # Determine if this is a peer slice
439 peer = self.get_peer(hrn)
440 sfa_peer = self.get_sfa_peer(hrn)
443 # Get the slice record from sfa
444 slicename = hrn_to_pl_slicename(hrn)
447 registries = Registries(self.api)
448 registry = registries[self.api.hrn]
449 credential = self.api.getCredential()
451 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
452 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
454 # find out where this slice is currently running
455 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
456 hostnames = [node['hostname'] for node in nodelist]
458 # get netspec details
459 nodespecs = spec.getDictsByTagName('NodeSpec')
461 # dict in which to store slice attributes to set for the nodes
463 for nodespec in nodespecs:
464 if isinstance(nodespec['name'], list):
465 for nodename in nodespec['name']:
467 for k in nodespec.keys():
468 rspec_attribute_value = nodespec[k]
469 if (self.rspec_to_slice_tag.has_key(k)):
470 slice_tag_name = self.rspec_to_slice_tag[k]
471 nodes[nodename][slice_tag_name] = rspec_attribute_value
472 elif isinstance(nodespec['name'], StringTypes):
473 nodename = nodespec['name']
475 for k in nodespec.keys():
476 rspec_attribute_value = nodespec[k]
477 if (self.rspec_to_slice_tag.has_key(k)):
478 slice_tag_name = self.rspec_to_slice_tag[k]
479 nodes[nodename][slice_tag_name] = rspec_attribute_value
481 for k in nodespec.keys():
482 rspec_attribute_value = nodespec[k]
483 if (self.rspec_to_slice_tag.has_key(k)):
484 slice_tag_name = self.rspec_to_slice_tag[k]
485 nodes[nodename][slice_tag_name] = rspec_attribute_value
487 node_names = nodes.keys()
488 # remove nodes not in rspec
489 deleted_nodes = list(set(hostnames).difference(node_names))
490 # add nodes from rspec
491 added_nodes = list(set(node_names).difference(hostnames))
494 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
496 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
498 # Add recognized slice tags
499 for node_name in node_names:
500 node = nodes[node_name]
501 for slice_tag in node.keys():
502 value = node[slice_tag]
503 if (isinstance(value, list)):
506 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
508 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
510 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
514 def sync_slice(self, old_record, new_record, peer):
515 if old_record['expires'] != new_record['expires']:
517 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
518 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
520 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])