9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, origin_hrn=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.origin_hrn = origin_hrn
41 def get_slivers(self, xrn, node=None):
42 hrn, type = urn_to_hrn(xrn)
44 slice_name = hrn_to_pl_slicename(hrn)
45 # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46 # of doing all of this?
47 #return self.api.GetSliceTicket(self.auth, slice_name)
49 # from PLCAPI.GetSlivers.get_slivers()
50 slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51 slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52 # Build up list of users and slice attributes
54 all_slice_tag_ids = set()
56 person_ids.update(slice['person_ids'])
57 all_slice_tag_ids.update(slice['slice_tag_ids'])
58 person_ids = list(person_ids)
59 all_slice_tag_ids = list(all_slice_tag_ids)
60 # Get user information
61 all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
63 for person in all_persons_list:
64 all_persons[person['person_id']] = person
66 # Build up list of keys
68 for person in all_persons.values():
69 key_ids.update(person['key_ids'])
70 key_ids = list(key_ids)
71 # Get user account keys
72 all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
74 for key in all_keys_list:
75 all_keys[key['key_id']] = key
76 # Get slice attributes
77 all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
79 for slice_tag in all_slice_tags_list:
80 all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
85 for person_id in slice['person_ids']:
86 if person_id in all_persons:
87 person = all_persons[person_id]
88 if not person['enabled']:
90 for key_id in person['key_ids']:
91 if key_id in all_keys:
92 key = all_keys[key_id]
93 keys += [{'key_type': key['key_type'],
96 # All (per-node and global) attributes for this slice
98 for slice_tag_id in slice['slice_tag_ids']:
99 if slice_tag_id in all_slice_tags:
100 slice_tags.append(all_slice_tags[slice_tag_id])
101 # Per-node sliver attributes take precedence over global
102 # slice attributes, so set them first.
103 # Then comes nodegroup slice attributes
104 # Followed by global slice attributes
105 sliver_attributes = []
108 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109 sliver_attributes.append(sliver_attribute['tagname'])
110 attributes.append({'tagname': sliver_attribute['tagname'],
111 'value': sliver_attribute['value']})
113 # set nodegroup slice attributes
114 for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115 # Do not set any nodegroup slice attributes for
116 # which there is at least one sliver attribute
118 if slice_tag not in slice_tags:
119 attributes.append({'tagname': slice_tag['tagname'],
120 'value': slice_tag['value']})
122 for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123 # Do not set any global slice attributes for
124 # which there is at least one sliver attribute
126 if slice_tag['tagname'] not in sliver_attributes:
127 attributes.append({'tagname': slice_tag['tagname'],
128 'value': slice_tag['value']})
130 # XXX Sanity check; though technically this should be a system invariant
131 # checked with an assertion
132 if slice['expires'] > MAXINT: slice['expires']= MAXINT
136 'name': slice['name'],
137 'slice_id': slice['slice_id'],
138 'instantiation': slice['instantiation'],
139 'expires': slice['expires'],
141 'attributes': attributes
146 def get_peer(self, xrn):
147 hrn, type = urn_to_hrn(xrn)
148 # Becaues of myplc federation, we first need to determine if this
149 # slice belongs to out local plc or a myplc peer. We will assume it
150 # is a local site, unless we find out otherwise
153 # get this slice's authority (site)
154 slice_authority = get_authority(hrn)
156 # get this site's authority (sfa root authority or sub authority)
157 site_authority = get_authority(slice_authority).lower()
159 # check if we are already peered with this site_authority, if so
160 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161 for peer_record in peers:
162 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163 if site_authority in names:
164 peer = peer_record['shortname']
168 def get_sfa_peer(self, xrn):
169 hrn, type = urn_to_hrn(xrn)
171 # return the authority for this hrn or None if we are the authority
173 slice_authority = get_authority(hrn)
174 site_authority = get_authority(slice_authority)
176 if site_authority != self.api.hrn:
177 sfa_peer = site_authority
183 Update the cached list of slices
185 # Reload components list
186 now = datetime.datetime.now()
187 if not self.has_key('threshold') or not self.has_key('timestamp') or \
188 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189 if self.api.interface in ['aggregate']:
190 self.refresh_slices_aggregate()
191 elif self.api.interface in ['slicemgr']:
192 self.refresh_slices_smgr()
194 def refresh_slices_aggregate(self):
195 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
198 # update timestamp and threshold
199 timestamp = datetime.datetime.now()
200 hr_timestamp = timestamp.strftime(self.api.time_format)
201 delta = datetime.timedelta(hours=self.ttl)
202 threshold = timestamp + delta
203 hr_threshold = threshold.strftime(self.api.time_format)
205 slice_details = {'hrn': slice_hrns,
206 'timestamp': hr_timestamp,
207 'threshold': hr_threshold
209 self.update(slice_details)
213 def refresh_slices_smgr(self):
215 aggregates = Aggregates(self.api)
216 credential = self.api.getCredential()
217 for aggregate in aggregates:
219 # request hash is optional so lets try the call without it
221 slices = aggregates[aggregate].get_slices(credential)
222 slice_hrns.extend(slices)
225 print >> log, "%s" % (traceback.format_exc())
226 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
228 # try sending the request hash if the previous call failed
230 arg_list = [credential]
232 slices = aggregates[aggregate].get_slices(credential)
233 slice_hrns.extend(slices)
236 print >> log, "%s" % (traceback.format_exc())
237 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
239 # update timestamp and threshold
240 timestamp = datetime.datetime.now()
241 hr_timestamp = timestamp.strftime(self.api.time_format)
242 delta = datetime.timedelta(hours=self.ttl)
243 threshold = timestamp + delta
244 hr_threshold = threshold.strftime(self.api.time_format)
246 slice_details = {'hrn': slice_hrns,
247 'timestamp': hr_timestamp,
248 'threshold': hr_threshold
250 self.update(slice_details)
254 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
257 authority = get_authority(slice_hrn)
258 authority_urn = hrn_to_urn(authority, 'authority')
259 site_records = registry.resolve(credential, authority_urn)
262 for site_record in site_records:
263 if site_record['type'] == 'authority':
266 raise RecordNotFound(authority)
267 remote_site_id = site.pop('site_id')
269 login_base = get_leaf(authority)
270 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
272 site_id = self.api.plshell.AddSite(self.api.plauth, site)
274 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
275 # mark this site as an sfa peer record
277 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
278 registry.register_peer_object(credential, peer_dict)
280 site_id = sites[0]['site_id']
281 remote_site_id = sites[0]['peer_site_id']
283 #the site is alredy on the remote agg. Let us update(e.g. max_slices field) it with the latest info.
284 self.sync_site(old_site, site, peer)
287 return (site_id, remote_site_id)
289 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
292 authority = get_authority(slice_hrn)
293 slice_records = registry.resolve(credential, slice_hrn)
295 for record in slice_records:
296 if record['type'] in ['slice']:
297 slice_record = record
299 raise RecordNotFound(hrn)
300 slicename = hrn_to_pl_slicename(slice_hrn)
301 parts = slicename.split("_")
302 login_base = parts[0]
303 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename])
306 slice_keys = ['name', 'url', 'description']
307 for key in slice_keys:
308 if key in slice_record and slice_record[key]:
309 slice_fields[key] = slice_record[key]
312 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
314 slice['slice_id'] = slice_id
316 # mark this slice as an sfa peer record
318 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
319 registry.register_peer_object(credential, peer_dict)
321 #this belongs to a peer
323 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
324 slice['node_ids'] = []
327 slice_id = slice['slice_id']
328 site_id = slice['site_id']
329 #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
330 self.sync_slice(slice, slice_record, peer)
332 slice['peer_slice_id'] = slice_record['pointer']
333 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
337 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
338 # get the list of valid slice users from the registry and make
339 # sure they are added to the slice
340 slicename = hrn_to_pl_slicename(slice_record['hrn'])
341 researchers = slice_record.get('researcher', [])
342 for researcher in researchers:
344 person_records = registry.resolve(credential, researcher)
345 for record in person_records:
346 if record['type'] in ['user']:
347 person_record = record
348 if not person_record:
350 person_dict = person_record
353 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
354 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
356 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
361 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
364 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
365 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
367 # mark this person as an sfa peer record
369 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
370 registry.register_peer_object(credential, peer_dict)
373 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
376 person_id = persons[0]['person_id']
377 key_ids = persons[0]['key_ids']
380 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
383 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
384 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
386 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
387 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
388 if peer and not local_person:
389 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
391 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
393 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
395 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
396 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
397 keys = [key['key'] for key in keylist]
399 #add keys that arent already there
400 key_ids = person_dict['key_ids']
401 for personkey in person_dict['keys']:
402 if personkey not in keys:
403 key = {'key_type': 'ssh', 'key': personkey}
405 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
406 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
407 if peer and not local_person:
408 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
410 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
414 def create_slice_aggregate(self, xrn, rspec):
415 hrn, type = urn_to_hrn(xrn)
416 # Determine if this is a peer slice
417 peer = self.get_peer(hrn)
418 sfa_peer = self.get_sfa_peer(hrn)
421 # Get the slice record from sfa
422 slicename = hrn_to_pl_slicename(hrn)
425 registries = Registries(self.api)
426 registry = registries[self.api.hrn]
427 credential = self.api.getCredential()
429 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
430 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
432 # find out where this slice is currently running
433 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
434 hostnames = [node['hostname'] for node in nodelist]
436 # get netspec details
437 nodespecs = spec.getDictsByTagName('NodeSpec')
439 # dict in which to store slice attributes to set for the nodes
441 for nodespec in nodespecs:
442 if isinstance(nodespec['name'], list):
443 for nodename in nodespec['name']:
445 for k in nodespec.keys():
446 rspec_attribute_value = nodespec[k]
447 if (self.rspec_to_slice_tag.has_key(k)):
448 slice_tag_name = self.rspec_to_slice_tag[k]
449 nodes[nodename][slice_tag_name] = rspec_attribute_value
450 elif isinstance(nodespec['name'], StringTypes):
451 nodename = nodespec['name']
453 for k in nodespec.keys():
454 rspec_attribute_value = nodespec[k]
455 if (self.rspec_to_slice_tag.has_key(k)):
456 slice_tag_name = self.rspec_to_slice_tag[k]
457 nodes[nodename][slice_tag_name] = rspec_attribute_value
459 for k in nodespec.keys():
460 rspec_attribute_value = nodespec[k]
461 if (self.rspec_to_slice_tag.has_key(k)):
462 slice_tag_name = self.rspec_to_slice_tag[k]
463 nodes[nodename][slice_tag_name] = rspec_attribute_value
465 node_names = nodes.keys()
466 # remove nodes not in rspec
467 deleted_nodes = list(set(hostnames).difference(node_names))
468 # add nodes from rspec
469 added_nodes = list(set(node_names).difference(hostnames))
472 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
474 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
476 # Add recognized slice tags
477 for node_name in node_names:
478 node = nodes[node_name]
479 for slice_tag in node.keys():
480 value = node[slice_tag]
481 if (isinstance(value, list)):
484 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
486 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
488 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
492 def sync_site(self, old_record, new_record, peer):
493 if old_record['max_slices'] != new_record['max_slices']:
495 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', old_record['site_id'], peer)
496 self.api.plshell.UpdateSite(self.api.plauth, old_record['site_id'], {'max_slices' : new_record['max_slices']})
498 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', old_record['site_id'], peer, old_record['peer_site_id'])
501 def sync_slice(self, old_record, new_record, peer):
502 if old_record['expires'] != new_record['expires']:
504 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
505 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
507 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])