complete connectRegistry. Also minor updates and fixes
[sfa.git] / geni / aggregate.py
1 import os
2 import sys
3 import datetime
4 import time
5 import xmlrpclib
6
7 from geni.util.geniserver import GeniServer
8 from geni.util.geniclient import *
9 from geni.util.cert import Keypair, Certificate
10 from geni.util.trustedroot import TrustedRootList
11 from geni.util.excep import *
12 from geni.util.misc import *
13 from geni.util.config import Config
14 from geni.util.rspec import Rspec
15 from geni.util.specdict import *
16 from geni.util.storage import SimpleStorage
17
18 class Aggregate(GeniServer):
19
20     hrn = None
21     nodes_ttl = None
22     nodes = None
23     slices = None 
24     policy = None
25     timestamp = None
26     threshold = None    
27     shell = None
28     registry = None
29   
30     ##
31     # Create a new aggregate object.
32     #
33     # @param ip the ip address to listen on
34     # @param port the port to listen on
35     # @param key_file private key filename of registry
36     # @param cert_file certificate filename containing public key (could be a GID file)     
37
38     def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
39         GeniServer.__init__(self, ip, port, key_file, cert_file)
40         self.key_file = key_file
41         self.cert_file = cert_file
42         self.conf = Config(config)
43         basedir = self.conf.GENI_BASE_DIR + os.sep
44         server_basedir = basedir + os.sep + "geni" + os.sep
45         self.hrn = self.conf.GENI_INTERFACE_HRN
46         
47         nodes_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.components'])
48         self.nodes = SimpleStorage(nodes_file)
49         self.nodes.load()
50        
51         slices_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.slices'])
52         self.slices = SimpleStorage(slices_file)
53         self.slices.load()
54  
55         policy_file = os.sep.join([server_basedir, 'agg.policy'])
56         self.policy = SimpleStorage(policy_file)
57         self.policy.load()
58         
59         timestamp_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.timestamp']) 
60         self.timestamp = SimpleStorage(timestamp_file)
61
62         self.nodes_ttl = 1
63         self.connectPLC()
64         self.connectRegistry()
65
66     def connectRegistry(self):
67         """
68         Connect to the registry
69         """
70         # connect to registry using GeniClient
71         address = self.config.GENI_REGISTRY_HOSTNAME
72         port = self.config.GENI_REGISTRY_PORT
73         url = 'https://%(address)s:%(port)s' % locals()
74         self.registry = GeniClient(url, self.key_file, self.cert_file) 
75     
76     def connectPLC(self):
77         """
78         Connect to the plc api interface. First attempt to impor thte shell, if that fails
79         try to connect to the xmlrpc server.
80         """
81         self.auth = {'Username': self.conf.GENI_PLC_USER,
82                      'AuthMethod': 'password',
83                      'AuthString': self.conf.GENI_PLC_PASSWORD}
84
85         try:
86            # try to import PLC.Shell directly
87             sys.path.append(self.conf.GENI_PLC_SHELL_PATH) 
88             import PLC.Shell
89             self.shell = PLC.Shell.Shell(globals())
90             self.shell.AuthCheck()
91         except ImportError:
92             # connect to plc api via xmlrpc
93             plc_host = self.conf.GENI_PLC_HOST
94             plc_port = self.conf.GENI_PLC_PORT
95             plc_api_path = self.conf.GENI_PLC_API_PATH                 
96             url = "https://%(plc_host)s:%(plc_port)s/%(plc_api_path)s/" % locals()
97             self.auth = {'Username': self.conf.GENI_PLC_USER,
98                  'AuthMethod': 'password',
99                  'AuthString': self.conf.GENI_PLC_PASSWORD} 
100
101             self.shell = xmlrpclib.Server(url, verbose = 0, allow_none = True) 
102             self.shell.AuthCheck(self.auth) 
103
104     def hostname_to_hrn(self, login_base, hostname):
105         """
106         Convert hrn to plantelab name.
107         """
108         genihostname = "_".join(hostname.split("."))
109         return ".".join([self.hrn, login_base, genihostname])
110
111     def slicename_to_hrn(self, slicename):
112         """
113         Convert hrn to planetlab name.
114         """
115         slicename = slicename.replace("_", ".")
116         return ".".join([self.hrn, slicename])
117
118     def refresh_components(self):
119         """
120         Update the cached list of nodes.
121         """
122         # resolve component hostnames 
123         nodes = self.shell.GetNodes(self.auth, {}, ['hostname', 'site_id'])
124     
125         # resolve site login_bases
126         site_ids = [node['site_id'] for node in nodes]
127         sites = self.shell.GetSites(self.auth, site_ids, ['site_id', 'login_base'])
128         site_dict = {}
129         for site in sites:
130             site_dict[site['site_id']] = site['login_base']
131
132         # convert plc names to geni hrn
133         nodedict = {}
134         for node in nodes:
135             node_hrn = self.hostname_to_hrn(site_dict[node['site_id']], node['hostname'])
136             # apply policy. 
137             # Do not allow nodes found in blacklist, only allow nodes found in whitelist
138             if self.polciy['whitelist'] and node_hrn not in self.polciy['whitelist']:
139                 continue
140             if self.polciy['blacklist'] and node_hrn in self.policy['blacklist']:
141                 continue
142             nodedict[node_hrn] = node['hostname']
143         
144         self.nodes = SimpleStorage(self.nodes.db_filename, nodedict)
145         self.nodes.write()
146
147         # update timestamp and threshold
148         self.timestamp['timestamp'] =  datetime.datetime.now()
149         delta = datetime.timedelta(hours=self.nodes_ttl)
150         self.threshold = self.timestamp['timestamp'] + delta 
151         self.timestamp.write()        
152  
153     def load_components(self):
154         """
155         Read cached list of nodes.
156         """
157         # Read component list from cached file 
158         self.nodes.load()
159         self.timestamp.load() 
160         time_format = "%Y-%m-%d %H:%M:%S"
161         timestamp = self.timestamp['timestamp']
162         self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
163         delta = datetime.timedelta(hours=self.nodes_ttl)
164         self.threshold = self.timestamp['timestamp'] + delta
165
166     def load_policy(self):
167         """
168         Read the list of blacklisted and whitelisted nodes.
169         """
170         self.policy.load()
171
172
173     def get_components(self):
174         """
175         Return a list of components at this aggregate.
176         """
177         # Reload components list
178         now = datetime.datetime.now()
179         #self.load_components()
180         if not self.threshold or not self.timestamp['timestamp'] or now > self.threshold:
181             self.refresh_components()
182         elif now < self.threshold and not self.nodes.keys(): 
183             self.load_components()
184         return self.nodes.keys()
185      
186     def get_rspec(self, hrn, type):
187         """
188         Get resource information from PLC
189         """
190         
191         # Get the required nodes
192         if type in ['aggregate']:
193             nodes = self.shell.GetNodes(self.auth)
194         elif type in ['slice']:
195             slicename = hrn_to_pl_slicename(hrn)
196             slices = self.shell.GetSlices(self.auth, [slicename])
197             node_ids = slices[0]['node_ids']
198             nodes = self.shell.GetNodes(self.auth, node_ids) 
199         
200         # Get all network interfaces
201         interface_ids = []
202         for node in nodes:
203             interface_ids.extend(node['nodenetwork_ids'])
204         interfaces = self.shell.GetNodeNetworks(self.auth, interface_ids)
205         interface_dict = {}
206         for interface in interfaces:
207             interface_dict[interface['nodenetwork_id']] = interface
208         
209         # join nodes with thier interfaces
210         for node in nodes:
211             node['interfaces'] = []
212             for nodenetwork_id in node['nodenetwork_ids']:
213                 node['interfaces'].append(interface_dict[nodenetwork_id])
214
215         # convert and threshold to ints
216         timestamp = self.timestamp['timestamp']
217         start_time = int(self.timestamp['timestamp'].strftime("%s"))
218         end_time = int(self.threshold.strftime("%s"))
219         duration = end_time - start_time
220
221         # create the plc dict
222         networks = {'nodes': nodes, 'name': self.hrn, 'start_time': start_time, 'duration': duration} 
223         resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
224
225         # convert the plc dict to an rspec dict
226         resouceDict = RspecDict(resources)
227
228         # convert the rspec dict to xml
229         rspec = Rspec()
230         rspec.parseDict(resourceDict)
231         return rspec.toxml()
232
233     def get_resources(self, slice_hrn):
234         """
235         Return the current rspec for the specified slice.
236         """
237         slicename = hrn_to_plcslicename(slice_hrn)
238         rspec = self.get_rspec(slicenamem, 'slice')
239         
240         return rspec
241  
242     def create_slice(self, slice_hrn, rspec, attributes = []):
243         """
244         Instantiate the specified slice according to whats defined in the rspec.
245         """
246
247         # save slice state locally
248         # we can assume that spec object has been validated so its safer to
249         # save this instead of the unvalidated rspec the user gave us
250         self.slices[slice_hrn] = spec.toxml()
251         self.slices.write()
252
253         # extract node list from rspec
254         slicename = self.hrn_to_plcslicename(slice_hrn)
255         spec = Rspec(rspec)
256         nodespecs = spec.getDictsByTagName('NodeSpec')
257         nodes = [nodespec['name'] for nodespec in nodespecs]
258
259         # add slice to nodes at plc    
260         self.shell.AddSliceToNodes(self.auth, slicename, nodes)
261         for attribute in attributes:
262             type, value, node, nodegroup = attribute['type'], attribute['value'], attribute['node'], attribute['nodegroup']
263             shell.AddSliceAttribute(self.auth, slicename, type, value, node, nodegroup)
264
265         # XX contact the registry to get the list of users on this slice and
266         # their keys.
267         #slice_record = self.registry.resolve(slice_hrn)
268         #person_records = slice_record['users']
269         # for person in person_record:
270         #    email = person['email']
271         #    self.shell.AddPersonToSlice(self.auth, email, slicename) 
272      
273
274         return 1
275
276     def update_slice(self, slice_hrn, rspec, attributes = []):
277         """
278         Update the specified slice.
279         """
280         # Get slice info
281         slicename = self.hrn_to_plcslicename(slice_hrn)
282         slices = self.shell.GetSlices(self.auth, [slicename], ['node_ids'])
283         if not slice:
284             raise RecordNotFound(slice_hrn)
285         slice = slices[0]
286
287         # find out where this slice is currently running
288         nodes = self.shell.GetNodes(self.auth, slice['node_ids'], ['hostname'])
289         hostnames = [node['hostname'] for node in nodes]
290
291         # get netspec details
292         spec = Rspec(rspec)
293         nodespecs = spec.getDictsByTagName('NodeSpec')
294         nodes = [nodespec['name'] for nodespec in nodespecs]    
295        
296         # save slice state locally
297         # we can assume that spec object has been validated so its safer to 
298         # save this instead of the unvalidated rspec the user gave us
299         self.slices[slice_hrn] = spec.toxml()
300         self.slices.write()
301
302         # remove nodes not in rspec
303         delete_nodes = set(hostnames).difference(nodes)
304         # add nodes from rspec
305         added_nodes = set(nodes).difference(hostnames)
306     
307         shell.AddSliceToNodes(self.auth, slicename, added_nodes)
308         shell.DeleteSliceFromNodes(self.auth, slicename, deleted_nodes)
309
310         for attribute in attributes:
311             type, value, node, nodegroup = attribute['type'], attribute['value'], attribute['node'], attribute['nodegroup']
312             shell.AddSliceAttribute(self.auth, slicename, type, value, node, nodegroup)
313     
314         # contact registry to get slice users and add them to the slice
315         # slice_record = self.registry.resolve(slice_hrn)
316         # persons = slice_record['users']
317         
318         #for person in persons:
319         #    shell.AddPersonToSlice(person['email'], slice_name)
320
321          
322     def delete_slice_(self, slice_hrn):
323         """
324         Remove this slice from all components it was previouly associated with and 
325         free up the resources it was using.
326         """
327         if self.slices.has_key(slice_hrn):
328             self.slices.pop(slice_hrn)
329             self.slices.write()
330
331         slicename = self.hrn_to_plcslicename(slice_hrn)
332         slices = shell.GetSlices(self.auth, [slicename])
333         if not slice:
334             return 1  
335         slice = slices[0]
336       
337         shell.DeleteSliceFromNodes(self.auth, slicename, slice['node_ids'])
338         return 1
339
340     def start_slice(self, slice_hrn):
341         """
342         Stop the slice at plc.
343         """
344         slicename = hrn_to_plcslicename(slice_hrn)
345         slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
346         if not slices:
347             #raise RecordNotFound(slice_hrn)
348             return 1 
349         slice_id = slices[0]
350         atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
351         attribute_id = attreibutes[0] 
352         self.shell.UpdateSliceAttribute(self.auth, attribute_id, "1" )
353         return 1
354
355     def stop_slice(self, slice_hrn):
356         """
357         Stop the slice at plc
358         """
359         slicename = hrn_to_plcslicename(slice_hrn)
360         slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
361         if not slices:
362             #raise RecordNotFound(slice_hrn)
363             return 1
364         slice_id = slices[0]
365         atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
366         attribute_id = attreibutes[0]
367         self.shell.UpdateSliceAttribute(self.auth, attribute_id, "0")
368         return 1
369
370
371     def reset_slice(self, slice_hrn):
372         """
373         Reset the slice
374         """
375         # XX not yet implemented
376         return 1
377
378     def get_policy(self):
379         """
380         Return this aggregates policy.
381         """
382     
383         return self.policy
384         
385     
386
387 ##############################
388 ## Server methods here for now
389 ##############################
390
391     def components(self):
392         return self.get_components()
393
394     #def slices(self):
395     #    return self.get_slices()
396
397     def resources(self, cred, hrn):
398         self.decode_authentication(cred, 'info')
399         self.verify_object_belongs_to_me(hrn)
400
401         return self.get_resources(hrn)
402
403     def createSlice(self, cred, hrn, rspec):
404         self.decode_authentication(cred, 'embed')
405         self.verify_object_belongs_to_me(hrn)
406         return self.create_slice(hrn)
407
408     def updateSlice(self, cred, hrn, rspec):
409         self.decode_authentication(cred, 'embed')
410         self.verify_object_belongs_to_me(hrn)
411         return self.update_slice(hrn)    
412
413     def deleteSlice(self, cred, hrn):
414         self.decode_authentication(cred, 'embed')
415         self.verify_object_belongs_to_me(hrn)
416         return self.delete_slice(hrn)
417
418     def startSlice(self, cred, hrn):
419         self.decode_authentication(cred, 'control')
420         return self.start_slice(hrn)
421
422     def stopSlice(self, cred, hrn):
423         self.decode_authentication(cred, 'control')
424         return self.stop(hrn)
425
426     def resetSlice(self, cred, hrn):
427         self.decode_authentication(cred, 'control')
428         return self.reset(hrn)
429
430     def policy(self, cred):
431         self.decode_authentication(cred, 'info')
432         return self.get_policy()
433
434     def register_functions(self):
435         GeniServer.register_functions(self)
436
437         # Aggregate interface methods
438         self.server.register_function(self.components)
439         #self.server.register_function(self.slices)
440         self.server.register_function(self.resources)
441         self.server.register_function(self.createSlice)
442         self.server.register_function(self.deleteSlice)
443         self.server.register_function(self.startSlice)
444         self.server.register_function(self.stopSlice)
445         self.server.register_function(self.resetSlice)
446         self.server.register_function(self.policy)
447