9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, origin_hrn=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.origin_hrn = origin_hrn
41 def get_slivers(self, xrn, node=None):
42 hrn, type = urn_to_hrn(xrn)
44 slice_name = hrn_to_pl_slicename(hrn)
45 # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46 # of doing all of this?
47 #return self.api.GetSliceTicket(self.auth, slice_name)
49 # from PLCAPI.GetSlivers.get_slivers()
50 slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51 slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52 # Build up list of users and slice attributes
54 all_slice_tag_ids = set()
56 person_ids.update(slice['person_ids'])
57 all_slice_tag_ids.update(slice['slice_tag_ids'])
58 person_ids = list(person_ids)
59 all_slice_tag_ids = list(all_slice_tag_ids)
60 # Get user information
61 all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
63 for person in all_persons_list:
64 all_persons[person['person_id']] = person
66 # Build up list of keys
68 for person in all_persons.values():
69 key_ids.update(person['key_ids'])
70 key_ids = list(key_ids)
71 # Get user account keys
72 all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
74 for key in all_keys_list:
75 all_keys[key['key_id']] = key
76 # Get slice attributes
77 all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
79 for slice_tag in all_slice_tags_list:
80 all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
85 for person_id in slice['person_ids']:
86 if person_id in all_persons:
87 person = all_persons[person_id]
88 if not person['enabled']:
90 for key_id in person['key_ids']:
91 if key_id in all_keys:
92 key = all_keys[key_id]
93 keys += [{'key_type': key['key_type'],
96 # All (per-node and global) attributes for this slice
98 for slice_tag_id in slice['slice_tag_ids']:
99 if slice_tag_id in all_slice_tags:
100 slice_tags.append(all_slice_tags[slice_tag_id])
101 # Per-node sliver attributes take precedence over global
102 # slice attributes, so set them first.
103 # Then comes nodegroup slice attributes
104 # Followed by global slice attributes
105 sliver_attributes = []
108 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109 sliver_attributes.append(sliver_attribute['tagname'])
110 attributes.append({'tagname': sliver_attribute['tagname'],
111 'value': sliver_attribute['value']})
113 # set nodegroup slice attributes
114 for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115 # Do not set any nodegroup slice attributes for
116 # which there is at least one sliver attribute
118 if slice_tag not in slice_tags:
119 attributes.append({'tagname': slice_tag['tagname'],
120 'value': slice_tag['value']})
122 for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123 # Do not set any global slice attributes for
124 # which there is at least one sliver attribute
126 if slice_tag['tagname'] not in sliver_attributes:
127 attributes.append({'tagname': slice_tag['tagname'],
128 'value': slice_tag['value']})
130 # XXX Sanity check; though technically this should be a system invariant
131 # checked with an assertion
132 if slice['expires'] > MAXINT: slice['expires']= MAXINT
136 'name': slice['name'],
137 'slice_id': slice['slice_id'],
138 'instantiation': slice['instantiation'],
139 'expires': slice['expires'],
141 'attributes': attributes
146 def get_peer(self, xrn):
147 hrn, type = urn_to_hrn(xrn)
148 # Becaues of myplc federation, we first need to determine if this
149 # slice belongs to out local plc or a myplc peer. We will assume it
150 # is a local site, unless we find out otherwise
153 # get this slice's authority (site)
154 slice_authority = get_authority(hrn)
156 # get this site's authority (sfa root authority or sub authority)
157 site_authority = get_authority(slice_authority).lower()
159 # check if we are already peered with this site_authority, if so
160 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161 for peer_record in peers:
162 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163 if site_authority in names:
164 peer = peer_record['shortname']
168 def get_sfa_peer(self, xrn):
169 hrn, type = urn_to_hrn(xrn)
171 # return the authority for this hrn or None if we are the authority
173 slice_authority = get_authority(hrn)
174 site_authority = get_authority(slice_authority)
176 if site_authority != self.api.hrn:
177 sfa_peer = site_authority
183 Update the cached list of slices
185 # Reload components list
186 now = datetime.datetime.now()
187 if not self.has_key('threshold') or not self.has_key('timestamp') or \
188 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189 if self.api.interface in ['aggregate']:
190 self.refresh_slices_aggregate()
191 elif self.api.interface in ['slicemgr']:
192 self.refresh_slices_smgr()
194 def refresh_slices_aggregate(self):
195 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
198 # update timestamp and threshold
199 timestamp = datetime.datetime.now()
200 hr_timestamp = timestamp.strftime(self.api.time_format)
201 delta = datetime.timedelta(hours=self.ttl)
202 threshold = timestamp + delta
203 hr_threshold = threshold.strftime(self.api.time_format)
205 slice_details = {'hrn': slice_hrns,
206 'timestamp': hr_timestamp,
207 'threshold': hr_threshold
209 self.update(slice_details)
213 def refresh_slices_smgr(self):
215 aggregates = Aggregates(self.api)
216 credential = self.api.getCredential()
217 for aggregate in aggregates:
219 # request hash is optional so lets try the call without it
221 slices = aggregates[aggregate].get_slices(credential)
222 slice_hrns.extend(slices)
225 print >> log, "%s" % (traceback.format_exc())
226 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
228 # try sending the request hash if the previous call failed
230 arg_list = [credential]
232 slices = aggregates[aggregate].get_slices(credential)
233 slice_hrns.extend(slices)
236 print >> log, "%s" % (traceback.format_exc())
237 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
239 # update timestamp and threshold
240 timestamp = datetime.datetime.now()
241 hr_timestamp = timestamp.strftime(self.api.time_format)
242 delta = datetime.timedelta(hours=self.ttl)
243 threshold = timestamp + delta
244 hr_threshold = threshold.strftime(self.api.time_format)
246 slice_details = {'hrn': slice_hrns,
247 'timestamp': hr_timestamp,
248 'threshold': hr_threshold
250 self.update(slice_details)
254 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
255 authority = get_authority(slice_hrn)
256 authority_urn = hrn_to_urn(authority, 'authority')
257 site_records = registry.resolve(credential, authority_urn)
260 for site_record in site_records:
261 if site_record['type'] == 'authority':
264 raise RecordNotFound(authority)
265 remote_site_id = site.pop('site_id')
267 login_base = get_leaf(authority)
268 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
270 site_id = self.api.plshell.AddSite(self.api.plauth, site)
272 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
273 # mark this site as an sfa peer record
275 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
276 registry.register_peer_object(credential, peer_dict)
278 site_id = sites[0]['site_id']
279 remote_site_id = sites[0]['peer_site_id']
281 #the site is alredy on the remote agg. Let us update(e.g. max_slices field) it with the latest info.
282 self.sync_site(old_site, site, peer)
285 return (site_id, remote_site_id)
287 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
290 authority = get_authority(slice_hrn)
291 slice_records = registry.resolve(credential, slice_hrn)
293 for record in slice_records:
294 if record['type'] in ['slice']:
295 slice_record = record
297 raise RecordNotFound(hrn)
298 slicename = hrn_to_pl_slicename(slice_hrn)
299 parts = slicename.split("_")
300 login_base = parts[0]
301 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename])
304 slice_keys = ['name', 'url', 'description']
305 for key in slice_keys:
306 if key in slice_record and slice_record[key]:
307 slice_fields[key] = slice_record[key]
310 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
312 slice['slice_id'] = slice_id
314 # mark this slice as an sfa peer record
316 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
317 registry.register_peer_object(credential, peer_dict)
319 #this belongs to a peer
321 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
322 slice['node_ids'] = []
325 slice_id = slice['slice_id']
326 site_id = slice['site_id']
327 #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
328 self.sync_slice(slice, slice_record, peer)
330 slice['peer_slice_id'] = slice_record['pointer']
331 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
335 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
336 # get the list of valid slice users from the registry and make
337 # sure they are added to the slice
338 slicename = hrn_to_pl_slicename(slice_record['hrn'])
339 researchers = slice_record.get('researcher', [])
340 for researcher in researchers:
342 person_records = registry.resolve(credential, researcher)
343 for record in person_records:
344 if record['type'] in ['user'] and record['enabled']:
345 person_record = record
346 if not person_record:
348 person_dict = person_record
351 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
352 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
354 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
359 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
362 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
363 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
365 # mark this person as an sfa peer record
367 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
368 registry.register_peer_object(credential, peer_dict)
371 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
374 person_id = persons[0]['person_id']
375 key_ids = persons[0]['key_ids']
378 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
381 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
382 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
384 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
385 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
386 if peer and not local_person:
387 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
389 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
391 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
393 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
394 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
395 keys = [key['key'] for key in keylist]
397 #add keys that arent already there
398 key_ids = person_dict['key_ids']
399 for personkey in person_dict['keys']:
400 if personkey not in keys:
401 key = {'key_type': 'ssh', 'key': personkey}
403 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
404 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
405 if peer and not local_person:
406 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
408 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
412 def create_slice_aggregate(self, xrn, rspec):
413 hrn, type = urn_to_hrn(xrn)
414 # Determine if this is a peer slice
415 peer = self.get_peer(hrn)
416 sfa_peer = self.get_sfa_peer(hrn)
419 # Get the slice record from sfa
420 slicename = hrn_to_pl_slicename(hrn)
423 registries = Registries(self.api)
424 registry = registries[self.api.hrn]
425 credential = self.api.getCredential()
427 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
428 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
430 # find out where this slice is currently running
431 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
432 hostnames = [node['hostname'] for node in nodelist]
434 # get netspec details
435 nodespecs = spec.getDictsByTagName('NodeSpec')
437 # dict in which to store slice attributes to set for the nodes
439 for nodespec in nodespecs:
440 if isinstance(nodespec['name'], list):
441 for nodename in nodespec['name']:
443 for k in nodespec.keys():
444 rspec_attribute_value = nodespec[k]
445 if (self.rspec_to_slice_tag.has_key(k)):
446 slice_tag_name = self.rspec_to_slice_tag[k]
447 nodes[nodename][slice_tag_name] = rspec_attribute_value
448 elif isinstance(nodespec['name'], StringTypes):
449 nodename = nodespec['name']
451 for k in nodespec.keys():
452 rspec_attribute_value = nodespec[k]
453 if (self.rspec_to_slice_tag.has_key(k)):
454 slice_tag_name = self.rspec_to_slice_tag[k]
455 nodes[nodename][slice_tag_name] = rspec_attribute_value
457 for k in nodespec.keys():
458 rspec_attribute_value = nodespec[k]
459 if (self.rspec_to_slice_tag.has_key(k)):
460 slice_tag_name = self.rspec_to_slice_tag[k]
461 nodes[nodename][slice_tag_name] = rspec_attribute_value
463 node_names = nodes.keys()
464 # remove nodes not in rspec
465 deleted_nodes = list(set(hostnames).difference(node_names))
466 # add nodes from rspec
467 added_nodes = list(set(node_names).difference(hostnames))
470 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
472 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
474 # Add recognized slice tags
475 for node_name in node_names:
476 node = nodes[node_name]
477 for slice_tag in node.keys():
478 value = node[slice_tag]
479 if (isinstance(value, list)):
482 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
484 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
486 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
490 def sync_site(self, old_record, new_record, peer):
491 if old_record['max_slices'] != new_record['max_slices'] or old_record['max_slivers'] != new_record['max_slivers']:
493 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', old_record['site_id'], peer)
494 if old_record['max_slices'] != new_record['max_slices']:
495 self.api.plshell.UpdateSite(self.api.plauth, old_record['site_id'], {'max_slices' : new_record['max_slices']})
496 if old_record['max_slivers'] != new_record['max_slivers']:
497 self.api.plshell.UpdateSite(self.api.plauth, old_record['site_id'], {'max_slivers' : new_record['max_slivers']})
499 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', old_record['site_id'], peer, old_record['peer_site_id'])
502 def sync_slice(self, old_record, new_record, peer):
503 if old_record['expires'] != new_record['expires']:
505 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
506 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
508 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])