fix bugs
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import Node
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import Services
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24
25 import time
26
27 class PlAggregate:
28
29     def __init__(self, driver):
30         self.driver = driver
31
32     def get_nodes(self, options={}):
33         filter = {'peer_id': None}
34         geni_available = options.get('geni_available')    
35         if geni_available == True:
36             filter['boot_state'] = 'boot'
37         nodes = self.driver.shell.GetNodes(filter)
38        
39         return nodes  
40  
41     def get_sites(self, filter={}):
42         sites = {}
43         for site in self.driver.shell.GetSites(filter):
44             sites[site['site_id']] = site
45         return sites
46
47     def get_interfaces(self, filter={}):
48         interfaces = {}
49         for interface in self.driver.shell.GetInterfaces(filter):
50             iface = Interface()
51             if interface['bwlimit']:
52                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
53             interfaces[interface['interface_id']] = interface
54         return interfaces
55
56     def get_links(self, sites, nodes, interfaces):
57         
58         topology = Topology() 
59         links = []
60         for (site_id1, site_id2) in topology:
61             site_id1 = int(site_id1)
62             site_id2 = int(site_id2)
63             link = Link()
64             if not site_id1 in sites or site_id2 not in sites:
65                 continue
66             site1 = sites[site_id1]
67             site2 = sites[site_id2]
68             # get hrns
69             site1_hrn = self.driver.hrn + '.' + site1['login_base']
70             site2_hrn = self.driver.hrn + '.' + site2['login_base']
71
72             for s1_node_id in site1['node_ids']:
73                 for s2_node_id in site2['node_ids']:
74                     if s1_node_id not in nodes or s2_node_id not in nodes:
75                         continue
76                     node1 = nodes[s1_node_id]
77                     node2 = nodes[s2_node_id]
78                     # set interfaces
79                     # just get first interface of the first node
80                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
81                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
82                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
83                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
84
85                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
86                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
87
88                     # set link
89                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
90                     link['interface1'] = if1
91                     link['interface2'] = if2
92                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
93                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
94                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
95                     links.append(link)
96
97         return links
98
99     def get_node_tags(self, filter={}):
100         node_tags = {}
101         for node_tag in self.driver.shell.GetNodeTags(filter):
102             node_tags[node_tag['node_tag_id']] = node_tag
103         return node_tags
104
105     def get_pl_initscripts(self, filter={}):
106         pl_initscripts = {}
107         filter.update({'enabled': True})
108         for initscript in self.driver.shell.GetInitScripts(filter):
109             pl_initscripts[initscript['initscript_id']] = initscript
110         return pl_initscripts
111
112     def get_slivers(self, urns, options):
113         names = set()
114         slice_ids = set()
115         node_ids = []
116         for urn in urns:
117             xrn = PlXrn(xrn=urn)
118             if xrn.type == 'sliver':
119                  # id: slice_id-node_id
120                 id_parts = xrn.leaf.split('-')
121                 slice_ids.add(id_parts[0]) 
122                 node_ids.append(id_parts[1])
123             else:  
124                 names.add(xrn.pl_slicename())
125             if xrn.id:
126                 ids.add(xrn.id)
127
128         filter = {}
129         if names:
130             filter['name'] = list(names)
131         if slice_ids:
132             filter['slice_id'] = list(slice_ids)
133         logger.debug('plaggregate.get_slivers: filter: %s' % str(filter)) 
134         slices = self.driver.shell.GetSlices(filter)
135         if not slices:
136             return []
137         slice = slices[0]
138         if node_ids:
139             slice['node_ids'] = node_ids
140         tags_dict = self.get_slice_tags(slice)
141         nodes_dict = self.get_slice_nodes(slice, options)
142         slivers = []
143         for node in nodes_dict.values():
144             node.update(slices[0]) 
145             node['tags'] = tags_dict[node['node_id']]
146             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
147             node['urn'] = PlXrn(xrn=sliver_hrn, type='sliver').get_urn() 
148             slivers.append(node)
149         return slivers
150
151     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
152         rspec_node = Node()
153         # xxx how to retrieve site['login_base']
154         site=sites[node['site_id']]
155         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
156         rspec_node['component_name'] = node['hostname']
157         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
158         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
159         # do not include boot state (<available> element) in the manifest rspec
160         rspec_node['boot_state'] = node['boot_state']
161         rspec_node['exclusive'] = 'false'
162         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
163                                         HardwareType({'name': 'pc'})]
164         # only doing this because protogeni rspec needs
165         # to advertise available initscripts
166         rspec_node['pl_initscripts'] = pl_initscripts.values()
167         # add site/interface info to nodes.
168         # assumes that sites, interfaces and tags have already been prepared.
169         if site['longitude'] and site['latitude']:
170             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
171             rspec_node['location'] = location
172         # Granularity
173         granularity = Granularity({'grain': grain})
174         rspec_node['granularity'] = granularity
175         rspec_node['interfaces'] = []
176         if_count=0
177         for if_id in node['interface_ids']:
178             interface = Interface(interfaces[if_id])
179             interface['ipv4'] = interface['ip']
180             interface['component_id'] = PlXrn(auth=self.driver.hrn,
181                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
182             # interfaces in the manifest need a client id
183             if slice:
184                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
185             rspec_node['interfaces'].append(interface)
186             if_count+=1
187         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids']]
188         rspec_node['tags'] = tags
189         return rspec_node
190
191     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, pl_initscripts):
192         # get the granularity in second for the reservation system
193         grain = self.driver.shell.GetLeaseGranularity()
194         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
195         # xxx how to retrieve site['login_base']
196         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
197         # remove interfaces from manifest
198         rspec_node['interfaces'] = []
199         # add sliver info
200         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
201                          'name': sliver['name'],
202                          'type': 'plab-vserver',
203                          'tags': []})
204         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
205         rspec_node['client_id'] = sliver['hostname']
206         rspec_node['slivers'] = [rspec_sliver]
207
208         # slivers always provide the ssh service
209         login = Login({'authentication': 'ssh-keys', 'hostname': sliver['hostname'], 'port':'22', 'username': sliver['name']})
210         service = Services({'login': login})
211         rspec_node['services'] = [service]    
212         return rspec_node      
213
214     def get_slice_tags(self, slice):
215         slice_tag_ids = []
216         slice_tag_ids.extend(slice['slice_tag_ids'])
217         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
218         # sorted by node_id
219         tags_dict = defaultdict(list)
220         for tag in tags:
221             tags_dict[tag['node_id']] = tag
222         return tags_dict
223
224     def get_slice_nodes(self, slice, options={}):
225         nodes_dict = {}
226         filter = {'peer_id': None}
227         tags_filter = {}
228         if slice and slice.get('node_ids'):
229             filter['node_id'] = slice['node_ids']
230         else:
231             # there are no nodes to look up
232             return nodes_dict
233         tags_filter=filter.copy()
234         geni_available = options.get('geni_available')
235         if geni_available == True:
236             filter['boot_state'] = 'boot'
237         nodes = self.driver.shell.GetNodes(filter)
238         for node in nodes:
239             nodes_dict[node['node_id']] = node
240         return nodes_dict
241
242     def rspec_node_to_geni_sliver(self, rspec_node):
243         op_status = "geni_unknown"
244         state = rspec_node['boot_state'].lower()
245         if state == 'boot':
246             op_status = 'geni_ready'
247         else:
248             op_status =' geni_failed'
249
250         # required fields
251         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
252                        'geni_expires': rspec_node['expires'],
253                        'geni_allocation_status': 'geni_provisioned',
254                        'geni_operational_status': op_status,
255                        'geni_error': None,
256                        }
257         return geni_sliver        
258
259     def get_leases(self, slice=None, options={}):
260         
261         now = int(time.time())
262         filter={}
263         filter.update({'clip':now})
264         if slice:
265            filter.update({'name':slice['name']})
266         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
267         leases = self.driver.shell.GetLeases(filter)
268         grain = self.driver.shell.GetLeaseGranularity()
269
270         site_ids = []
271         for lease in leases:
272             site_ids.append(lease['site_id'])
273
274         # get sites
275         sites_dict  = self.get_sites({'site_id': site_ids}) 
276   
277         rspec_leases = []
278         for lease in leases:
279
280             rspec_lease = Lease()
281             
282             # xxx how to retrieve site['login_base']
283             site_id=lease['site_id']
284             site=sites_dict[site_id]
285
286             rspec_lease['lease_id'] = lease['lease_id']
287             rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], lease['hostname'])
288             slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name'])
289             slice_urn = hrn_to_urn(slice_hrn, 'slice')
290             rspec_lease['slice_id'] = slice_urn
291             rspec_lease['start_time'] = lease['t_from']
292             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
293             rspec_leases.append(rspec_lease)
294         return rspec_leases
295
296     
297     def list_resources(self, version = None, options={}):
298
299         version_manager = VersionManager()
300         version = version_manager.get_version(version)
301         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
302         rspec = RSpec(version=rspec_version, user_options=options)
303        
304         if not options.get('list_leases') or options['list_leases'] != 'leases':
305             # get nodes
306             nodes  = self.get_nodes(options)
307             site_ids = []
308             interface_ids = []
309             tag_ids = []
310             nodes_dict = {}
311             for node in nodes:
312                 site_ids.append(node['site_id'])
313                 interface_ids.extend(node['interface_ids'])
314                 tag_ids.extend(node['node_tag_ids'])
315                 nodes_dict[node['node_id']] = node
316             sites = self.get_sites({'site_id': site_ids})
317             interfaces = self.get_interfaces({'interface_id':interface_ids})
318             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
319             pl_initscripts = self.get_pl_initscripts()
320             # convert nodes to rspec nodes
321             rspec_nodes = []
322             for node in nodes:
323                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
324                 rspec_nodes.append(rspec_node)
325             rspec.version.add_nodes(rspec_nodes)
326
327             # add links
328             links = self.get_links(sites, nodes_dict, interfaces)        
329             rspec.version.add_links(links)
330         return rspec.toxml()
331
332     def describe(self, urns, version=None, options={}):
333         version_manager = VersionManager()
334         version = version_manager.get_version(version)
335         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
336         rspec = RSpec(version=version, user_options=options)
337
338         # get slivers
339         geni_slivers = []
340         slivers = self.get_slivers(urns, options) 
341         if len(slivers) == 0:
342             raise SliverDoesNotExist("You have not allocated any slivers here for %s" % str(urns))
343         rspec.xml.set('expires',  datetime_to_string(utcparse(slivers[0]['expires'])))
344       
345         if not options.get('list_leases') or options['list_leases'] != 'leases':
346             # add slivers
347             site_ids = []
348             interface_ids = []
349             tag_ids = []
350             nodes_dict = {}
351             for sliver in slivers:
352                 site_ids.append(sliver['site_id'])
353                 interface_ids.extend(sliver['interface_ids'])
354                 tag_ids.extend(sliver['node_tag_ids'])
355                 nodes_dict[sliver['node_id']] = sliver
356             sites = self.get_sites({'site_id': site_ids})
357             interfaces = self.get_interfaces({'interface_id':interface_ids})
358             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
359             pl_initscripts = self.get_pl_initscripts()
360             rspec_nodes = []
361             for sliver in slivers:
362                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
363                     continue
364                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts)
365                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node)
366                 rspec_nodes.append(rspec_node) 
367                 geni_slivers.append(geni_sliver)
368             rspec.version.add_nodes(rspec_nodes)
369
370             # add sliver defaults
371             #default_sliver = slivers.get(None, [])
372             #if default_sliver:
373             #    default_sliver_attribs = default_sliver.get('tags', [])
374             #    for attrib in default_sliver_attribs:
375             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
376
377             # add links 
378             links = self.get_links(sites, nodes_dict, interfaces)        
379             rspec.version.add_links(links)
380
381         if not options.get('list_leases') or options['list_leases'] != 'resources':
382             leases = self.get_leases(slivers[0])
383             rspec.version.add_leases(leases)
384
385                
386         return {'geni_urn': urns[0], 
387                 'geni_rspec': rspec.toxml(),
388                 'geni_slivers': geni_slivers}