added filter method used to remove elements from the dom tree
[sfa.git] / geni / slicemgr.py
1 import os
2 import sys
3 import datetime
4 import time
5
6 from geni.util.geniserver import *
7 from geni.util.geniclient import *
8 from geni.util.cert import *
9 from geni.util.credential import Credential
10 from geni.util.trustedroot import *
11 from geni.util.excep import *
12 from geni.util.misc import *
13 from geni.util.config import Config
14 from geni.util.rspec import Rspec
15 from geni.util.specdict import *
16 from geni.util.storage import SimpleStorage
17
18 class SliceMgr(GeniServer):
19
20     hrn = None
21     nodes_ttl = None
22     nodes = None
23     slices = None
24     policy = None
25     aggregates = None
26     timestamp = None
27     threshold = None    
28     shell = None
29     registry = None
30     key_file = None
31     cert_file = None
32     credential = None 
33   
34     ##
35     # Create a new slice manager object.
36     #
37     # @param ip the ip address to listen on
38     # @param port the port to listen on
39     # @param key_file private key filename of registry
40     # @param cert_file certificate filename containing public key (could be a GID file)     
41
42     def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
43         GeniServer.__init__(ip, port, key_file, cert_file)
44         self.key_file = key_file
45         self.cert_file = cert_file
46         self.conf = Config(config)
47         basedir = self.conf.GENI_BASE_DIR + os.sep
48         server_basedir = basedir + os.sep + "geni" + os.sep
49         self.hrn = conf.GENI_INTERFACE_HRN    
50
51         # Get list of aggregates this sm talks to
52         # XX do we use simplestorage to maintain this file manually?
53         aggregates_file = server_basedir + os.sep + 'aggregates'
54         self.aggregates = SimpleStorage(aggregates_file)
55         self.connect_aggregates(aggregates_file) 
56         
57         nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components'])
58         self.nodes = SimpleStorage(nodes_file)
59         self.nodes.load()
60         
61         slices_file = os.sep.join([server_basedir, 'slicemgr' + self.hrn + '.slices'])
62         self.slices = SimpleStorage(slices_file)
63         self.slices.load()
64
65         policy_file = os.sep.join([server_basedir, 'smgr.policy'])
66         self.policy = SimpleStorage(policy_file)
67         self.policy.load()
68
69         timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp'])
70         self.timestamp = SimpleStorage(timestamp_file)
71  
72         self.nodes_ttl = 1
73         self.connectAggregates()
74         self.connectRegistry()
75         self.loadCredential()
76
77
78     def loadCredential(self):
79         """
80         Attempt to load credential from file if it exists. If it doesnt get
81         credential from registry.
82         """
83
84         self_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".cred"
85         ma_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".sa.cred"
86
87         # see if this file exists
88         try:
89             cred = Credential(filename = ma_cred_filename)
90             self.credential = cred.save_to_string()
91         except IOError:
92             # get self credential
93             self_cred = self.registry.get_credential(None, 'ma', self.hrn)
94             self_credential = Credential(string = self_cred)
95             self_credential.save_to_file(self_cred_filename)
96
97             # get ma credential
98             ma_cred = self.registry.get_gredential(self_cred)
99             ma_credential = Credential(string = ma_cred)
100             ma_credential.save_to_file(ma_cred_filename)
101             self.credential = ma_cred        
102
103     def connect_aggregates(self, aggregates_file):
104         """
105         Get info about the aggregates available to us from file and create 
106         an xmlrpc connection to each. If any info is invalid, skip it. 
107         """
108         lines = []
109         try:
110             f = open(aggregates_file, 'r')
111             lines = f.readlines()
112             f.close()
113         except: raise 
114     
115         for line in lines:
116             # Skip comments
117             if line.strip.startswith("#"):
118                 continue
119             agg_info = line.split("\t").split(" ")
120         
121             # skip invalid info
122             if len(agg_info) != 3:
123                 continue
124
125             # create xmlrpc connection using GeniClient
126             hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
127             url = 'https://%(address)s:%(port)s' % locals()
128             self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
129
130
131     def item_hrns(self, items):
132         """
133         Take a list of items (components or slices) and return a dictionary where
134         the key is the authoritative hrn and the value is a list of items at that 
135         hrn.
136         """
137         item_hrns = {}
138         agg_hrns = self.aggregates.keys()
139         for agg_hrn in agg_hrns:
140             item_hrns[agg_hrn] = []
141         for item in items:
142             for agg_hrn in agg_hrns:
143                 if item.startswith(agg_hrn):
144                     item_hrns[agg_hrn] = item
145
146         return item_hrns    
147              
148
149     def hostname_to_hrn(self, login_base, hostname):
150         """
151         Convert hrn to plantelab name.
152         """
153         genihostname = "_".join(hostname.split("."))
154         return ".".join([self.hrn, login_base, genihostname])
155
156     def slicename_to_hrn(self, slicename):
157         """
158         Convert hrn to planetlab name.
159         """
160         slicename = slicename.replace("_", ".")
161         return ".".join([self.hrn, slicename])
162
163     def refresh_components(self):
164         """
165         Update the cached list of nodes.
166         """
167         print "refreshing"
168     
169         aggregates = self.aggregates.keys()
170         all_nodes = []
171         nodedict = {}
172         for aggregate in aggregates:
173             try:
174                 # resolve components hostnames
175                 nodes = self.aggregates[aggregate].get_components()
176                 all_nodes.extend(nodes)    
177             except:
178                 # XX print out to some error log
179                 pass    
180    
181         for node in all_nodes:
182             if self.polciy['whitelist'] and node not in self.polciy['whitelist']:
183                 continue
184             if self.polciy['blacklist'] and node in self.policy['blacklist']:
185                 continue
186
187             nodedict[node] = node
188
189         self.nodes = SimpleStorate(self.nodes.db_filename, nodedict)
190         self.nodes.write()
191
192         # update timestamp and threshold
193         self.timestamp['timestamp'] = datetime.datetime.now()
194         delta = datetime.timedelta(hours=self.nodes_tt1)
195         self.threshold = self.timestamp['timestamp'] + delta
196         self.timestamp.write()
197         
198  
199     def load_components(self):
200         """
201         Read cached list of nodes and slices.
202         """
203         print "loading nodes"
204         # Read component list from cached file 
205         self.nodes.load()
206         self.timestamp.load()
207         time_format = "%Y-%m-%d %H:%M:%S"
208         timestamp = self.timestamp['timestamp']
209         self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
210         delta = datetime.timedelta(hours=self.nodes_ttl)
211         self.threshold = self.timestamp['timestamp'] + delta
212
213     def load_policy(self):
214         """
215         Read the list of blacklisted and whitelisted nodes.
216         """
217         self.policy.load()
218  
219     def load_slices(self):
220         """
221         Read current slice instantiation states.
222         """
223         print "loading slices"
224         self.slices.load()
225
226
227     def get_components(self):
228         """
229         Return a list of components managed by this slice manager.
230         """
231         # Reload components list
232         now = datetime.datetime.now()
233         #self.load_components()
234         if not self.threshold or not self.timestamp or now > self.threshold:
235             self.refresh_components()
236         elif now < self.threshold and not self.components: 
237             self.load_components()
238         return self.nodes.keys()
239    
240      
241     def get_slices(self):
242         """
243         Return a list of instnatiated managed by this slice manager.
244         """
245         return dict(self.slices)
246
247     def get_resources(self, slice_hrn):
248         """
249         Return the current rspec for the specified slice.
250         """
251         cred = self.credential
252
253         if slice_hrn in self.slices.keys():
254             # check if we alreay have this slices state saved
255             rspec = self.slices[slice_hrn]
256         else:
257             # request this slices state from all  known aggregates
258             rspecdicts = []
259             for hrn in self.aggregates.keys():
260                 # XX need to use the right credentials for this call
261                 # check if the slice has resources at this hrn
262                 tempresources = self.aggregates[hrn].resources(cred, slice_hrn)
263                 temprspec = Rspec()
264                 temprspec.parseString(temprspec)
265                 if temprspec.getDictsByTagName('NodeSpec'):
266                     # append this rspec to the list of rspecs
267                     rspecdicts.append(temprspec.toDict())
268                 
269             # merge all these rspecs into one
270             start_time = int(self.timestamp['timestamp'].strftime("%s"))
271             end_time = int(self.duration.strftime("%s"))
272             duration = end_time - start_time
273                 
274             # create a plc dict 
275             networks = [rspecdict['networks'][0] for rspecdict in rspecdicts]
276             resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
277             # convert the plc dict to an rspec dict
278             resourceDict = RspecDict(resources)
279             resourceSpec = Rspec()
280             resourceSpec.parseDict(resourceDict)
281             rspec = resourceSpec.toxml() 
282             # save this slices resources
283             self.slices[slice_hrn] = rspec
284             self.slices.write()
285          
286         return rspec
287  
288     def create_slice(self, slice_hrn, rspec, attributes):
289         """
290         Instantiate the specified slice according to whats defined in the rspec.
291         """
292         # XX need to gget the correct credentials
293         cred = self.credential
294
295         # save slice state locally
296         # we can assume that spec object has been validated so its safer to
297         # save this instead of the unvalidated rspec the user gave us
298         self.slices[slice_hrn] = spec.toxml()
299         self.slices.write()
300
301         # extract network list from the rspec and create a separate
302         # rspec for each network
303         slicename = self.hrn_to_plcslicename(slice_hrn)
304         spec = Rspec()
305         spec.parseString(rspec)
306         specDict = spec.toDict()
307         start_time = specDict['start_time']
308         end_time = specDict['end_time']
309
310         rspecs = {}
311         # only attempt to extract information about the aggregates we know about
312         for hrn in self.aggregates.keys():
313             netspec = spec.getDictByTagNameValue('NetSpec', 'hrn')
314             if netspec:
315                 # creat a plc dict 
316                 tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec}
317                 #convert the plc dict to rpsec dict
318                 resourceDict = RspecDict(tempdict)
319                 # parse rspec dict
320                 tempspec = Rspec()
321                 tempspec.parseDict(resourceDict)
322                 rspecs[hrn] = tempspec.toxml()
323
324         # notify the aggregates
325         for hrn in self.rspecs.keys():
326             self.aggregates[hrn].createSlice(cred, rspecs[hrn])
327             
328         return 1
329
330     def update_slice(self, slice_hrn, rspec, attributes = []):
331         """
332         Update the specifed slice
333         """
334         self.create_slice(slice_hrn, rspec, attributes)
335     
336     def delete_slice_(self, slice_hrn):
337         """
338         Remove this slice from all components it was previouly associated with and 
339         free up the resources it was using.
340         """
341         # XX need to get the correct credential
342         cred = self.credential
343         
344         if self.slices.has_key(slice_hrn):
345             self.slices.pop(slice_hrn)
346             self.slices.write()
347
348         for hrn in self.aggregates.keys():
349             self.aggregates[hrn].deleteSlice(cred, slice_hrn)
350
351         return 1
352
353     def start_slice(self, slice_hrn):
354         """
355         Stop the slice at plc.
356         """
357         cred = self.credential
358
359         for hrn in self.aggregates.keys():
360             self.aggregates[hrn].startSlice(cred, slice_hrn)
361         return 1
362
363     def stop_slice(self, slice_hrn):
364         """
365         Stop the slice at plc
366         """
367         cred = self.credential
368         for hrn in self.aggregates.keys():
369             self.aggregates[hrn].startSlice(cred, slice_hrn)
370         return 1
371
372     def reset_slice(self, slice_hrn):
373         """
374         Reset the slice
375         """
376         # XX not yet implemented
377         return 1
378
379     def get_policy(self):
380         """
381         Return the policy of this slice manager.
382         """
383     
384         return self.policy
385         
386     
387
388 ##############################
389 ## Server methods here for now
390 ##############################
391
392     def nodes(self):
393         return self.get_components()
394
395     def slices(self):
396         return self.get_slices()
397
398     def resources(self, cred, hrn):
399         self.decode_authentication(cred, 'info')
400         self.verify_object_belongs_to_me(hrn)
401
402         return self.get_resources(hrn)
403
404     def create(self, cred, hrn, rspec):
405         self.decode_authentication(cred, 'embed')
406         self.verify_object_belongs_to_me(hrn)
407         return self.create(hrn)
408
409     def delete(self, cred, hrn):
410         self.decode_authentication(cred, 'embed')
411         self.verify_object_belongs_to_me(hrn)
412         return self.delete_slice(hrn)
413
414     def start(self, cred, hrn):
415         self.decode_authentication(cred, 'control')
416         return self.start(hrn)
417
418     def stop(self, cred, hrn):
419         self.decode_authentication(cred, 'control')
420         return self.stop(hrn)
421
422     def reset(self, cred, hrn):
423         self.decode_authentication(cred, 'control')
424         return self.reset(hrn)
425
426     def policy(self, cred):
427         self.decode_authentication(cred, 'info')
428         return self.get_policy()
429
430     def register_functions(self):
431         GeniServer.register_functions(self)
432
433         # Aggregate interface methods
434         self.server.register_function(self.components)
435         self.server.register_function(self.slices)
436         self.server.register_function(self.resources)
437         self.server.register_function(self.create)
438         self.server.register_function(self.delete)
439         self.server.register_function(self.start)
440         self.server.register_function(self.stop)
441         self.server.register_function(self.reset)
442         self.server.register_function(self.policy)
443