check in lost code for updating researchers in records
[sfa.git] / geni / slicemgr.py
1 import os
2 import sys
3 import datetime
4 import time
5
6 from geni.util.geniserver import *
7 from geni.util.geniclient import *
8 from geni.util.cert import *
9 from geni.util.credential import Credential
10 from geni.util.trustedroot import *
11 from geni.util.excep import *
12 from geni.util.misc import *
13 from geni.util.config import Config
14 from geni.util.rspec import Rspec
15 from geni.util.specdict import *
16 from geni.util.storage import SimpleStorage
17
18 class SliceMgr(GeniServer):
19
20     hrn = None
21     nodes_ttl = None
22     nodes = None
23     slices = None
24     policy = None
25     aggregates = None
26     timestamp = None
27     threshold = None    
28     shell = None
29     registry = None
30     key_file = None
31     cert_file = None
32     credential = None 
33   
34     ##
35     # Create a new slice manager object.
36     #
37     # @param ip the ip address to listen on
38     # @param port the port to listen on
39     # @param key_file private key filename of registry
40     # @param cert_file certificate filename containing public key (could be a GID file)     
41
42     def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
43         GeniServer.__init__(ip, port, key_file, cert_file)
44         self.key_file = key_file
45         self.cert_file = cert_file
46         self.conf = Config(config)
47         basedir = self.conf.GENI_BASE_DIR + os.sep
48         server_basedir = basedir + os.sep + "geni" + os.sep
49         self.hrn = conf.GENI_INTERFACE_HRN    
50
51         # Get list of aggregates this sm talks to
52         # XX do we use simplestorage to maintain this file manually?
53         aggregates_file = server_basedir + os.sep + 'aggregates'
54         self.aggregates = SimpleStorage(aggregates_file)
55         self.connect_aggregates(aggregates_file) 
56         
57         nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components'])
58         self.nodes = SimpleStorage(nodes_file)
59         self.nodes.load()
60         
61         slices_file = os.sep.join([server_basedir, 'smgr' + self.hrn + '.slices'])
62         self.slices = SimpleStorage(slices_file)
63         self.slices.load()
64
65         policy_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.policy'])
66         self.policy = SimpleStorage(policy_file)
67         self.policy.load()
68
69         timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp'])
70         self.timestamp = SimpleStorage(timestamp_file)
71  
72         self.nodes_ttl = 1
73         self.connectAggregates()
74         self.connectRegistry()
75         self.loadCredential()
76
77
78     def loadCredential(self):
79         """
80         Attempt to load credential from file if it exists. If it doesnt get
81         credential from registry.
82         """
83
84         self_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".cred"
85         ma_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".sa.cred"
86
87         # see if this file exists
88         try:
89             cred = Credential(filename = ma_cred_filename)
90             self.credential = cred.save_to_string()
91         except IOError:
92             # get self credential
93             self_cred = self.registry.get_credential(None, 'ma', self.hrn)
94             self_credential = Credential(string = self_cred)
95             self_credential.save_to_file(self_cred_filename)
96
97             # get ma credential
98             ma_cred = self.registry.get_gredential(self_cred)
99             ma_credential = Credential(string = ma_cred)
100             ma_credential.save_to_file(ma_cred_filename)
101             self.credential = ma_cred        
102
103     def connect_aggregates(self, aggregates_file):
104         """
105         Get info about the aggregates available to us from file and create 
106         an xmlrpc connection to each. If any info is invalid, skip it. 
107         """
108         lines = []
109         try:
110             f = open(aggregates_file, 'r')
111             lines = f.readlines()
112             f.close()
113         except: raise 
114     
115         for line in lines:
116             # Skip comments
117             if line.strip.startswith("#"):
118                 continue
119             agg_info = line.split("\t").split(" ")
120         
121             # skip invalid info
122             if len(agg_info) != 3:
123                 continue
124
125             # create xmlrpc connection using GeniClient
126             hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
127             url = 'https://%(address)s:%(port)s' % locals()
128             self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
129
130
131     def item_hrns(self, items):
132         """
133         Take a list of items (components or slices) and return a dictionary where
134         the key is the authoritative hrn and the value is a list of items at that 
135         hrn.
136         """
137         item_hrns = {}
138         agg_hrns = self.aggregates.keys()
139         for agg_hrn in agg_hrns:
140             item_hrns[agg_hrn] = []
141         for item in items:
142             for agg_hrn in agg_hrns:
143                 if item.startswith(agg_hrn):
144                     item_hrns[agg_hrn] = item
145
146         return item_hrns    
147              
148
149     def hostname_to_hrn(self, login_base, hostname):
150         """
151         Convert hrn to plantelab name.
152         """
153         genihostname = "_".join(hostname.split("."))
154         return ".".join([self.hrn, login_base, genihostname])
155
156     def slicename_to_hrn(self, slicename):
157         """
158         Convert hrn to planetlab name.
159         """
160         slicename = slicename.replace("_", ".")
161         return ".".join([self.hrn, slicename])
162
163     def refresh_components(self):
164         """
165         Update the cached list of nodes.
166         """
167         print "refreshing"
168     
169         aggregates = self.aggregates.keys()
170         all_nodes = []
171         nodedict = {}
172         for aggregate in aggregates:
173             try:
174                 # resolve components hostnames
175                 nodes = self.aggregates[aggregate].get_components()
176                 all_nodes.extend(nodes)    
177             except:
178                 # XX print out to some error log
179                 pass    
180    
181         for node in all_nodes:
182             if self.polciy['whitelist'] and node not in self.polciy['whitelist']:
183                 continue
184             if self.polciy['blacklist'] and node in self.policy['blacklist']:
185                 continue
186
187             nodedict[node] = node
188
189         self.nodes = SimpleStorate(self.nodes.db_filename, nodedict)
190         self.nodes.write()
191
192         # update timestamp and threshold
193         self.timestamp['timestamp'] = datetime.datetime.now()
194         delta = datetime.timedelta(hours=self.nodes_tt1)
195         self.threshold = self.timestamp['timestamp'] + delta
196         self.timestamp.write()
197         
198  
199     def load_components(self):
200         """
201         Read cached list of nodes and slices.
202         """
203         print "loading nodes"
204         # Read component list from cached file 
205         self.nodes.load()
206         self.timestamp.load()
207         time_format = "%Y-%m-%d %H:%M:%S"
208         timestamp = self.timestamp['timestamp']
209         self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
210         delta = datetime.timedelta(hours=self.nodes_ttl)
211         self.threshold = self.timestamp['timestamp'] + delta
212
213     def load_policy(self):
214         """
215         Read the list of blacklisted and whitelisted nodes.
216         """
217         self.policy.load()
218  
219     def load_slices(self):
220         """
221         Read current slice instantiation states.
222         """
223         print "loading slices"
224         self.slices.load()
225
226
227     def getComponents(self):
228         """
229         Return a list of components managed by this slice manager.
230         """
231         # Reload components list
232         now = datetime.datetime.now()
233         #self.load_components()
234         if not self.threshold or not self.timestamp or now > self.threshold:
235             self.refresh_components()
236         elif now < self.threshold and not self.components: 
237             self.load_components()
238         return self.nodes.keys()
239    
240      
241     def getSlices(self):
242         """
243         Return a list of instnatiated managed by this slice manager.
244         """
245         # XX return only the slices at the specified hrn
246         return dict(self.slices)
247
248     def getResources(self, slice_hrn):
249         """
250         Return the current rspec for the specified slice.
251         """
252         cred = self.credential
253
254         if slice_hrn in self.slices.keys():
255             # check if we alreay have this slices state saved
256             rspec = self.slices[slice_hrn]
257         else:
258             # request this slices state from all  known aggregates
259             rspecdicts = []
260             for hrn in self.aggregates.keys():
261                 # XX need to use the right credentials for this call
262                 # check if the slice has resources at this hrn
263                 tempresources = self.aggregates[hrn].resources(cred, slice_hrn)
264                 temprspec = Rspec()
265                 temprspec.parseString(temprspec)
266                 if temprspec.getDictsByTagName('NodeSpec'):
267                     # append this rspec to the list of rspecs
268                     rspecdicts.append(temprspec.toDict())
269                 
270             # merge all these rspecs into one
271             start_time = int(self.timestamp['timestamp'].strftime("%s"))
272             end_time = int(self.duration.strftime("%s"))
273             duration = end_time - start_time
274                 
275             # create a plc dict 
276             networks = [rspecdict['networks'][0] for rspecdict in rspecdicts]
277             resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
278             # convert the plc dict to an rspec dict
279             resourceDict = RspecDict(resources)
280             resourceSpec = Rspec()
281             resourceSpec.parseDict(resourceDict)
282             rspec = resourceSpec.toxml() 
283             # save this slices resources
284             self.slices[slice_hrn] = rspec
285             self.slices.write()
286          
287         return rspec
288  
289     def createSlice(self, slice_hrn, rspec, attributes):
290         """
291         Instantiate the specified slice according to whats defined in the rspec.
292         """
293         # XX need to gget the correct credentials
294         cred = self.credential
295
296         # save slice state locally
297         # we can assume that spec object has been validated so its safer to
298         # save this instead of the unvalidated rspec the user gave us
299         self.slices[slice_hrn] = spec.toxml()
300         self.slices.write()
301
302         # extract network list from the rspec and create a separate
303         # rspec for each network
304         slicename = self.hrn_to_plcslicename(slice_hrn)
305         spec = Rspec()
306         spec.parseString(rspec)
307         specDict = spec.toDict()
308         start_time = specDict['start_time']
309         end_time = specDict['end_time']
310
311         rspecs = {}
312         # only attempt to extract information about the aggregates we know about
313         for hrn in self.aggregates.keys():
314             netspec = spec.getDictByTagNameValue('NetSpec', 'hrn')
315             if netspec:
316                 # creat a plc dict 
317                 tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec}
318                 #convert the plc dict to rpsec dict
319                 resourceDict = RspecDict(tempdict)
320                 # parse rspec dict
321                 tempspec = Rspec()
322                 tempspec.parseDict(resourceDict)
323                 rspecs[hrn] = tempspec.toxml()
324
325         # notify the aggregates
326         for hrn in self.rspecs.keys():
327             self.aggregates[hrn].createSlice(cred, rspecs[hrn])
328             
329         return 1
330
331     def updateSlice(self, slice_hrn, rspec, attributes = []):
332         """
333         Update the specifed slice
334         """
335         self.create_slice(slice_hrn, rspec, attributes)
336     
337     def deleteSlice_(self, slice_hrn):
338         """
339         Remove this slice from all components it was previouly associated with and 
340         free up the resources it was using.
341         """
342         # XX need to get the correct credential
343         cred = self.credential
344         
345         if self.slices.has_key(slice_hrn):
346             self.slices.pop(slice_hrn)
347             self.slices.write()
348
349         for hrn in self.aggregates.keys():
350             self.aggregates[hrn].deleteSlice(cred, slice_hrn)
351
352         return 1
353
354     def startSlice(self, slice_hrn):
355         """
356         Stop the slice at plc.
357         """
358         cred = self.credential
359
360         for hrn in self.aggregates.keys():
361             self.aggregates[hrn].startSlice(cred, slice_hrn)
362         return 1
363
364     def stopSlice(self, slice_hrn):
365         """
366         Stop the slice at plc
367         """
368         cred = self.credential
369         for hrn in self.aggregates.keys():
370             self.aggregates[hrn].startSlice(cred, slice_hrn)
371         return 1
372
373     def resetSlice(self, slice_hrn):
374         """
375         Reset the slice
376         """
377         # XX not yet implemented
378         return 1
379
380     def getPolicy(self):
381         """
382         Return the policy of this slice manager.
383         """
384     
385         return self.policy
386         
387     
388
389 ##############################
390 ## Server methods here for now
391 ##############################
392
393     def list_components(self):
394         return self.getComponents()
395
396     def list_slices(self, cred, hrn):
397         self.decode_authentication(cred, 'list')
398         return self.getSlices(hrn)
399
400     def get_resources(self, cred, hrn):
401         self.decode_authentication(cred, 'info')
402         return self.getResources(hrn)
403
404     def get_policy(self, cred):
405         self.decode_authentication(cred, 'info')
406         return self.getPolicy()
407
408     def create_slice(self, cred, hrn, rspec):
409         self.decode_authentication(cred, 'embed')
410         return self.createSlice(hrn)
411
412     def delete_slice(self, cred, hrn):
413         self.decode_authentication(cred, 'embed')
414         return self.deleteSlice(hrn)
415
416     def start_slice(self, cred, hrn):
417         self.decode_authentication(cred, 'control')
418         return self.startSlice(hrn)
419
420     def stop_slice(self, cred, hrn):
421         self.decode_authentication(cred, 'control')
422         return self.stopSlice(hrn)
423
424     def reset_slice(self, cred, hrn):
425         self.decode_authentication(cred, 'control')
426         return self.resetSlice(hrn)
427
428     def register_functions(self):
429         GeniServer.register_functions(self)
430
431         # Aggregate interface methods
432         self.server.register_function(self.list_components)
433         self.server.register_function(self.list_slices)
434         self.server.register_function(self.get_resources)
435         self.server.register_function(self.get_policy)
436         self.server.register_function(self.create_slice)
437         self.server.register_function(self.delete_slice)
438         self.server.register_function(self.start_slice)
439         self.server.register_function(self.stop_slice)
440         self.server.register_function(self.reset_slice)
441