fix component_id
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import Node
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import Services
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 id_parts = xrn.leaf.split('-')
124                 slice_ids.add(id_parts[0]) 
125                 node_ids.append(id_parts[1])
126             else:  
127                 names.add(xrn.pl_slicename())
128             if xrn.id:
129                 ids.add(xrn.id)
130
131         filter = {}
132         if names:
133             filter['name'] = list(names)
134         if slice_ids:
135             filter['slice_id'] = list(slice_ids)
136         slices = self.driver.shell.GetSlices(filter)
137         if not slices:
138             return []
139         slice = slices[0]
140         if node_ids:
141             slice['node_ids'] = node_ids
142         tags_dict = self.get_slice_tags(slice)
143         nodes_dict = self.get_slice_nodes(slice, options)
144         slivers = []
145         for node in nodes_dict.values():
146             node.update(slices[0]) 
147             node['tags'] = tags_dict[node['node_id']]
148             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
149             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
150             node['urn'] = node['sliver_id'] 
151             slivers.append(node)
152         return slivers
153
154     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
155         rspec_node = Node()
156         # xxx how to retrieve site['login_base']
157         site=sites[node['site_id']]
158         rspec_node['component_id'] = PlXrn(self.driver.hrn, hostname=node['hostname']).get_urn()
159         rspec_node['component_name'] = node['hostname']
160         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
161         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
162         # do not include boot state (<available> element) in the manifest rspec
163         rspec_node['boot_state'] = node['boot_state']
164         if node['boot_state'] == 'boot': 
165             rspec_node['available'] = 'true'
166         else:
167             rspec_node['available'] = 'false'
168         rspec_node['exclusive'] = 'false'
169         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
170                                         HardwareType({'name': 'pc'})]
171         # only doing this because protogeni rspec needs
172         # to advertise available initscripts
173         rspec_node['pl_initscripts'] = pl_initscripts.values()
174         # add site/interface info to nodes.
175         # assumes that sites, interfaces and tags have already been prepared.
176         if site['longitude'] and site['latitude']:
177             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
178             rspec_node['location'] = location
179         # Granularity
180         granularity = Granularity({'grain': grain})
181         rspec_node['granularity'] = granularity
182         rspec_node['interfaces'] = []
183         if_count=0
184         for if_id in node['interface_ids']:
185             interface = Interface(interfaces[if_id])
186             interface['ipv4'] = interface['ip']
187             interface['component_id'] = PlXrn(auth=self.driver.hrn,
188                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
189             # interfaces in the manifest need a client id
190             if slice:
191                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
192             rspec_node['interfaces'].append(interface)
193             if_count+=1
194         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids']]
195         rspec_node['tags'] = tags
196         return rspec_node
197
198     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
199                              pl_initscripts, sliver_allocations):
200         # get the granularity in second for the reservation system
201         grain = self.driver.shell.GetLeaseGranularity()
202         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
203         # xxx how to retrieve site['login_base']
204         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
205         # remove interfaces from manifest
206         rspec_node['interfaces'] = []
207         # add sliver info
208         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
209                          'name': sliver['name'],
210                          'type': 'plab-vserver',
211                          'tags': []})
212         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
213         rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
214         rspec_node['slivers'] = [rspec_sliver]
215
216         # slivers always provide the ssh service
217         login = Login({'authentication': 'ssh-keys', 'hostname': sliver['hostname'], 'port':'22', 'username': sliver['name']})
218         service = Services({'login': login})
219         rspec_node['services'] = [service]    
220         return rspec_node      
221
222     def get_slice_tags(self, slice):
223         slice_tag_ids = []
224         slice_tag_ids.extend(slice['slice_tag_ids'])
225         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
226         # sorted by node_id
227         tags_dict = defaultdict(list)
228         for tag in tags:
229             tags_dict[tag['node_id']] = tag
230         return tags_dict
231
232     def get_slice_nodes(self, slice, options={}):
233         nodes_dict = {}
234         filter = {'peer_id': None}
235         tags_filter = {}
236         if slice and slice.get('node_ids'):
237             filter['node_id'] = slice['node_ids']
238         else:
239             # there are no nodes to look up
240             return nodes_dict
241         tags_filter=filter.copy()
242         geni_available = options.get('geni_available')
243         if geni_available == True:
244             filter['boot_state'] = 'boot'
245         nodes = self.driver.shell.GetNodes(filter)
246         for node in nodes:
247             nodes_dict[node['node_id']] = node
248         return nodes_dict
249
250     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
251         if rspec_node['sliver_id'] in sliver_allocations:
252             # set sliver allocation and operational status
253             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
254             if sliver_allocation:
255                 allocation_status = sliver_allocation.allocation_state
256                 if allocation_status == 'geni_allocated':
257                     op_status =  'geni_pending_allocation'
258                 elif allocation_status == 'geni_provisioned':
259                     if rspec_node['boot_state'] == 'boot':
260                         op_status = 'geni_ready'
261                     else:
262                         op_status = 'geni_failed'
263                 else:
264                     op_status = 'geni_unknown'
265             else:
266                 allocation_status = 'geni_unallocated'    
267         # required fields
268         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
269                        'geni_expires': rspec_node['expires'],
270                        'geni_allocation_status' : allocation_status,
271                        'geni_operational_status': op_status,
272                        'geni_error': None,
273                        }
274         return geni_sliver        
275
276     def get_leases(self, slice=None, options={}):
277         
278         now = int(time.time())
279         filter={}
280         filter.update({'clip':now})
281         if slice:
282            filter.update({'name':slice['name']})
283         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
284         leases = self.driver.shell.GetLeases(filter)
285         grain = self.driver.shell.GetLeaseGranularity()
286
287         site_ids = []
288         for lease in leases:
289             site_ids.append(lease['site_id'])
290
291         # get sites
292         sites_dict  = self.get_sites({'site_id': site_ids}) 
293   
294         rspec_leases = []
295         for lease in leases:
296
297             rspec_lease = Lease()
298             
299             # xxx how to retrieve site['login_base']
300             site_id=lease['site_id']
301             site=sites_dict[site_id]
302
303             rspec_lease['lease_id'] = lease['lease_id']
304             rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], lease['hostname'])
305             slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name'])
306             slice_urn = hrn_to_urn(slice_hrn, 'slice')
307             rspec_lease['slice_id'] = slice_urn
308             rspec_lease['start_time'] = lease['t_from']
309             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
310             rspec_leases.append(rspec_lease)
311         return rspec_leases
312
313     
314     def list_resources(self, version = None, options={}):
315
316         version_manager = VersionManager()
317         version = version_manager.get_version(version)
318         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
319         rspec = RSpec(version=rspec_version, user_options=options)
320        
321         if not options.get('list_leases') or options['list_leases'] != 'leases':
322             # get nodes
323             nodes  = self.get_nodes(options)
324             site_ids = []
325             interface_ids = []
326             tag_ids = []
327             nodes_dict = {}
328             for node in nodes:
329                 site_ids.append(node['site_id'])
330                 interface_ids.extend(node['interface_ids'])
331                 tag_ids.extend(node['node_tag_ids'])
332                 nodes_dict[node['node_id']] = node
333             sites = self.get_sites({'site_id': site_ids})
334             interfaces = self.get_interfaces({'interface_id':interface_ids})
335             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
336             pl_initscripts = self.get_pl_initscripts()
337             # convert nodes to rspec nodes
338             rspec_nodes = []
339             for node in nodes:
340                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
341                 rspec_nodes.append(rspec_node)
342             rspec.version.add_nodes(rspec_nodes)
343
344             # add links
345             links = self.get_links(sites, nodes_dict, interfaces)        
346             rspec.version.add_links(links)
347         return rspec.toxml()
348
349     def describe(self, urns, version=None, options={}):
350         version_manager = VersionManager()
351         version = version_manager.get_version(version)
352         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
353         rspec = RSpec(version=rspec_version, user_options=options)
354
355         # get slivers
356         geni_slivers = []
357         slivers = self.get_slivers(urns, options)
358         if slivers:
359             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
360         else:
361             rspec_expires = datetime_to_string(utcparse(time.time()))      
362         rspec.xml.set('expires',  rspec_expires)
363
364         # lookup the sliver allocations
365         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
366         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
367         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
368         sliver_allocation_dict = {}
369         for sliver_allocation in sliver_allocations:
370             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
371       
372         if not options.get('list_leases') or options['list_leases'] != 'leases':
373             # add slivers
374             site_ids = []
375             interface_ids = []
376             tag_ids = []
377             nodes_dict = {}
378             for sliver in slivers:
379                 site_ids.append(sliver['site_id'])
380                 interface_ids.extend(sliver['interface_ids'])
381                 tag_ids.extend(sliver['node_tag_ids'])
382                 nodes_dict[sliver['node_id']] = sliver
383             sites = self.get_sites({'site_id': site_ids})
384             interfaces = self.get_interfaces({'interface_id':interface_ids})
385             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
386             pl_initscripts = self.get_pl_initscripts()
387             rspec_nodes = []
388             for sliver in slivers:
389                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
390                     continue
391                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
392                                                        pl_initscripts, sliver_allocation_dict)
393                 # manifest node element shouldn't contain available attribute
394                 rspec_node.pop('available')
395                 rspec_nodes.append(rspec_node) 
396                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
397                 geni_slivers.append(geni_sliver)
398             rspec.version.add_nodes(rspec_nodes)
399
400             # add sliver defaults
401             #default_sliver = slivers.get(None, [])
402             #if default_sliver:
403             #    default_sliver_attribs = default_sliver.get('tags', [])
404             #    for attrib in default_sliver_attribs:
405             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
406
407             # add links 
408             links = self.get_links(sites, nodes_dict, interfaces)        
409             rspec.version.add_links(links)
410
411         if not options.get('list_leases') or options['list_leases'] != 'resources':
412             if slivers:
413                 leases = self.get_leases(slivers[0])
414                 rspec.version.add_leases(leases)
415
416                
417         return {'geni_urn': urns[0], 
418                 'geni_rspec': rspec.toxml(),
419                 'geni_slivers': geni_slivers}