complete connectRegistry. Also minor updates and fixes
[sfa.git] / geni / slicemgr.py
1 import os
2 import sys
3 import datetime
4 import time
5
6 from geni.util.geniserver import *
7 from geni.util.geniclient import *
8 from geni.util.cert import *
9 from geni.util.trustedroot import *
10 from geni.util.excep import *
11 from geni.util.misc import *
12 from geni.util.config import Config
13 from geni.util.rspec import Rspec
14 from geni.util.specdict import *
15 from geni.util.storage import SimpleStorage
16
17 class SliceMgr(GeniServer):
18
19     hrn = None
20     key_file = None
21     cert_file = None
22     nodes_ttl = None
23     nodes = None
24     slices = None
25     policy = None
26     aggregates = None
27     timestamp = None
28     threshold = None    
29     shell = None
30     registry = None
31     
32   
33     ##
34     # Create a new slice manager object.
35     #
36     # @param ip the ip address to listen on
37     # @param port the port to listen on
38     # @param key_file private key filename of registry
39     # @param cert_file certificate filename containing public key (could be a GID file)     
40
41     def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
42         GeniServer.__init__(ip, port, key_file, cert_file)
43         self.key_file = key_file
44         self.cert_file = cert_file
45         self.conf = Config(config)
46         basedir = self.conf.GENI_BASE_DIR + os.sep
47         server_basedir = basedir + os.sep + "geni" + os.sep
48         self.hrn = conf.GENI_INTERFACE_HRN    
49
50         # Get list of aggregates this sm talks to
51         # XX do we use simplestorage to maintain this file manually?
52         aggregates_file = server_basedir + os.sep + 'aggregates'
53         self.aggregates = SimpleStorage(aggregates_file)
54         self.connect_aggregates(aggregates_file) 
55         
56         nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components'])
57         self.nodes = SimpleStorage(nodes_file)
58         self.nodes.load()
59         
60         slices_file = os.sep.join([server_basedir, 'slicemgr' + self.hrn + '.slices'])
61         self.slices = SimpleStorage(slices_file)
62         self.slices.load()
63
64         policy_file = os.sep.join([server_basedir, 'smgr.policy'])
65         self.policy = SimpleStorage(policy_file)
66         self.policy.load()
67
68         timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp'])
69         self.timestamp = SimpleStorage(timestamp_file)
70  
71         self.nodes_ttl = 1
72         self.connectAggregates()
73         self.connectRegistry()
74
75     def connect_aggregates(self, aggregates_file):
76         """
77         Get info about the aggregates available to us from file and create 
78         an xmlrpc connection to each. If any info is invalid, skip it. 
79         """
80         lines = []
81         try:
82             f = open(aggregates_file, 'r')
83             lines = f.readlines()
84             f.close()
85         except: raise 
86     
87         for line in lines:
88             # Skip comments
89             if line.strip.startswith("#"):
90                 continue
91             agg_info = line.split("\t").split(" ")
92         
93             # skip invalid info
94             if len(agg_info) != 3:
95                 continue
96
97             # create xmlrpc connection using GeniClient
98             hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
99             url = 'https://%(address)s:%(port)s' % locals()
100             self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
101
102
103     def item_hrns(self, items):
104         """
105         Take a list of items (components or slices) and return a dictionary where
106         the key is the authoritative hrn and the value is a list of items at that 
107         hrn.
108         """
109         item_hrns = {}
110         agg_hrns = self.aggregates.keys()
111         for agg_hrn in agg_hrns:
112             item_hrns[agg_hrn] = []
113         for item in items:
114             for agg_hrn in agg_hrns:
115                 if item.startswith(agg_hrn):
116                     item_hrns[agg_hrn] = item
117
118         return item_hrns    
119              
120
121     def hostname_to_hrn(self, login_base, hostname):
122         """
123         Convert hrn to plantelab name.
124         """
125         genihostname = "_".join(hostname.split("."))
126         return ".".join([self.hrn, login_base, genihostname])
127
128     def slicename_to_hrn(self, slicename):
129         """
130         Convert hrn to planetlab name.
131         """
132         slicename = slicename.replace("_", ".")
133         return ".".join([self.hrn, slicename])
134
135     def refresh_components(self):
136         """
137         Update the cached list of nodes.
138         """
139         print "refreshing"
140     
141         aggregates = self.aggregates.keys()
142         all_nodes = []
143         nodedict = {}
144         for aggregate in aggregates:
145             try:
146                 # resolve components hostnames
147                 nodes = self.aggregates[aggregate].get_components()
148                 all_nodes.extend(nodes)    
149             except:
150                 # XX print out to some error log
151                 pass    
152    
153         for node in all_nodes:
154             if self.polciy['whitelist'] and node not in self.polciy['whitelist']:
155                 continue
156             if self.polciy['blacklist'] and node in self.policy['blacklist']:
157                 continue
158
159             nodedict[node] = node
160
161         self.nodes = SimpleStorate(self.nodes.db_filename, nodedict)
162         self.nodes.write()
163
164         # update timestamp and threshold
165         self.timestamp['timestamp'] = datetime.datetime.now()
166         delta = datetime.timedelta(hours=self.nodes_tt1)
167         self.threshold = self.timestamp['timestamp'] + delta
168         self.timestamp.write()
169         
170  
171     def load_components(self):
172         """
173         Read cached list of nodes and slices.
174         """
175         print "loading nodes"
176         # Read component list from cached file 
177         self.nodes.load()
178         self.timestamp.load()
179         time_format = "%Y-%m-%d %H:%M:%S"
180         timestamp = self.timestamp['timestamp']
181         self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
182         delta = datetime.timedelta(hours=self.nodes_ttl)
183         self.threshold = self.timestamp['timestamp'] + delta
184
185     def load_policy(self):
186         """
187         Read the list of blacklisted and whitelisted nodes.
188         """
189         self.policy.load()
190  
191     def load_slices(self):
192         """
193          Read current slice instantiation states.
194         """
195         print "loading slices"
196         self.slices.load()
197
198
199     def get_components(self):
200         """
201         Return a list of components managed by this slice manager.
202         """
203         # Reload components list
204         now = datetime.datetime.now()
205         #self.load_components()
206         if not self.threshold or not self.timestamp or now > self.threshold:
207             self.refresh_components()
208         elif now < self.threshold and not self.components: 
209             self.load_components()
210         return self.nodes.keys()
211    
212      
213     def get_slices(self):
214         """
215         Return a list of instnatiated managed by this slice manager.
216         """
217         return dict(self.slices)
218
219     def get_resources(self, slice_hrn):
220         """
221         Return the current rspec for the specified slice.
222         """
223
224         cred = None
225         if slice_hrn in self.slices.keys():
226             # check if we alreay have this slices state saved
227             rspec = self.slices[slice_hrn]
228         else:
229             # request this slices state from all  known aggregates
230             rspecdicts = []
231             for hrn in self.aggregates.keys():
232                 # XX need to use the right credentials for this call
233                 # check if the slice has resources at this hrn
234                 tempresources = self.aggregates[hrn].resources(cred, slice_hrn)
235                 temprspec = Rspec()
236                 temprspec.parseString(temprspec)
237                 if temprspec.getDictsByTagName('NodeSpec'):
238                     # append this rspec to the list of rspecs
239                     rspecdicts.append(temprspec.toDict())
240                 
241             # merge all these rspecs into one
242             start_time = int(self.timestamp['timestamp'].strftime("%s"))
243             end_time = int(self.duration.strftime("%s"))
244             duration = end_time - start_time
245                 
246             # create a plc dict 
247             networks = [rspecdict['networks'][0] for rspecdict in rspecdicts]
248             resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
249             # convert the plc dict to an rspec dict
250             resourceDict = RspecDict(resources)
251             resourceSpec = Rspec()
252             resourceSpec.parseDict(resourceDict)
253             rspec = resourceSpec.toxml() 
254             # save this slices resources
255             self.slices[slice_hrn] = rspec
256             self.slices.write()
257          
258         return rspec
259  
260     def create_slice(self, slice_hrn, rspec, attributes):
261         """
262         Instantiate the specified slice according to whats defined in the rspec.
263         """
264         # XX need to gget the correct credentials
265         cred = None
266
267         # save slice state locally
268         # we can assume that spec object has been validated so its safer to
269         # save this instead of the unvalidated rspec the user gave us
270         self.slices[slice_hrn] = spec.toxml()
271         self.slices.write()
272
273         # extract network list from the rspec and create a separate
274         # rspec for each network
275         slicename = self.hrn_to_plcslicename(slice_hrn)
276         spec = Rspec()
277         spec.parseString(rspec)
278         specDict = spec.toDict()
279         start_time = specDict['start_time']
280         end_time = specDict['end_time']
281
282         rspecs = {}
283         # only attempt to extract information about the aggregates we know about
284         for hrn in self.aggregates.keys():
285             netspec = spec.getDictByTagNameValue('NetSpec', 'hrn')
286             if netspec:
287                 # creat a plc dict 
288                 tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec}
289                 #convert the plc dict to rpsec dict
290                 resourceDict = RspecDict(tempdict)
291                 # parse rspec dict
292                 tempspec = Rspec()
293                 tempspec.parseDict(resourceDict)
294                 rspecs[hrn] = tempspec.toxml()
295
296         # notify the aggregates
297         for hrn in self.rspecs.keys():
298             self.aggregates[hrn].createSlice(cred, rspecs[hrn])
299             
300         return 1
301
302     def update_slice(self, slice_hrn, rspec, attributes = []):
303         """
304         Update the specifed slice
305         """
306         self.create_slice(slice_hrn, rspec, attributes)
307     
308     def delete_slice_(self, slice_hrn):
309         """
310         Remove this slice from all components it was previouly associated with and 
311         free up the resources it was using.
312         """
313         # XX need to get the correct credential
314         cred = None
315         
316         if self.slices.has_key(slice_hrn):
317             self.slices.pop(slice_hrn)
318             self.slices.write()
319
320         for hrn in self.aggregates.keys():
321             self.aggregates[hrn].deleteSlice(cred, slice_hrn)
322
323         return 1
324
325     def start_slice(self, slice_hrn):
326         """
327         Stop the slice at plc.
328         """
329         # XX need to get the correct credential
330         cred = None
331
332         for hrn in self.aggregates.keys():
333             self.aggregates[hrn].startSlice(cred, slice_hrn)
334         return 1
335
336     def stop_slice(self, slice_hrn):
337         """
338         Stop the slice at plc
339         """
340         for hrn in self.aggregates.keys():
341             self.aggregates[hrn].startSlice(cred, slice_hrn)
342         return 1
343
344     def reset_slice(self, slice_hrn):
345         """
346         Reset the slice
347         """
348         # XX not yet implemented
349         return 1
350
351     def get_policy(self):
352         """
353         Return the policy of this slice manager.
354         """
355     
356         return self.policy
357         
358     
359
360 ##############################
361 ## Server methods here for now
362 ##############################
363
364     def nodes(self):
365         return self.get_components()
366
367     def slices(self):
368         return self.get_slices()
369
370     def resources(self, cred, hrn):
371         self.decode_authentication(cred, 'info')
372         self.verify_object_belongs_to_me(hrn)
373
374         return self.get_resources(hrn)
375
376     def create(self, cred, hrn, rspec):
377         self.decode_authentication(cred, 'embed')
378         self.verify_object_belongs_to_me(hrn, rspec)
379         return self.create(hrn)
380
381     def delete(self, cred, hrn):
382         self.decode_authentication(cred, 'embed')
383         self.verify_object_belongs_to_me(hrn)
384         return self.delete_slice(hrn)
385
386     def start(self, cred, hrn):
387         self.decode_authentication(cred, 'control')
388         return self.start(hrn)
389
390     def stop(self, cred, hrn):
391         self.decode_authentication(cred, 'control')
392         return self.stop(hrn)
393
394     def reset(self, cred, hrn):
395         self.decode_authentication(cred, 'control')
396         return self.reset(hrn)
397
398     def policy(self, cred):
399         self.decode_authentication(cred, 'info')
400         return self.get_policy()
401
402     def register_functions(self):
403         GeniServer.register_functions(self)
404
405         # Aggregate interface methods
406         self.server.register_function(self.components)
407         self.server.register_function(self.slices)
408         self.server.register_function(self.resources)
409         self.server.register_function(self.create)
410         self.server.register_function(self.delete)
411         self.server.register_function(self.start)
412         self.server.register_function(self.stop)
413         self.server.register_function(self.reset)
414         self.server.register_function(self.policy)
415