9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, origin_hrn=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.origin_hrn = origin_hrn
41 def get_slivers(self, hrn, node=None):
43 slice_name = hrn_to_pl_slicename(hrn)
44 # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
45 # of doing all of this?
46 #return self.api.GetSliceTicket(self.auth, slice_name)
48 # from PLCAPI.GetSlivers.get_slivers()
49 slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
50 slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
51 # Build up list of users and slice attributes
53 all_slice_tag_ids = set()
55 person_ids.update(slice['person_ids'])
56 all_slice_tag_ids.update(slice['slice_tag_ids'])
57 person_ids = list(person_ids)
58 all_slice_tag_ids = list(all_slice_tag_ids)
59 # Get user information
60 all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
62 for person in all_persons_list:
63 all_persons[person['person_id']] = person
65 # Build up list of keys
67 for person in all_persons.values():
68 key_ids.update(person['key_ids'])
69 key_ids = list(key_ids)
70 # Get user account keys
71 all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
73 for key in all_keys_list:
74 all_keys[key['key_id']] = key
75 # Get slice attributes
76 all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
78 for slice_tag in all_slice_tags_list:
79 all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
84 for person_id in slice['person_ids']:
85 if person_id in all_persons:
86 person = all_persons[person_id]
87 if not person['enabled']:
89 for key_id in person['key_ids']:
90 if key_id in all_keys:
91 key = all_keys[key_id]
92 keys += [{'key_type': key['key_type'],
95 # All (per-node and global) attributes for this slice
97 for slice_tag_id in slice['slice_tag_ids']:
98 if slice_tag_id in all_slice_tags:
99 slice_tags.append(all_slice_tags[slice_tag_id])
100 # Per-node sliver attributes take precedence over global
101 # slice attributes, so set them first.
102 # Then comes nodegroup slice attributes
103 # Followed by global slice attributes
104 sliver_attributes = []
107 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
108 sliver_attributes.append(sliver_attribute['tagname'])
109 attributes.append({'tagname': sliver_attribute['tagname'],
110 'value': sliver_attribute['value']})
112 # set nodegroup slice attributes
113 for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
114 # Do not set any nodegroup slice attributes for
115 # which there is at least one sliver attribute
117 if slice_tag not in slice_tags:
118 attributes.append({'tagname': slice_tag['tagname'],
119 'value': slice_tag['value']})
121 for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
122 # Do not set any global slice attributes for
123 # which there is at least one sliver attribute
125 if slice_tag['tagname'] not in sliver_attributes:
126 attributes.append({'tagname': slice_tag['tagname'],
127 'value': slice_tag['value']})
129 # XXX Sanity check; though technically this should be a system invariant
130 # checked with an assertion
131 if slice['expires'] > MAXINT: slice['expires']= MAXINT
135 'name': slice['name'],
136 'slice_id': slice['slice_id'],
137 'instantiation': slice['instantiation'],
138 'expires': slice['expires'],
140 'attributes': attributes
145 def get_peer(self, hrn):
146 # Becaues of myplc federation, we first need to determine if this
147 # slice belongs to out local plc or a myplc peer. We will assume it
148 # is a local site, unless we find out otherwise
151 # get this slice's authority (site)
152 slice_authority = get_authority(hrn)
154 # get this site's authority (sfa root authority or sub authority)
155 site_authority = get_authority(slice_authority).lower()
157 # check if we are already peered with this site_authority, if so
158 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
159 for peer_record in peers:
160 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
161 if site_authority in names:
162 peer = peer_record['shortname']
166 def get_sfa_peer(self, hrn):
167 # return the authority for this hrn or None if we are the authority
169 slice_authority = get_authority(hrn)
170 site_authority = get_authority(slice_authority)
172 if site_authority != self.api.hrn:
173 sfa_peer = site_authority
179 Update the cached list of slices
181 # Reload components list
182 now = datetime.datetime.now()
183 if not self.has_key('threshold') or not self.has_key('timestamp') or \
184 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
185 if self.api.interface in ['aggregate']:
186 self.refresh_slices_aggregate()
187 elif self.api.interface in ['slicemgr']:
188 self.refresh_slices_smgr()
190 def refresh_slices_aggregate(self):
191 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
192 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
194 # update timestamp and threshold
195 timestamp = datetime.datetime.now()
196 hr_timestamp = timestamp.strftime(self.api.time_format)
197 delta = datetime.timedelta(hours=self.ttl)
198 threshold = timestamp + delta
199 hr_threshold = threshold.strftime(self.api.time_format)
201 slice_details = {'hrn': slice_hrns,
202 'timestamp': hr_timestamp,
203 'threshold': hr_threshold
205 self.update(slice_details)
209 def refresh_slices_smgr(self):
211 aggregates = Aggregates(self.api)
212 credential = self.api.getCredential()
213 for aggregate in aggregates:
215 # request hash is optional so lets try the call without it
217 slices = aggregates[aggregate].get_slices(credential)
218 slice_hrns.extend(slices)
221 print >> log, "%s" % (traceback.format_exc())
222 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
224 # try sending the request hash if the previous call failed
226 arg_list = [credential]
228 slices = aggregates[aggregate].get_slices(credential)
229 slice_hrns.extend(slices)
232 print >> log, "%s" % (traceback.format_exc())
233 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
235 # update timestamp and threshold
236 timestamp = datetime.datetime.now()
237 hr_timestamp = timestamp.strftime(self.api.time_format)
238 delta = datetime.timedelta(hours=self.ttl)
239 threshold = timestamp + delta
240 hr_threshold = threshold.strftime(self.api.time_format)
242 slice_details = {'hrn': slice_hrns,
243 'timestamp': hr_timestamp,
244 'threshold': hr_threshold
246 self.update(slice_details)
250 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
251 authority = get_authority(slice_hrn)
252 site_records = registry.resolve(credential, authority)
255 for site_record in site_records:
256 if site_record['type'] == 'authority':
259 raise RecordNotFound(authority)
260 remote_site_id = site.pop('site_id')
262 login_base = get_leaf(authority)
263 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
265 site_id = self.api.plshell.AddSite(self.api.plauth, site)
267 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
268 # mark this site as an sfa peer record
270 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
271 registry.register_peer_object(credential, peer_dict)
273 site_id = sites[0]['site_id']
274 remote_site_id = sites[0]['peer_site_id']
277 return (site_id, remote_site_id)
279 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
282 authority = get_authority(slice_hrn)
283 slice_records = registry.resolve(credential, slice_hrn)
285 for record in slice_records:
286 if record['type'] in ['slice']:
287 slice_record = record
289 raise RecordNotFound(hrn)
290 slicename = hrn_to_pl_slicename(slice_hrn)
291 parts = slicename.split("_")
292 login_base = parts[0]
293 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename])
296 slice_keys = ['name', 'url', 'description']
297 for key in slice_keys:
298 if key in slice_record and slice_record[key]:
299 slice_fields[key] = slice_record[key]
302 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
304 slice['slice_id'] = slice_id
306 # mark this slice as an sfa peer record
308 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
309 registry.register_peer_object(credential, peer_dict)
311 #this belongs to a peer
313 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
314 slice['node_ids'] = []
317 slice_id = slice['slice_id']
318 site_id = slice['site_id']
319 #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
320 self.sync_slice(slice, slice_record, peer)
322 slice['peer_slice_id'] = slice_record['pointer']
323 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
327 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
328 # get the list of valid slice users from the registry and make
329 # sure they are added to the slice
330 slicename = hrn_to_pl_slicename(slice_record['hrn'])
331 researchers = slice_record.get('researcher', [])
332 for researcher in researchers:
334 person_records = registry.resolve(credential, researcher)
335 for record in person_records:
336 if record['type'] in ['user']:
337 person_record = record
338 if not person_record:
340 person_dict = person_record
343 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
344 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
346 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
351 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
354 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
355 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
357 # mark this person as an sfa peer record
359 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
360 registry.register_peer_object(credential, peer_dict)
363 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
366 person_id = persons[0]['person_id']
367 key_ids = persons[0]['key_ids']
370 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
373 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
374 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
376 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
377 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
378 if peer and not local_person:
379 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
381 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
383 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
385 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
386 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
387 keys = [key['key'] for key in keylist]
389 #add keys that arent already there
390 key_ids = person_dict['key_ids']
391 for personkey in person_dict['keys']:
392 if personkey not in keys:
393 key = {'key_type': 'ssh', 'key': personkey}
395 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
396 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
397 if peer and not local_person:
398 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
400 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
404 def create_slice_aggregate(self, hrn, rspec):
406 # Determine if this is a peer slice
407 peer = self.get_peer(hrn)
408 sfa_peer = self.get_sfa_peer(hrn)
411 # Get the slice record from sfa
412 slicename = hrn_to_pl_slicename(hrn)
415 registries = Registries(self.api)
416 registry = registries[self.api.hrn]
417 credential = self.api.getCredential()
419 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
420 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
422 # find out where this slice is currently running
423 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
424 hostnames = [node['hostname'] for node in nodelist]
426 # get netspec details
427 nodespecs = spec.getDictsByTagName('NodeSpec')
429 # dict in which to store slice attributes to set for the nodes
431 for nodespec in nodespecs:
432 if isinstance(nodespec['name'], list):
433 for nodename in nodespec['name']:
435 for k in nodespec.keys():
436 rspec_attribute_value = nodespec[k]
437 if (self.rspec_to_slice_tag.has_key(k)):
438 slice_tag_name = self.rspec_to_slice_tag[k]
439 nodes[nodename][slice_tag_name] = rspec_attribute_value
440 elif isinstance(nodespec['name'], StringTypes):
441 nodename = nodespec['name']
443 for k in nodespec.keys():
444 rspec_attribute_value = nodespec[k]
445 if (self.rspec_to_slice_tag.has_key(k)):
446 slice_tag_name = self.rspec_to_slice_tag[k]
447 nodes[nodename][slice_tag_name] = rspec_attribute_value
449 for k in nodespec.keys():
450 rspec_attribute_value = nodespec[k]
451 if (self.rspec_to_slice_tag.has_key(k)):
452 slice_tag_name = self.rspec_to_slice_tag[k]
453 nodes[nodename][slice_tag_name] = rspec_attribute_value
455 node_names = nodes.keys()
456 # remove nodes not in rspec
457 deleted_nodes = list(set(hostnames).difference(node_names))
458 # add nodes from rspec
459 added_nodes = list(set(node_names).difference(hostnames))
462 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
464 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
466 # Add recognized slice tags
467 for node_name in node_names:
468 node = nodes[node_name]
469 for slice_tag in node.keys():
470 value = node[slice_tag]
471 if (isinstance(value, list)):
474 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
476 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
478 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
482 def sync_slice(self, old_record, new_record, peer):
483 if old_record['expires'] != new_record['expires']:
485 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
486 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
488 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])