9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
22 class Slices(SimpleStorage):
24 def __init__(self, api, ttl = .5, caller_cred=None):
28 path = self.api.config.SFA_BASE_DIR
29 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
30 filepath = path + os.sep + filename
31 self.slices_file = filepath
32 SimpleStorage.__init__(self, self.slices_file)
33 self.policy = Policy(self.api)
35 self.caller_cred=caller_cred
38 def get_peer(self, hrn):
39 # Becaues of myplc federation, we first need to determine if this
40 # slice belongs to out local plc or a myplc peer. We will assume it
41 # is a local site, unless we find out otherwise
44 # get this slice's authority (site)
45 slice_authority = get_authority(hrn)
47 # get this site's authority (sfa root authority or sub authority)
48 site_authority = get_authority(slice_authority).lower()
50 # check if we are already peered with this site_authority, if so
51 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
52 for peer_record in peers:
53 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
54 if site_authority in names:
55 peer = peer_record['shortname']
59 def get_sfa_peer(self, hrn):
60 # return the authority for this hrn or None if we are the authority
62 slice_authority = get_authority(hrn)
63 site_authority = get_authority(slice_authority)
65 if site_authority != self.api.hrn:
66 sfa_peer = site_authority
72 Update the cached list of slices
74 # Reload components list
75 now = datetime.datetime.now()
76 if not self.has_key('threshold') or not self.has_key('timestamp') or \
77 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
78 if self.api.interface in ['aggregate']:
79 self.refresh_slices_aggregate()
80 elif self.api.interface in ['slicemgr']:
81 self.refresh_slices_smgr()
83 def refresh_slices_aggregate(self):
84 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
85 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
87 # update timestamp and threshold
88 timestamp = datetime.datetime.now()
89 hr_timestamp = timestamp.strftime(self.api.time_format)
90 delta = datetime.timedelta(hours=self.ttl)
91 threshold = timestamp + delta
92 hr_threshold = threshold.strftime(self.api.time_format)
94 slice_details = {'hrn': slice_hrns,
95 'timestamp': hr_timestamp,
96 'threshold': hr_threshold
98 self.update(slice_details)
102 def refresh_slices_smgr(self):
104 aggregates = Aggregates(self.api)
105 credential = self.api.getCredential()
106 arg_list = [credential]
107 request_hash = self.api.key.compute_hash(arg_list)
108 for aggregate in aggregates:
110 slices = aggregates[aggregate].get_slices(credential, request_hash)
111 slice_hrns.extend(slices)
113 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
114 # update timestamp and threshold
115 timestamp = datetime.datetime.now()
116 hr_timestamp = timestamp.strftime(self.api.time_format)
117 delta = datetime.timedelta(hours=self.ttl)
118 threshold = timestamp + delta
119 hr_threshold = threshold.strftime(self.api.time_format)
121 slice_details = {'hrn': slice_hrns,
122 'timestamp': hr_timestamp,
123 'threshold': hr_threshold
125 self.update(slice_details)
129 def delete_slice(self, hrn):
130 if self.api.interface in ['aggregate']:
131 self.delete_slice_aggregate(hrn)
132 elif self.api.interface in ['slicemgr']:
133 self.delete_slice_smgr(hrn)
135 def delete_slice_aggregate(self, hrn):
137 slicename = hrn_to_pl_slicename(hrn)
138 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
143 # determine if this is a peer slice
144 peer = self.get_peer(hrn)
146 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
147 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
149 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
152 def delete_slice_smgr(self, hrn):
153 credential = self.api.getCredential()
154 caller_cred = self.caller_cred
155 aggregates = Aggregates(self.api)
156 arg_list = [credential, hrn]
157 request_hash = self.api.key.compute_hash(arg_list)
158 for aggregate in aggregates:
160 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
162 print >> log, "Error calling list nodes at aggregate %s" % aggregate
163 traceback.print_exc(log)
164 exc_type, exc_value, exc_traceback = sys.exc_info()
165 print exc_type, exc_value, exc_traceback
167 def create_slice(self, hrn, rspec):
169 # check our slice policy before we procede
170 whitelist = self.policy['slice_whitelist']
171 blacklist = self.policy['slice_blacklist']
173 if whitelist and hrn not in whitelist or \
174 blacklist and hrn in blacklist:
175 policy_file = self.policy.policy_file
176 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
179 if self.api.interface in ['aggregate']:
180 self.create_slice_aggregate(hrn, rspec)
181 elif self.api.interface in ['slicemgr']:
182 self.create_slice_smgr(hrn, rspec)
184 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
185 authority = get_authority(slice_hrn)
186 arg_list = [credential, authority]
187 request_hash = self.api.key.compute_hash(arg_list)
188 site_records = registry.resolve(credential, authority, request_hash)
190 for site_record in site_records:
191 if site_record['type'] == 'authority':
194 raise RecordNotFound(authority)
195 remote_site_id = site.pop('site_id')
197 login_base = get_leaf(authority)
198 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
200 site_id = self.api.plshell.AddSite(self.api.plauth, site)
202 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
203 # mark this site as an sfa peer record
205 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
206 arg_list = [credential]
207 request_hash = self.api.key.compute_hash(arg_list)
208 registry.register_peer_object(credential, peer_dict, request_hash)
211 site_id = sites[0]['site_id']
212 remote_site_id = sites[0]['peer_site_id']
215 return (site_id, remote_site_id)
217 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
220 authority = get_authority(slice_hrn)
221 arg_list = [credential, slice_hrn]
222 request_hash = self.api.key.compute_hash(arg_list)
223 slice_records = registry.resolve(credential, slice_hrn, request_hash)
224 for record in slice_records:
225 if record['type'] in ['slice']:
226 slice_record = record
228 raise RecordNotFound(hrn)
229 slicename = hrn_to_pl_slicename(slice_hrn)
230 parts = slicename.split("_")
231 login_base = parts[0]
232 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id'])
235 slice_keys = ['name', 'url', 'description']
236 for key in slice_keys:
237 if key in slice_record and slice_record[key]:
238 slice_fields[key] = slice_record[key]
241 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
243 slice['slice_id'] = slice_id
245 # mark this slice as an sfa peer record
247 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
248 arg_list = [credential]
249 request_hash = self.api.key.compute_hash(arg_list)
250 registry.register_peer_object(credential, peer_dict, request_hash)
253 #this belongs to a peer
255 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
256 slice['node_ids'] = []
259 slice_id = slice['slice_id']
260 site_id = slice['site_id']
262 slice['peer_slice_id'] = slice_record['pointer']
263 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
267 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
268 # get the list of valid slice users from the registry and make
269 # sure they are added to the slice
270 slicename = hrn_to_pl_slicename(slice_record['hrn'])
271 researchers = slice_record.get('researcher', [])
272 for researcher in researchers:
273 arg_list = [credential, researcher]
274 request_hash = self.api.key.compute_hash(arg_list)
276 person_records = registry.resolve(credential, researcher, request_hash)
277 for record in person_records:
278 if record['type'] in ['user']:
279 person_record = record
280 if not person_record:
282 person_dict = person_record
284 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
285 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
288 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
291 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
292 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
294 # mark this person as an sfa peer record
296 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
297 arg_list = [credential]
298 request_hash = self.api.key.compute_hash(arg_list)
299 registry.register_peer_object(credential, peer_dict, request_hash)
303 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
306 person_id = persons[0]['person_id']
307 key_ids = persons[0]['key_ids']
310 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
313 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
314 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
316 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
317 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
319 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
320 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
322 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer)
324 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer):
325 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
326 keys = [key['key'] for key in keylist]
328 #add keys that arent already there
329 key_ids = person_dict['key_ids']
330 for personkey in person_dict['keys']:
331 if personkey not in keys:
332 key = {'key_type': 'ssh', 'key': personkey}
334 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
335 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
337 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
338 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
342 def create_slice_aggregate(self, hrn, rspec):
344 # Determine if this is a peer slice
345 peer = self.get_peer(hrn)
346 sfa_peer = self.get_sfa_peer(hrn)
349 # Get the slice record from sfa
350 slicename = hrn_to_pl_slicename(hrn)
353 registries = Registries(self.api)
354 registry = registries[self.api.hrn]
355 credential = self.api.getCredential()
357 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
358 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
360 # find out where this slice is currently running
361 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
362 hostnames = [node['hostname'] for node in nodelist]
364 # get netspec details
365 nodespecs = spec.getDictsByTagName('NodeSpec')
367 for nodespec in nodespecs:
368 if isinstance(nodespec['name'], list):
369 nodes.extend(nodespec['name'])
370 elif isinstance(nodespec['name'], StringTypes):
371 nodes.append(nodespec['name'])
373 # remove nodes not in rspec
374 deleted_nodes = list(set(hostnames).difference(nodes))
375 # add nodes from rspec
376 added_nodes = list(set(nodes).difference(hostnames))
379 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
380 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
381 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
383 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
387 def create_slice_smgr(self, hrn, rspec):
390 spec.parseString(rspec)
391 slicename = hrn_to_pl_slicename(hrn)
392 specDict = spec.toDict()
393 if specDict.has_key('Rspec'): specDict = specDict['Rspec']
394 if specDict.has_key('start_time'): start_time = specDict['start_time']
396 if specDict.has_key('end_time'): end_time = specDict['end_time']
400 aggregates = Aggregates(self.api)
401 credential = self.api.getCredential()
403 # split the netspecs into individual rspecs
404 netspecs = spec.getDictsByTagName('NetSpec')
405 for netspec in netspecs:
406 net_hrn = netspec['name']
407 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
408 resourceDict = {'Rspec': resources}
409 tempspec.parseDict(resourceDict)
410 rspecs[net_hrn] = tempspec.toxml()
412 # send each rspec to the appropriate aggregate/sm
413 caller_cred = self.caller_cred
414 for net_hrn in rspecs:
416 # if we are directly connected to the aggregate then we can just send them the rspec
417 # if not, then we may be connected to an sm thats connected to the aggregate
418 if net_hrn in aggregates:
419 # send the whloe rspec to the local aggregate
420 if net_hrn in [self.api.hrn]:
421 arg_list = [credential,hrn,rspec]
422 request_hash = self.api.key.compute_hash(arg_list)
423 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
425 arg_list = [credential,hrn,rspecs[net_hrn]]
426 request_hash = self.api.key.compute_hash(arg_list)
427 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
429 # lets forward this rspec to a sm that knows about the network
430 arg_list = [credential, net_hrn]
431 request_hash = self.api.compute_hash(arg_list)
432 for aggregate in aggregates:
433 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
435 arg_list = [credential, hrn, rspecs[net_hrn]]
436 request_hash = self.api.key.compute_hash(arg_list)
437 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
440 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
441 traceback.print_exc()
445 def start_slice(self, hrn):
446 if self.api.interface in ['aggregate']:
447 self.start_slice_aggregate(hrn)
448 elif self.api.interface in ['slicemgr']:
449 self.start_slice_smgr(hrn)
451 def start_slice_aggregate(self, hrn):
452 slicename = hrn_to_pl_slicename(hrn)
453 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
455 raise RecordNotFound(hrn)
457 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
458 attribute_id = attreibutes[0]['slice_attribute_id']
459 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
462 def start_slice_smgr(self, hrn):
463 credential = self.api.getCredential()
464 aggregates = Aggregates(self.api)
465 for aggregate in aggregates:
466 aggregates[aggregate].start_slice(credential, hrn)
470 def stop_slice(self, hrn):
471 if self.api.interface in ['aggregate']:
472 self.stop_slice_aggregate(hrn)
473 elif self.api.interface in ['slicemgr']:
474 self.stop_slice_smgr(hrn)
476 def stop_slice_aggregate(self, hrn):
477 slicename = hrn_to_pl_slicename(hrn)
478 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
480 raise RecordNotFound(hrn)
481 slice_id = slices[0]['slice_id']
482 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
483 attribute_id = attributes[0]['slice_attribute_id']
484 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
487 def stop_slice_smgr(self, hrn):
488 credential = self.api.getCredential()
489 aggregates = Aggregates(self.api)
490 arg_list = [credential, hrn]
491 request_hash = self.api.key.compute_hash(arg_list)
492 for aggregate in aggregates:
493 aggregates[aggregate].stop_slice(credential, hrn, request_hash)