9 from types import StringTypes
10 from sfa.util.namespace import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, origin_hrn=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.origin_hrn = origin_hrn
41 def get_slivers(self, xrn, node=None):
42 hrn, type = urn_to_hrn(xrn)
44 slice_name = hrn_to_pl_slicename(hrn)
45 # XX Should we just call PLCAPI.GetSliceTicket(slice_name) instead
46 # of doing all of this?
47 #return self.api.GetSliceTicket(self.auth, slice_name)
49 # from PLCAPI.GetSlivers.get_slivers()
50 slice_fields = ['slice_id', 'name', 'instantiation', 'expires', 'person_ids', 'slice_tag_ids']
51 slices = self.api.plshell.GetSlices(self.api.plauth, slice_name, slice_fields)
52 # Build up list of users and slice attributes
54 all_slice_tag_ids = set()
56 person_ids.update(slice['person_ids'])
57 all_slice_tag_ids.update(slice['slice_tag_ids'])
58 person_ids = list(person_ids)
59 all_slice_tag_ids = list(all_slice_tag_ids)
60 # Get user information
61 all_persons_list = self.api.plshell.GetPersons(self.api.plauth, {'person_id':person_ids,'enabled':True}, ['person_id', 'enabled', 'key_ids'])
63 for person in all_persons_list:
64 all_persons[person['person_id']] = person
66 # Build up list of keys
68 for person in all_persons.values():
69 key_ids.update(person['key_ids'])
70 key_ids = list(key_ids)
71 # Get user account keys
72 all_keys_list = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key_id', 'key', 'key_type'])
74 for key in all_keys_list:
75 all_keys[key['key_id']] = key
76 # Get slice attributes
77 all_slice_tags_list = self.api.plshell.GetSliceTags(self.api.plauth, all_slice_tag_ids)
79 for slice_tag in all_slice_tags_list:
80 all_slice_tags[slice_tag['slice_tag_id']] = slice_tag
85 for person_id in slice['person_ids']:
86 if person_id in all_persons:
87 person = all_persons[person_id]
88 if not person['enabled']:
90 for key_id in person['key_ids']:
91 if key_id in all_keys:
92 key = all_keys[key_id]
93 keys += [{'key_type': key['key_type'],
96 # All (per-node and global) attributes for this slice
98 for slice_tag_id in slice['slice_tag_ids']:
99 if slice_tag_id in all_slice_tags:
100 slice_tags.append(all_slice_tags[slice_tag_id])
101 # Per-node sliver attributes take precedence over global
102 # slice attributes, so set them first.
103 # Then comes nodegroup slice attributes
104 # Followed by global slice attributes
105 sliver_attributes = []
108 for sliver_attribute in filter(lambda a: a['node_id'] == node['node_id'], slice_tags):
109 sliver_attributes.append(sliver_attribute['tagname'])
110 attributes.append({'tagname': sliver_attribute['tagname'],
111 'value': sliver_attribute['value']})
113 # set nodegroup slice attributes
114 for slice_tag in filter(lambda a: a['nodegroup_id'] in node['nodegroup_ids'], slice_tags):
115 # Do not set any nodegroup slice attributes for
116 # which there is at least one sliver attribute
118 if slice_tag not in slice_tags:
119 attributes.append({'tagname': slice_tag['tagname'],
120 'value': slice_tag['value']})
122 for slice_tag in filter(lambda a: a['node_id'] is None, slice_tags):
123 # Do not set any global slice attributes for
124 # which there is at least one sliver attribute
126 if slice_tag['tagname'] not in sliver_attributes:
127 attributes.append({'tagname': slice_tag['tagname'],
128 'value': slice_tag['value']})
130 # XXX Sanity check; though technically this should be a system invariant
131 # checked with an assertion
132 if slice['expires'] > MAXINT: slice['expires']= MAXINT
136 'name': slice['name'],
137 'slice_id': slice['slice_id'],
138 'instantiation': slice['instantiation'],
139 'expires': slice['expires'],
141 'attributes': attributes
146 def get_peer(self, xrn):
147 hrn, type = urn_to_hrn(xrn)
148 # Becaues of myplc federation, we first need to determine if this
149 # slice belongs to out local plc or a myplc peer. We will assume it
150 # is a local site, unless we find out otherwise
153 # get this slice's authority (site)
154 slice_authority = get_authority(hrn)
156 # get this site's authority (sfa root authority or sub authority)
157 site_authority = get_authority(slice_authority).lower()
159 # check if we are already peered with this site_authority, if so
160 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
161 for peer_record in peers:
162 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
163 if site_authority in names:
164 peer = peer_record['shortname']
168 def get_sfa_peer(self, xrn):
169 hrn, type = urn_to_hrn(xrn)
171 # return the authority for this hrn or None if we are the authority
173 slice_authority = get_authority(hrn)
174 site_authority = get_authority(slice_authority)
176 if site_authority != self.api.hrn:
177 sfa_peer = site_authority
183 Update the cached list of slices
185 # Reload components list
186 now = datetime.datetime.now()
187 if not self.has_key('threshold') or not self.has_key('timestamp') or \
188 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
189 if self.api.interface in ['aggregate']:
190 self.refresh_slices_aggregate()
191 elif self.api.interface in ['slicemgr']:
192 self.refresh_slices_smgr()
194 def refresh_slices_aggregate(self):
195 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
196 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
198 # update timestamp and threshold
199 timestamp = datetime.datetime.now()
200 hr_timestamp = timestamp.strftime(self.api.time_format)
201 delta = datetime.timedelta(hours=self.ttl)
202 threshold = timestamp + delta
203 hr_threshold = threshold.strftime(self.api.time_format)
205 slice_details = {'hrn': slice_hrns,
206 'timestamp': hr_timestamp,
207 'threshold': hr_threshold
209 self.update(slice_details)
213 def refresh_slices_smgr(self):
215 aggregates = Aggregates(self.api)
216 credential = self.api.getCredential()
217 for aggregate in aggregates:
219 # request hash is optional so lets try the call without it
221 slices = aggregates[aggregate].get_slices(credential)
222 slice_hrns.extend(slices)
225 print >> log, "%s" % (traceback.format_exc())
226 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
228 # try sending the request hash if the previous call failed
230 arg_list = [credential]
232 slices = aggregates[aggregate].get_slices(credential)
233 slice_hrns.extend(slices)
236 print >> log, "%s" % (traceback.format_exc())
237 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
239 # update timestamp and threshold
240 timestamp = datetime.datetime.now()
241 hr_timestamp = timestamp.strftime(self.api.time_format)
242 delta = datetime.timedelta(hours=self.ttl)
243 threshold = timestamp + delta
244 hr_threshold = threshold.strftime(self.api.time_format)
246 slice_details = {'hrn': slice_hrns,
247 'timestamp': hr_timestamp,
248 'threshold': hr_threshold
250 self.update(slice_details)
254 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
255 authority = get_authority(slice_hrn)
256 authority_urn = hrn_to_urn(authority, 'authority')
257 site_records = registry.resolve(credential, authority_urn)
260 for site_record in site_records:
261 if site_record['type'] == 'authority':
264 raise RecordNotFound(authority)
265 remote_site_id = site.pop('site_id')
267 login_base = get_leaf(authority)
268 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
270 site_id = self.api.plshell.AddSite(self.api.plauth, site)
272 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
273 # mark this site as an sfa peer record
275 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
276 registry.register_peer_object(credential, peer_dict)
278 site_id = sites[0]['site_id']
279 remote_site_id = sites[0]['peer_site_id']
282 return (site_id, remote_site_id)
284 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
287 authority = get_authority(slice_hrn)
288 slice_records = registry.resolve(credential, slice_hrn)
290 for record in slice_records:
291 if record['type'] in ['slice']:
292 slice_record = record
294 raise RecordNotFound(hrn)
295 slicename = hrn_to_pl_slicename(slice_hrn)
296 parts = slicename.split("_")
297 login_base = parts[0]
298 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename])
301 slice_keys = ['name', 'url', 'description']
302 for key in slice_keys:
303 if key in slice_record and slice_record[key]:
304 slice_fields[key] = slice_record[key]
307 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
309 slice['slice_id'] = slice_id
311 # mark this slice as an sfa peer record
313 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
314 registry.register_peer_object(credential, peer_dict)
316 #this belongs to a peer
318 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
319 slice['node_ids'] = []
322 slice_id = slice['slice_id']
323 site_id = slice['site_id']
324 #the slice is alredy on the remote agg. Let us update(e.g. expires field) it with the latest info.
325 self.sync_slice(slice, slice_record, peer)
327 slice['peer_slice_id'] = slice_record['pointer']
328 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
332 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
333 # get the list of valid slice users from the registry and make
334 # sure they are added to the slice
335 slicename = hrn_to_pl_slicename(slice_record['hrn'])
336 researchers = slice_record.get('researcher', [])
337 for researcher in researchers:
339 person_records = registry.resolve(credential, researcher)
340 for record in person_records:
341 if record['type'] in ['user']:
342 person_record = record
343 if not person_record:
345 person_dict = person_record
348 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
349 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
351 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
356 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
359 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
360 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
362 # mark this person as an sfa peer record
364 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
365 registry.register_peer_object(credential, peer_dict)
368 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
371 person_id = persons[0]['person_id']
372 key_ids = persons[0]['key_ids']
375 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
378 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
379 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
381 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
382 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
383 if peer and not local_person:
384 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
386 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
388 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer, local_person)
390 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer, local_person):
391 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
392 keys = [key['key'] for key in keylist]
394 #add keys that arent already there
395 key_ids = person_dict['key_ids']
396 for personkey in person_dict['keys']:
397 if personkey not in keys:
398 key = {'key_type': 'ssh', 'key': personkey}
400 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
401 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
402 if peer and not local_person:
403 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
405 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
409 def create_slice_aggregate(self, xrn, rspec):
410 hrn, type = urn_to_hrn(xrn)
411 # Determine if this is a peer slice
412 peer = self.get_peer(hrn)
413 sfa_peer = self.get_sfa_peer(hrn)
416 # Get the slice record from sfa
417 slicename = hrn_to_pl_slicename(hrn)
420 registries = Registries(self.api)
421 registry = registries[self.api.hrn]
422 credential = self.api.getCredential()
424 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
425 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
427 # find out where this slice is currently running
428 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
429 hostnames = [node['hostname'] for node in nodelist]
431 # get netspec details
432 nodespecs = spec.getDictsByTagName('NodeSpec')
434 # dict in which to store slice attributes to set for the nodes
436 for nodespec in nodespecs:
437 if isinstance(nodespec['name'], list):
438 for nodename in nodespec['name']:
440 for k in nodespec.keys():
441 rspec_attribute_value = nodespec[k]
442 if (self.rspec_to_slice_tag.has_key(k)):
443 slice_tag_name = self.rspec_to_slice_tag[k]
444 nodes[nodename][slice_tag_name] = rspec_attribute_value
445 elif isinstance(nodespec['name'], StringTypes):
446 nodename = nodespec['name']
448 for k in nodespec.keys():
449 rspec_attribute_value = nodespec[k]
450 if (self.rspec_to_slice_tag.has_key(k)):
451 slice_tag_name = self.rspec_to_slice_tag[k]
452 nodes[nodename][slice_tag_name] = rspec_attribute_value
454 for k in nodespec.keys():
455 rspec_attribute_value = nodespec[k]
456 if (self.rspec_to_slice_tag.has_key(k)):
457 slice_tag_name = self.rspec_to_slice_tag[k]
458 nodes[nodename][slice_tag_name] = rspec_attribute_value
460 node_names = nodes.keys()
461 # remove nodes not in rspec
462 deleted_nodes = list(set(hostnames).difference(node_names))
463 # add nodes from rspec
464 added_nodes = list(set(node_names).difference(hostnames))
467 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
469 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
471 # Add recognized slice tags
472 for node_name in node_names:
473 node = nodes[node_name]
474 for slice_tag in node.keys():
475 value = node[slice_tag]
476 if (isinstance(value, list)):
479 self.api.plshell.AddSliceTag(self.api.plauth, slicename, slice_tag, value, node_name)
481 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
483 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
487 def sync_slice(self, old_record, new_record, peer):
488 if old_record['expires'] != new_record['expires']:
490 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', old_record['slice_id'], peer)
491 self.api.plshell.UpdateSlice(self.api.plauth, old_record['slice_id'], {'expires' : new_record['expires']})
493 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', old_record['slice_id'], peer, old_record['peer_slice_id'])