9 from sfa.util.misc import *
10 from sfa.util.rspec import *
11 from sfa.util.specdict import *
12 from sfa.util.faults import *
13 from sfa.util.storage import *
14 from sfa.util.policy import Policy
15 from sfa.util.debug import log
16 from sfa.server.aggregate import Aggregates
17 from sfa.server.registry import Registries
19 class Slices(SimpleStorage):
21 def __init__(self, api, ttl = .5):
25 path = self.api.config.SFA_BASE_DIR
26 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
27 filepath = path + os.sep + filename
28 self.slices_file = filepath
29 SimpleStorage.__init__(self, self.slices_file)
30 self.policy = Policy(self.api)
34 def get_peer(self, hrn):
35 # Becaues of myplc federation, we first need to determine if this
36 # slice belongs to out local plc or a myplc peer. We will assume it
37 # is a local site, unless we find out otherwise
40 # get this slice's authority (site)
41 slice_authority = get_authority(hrn)
43 # get this site's authority (sfa root authority or sub authority)
44 site_authority = get_authority(slice_authority)
46 # check if we are already peered with this site_authority, if so
47 peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
48 for peer_record in peers:
49 if site_authority in peer_record.values():
50 peer = peer_record['shortname']
56 Update the cached list of slices
58 # Reload components list
59 now = datetime.datetime.now()
60 if not self.has_key('threshold') or not self.has_key('timestamp') or \
61 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
62 if self.api.interface in ['aggregate']:
63 self.refresh_slices_aggregate()
64 elif self.api.interface in ['slicemgr']:
65 self.refresh_slices_smgr()
67 def refresh_slices_aggregate(self):
68 slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
69 slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
71 # update timestamp and threshold
72 timestamp = datetime.datetime.now()
73 hr_timestamp = timestamp.strftime(self.api.time_format)
74 delta = datetime.timedelta(hours=self.ttl)
75 threshold = timestamp + delta
76 hr_threshold = threshold.strftime(self.api.time_format)
78 slice_details = {'hrn': slice_hrns,
79 'timestamp': hr_timestamp,
80 'threshold': hr_threshold
82 self.update(slice_details)
86 def refresh_slices_smgr(self):
88 aggregates = Aggregates(self.api)
89 credential = self.api.getCredential()
90 for aggregate in aggregates:
92 slices = aggregates[aggregate].get_slices(credential)
93 slice_hrns.extend(slices)
95 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
96 # update timestamp and threshold
97 timestamp = datetime.datetime.now()
98 hr_timestamp = timestamp.strftime(self.api.time_format)
99 delta = datetime.timedelta(hours=self.ttl)
100 threshold = timestamp + delta
101 hr_threshold = threshold.strftime(self.api.time_format)
103 slice_details = {'hrn': slice_hrns,
104 'timestamp': hr_timestamp,
105 'threshold': hr_threshold
107 self.update(slice_details)
111 def delete_slice(self, hrn):
112 if self.api.interface in ['aggregate']:
113 self.delete_slice_aggregate(hrn)
114 elif self.api.interface in ['slicemgr']:
115 self.delete_slice_smgr(hrn)
117 def delete_slice_aggregate(self, hrn):
119 slicename = hrn_to_pl_slicename(hrn)
120 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename})
125 # determine if this is a peer slice
126 peer = self.get_peer(hrn)
128 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice['slice_id'], peer)
129 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
131 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
134 def delete_slice_smgr(self, hrn):
135 credential = self.api.getCredential()
136 aggregates = Aggregates(self.api)
137 for aggregate in aggregates:
139 aggregates[aggregate].delete_slice(credential, hrn)
141 print >> log, "Error calling list nodes at aggregate %s" % aggregate
142 traceback.print_exc(log)
143 exc_type, exc_value, exc_traceback = sys.exc_info()
144 print exc_type, exc_value, exc_traceback
146 def create_slice(self, hrn, rspec):
148 # check our slice policy before we procede
149 whitelist = self.policy['slice_whitelist']
150 blacklist = self.policy['slice_blacklist']
152 if whitelist and hrn not in whitelist or \
153 blacklist and hrn in blacklist:
154 policy_file = self.policy.policy_file
155 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
158 if self.api.interface in ['aggregate']:
159 self.create_slice_aggregate(hrn, rspec)
160 elif self.api.interface in ['slicemgr']:
161 self.create_slice_smgr(hrn, rspec)
163 def create_slice_aggregate(self, hrn, rspec):
165 # Determine if this is a peer slice
166 peer = self.get_peer(hrn)
169 # Get the slice record from geni
172 registries = Registries(self.api)
173 registry = registries[self.api.hrn]
174 credential = self.api.getCredential()
175 records = registry.resolve(credential, hrn)
176 for record in records:
177 if record.get_type() in ['slice']:
178 slice_record = record.as_dict()
180 raise RecordNotFound(hrn)
182 # Make sure slice exists at plc, if it doesnt add it
183 slicename = hrn_to_pl_slicename(hrn)
184 slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['slice_id', 'node_ids'])
186 parts = slicename.split("_")
187 login_base = parts[0]
188 # if site doesnt exist add it
189 sites = self.api.plshell.GetSites(self.api.plauth, [login_base])
191 authority = get_authority(hrn)
192 site_records = registry.resolve(credential, authority)
195 raise RecordNotFound(authority)
196 site_record = site_records[0]
197 site = site_record.as_dict()
200 remote_site_id = site.pop('site_id')
201 site_id = self.api.plshell.AddSite(self.api.plauth, site)
202 # this belongs to a peer
204 self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
208 # create slice object
210 slice_keys = ['name', 'url', 'description']
211 for key in slice_keys:
212 if key in slice_record and slice_record[key]:
213 slice_fields[key] = slice_record[key]
216 slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
219 #this belongs to a peer
221 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
222 slice['node_ids'] = []
225 slice_id = slice['slice_id']
226 # get the list of valid slice users from the registry and make
227 # they are added to the slice
228 researchers = record.get('researcher', [])
229 for researcher in researchers:
231 person_records = registry.resolve(credential, researcher)
232 for record in person_records:
233 if record.get_type() in ['user']:
234 person_record = record
235 if not person_record:
237 person_dict = person_record.as_dict()
238 persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
240 # Create the person record
242 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
244 # The line below enables the user account on the remote
245 # aggregate soon after it is created. without this the
246 # user key is not transfered to the slice (as GetSlivers
247 # returns key of only enabled users), which prevents the
248 # user from login to the slice. We may do additional checks
249 # before enabling the user.
251 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
253 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_record['pointer'])
256 person_id = persons[0]['person_id']
257 key_ids = persons[0]['key_ids']
259 # if this is a peer person, we must unbind them from the peer or PLCAPI will throw
262 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'person', person_id, peer)
263 self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)
265 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_record['pointer'])
267 # Get this users local keys
268 keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
269 keys = [key['key'] for key in keylist]
271 # add keys that arent already there
272 for personkey in person_dict['keys']:
273 if personkey not in keys:
274 key = {'key_type': 'ssh', 'key': personkey}
276 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_record['pointer'])
277 self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
279 self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_record['pointer'])
281 # find out where this slice is currently running
282 nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
283 hostnames = [node['hostname'] for node in nodelist]
285 # get netspec details
286 nodespecs = spec.getDictsByTagName('NodeSpec')
288 for nodespec in nodespecs:
289 if isinstance(nodespec['name'], list):
290 nodes.extend(nodespec['name'])
291 elif isinstance(nodespec['name'], StringTypes):
292 nodes.append(nodespec['name'])
294 # remove nodes not in rspec
295 deleted_nodes = list(set(hostnames).difference(nodes))
296 # add nodes from rspec
297 added_nodes = list(set(nodes).difference(hostnames))
300 self.api.plshell.UnBindObjectFromPeer(self.api.plauth, 'slice', slice_id, peer)
301 self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes)
302 self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
304 self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
308 def create_slice_smgr(self, hrn, rspec):
311 spec.parseString(rspec)
312 slicename = hrn_to_pl_slicename(hrn)
313 specDict = spec.toDict()
314 if specDict.has_key('Rspec'): specDict = specDict['Rspec']
315 if specDict.has_key('start_time'): start_time = specDict['start_time']
317 if specDict.has_key('end_time'): end_time = specDict['end_time']
321 aggregates = Aggregates(self.api)
322 credential = self.api.getCredential()
323 # only attempt to extract information about the aggregates we know about
324 for aggregate in aggregates:
325 netspec = spec.getDictByTagNameValue('NetSpec', aggregate)
328 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
329 resourceDict = {'Rspec': resources}
330 tempspec.parseDict(resourceDict)
331 rspecs[aggregate] = tempspec.toxml()
333 # notify the aggregates
334 for aggregate in rspecs.keys():
336 # send the whloe rspec to the local aggregate
337 if aggregate in [self.api.hrn]:
338 aggregates[aggregate].create_slice(credential, hrn, rspec)
340 aggregates[aggregate].create_slice(credential, hrn, rspecs[aggregate])
342 print >> log, "Error creating slice %(hrn)s at aggregate %(aggregate)s" % locals()
346 def start_slice(self, hrn):
347 if self.api.interface in ['aggregate']:
348 self.start_slice_aggregate(hrn)
349 elif self.api.interface in ['slicemgr']:
350 self.start_slice_smgr(hrn)
352 def start_slice_aggregate(self, hrn):
353 slicename = hrn_to_pl_slicename(hrn)
354 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
356 raise RecordNotFound(hrn)
358 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
359 attribute_id = attreibutes[0]['slice_attribute_id']
360 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
363 def start_slice_smgr(self, hrn):
364 credential = self.api.getCredential()
365 aggregates = Aggregates(self.api)
366 for aggregate in aggregates:
367 aggregates[aggregate].start_slice(credential, hrn)
371 def stop_slice(self, hrn):
372 if self.api.interface in ['aggregate']:
373 self.stop_slice_aggregate(hrn)
374 elif self.api.interface in ['slicemgr']:
375 self.stop_slice_smgr(hrn)
377 def stop_slice_aggregate(self, hrn):
378 slicename = hrn_to_pl_slicename(hrn)
379 slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
381 raise RecordNotFound(hrn)
382 slice_id = slices[0]['slice_id']
383 attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
384 attribute_id = attributes[0]['slice_attribute_id']
385 self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
388 def stop_slice_smgr(self, hrn):
389 credential = self.api.getCredential()
390 aggregates = Aggregates(self.api)
391 for aggregate in aggregates:
392 aggregates[aggregate].stop_slice(credential, hrn)