9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
22 class Slices(SimpleStorage):
24 def __init__(self, api, ttl = .5, caller_cred=None):
28 path = self.api.config.SFA_BASE_DIR
29 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
30 filepath = path + os.sep + filename
31 self.slices_file = filepath
32 SimpleStorage.__init__(self, self.slices_file)
33 self.policy = Policy(self.api)
35 self.caller_cred=caller_cred
38 def get_peer(self, hrn):
39 # Becaues of myplc federation, we first need to determine if this
40 # slice belongs to out local plc or a myplc peer. We will assume it
41 # is a local site, unless we find out otherwise
44 # get this slice's authority (site)
45 slice_authority = get_authority(hrn)
47 # get this site's authority (sfa root authority or sub authority)
48 site_authority = get_authority(slice_authority).lower()
50 # check if we are already peered with this site_authority, if so
51 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
52 for peer_record in peers:
53 names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
54 if site_authority in names:
55 peer = peer_record['shortname']
59 def get_sfa_peer(self, hrn):
60 # return the authority for this hrn or None if we are the authority
62 slice_authority = get_authority(hrn)
63 site_authority = get_authority(slice_authority)
65 if site_authority != self.api.hrn:
66 sfa_peer = site_authority
72 Update the cached list of slices
74 # Reload components list
75 now = datetime.datetime.now()
76 if not self.has_key('threshold') or not self.has_key('timestamp') or \
77 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
78 if self.api.interface in ['aggregate']:
79 self.refresh_slices_aggregate()
80 elif self.api.interface in ['slicemgr']:
81 self.refresh_slices_smgr()
83 def refresh_slices_aggregate(self):
84 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
85 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
87 # update timestamp and threshold
88 timestamp = datetime.datetime.now()
89 hr_timestamp = timestamp.strftime(self.api.time_format)
90 delta = datetime.timedelta(hours=self.ttl)
91 threshold = timestamp + delta
92 hr_threshold = threshold.strftime(self.api.time_format)
94 slice_details = {'hrn': slice_hrns,
95 'timestamp': hr_timestamp,
96 'threshold': hr_threshold
98 self.update(slice_details)
102 def refresh_slices_smgr(self):
104 aggregates = Aggregates(self.api)
105 credential = self.api.getCredential()
106 arg_list = [credential]
107 request_hash = self.api.key.compute_hash(arg_list)
108 for aggregate in aggregates:
110 slices = aggregates[aggregate].get_slices(credential, request_hash)
111 slice_hrns.extend(slices)
113 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
114 # update timestamp and threshold
115 timestamp = datetime.datetime.now()
116 hr_timestamp = timestamp.strftime(self.api.time_format)
117 delta = datetime.timedelta(hours=self.ttl)
118 threshold = timestamp + delta
119 hr_threshold = threshold.strftime(self.api.time_format)
121 slice_details = {'hrn': slice_hrns,
122 'timestamp': hr_timestamp,
123 'threshold': hr_threshold
125 self.update(slice_details)
129 def delete_slice(self, hrn):
130 if self.api.interface in ['aggregate']:
131 self.delete_slice_aggregate(hrn)
132 elif self.api.interface in ['slicemgr']:
133 self.delete_slice_smgr(hrn)
135 def delete_slice_aggregate(self, hrn):
137 slicename = hrn_to_pl_slicename(hrn)
138 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
143 # determine if this is a peer slice
144 peer = self.get_peer(hrn)
146 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
147 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
149 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
152 def delete_slice_smgr(self, hrn):
153 credential = self.api.getCredential()
154 aggregates = Aggregates(self.api)
155 for aggregate in aggregates:
157 aggregates[aggregate].delete_slice(credential, hrn, caller_cred=self.caller_cred)
159 print >> log, "Error calling list nodes at aggregate %s" % aggregate
160 traceback.print_exc(log)
161 exc_type, exc_value, exc_traceback = sys.exc_info()
162 print exc_type, exc_value, exc_traceback
164 def create_slice(self, hrn, rspec):
166 # check our slice policy before we procede
167 whitelist = self.policy['slice_whitelist']
168 blacklist = self.policy['slice_blacklist']
170 if whitelist and hrn not in whitelist or \
171 blacklist and hrn in blacklist:
172 policy_file = self.policy.policy_file
173 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
176 if self.api.interface in ['aggregate']:
177 self.create_slice_aggregate(hrn, rspec)
178 elif self.api.interface in ['slicemgr']:
179 self.create_slice_smgr(hrn, rspec)
181 def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer):
182 authority = get_authority(slice_hrn)
183 site_records = registry.resolve(credential, authority)
185 for site_record in site_records:
186 if site_record['type'] == 'authority':
187 site = site_record.as_dict()
189 raise RecordNotFound(authority)
190 remote_site_id = site.pop('site_id')
192 login_base = get_leaf(authority)
193 sites = self.api.plshell.GetSites(self.api.plauth, login_base)
195 site_id = self.api.plshell.AddSite(self.api.plauth, site)
197 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
198 # mark this site as an sfa peer record
200 peer_dict = {'type': 'authority', 'hrn': authority, 'peer_authority': sfa_peer, 'pointer': site_id}
201 registry.register_peer_object(credential, peer_dict)
204 site_id = sites[0]['site_id']
205 remote_site_id = sites[0]['peer_site_id']
208 return (site_id, remote_site_id)
210 def verify_slice(self, registry, credential, slice_hrn, site_id, remote_site_id, peer, sfa_peer):
213 authority = get_authority(slice_hrn)
214 slice_records = registry.resolve(credential, slice_hrn)
215 for record in slice_records:
216 if record['type'] in ['slice']:
217 slice_record = record
219 raise RecordNotFound(hrn)
220 slicename = hrn_to_pl_slicename(slice_hrn)
221 parts = slicename.split("_")
222 login_base = parts[0]
223 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids', 'site_id'])
226 slice_keys = ['name', 'url', 'description']
227 for key in slice_keys:
228 if key in slice_record and slice_record[key]:
229 slice_fields[key] = slice_record[key]
232 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
234 slice['slice_id'] = slice_id
236 # mark this slice as an sfa peer record
238 peer_dict = {'type': 'slice', 'hrn': slice_hrn, 'peer_authority': sfa_peer, 'pointer': slice_id}
239 registry.register_peer_object(credential, peer_dict)
242 #this belongs to a peer
244 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
245 slice['node_ids'] = []
248 slice_id = slice['slice_id']
249 site_id = slice['site_id']
251 slice['peer_slice_id'] = slice_record['pointer']
252 self.verify_persons(registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer)
256 def verify_persons(self, registry, credential, slice_record, site_id, remote_site_id, peer, sfa_peer):
257 # get the list of valid slice users from the registry and make
258 # sure they are added to the slice
259 slicename = hrn_to_pl_slicename(slice_record['hrn'])
260 researchers = slice_record.get('researcher', [])
261 for researcher in researchers:
263 person_records = registry.resolve(credential, researcher)
264 for record in person_records:
265 if record['type'] in ['user']:
266 person_record = record
267 if not person_record:
269 person_dict = person_record.as_dict()
271 peer_id = self.api.plshell.GetPeers(self.api.plauth, {'shortname': peer}, ['peer_id'])[0]['peer_id']
272 persons = self.api.plshell.GetPersons(self.api.plauth, {'email': [person_dict['email']], 'peer_id': peer_id}, ['person_id', 'key_ids'])
275 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
278 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
279 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
281 # mark this person as an sfa peer record
283 peer_dict = {'type': 'user', 'hrn': researcher, 'peer_authority': sfa_peer, 'pointer': person_id}
284 registry.register_peer_object(credential, peer_dict)
288 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
291 person_id = persons[0]['person_id']
292 key_ids = persons[0]['key_ids']
295 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
298 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
299 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'site', site_id, peer)
301 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
302 self.api.plshell.AddPersonToSite(self.api.plauth, person_dict['email'], site_id)
304 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
305 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
307 self.verify_keys(registry, credential, person_dict, key_ids, person_id, peer)
309 def verify_keys(self, registry, credential, person_dict, key_ids, person_id, peer):
310 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
311 keys = [key['key'] for key in keylist]
313 #add keys that arent already there
314 key_ids = person_dict['key_ids']
315 for personkey in person_dict['keys']:
316 if personkey not in keys:
317 key = {'key_type': 'ssh', 'key': personkey}
319 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
320 key_id = self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
322 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_dict['pointer'])
323 try: self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', key_id, peer, key_ids.pop(0))
327 def create_slice_aggregate(self, hrn, rspec):
329 # Determine if this is a peer slice
330 peer = self.get_peer(hrn)
331 sfa_peer = self.get_sfa_peer(hrn)
334 # Get the slice record from sfa
335 slicename = hrn_to_pl_slicename(hrn)
338 registries = Registries(self.api)
339 registry = registries[self.api.hrn]
340 credential = self.api.getCredential()
342 site_id, remote_site_id = self.verify_site(registry, credential, hrn, peer, sfa_peer)
343 slice = self.verify_slice(registry, credential, hrn, site_id, remote_site_id, peer, sfa_peer)
345 # find out where this slice is currently running
346 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
347 hostnames = [node['hostname'] for node in nodelist]
349 # get netspec details
350 nodespecs = spec.getDictsByTagName('NodeSpec')
352 for nodespec in nodespecs:
353 if isinstance(nodespec['name'], list):
354 nodes.extend(nodespec['name'])
355 elif isinstance(nodespec['name'], StringTypes):
356 nodes.append(nodespec['name'])
358 # remove nodes not in rspec
359 deleted_nodes = list(set(hostnames).difference(nodes))
360 # add nodes from rspec
361 added_nodes = list(set(nodes).difference(hostnames))
364 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
365 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
366 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
368 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
372 def create_slice_smgr(self, hrn, rspec):
375 spec.parseString(rspec)
376 slicename = hrn_to_pl_slicename(hrn)
377 specDict = spec.toDict()
378 if specDict.has_key('Rspec'): specDict = specDict['Rspec']
379 if specDict.has_key('start_time'): start_time = specDict['start_time']
381 if specDict.has_key('end_time'): end_time = specDict['end_time']
385 aggregates = Aggregates(self.api)
386 credential = self.api.getCredential()
388 # split the netspecs into individual rspecs
389 netspecs = spec.getDictsByTagName('NetSpec')
390 for netspec in netspecs:
391 net_hrn = netspec['name']
392 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
393 resourceDict = {'Rspec': resources}
394 tempspec.parseDict(resourceDict)
395 rspecs[net_hrn] = tempspec.toxml()
397 # send each rspec to the appropriate aggregate/sm
398 for net_hrn in rspecs:
400 # if we are directly connected to the aggregate then we can just send them the rspec
401 # if not, then we may be connected to an sm thats connected to the aggregate
402 if net_hrn in aggregates:
403 # send the whloe rspec to the local aggregate
404 if net_hrn in [self.api.hrn]:
405 aggregates[net_hrn].create_slice(credential, hrn, rspec, caller_cred=self.caller_cred)
407 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], caller_cred=self.caller_cred)
409 # lets forward this rspec to a sm that knows about the network
410 for aggregate in aggregates:
411 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
413 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], caller_cred=self.caller_cred)
416 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
417 traceback.print_exc()
421 def start_slice(self, hrn):
422 if self.api.interface in ['aggregate']:
423 self.start_slice_aggregate(hrn)
424 elif self.api.interface in ['slicemgr']:
425 self.start_slice_smgr(hrn)
427 def start_slice_aggregate(self, hrn):
428 slicename = hrn_to_pl_slicename(hrn)
429 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
431 raise RecordNotFound(hrn)
433 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
434 attribute_id = attreibutes[0]['slice_attribute_id']
435 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
438 def start_slice_smgr(self, hrn):
439 credential = self.api.getCredential()
440 aggregates = Aggregates(self.api)
441 for aggregate in aggregates:
442 aggregates[aggregate].start_slice(credential, hrn)
446 def stop_slice(self, hrn):
447 if self.api.interface in ['aggregate']:
448 self.stop_slice_aggregate(hrn)
449 elif self.api.interface in ['slicemgr']:
450 self.stop_slice_smgr(hrn)
452 def stop_slice_aggregate(self, hrn):
453 slicename = hrn_to_pl_slicename(hrn)
454 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
456 raise RecordNotFound(hrn)
457 slice_id = slices[0]['slice_id']
458 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
459 attribute_id = attributes[0]['slice_attribute_id']
460 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
463 def stop_slice_smgr(self, hrn):
464 credential = self.api.getCredential()
465 aggregates = Aggregates(self.api)
466 for aggregate in aggregates:
467 aggregates[aggregate].stop_slice(credential, hrn)