comment out BindObjectToPeer() calls until things are working
[sfa.git] / sfa / plc / slices.py
1 ### $Id$
2 ### $URL$
3
4 import datetime
5 import time
6
7 from sfa.util.misc import *
8 from sfa.util.rspec import *
9 from sfa.util.specdict import *
10 from sfa.util.faults import *
11 from sfa.util.storage import *
12 from sfa.util.policy import Policy
13 from sfa.util.debug import log
14 from sfa.server.aggregate import Aggregates
15 from sfa.server.registry import Registries
16 import traceback
17 class Slices(SimpleStorage):
18
19     def __init__(self, api, ttl = .5):
20         self.api = api
21         self.ttl = ttl
22         self.threshold = None
23         path = self.api.config.SFA_BASE_DIR
24         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
25         filepath = path + os.sep + filename
26         self.slices_file = filepath
27         SimpleStorage.__init__(self, self.slices_file)
28         self.policy = Policy(self.api)    
29         self.load()
30
31
32     def refresh(self):
33         """
34         Update the cached list of slices
35         """
36         # Reload components list
37         now = datetime.datetime.now()
38         if not self.has_key('threshold') or not self.has_key('timestamp') or \
39            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
40             if self.api.interface in ['aggregate']:
41                 self.refresh_slices_aggregate()
42             elif self.api.interface in ['slicemgr']:
43                 self.refresh_slices_smgr()
44
45     def refresh_slices_aggregate(self):
46         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name'])
47         slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices]
48
49          # update timestamp and threshold
50         timestamp = datetime.datetime.now()
51         hr_timestamp = timestamp.strftime(self.api.time_format)
52         delta = datetime.timedelta(hours=self.ttl)
53         threshold = timestamp + delta
54         hr_threshold = threshold.strftime(self.api.time_format)
55         
56         slice_details = {'hrn': slice_hrns,
57                          'timestamp': hr_timestamp,
58                          'threshold': hr_threshold
59                         }
60         self.update(slice_details)
61         self.write()     
62         
63
64     def refresh_slices_smgr(self):
65         slice_hrns = []
66         aggregates = Aggregates(self.api)
67         credential = self.api.getCredential()
68         for aggregate in aggregates:
69             try:
70                 slices = aggregates[aggregate].get_slices(credential)
71                 slice_hrns.extend(slices)
72             except:
73                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
74          # update timestamp and threshold
75         timestamp = datetime.datetime.now()
76         hr_timestamp = timestamp.strftime(self.api.time_format)
77         delta = datetime.timedelta(hours=self.ttl)
78         threshold = timestamp + delta
79         hr_threshold = threshold.strftime(self.api.time_format)
80
81         slice_details = {'hrn': slice_hrns,
82                          'timestamp': hr_timestamp,
83                          'threshold': hr_threshold
84                         }
85         self.update(slice_details)
86         self.write()
87
88
89     def delete_slice(self, hrn):
90         if self.api.interface in ['aggregate']:
91             self.delete_slice_aggregate(hrn)
92         elif self.api.interface in ['slicemgr']:
93             self.delete_slice_smgr(hrn)
94         
95     def delete_slice_aggregate(self, hrn):
96         slicename = hrn_to_pl_slicename(hrn)
97         slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None, 'name': slicename})
98         if not slices:
99             return 1        
100         slice = slices[0]
101
102         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids'])
103         return 1
104
105     def delete_slice_smgr(self, hrn):
106         credential = self.api.getCredential()
107         aggregates = Aggregates(self.api)
108         for aggregate in aggregates:
109             aggregates[aggregate].delete_slice(credential, hrn)
110
111     def create_slice(self, hrn, rspec):
112         
113         # check our slice policy before we procede
114         whitelist = self.policy['slice_whitelist']     
115         blacklist = self.policy['slice_blacklist']
116        
117         if whitelist and hrn not in whitelist or \
118            blacklist and hrn in blacklist:
119             policy_file = self.policy.policy_file
120             print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
121             return 1
122
123         if self.api.interface in ['aggregate']:     
124             self.create_slice_aggregate(hrn, rspec)
125         elif self.api.interface in ['slicemgr']:
126             self.create_slice_smgr(hrn, rspec)
127
128     def create_slice_aggregate(self, hrn, rspec):
129         # Becaues of myplc federation,  we first need to determine if this
130         # slice belongs to out local plc or a myplc peer. We will assume it 
131         # is a local site, unless we find out otherwise  
132         peer = None
133         
134         # get this slice's authority (site)
135         #slice_authority = get_authority(hrn)
136         
137         # get this site's authority (sfa root authority or sub authority)
138         #site_authority = get_authority(slice_authority)
139         
140         # check if we are already peered with this site_authority at ple, if so
141         #peers = self.api.plshell.GetPeers(self.api.plauth, {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
142         #for peer_record in peers:
143         #    if site_authority in peer_record.values():
144         #        peer = peer_record['shortname']                                            
145
146         spec = Rspec(rspec)
147         # Get the slice record from geni
148         slice = {}
149         slice_record = None
150         registries = Registries(self.api)
151         registry = registries[self.api.hrn]
152         credential = self.api.getCredential()
153         records = registry.resolve(credential, hrn)
154         for record in records:
155             if record.get_type() in ['slice']:
156                 slice_record = record.as_dict()
157         if not slice_record:
158             raise RecordNotFound(hrn)   
159
160         # Make sure slice exists at plc, if it doesnt add it
161         slicename = hrn_to_pl_slicename(hrn)
162         slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['node_ids'])
163         if not slices:
164             parts = slicename.split("_")
165             login_base = parts[0]
166             # if site doesnt exist add it
167             sites = self.api.plshell.GetSites(self.api.plauth, [login_base])
168             if not sites:
169                 authority = get_authority(hrn)
170                 site_records = registry.resolve(credential, authority)
171                 site_record = {}
172                 if not site_records:
173                     raise RecordNotFound(authority)
174                 site_record = site_records[0]
175                 site = site_record.as_dict()
176                 
177                  # add the site
178                 remote_site_id = site.pop('site_id')
179                 site_id = self.api.plshell.AddSite(self.api.plauth, site)
180                 # this belongs to a peer 
181                 #if peer:
182                 #    print site_id, peer, remote_site_id
183                 #    self.api.plshell.BindObjectToPeer(self.api.plauth, 'site', site_id, peer, remote_site_id)
184             else:
185                 site = sites[0]
186             
187             # create slice object
188             slice_fields = {}
189             slice_keys = ['name', 'url', 'description']
190             for key in slice_keys:
191                 if key in slice_record and slice_record[key]:
192                     slice_fields[key] = slice_record[key]
193
194             # add the slice  
195             slice_id = self.api.plshell.AddSlice(self.api.plauth, slice_fields)
196             slice = slice_fields
197             
198             #this belongs to a peer
199             #if peer:
200             #    self.api.plshell.BindObjectToPeer(self.api.plauth, 'slice', slice_id, peer, slice_record['pointer'])
201             slice['node_ids'] = 0
202         else:
203             slice = slices[0]    
204         # get the list of valid slice users from the registry and make 
205         # they are added to the slice 
206         researchers = record.get('researcher', [])
207         for researcher in researchers:
208             person_record = {}
209             person_records = registry.resolve(credential, researcher)
210             for record in person_records:
211                 if record.get_type() in ['user']:
212                     person_record = record
213             if not person_record:
214                 pass
215             person_dict = person_record.as_dict()
216             persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids'])
217
218             # Create the person record 
219             if not persons:
220                 person_id=self.api.plshell.AddPerson(self.api.plauth, person_dict)
221
222                 # The line below enables the user account on the remote 
223                 # aggregate soon after it is created. without this the 
224                 # user key is not transfered to the slice (as GetSlivers 
225                 # returns key of only enabled users), which prevents the 
226                 # user from login to the slice. We may do additional checks 
227                 # before enabling the user.
228
229                 self.api.plshell.UpdatePerson(self.api.plauth, person_id, {'enabled' : True})
230                 #if peer:
231                 #    self.api.plshell.BindObjectToPeer(self.api.plauth, 'person', person_id, peer, person_record['pointer'])
232                 key_ids = []
233             else:
234                 key_ids = persons[0]['key_ids']
235
236             self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename)        
237             # Get this users local keys
238             keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key'])
239             keys = [key['key'] for key in keylist]
240
241             # add keys that arent already there 
242             for personkey in person_dict['keys']:
243                 if personkey not in keys:
244                     key = {'key_type': 'ssh', 'key': personkey}
245                     #if peer:
246                         # XX Need to get the key_id from remote registry somehow 
247                         #self.api.plshell.BindObjectToPeer(self.api.plauth, 'key', None, peer, key_id)   
248                         #pass
249                     self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key)
250
251         # find out where this slice is currently running
252         nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname'])
253         hostnames = [node['hostname'] for node in nodelist]
254
255         # get netspec details
256         nodespecs = spec.getDictsByTagName('NodeSpec')
257         nodes = []
258         for nodespec in nodespecs:
259             if isinstance(nodespec['name'], list):
260                 nodes.extend(nodespec['name'])
261             elif isinstance(nodespec['name'], StringTypes):
262                 nodes.append(nodespec['name'])
263
264         # remove nodes not in rspec
265         deleted_nodes = list(set(hostnames).difference(nodes))
266         # add nodes from rspec
267         added_nodes = list(set(nodes).difference(hostnames))
268
269         self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) 
270         self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes)
271
272         return 1
273
274     def create_slice_smgr(self, hrn, rspec):
275         spec = Rspec()
276         tempspec = Rspec()
277         spec.parseString(rspec)
278         slicename = hrn_to_pl_slicename(hrn)
279         specDict = spec.toDict()
280         if specDict.has_key('Rspec'): specDict = specDict['Rspec']
281         if specDict.has_key('start_time'): start_time = specDict['start_time']
282         else: start_time = 0
283         if specDict.has_key('end_time'): end_time = specDict['end_time']
284         else: end_time = 0
285
286         rspecs = {}
287         aggregates = Aggregates(self.api)
288         credential = self.api.getCredential()
289         # only attempt to extract information about the aggregates we know about
290         for aggregate in aggregates:
291             netspec = spec.getDictByTagNameValue('NetSpec', aggregate)
292             if netspec:
293                 # creat a plc dict 
294                 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
295                 resourceDict = {'Rspec': resources}
296                 tempspec.parseDict(resourceDict)
297                 rspecs[aggregate] = tempspec.toxml()
298
299         # notify the aggregates
300         for aggregate in rspecs.keys():
301             try:
302                 # send the whloe rspec to the local aggregate
303                 if aggregate in [self.api.hrn]:
304                     aggregates[aggregate].create_slice(credential, hrn, rspec)
305                 else:
306                     aggregates[aggregate].create_slice(credential, hrn, rspecs[aggregate])
307             except:
308                 print >> log, "Error creating slice %(hrn)s at aggregate %(aggregate)s" % locals()
309         return 1
310
311
312     def start_slice(self, hrn):
313         if self.api.interface in ['aggregate']:
314             self.start_slice_aggregate(hrn)
315         elif self.api.interface in ['slicemgr']:
316             self.start_slice_smgr(hrn)
317
318     def start_slice_aggregate(self, hrn):
319         slicename = hrn_to_pl_slicename(hrn)
320         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
321         if not slices:
322             raise RecordNotFound(hrn)
323         slice_id = slices[0]
324         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
325         attribute_id = attreibutes[0]['slice_attribute_id']
326         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" )
327         return 1
328
329     def start_slice_smgr(self, hrn):
330         credential = self.api.getCredential()
331         aggregates = Aggregates(self.api)
332         for aggregate in aggregates:
333             aggregates[aggregate].start_slice(credential, hrn)
334         return 1
335
336
337     def stop_slice(self, hrn):
338         if self.api.interface in ['aggregate']:
339             self.stop_slice_aggregate(hrn)
340         elif self.api.interface in ['slicemgr']:
341             self.stop_slice_smgr(hrn)
342
343     def stop_slice_aggregate(self, hrn):
344         slicename = hrn_to_pl_slicename(hrn)
345         slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id'])
346         if not slices:
347             raise RecordNotFound(hrn)
348         slice_id = slices[0]['slice_id']
349         attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
350         attribute_id = attributes[0]['slice_attribute_id']
351         self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0")
352         return 1
353
354     def stop_slice_smgr(self, hrn):
355         credential = self.api.getCredential()
356         aggregates = Aggregates(self.api)
357         for aggregate in aggregates:
358             aggregates[aggregate].stop_slice(credential, hrn)  
359