fix indentation error
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, xrn_to_ext_slicename, top_auth
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29 class PlAggregate:
30
31     def __init__(self, driver):
32         self.driver = driver
33
34     def get_nodes(self, options={}):
35         filter = {'peer_id': None}
36         geni_available = options.get('geni_available')    
37         if geni_available == True:
38             filter['boot_state'] = 'boot'
39         nodes = self.driver.shell.GetNodes(filter)
40        
41         return nodes  
42  
43     def get_sites(self, filter={}):
44         sites = {}
45         for site in self.driver.shell.GetSites(filter):
46             sites[site['site_id']] = site
47         return sites
48
49     def get_interfaces(self, filter={}):
50         interfaces = {}
51         for interface in self.driver.shell.GetInterfaces(filter):
52             iface = Interface()
53             if interface['bwlimit']:
54                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
55             interfaces[interface['interface_id']] = interface
56         return interfaces
57
58     def get_links(self, sites, nodes, interfaces):
59         
60         topology = Topology() 
61         links = []
62         for (site_id1, site_id2) in topology:
63             site_id1 = int(site_id1)
64             site_id2 = int(site_id2)
65             link = Link()
66             if not site_id1 in sites or site_id2 not in sites:
67                 continue
68             site1 = sites[site_id1]
69             site2 = sites[site_id2]
70             # get hrns
71             site1_hrn = self.driver.hrn + '.' + site1['login_base']
72             site2_hrn = self.driver.hrn + '.' + site2['login_base']
73
74             for s1_node_id in site1['node_ids']:
75                 for s2_node_id in site2['node_ids']:
76                     if s1_node_id not in nodes or s2_node_id not in nodes:
77                         continue
78                     node1 = nodes[s1_node_id]
79                     node2 = nodes[s2_node_id]
80                     # set interfaces
81                     # just get first interface of the first node
82                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
83                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
84                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
85                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
86
87                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
88                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
89
90                     # set link
91                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
92                     link['interface1'] = if1
93                     link['interface2'] = if2
94                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
95                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
96                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
97                     links.append(link)
98
99         return links
100
101     def get_node_tags(self, filter={}):
102         node_tags = {}
103         for node_tag in self.driver.shell.GetNodeTags(filter):
104             node_tags[node_tag['node_tag_id']] = node_tag
105         return node_tags
106
107     def get_pl_initscripts(self, filter={}):
108         pl_initscripts = {}
109         filter.update({'enabled': True})
110         for initscript in self.driver.shell.GetInitScripts(filter):
111             pl_initscripts[initscript['initscript_id']] = initscript
112         return pl_initscripts
113
114     def get_slivers(self, urns, options={}):
115         names = set()
116         slice_ids = set()
117         node_ids = []
118         for urn in urns:
119             xrn = PlXrn(xrn=urn)
120             if xrn.type == 'sliver':
121                  # id: slice_id-node_id
122                 try:
123                     sliver_id_parts = xrn.get_sliver_id_parts()
124                     slice_id = int(sliver_id_parts[0]) 
125                     node_id = int(sliver_id_parts[1])
126                     slice_ids.add(slice_id) 
127                     node_ids.append(node_id)
128                 except ValueError:
129                     pass 
130             else:  
131                 slice_hrn = xrn.get_hrn()
132                 top_auth_hrn = top_auth(slice_hrn)
133                 if top_auth_hrn == self.driver.hrn:
134                     slice_name = hrn_to_pl_slicename(slice_hrn)
135                 else:
136                     slice_name = xrn_to_ext_slicename(slice_hrn) 
137                 names.add(slice_name)
138
139         filter = {}
140         if names:
141             filter['name'] = list(names)
142         if slice_ids:
143             filter['slice_id'] = list(slice_ids)
144         # get slices
145         slices = self.driver.shell.GetSlices(filter)
146         if not slices:
147             return []
148         slice = slices[0]     
149         slice['hrn'] = slice_hrn   
150
151         # get sliver users
152         persons = []
153         person_ids = []
154         for slice in slices:
155             person_ids.extend(slice['person_ids'])
156         if person_ids:
157             persons = self.driver.shell.GetPersons(person_ids)
158                  
159         # get user keys
160         keys = {}
161         key_ids = []
162         for person in persons:
163             key_ids.extend(person['key_ids'])
164         
165         if key_ids:
166             key_list = self.driver.shell.GetKeys(key_ids)
167             for key in key_list:
168                 keys[key['key_id']] = key  
169
170         # construct user key info
171         users = []
172         for person in persons:
173             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
174             user = {
175                 'login': slice['name'], 
176                 'user_urn': person_urn,
177                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
178             }
179             users.append(user)
180
181         if node_ids:
182             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
183             slice['node_ids'] = node_ids
184         tags_dict = self.get_slice_tags(slice)
185         nodes_dict = self.get_slice_nodes(slice, options)
186         slivers = []
187         for node in nodes_dict.values():
188             node.update(slice) 
189             node['tags'] = tags_dict[node['node_id']]
190             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
191             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
192             node['urn'] = node['sliver_id'] 
193             node['services_user'] = users
194             slivers.append(node)
195         return slivers
196
197     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
198         rspec_node = NodeElement()
199         # xxx how to retrieve site['login_base']
200         site=sites[node['site_id']]
201         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
202         rspec_node['component_name'] = node['hostname']
203         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
204         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
205         # do not include boot state (<available> element) in the manifest rspec
206         rspec_node['boot_state'] = node['boot_state']
207         if node['boot_state'] == 'boot': 
208             rspec_node['available'] = 'true'
209         else:
210             rspec_node['available'] = 'false'
211
212         #distinguish between Shared and Reservable nodes
213         if node['node_type'] == 'reservable':
214             rspec_node['exclusive'] = 'true'
215         else:
216             rspec_node['exclusive'] = 'false'
217
218         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
219                                         HardwareType({'name': 'pc'})]
220         # only doing this because protogeni rspec needs
221         # to advertise available initscripts
222         rspec_node['pl_initscripts'] = pl_initscripts.values()
223         # add site/interface info to nodes.
224         # assumes that sites, interfaces and tags have already been prepared.
225         if site['longitude'] and site['latitude']:
226             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
227             rspec_node['location'] = location
228         # Granularity
229         granularity = Granularity({'grain': grain})
230         rspec_node['granularity'] = granularity
231         rspec_node['interfaces'] = []
232         if_count=0
233         for if_id in node['interface_ids']:
234             interface = Interface(interfaces[if_id])
235             interface['ipv4'] = interface['ip']
236             interface['component_id'] = PlXrn(auth=self.driver.hrn,
237                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
238             # interfaces in the manifest need a client id
239             if slice:
240                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
241             rspec_node['interfaces'].append(interface)
242             if_count+=1
243         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
244         rspec_node['tags'] = tags
245         return rspec_node
246
247     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
248                              pl_initscripts, sliver_allocations):
249         # get the granularity in second for the reservation system
250         grain = self.driver.shell.GetLeaseGranularity()
251         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
252         # xxx how to retrieve site['login_base']
253         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
254         # remove interfaces from manifest
255         rspec_node['interfaces'] = []
256         # add sliver info
257         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
258                          'name': sliver['name'],
259                          'type': 'plab-vserver',
260                          'tags': []})
261         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
262         if sliver['urn'] in sliver_allocations:
263             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
264             if sliver_allocations[sliver['urn']].component_id:
265                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
266         rspec_node['slivers'] = [rspec_sliver]
267
268         # slivers always provide the ssh service
269         login = Login({'authentication': 'ssh-keys', 
270                        'hostname': sliver['hostname'], 
271                        'port':'22', 
272                        'username': sliver['name'],
273                        'login': sliver['name']
274                       })
275         service = ServicesElement({'login': login,
276                             'services_user': sliver['services_user']})
277         rspec_node['services'] = [service]    
278         return rspec_node      
279
280     def get_slice_tags(self, slice):
281         slice_tag_ids = []
282         slice_tag_ids.extend(slice['slice_tag_ids'])
283         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
284         # sorted by node_id
285         tags_dict = defaultdict(list)
286         for tag in tags:
287             tags_dict[tag['node_id']] = tag
288         return tags_dict
289
290     def get_slice_nodes(self, slice, options={}):
291         nodes_dict = {}
292         filter = {'peer_id': None}
293         tags_filter = {}
294         if slice and slice.get('node_ids'):
295             filter['node_id'] = slice['node_ids']
296         else:
297             # there are no nodes to look up
298             return nodes_dict
299         tags_filter=filter.copy()
300         geni_available = options.get('geni_available')
301         if geni_available == True:
302             filter['boot_state'] = 'boot'
303         nodes = self.driver.shell.GetNodes(filter)
304         for node in nodes:
305             nodes_dict[node['node_id']] = node
306         return nodes_dict
307
308     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
309         if rspec_node['sliver_id'] in sliver_allocations:
310             # set sliver allocation and operational status
311             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
312             if sliver_allocation:
313                 allocation_status = sliver_allocation.allocation_state
314                 if allocation_status == 'geni_allocated':
315                     op_status =  'geni_pending_allocation'
316                 elif allocation_status == 'geni_provisioned':
317                     if rspec_node['boot_state'] == 'boot':
318                         op_status = 'geni_ready'
319                     else:
320                         op_status = 'geni_failed'
321                 else:
322                     op_status = 'geni_unknown'
323             else:
324                 allocation_status = 'geni_unallocated'    
325         else:
326             allocation_status = 'geni_unallocated'
327             op_status = 'geni_failed'
328         # required fields
329         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
330                        'geni_expires': rspec_node['expires'],
331                        'geni_allocation_status' : allocation_status,
332                        'geni_operational_status': op_status,
333                        'geni_error': '',
334                        }
335         return geni_sliver        
336
337     def get_leases(self, slice=None, options={}):
338         
339         now = int(time.time())
340         filter={}
341         filter.update({'clip':now})
342         if slice:
343            filter.update({'name':slice['name']})
344         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
345         leases = self.driver.shell.GetLeases(filter)
346         grain = self.driver.shell.GetLeaseGranularity()
347
348         site_ids = []
349         for lease in leases:
350             site_ids.append(lease['site_id'])
351
352         # get sites
353         sites_dict  = self.get_sites({'site_id': site_ids}) 
354   
355         rspec_leases = []
356         for lease in leases:
357
358             rspec_lease = Lease()
359             
360             # xxx how to retrieve site['login_base']
361             site_id=lease['site_id']
362             site=sites_dict[site_id]
363
364             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
365             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
366             slice_urn = hrn_to_urn(slice_hrn, 'slice')
367             rspec_lease['slice_id'] = slice_urn
368             rspec_lease['start_time'] = lease['t_from']
369             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
370             rspec_leases.append(rspec_lease)
371         return rspec_leases
372
373     
374     def list_resources(self, version = None, options={}):
375
376         version_manager = VersionManager()
377         version = version_manager.get_version(version)
378         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
379         rspec = RSpec(version=rspec_version, user_options=options)
380        
381         if not options.get('list_leases') or options['list_leases'] != 'leases':
382             # get nodes
383             nodes  = self.get_nodes(options)
384             site_ids = []
385             interface_ids = []
386             tag_ids = []
387             nodes_dict = {}
388             for node in nodes:
389                 site_ids.append(node['site_id'])
390                 interface_ids.extend(node['interface_ids'])
391                 tag_ids.extend(node['node_tag_ids'])
392                 nodes_dict[node['node_id']] = node
393             sites = self.get_sites({'site_id': site_ids})
394             interfaces = self.get_interfaces({'interface_id':interface_ids})
395             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
396             pl_initscripts = self.get_pl_initscripts()
397             # convert nodes to rspec nodes
398             rspec_nodes = []
399             for node in nodes:
400                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
401                 rspec_nodes.append(rspec_node)
402             rspec.version.add_nodes(rspec_nodes)
403
404             # add links
405             links = self.get_links(sites, nodes_dict, interfaces)        
406             rspec.version.add_links(links)
407
408         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
409            leases = self.get_leases()
410            rspec.version.add_leases(leases)
411
412         return rspec.toxml()
413
414     def describe(self, urns, version=None, options={}):
415         version_manager = VersionManager()
416         version = version_manager.get_version(version)
417         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
418         rspec = RSpec(version=rspec_version, user_options=options)
419
420         # get slivers
421         geni_slivers = []
422         slivers = self.get_slivers(urns, options)
423         if slivers:
424             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
425         else:
426             rspec_expires = datetime_to_string(utcparse(time.time()))      
427         rspec.xml.set('expires',  rspec_expires)
428
429         # lookup the sliver allocations
430         geni_urn = urns[0]
431         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
432         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
433         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
434         sliver_allocation_dict = {}
435         for sliver_allocation in sliver_allocations:
436             geni_urn = sliver_allocation.slice_urn
437             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
438       
439         if not options.get('list_leases') or options['list_leases'] != 'leases':
440             # add slivers
441             site_ids = []
442             interface_ids = []
443             tag_ids = []
444             nodes_dict = {}
445             for sliver in slivers:
446                 site_ids.append(sliver['site_id'])
447                 interface_ids.extend(sliver['interface_ids'])
448                 tag_ids.extend(sliver['node_tag_ids'])
449                 nodes_dict[sliver['node_id']] = sliver
450             sites = self.get_sites({'site_id': site_ids})
451             interfaces = self.get_interfaces({'interface_id':interface_ids})
452             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
453             pl_initscripts = self.get_pl_initscripts()
454             rspec_nodes = []
455             for sliver in slivers:
456                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
457                     continue
458                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
459                                                        pl_initscripts, sliver_allocation_dict)
460                 # manifest node element shouldn't contain available attribute
461                 rspec_node.pop('available')
462                 rspec_nodes.append(rspec_node) 
463                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
464                 geni_slivers.append(geni_sliver)
465             rspec.version.add_nodes(rspec_nodes)
466
467             # add sliver defaults
468             #default_sliver = slivers.get(None, [])
469             #if default_sliver:
470             #    default_sliver_attribs = default_sliver.get('tags', [])
471             #    for attrib in default_sliver_attribs:
472             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
473
474             # add links 
475             links = self.get_links(sites, nodes_dict, interfaces)        
476             rspec.version.add_links(links)
477
478         if not options.get('list_leases') or options['list_leases'] != 'resources':
479             if slivers:
480                 leases = self.get_leases(slivers[0])
481                 rspec.version.add_leases(leases)
482
483                
484         return {'geni_urn': geni_urn, 
485                 'geni_rspec': rspec.toxml(),
486                 'geni_slivers': geni_slivers}