Merge branch 'geni-v3' into dbsession
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29 class PlAggregate:
30
31     def __init__(self, driver):
32         self.driver = driver
33
34     def get_nodes(self, options={}):
35         filter = {'peer_id': None}
36         geni_available = options.get('geni_available')    
37         if geni_available == True:
38             filter['boot_state'] = 'boot'
39         nodes = self.driver.shell.GetNodes(filter)
40        
41         return nodes  
42  
43     def get_sites(self, filter={}):
44         sites = {}
45         for site in self.driver.shell.GetSites(filter):
46             sites[site['site_id']] = site
47         return sites
48
49     def get_interfaces(self, filter={}):
50         interfaces = {}
51         for interface in self.driver.shell.GetInterfaces(filter):
52             iface = Interface()
53             if interface['bwlimit']:
54                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
55             interfaces[interface['interface_id']] = interface
56         return interfaces
57
58     def get_links(self, sites, nodes, interfaces):
59         
60         topology = Topology() 
61         links = []
62         for (site_id1, site_id2) in topology:
63             site_id1 = int(site_id1)
64             site_id2 = int(site_id2)
65             link = Link()
66             if not site_id1 in sites or site_id2 not in sites:
67                 continue
68             site1 = sites[site_id1]
69             site2 = sites[site_id2]
70             # get hrns
71             site1_hrn = self.driver.hrn + '.' + site1['login_base']
72             site2_hrn = self.driver.hrn + '.' + site2['login_base']
73
74             for s1_node_id in site1['node_ids']:
75                 for s2_node_id in site2['node_ids']:
76                     if s1_node_id not in nodes or s2_node_id not in nodes:
77                         continue
78                     node1 = nodes[s1_node_id]
79                     node2 = nodes[s2_node_id]
80                     # set interfaces
81                     # just get first interface of the first node
82                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
83                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
84                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
85                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
86
87                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
88                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
89
90                     # set link
91                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
92                     link['interface1'] = if1
93                     link['interface2'] = if2
94                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
95                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
96                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
97                     links.append(link)
98
99         return links
100
101     def get_node_tags(self, filter={}):
102         node_tags = {}
103         for node_tag in self.driver.shell.GetNodeTags(filter):
104             node_tags[node_tag['node_tag_id']] = node_tag
105         return node_tags
106
107     def get_pl_initscripts(self, filter={}):
108         pl_initscripts = {}
109         filter.update({'enabled': True})
110         for initscript in self.driver.shell.GetInitScripts(filter):
111             pl_initscripts[initscript['initscript_id']] = initscript
112         return pl_initscripts
113
114     def get_slivers(self, urns, options={}):
115         names = set()
116         slice_ids = set()
117         node_ids = []
118         for urn in urns:
119             xrn = PlXrn(xrn=urn)
120             if xrn.type == 'sliver':
121                  # id: slice_id-node_id
122                 try:
123                     sliver_id_parts = xrn.get_sliver_id_parts()
124                     slice_id = int(sliver_id_parts[0]) 
125                     node_id = int(sliver_id_parts[1])
126                     slice_ids.add(slice_id) 
127                     node_ids.append(node_id)
128                 except ValueError:
129                     pass 
130             else:  
131                 slice_hrn = xrn.get_hrn()
132                 top_auth_hrn = top_auth(slice_hrn)
133                 site_hrn = '.'.join(slice_hrn.split('.')[:-1])
134                 slice_part = slice_hrn.split('.')[-1]
135                 if top_auth_hrn == self.driver.hrn:
136                     login_base = slice_hrn.split('.')[-2][:12]
137                 else:
138                     login_base = hash_loginbase(site_hrn)
139
140                 slice_name = '_'.join([login_base, slice_part])
141                 names.add(slice_name)
142
143         filter = {}
144         if names:
145             filter['name'] = list(names)
146         if slice_ids:
147             filter['slice_id'] = list(slice_ids)
148         # get slices
149         slices = self.driver.shell.GetSlices(filter)
150         if not slices:
151             return []
152         slice = slices[0]     
153         slice['hrn'] = slice_hrn   
154
155         # get sliver users
156         persons = []
157         person_ids = []
158         for slice in slices:
159             person_ids.extend(slice['person_ids'])
160         if person_ids:
161             persons = self.driver.shell.GetPersons(person_ids)
162                  
163         # get user keys
164         keys = {}
165         key_ids = []
166         for person in persons:
167             key_ids.extend(person['key_ids'])
168         
169         if key_ids:
170             key_list = self.driver.shell.GetKeys(key_ids)
171             for key in key_list:
172                 keys[key['key_id']] = key  
173
174         # construct user key info
175         users = []
176         for person in persons:
177             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
178             user = {
179                 'login': slice['name'], 
180                 'user_urn': person_urn,
181                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
182             }
183             users.append(user)
184
185         if node_ids:
186             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
187             slice['node_ids'] = node_ids
188         tags_dict = self.get_slice_tags(slice)
189         nodes_dict = self.get_slice_nodes(slice, options)
190         slivers = []
191         for node in nodes_dict.values():
192             node.update(slice) 
193             node['tags'] = tags_dict[node['node_id']]
194             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
195             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
196             node['urn'] = node['sliver_id'] 
197             node['services_user'] = users
198             slivers.append(node)
199         return slivers
200
201     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
202         rspec_node = NodeElement()
203         # xxx how to retrieve site['login_base']
204         site=sites[node['site_id']]
205         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
206         rspec_node['component_name'] = node['hostname']
207         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
208         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
209         # do not include boot state (<available> element) in the manifest rspec
210         rspec_node['boot_state'] = node['boot_state']
211         if node['boot_state'] == 'boot': 
212             rspec_node['available'] = 'true'
213         else:
214             rspec_node['available'] = 'false'
215
216         #distinguish between Shared and Reservable nodes
217         if node['node_type'] == 'reservable':
218             rspec_node['exclusive'] = 'true'
219         else:
220             rspec_node['exclusive'] = 'false'
221
222         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
223                                         HardwareType({'name': 'pc'})]
224         # only doing this because protogeni rspec needs
225         # to advertise available initscripts
226         rspec_node['pl_initscripts'] = pl_initscripts.values()
227         # add site/interface info to nodes.
228         # assumes that sites, interfaces and tags have already been prepared.
229         if site['longitude'] and site['latitude']:
230             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
231             rspec_node['location'] = location
232         # Granularity
233         granularity = Granularity({'grain': grain})
234         rspec_node['granularity'] = granularity
235         rspec_node['interfaces'] = []
236         if_count=0
237         for if_id in node['interface_ids']:
238             interface = Interface(interfaces[if_id])
239             interface['ipv4'] = interface['ip']
240             interface['component_id'] = PlXrn(auth=self.driver.hrn,
241                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
242             # interfaces in the manifest need a client id
243             if slice:
244                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
245             rspec_node['interfaces'].append(interface)
246             if_count+=1
247         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
248         rspec_node['tags'] = tags
249         return rspec_node
250
251     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
252                              pl_initscripts, sliver_allocations):
253         # get the granularity in second for the reservation system
254         grain = self.driver.shell.GetLeaseGranularity()
255         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
256         # xxx how to retrieve site['login_base']
257         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
258         # remove interfaces from manifest
259         rspec_node['interfaces'] = []
260         # add sliver info
261         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
262                          'name': sliver['name'],
263                          'type': 'plab-vserver',
264                          'tags': []})
265         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
266         if sliver['urn'] in sliver_allocations:
267             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
268             if sliver_allocations[sliver['urn']].component_id:
269                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
270         rspec_node['slivers'] = [rspec_sliver]
271
272         # slivers always provide the ssh service
273         login = Login({'authentication': 'ssh-keys', 
274                        'hostname': sliver['hostname'], 
275                        'port':'22', 
276                        'username': sliver['name'],
277                        'login': sliver['name']
278                       })
279         service = ServicesElement({'login': login,
280                             'services_user': sliver['services_user']})
281         rspec_node['services'] = [service]    
282         return rspec_node      
283
284     def get_slice_tags(self, slice):
285         slice_tag_ids = []
286         slice_tag_ids.extend(slice['slice_tag_ids'])
287         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
288         # sorted by node_id
289         tags_dict = defaultdict(list)
290         for tag in tags:
291             tags_dict[tag['node_id']] = tag
292         return tags_dict
293
294     def get_slice_nodes(self, slice, options={}):
295         nodes_dict = {}
296         filter = {'peer_id': None}
297         tags_filter = {}
298         if slice and slice.get('node_ids'):
299             filter['node_id'] = slice['node_ids']
300         else:
301             # there are no nodes to look up
302             return nodes_dict
303         tags_filter=filter.copy()
304         geni_available = options.get('geni_available')
305         if geni_available == True:
306             filter['boot_state'] = 'boot'
307         nodes = self.driver.shell.GetNodes(filter)
308         for node in nodes:
309             nodes_dict[node['node_id']] = node
310         return nodes_dict
311
312     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
313         if rspec_node['sliver_id'] in sliver_allocations:
314             # set sliver allocation and operational status
315             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
316             if sliver_allocation:
317                 allocation_status = sliver_allocation.allocation_state
318                 if allocation_status == 'geni_allocated':
319                     op_status =  'geni_pending_allocation'
320                 elif allocation_status == 'geni_provisioned':
321                     if rspec_node['boot_state'] == 'boot':
322                         op_status = 'geni_ready'
323                     else:
324                         op_status = 'geni_failed'
325                 else:
326                     op_status = 'geni_unknown'
327             else:
328                 allocation_status = 'geni_unallocated'    
329         else:
330             allocation_status = 'geni_unallocated'
331             op_status = 'geni_failed'
332         # required fields
333         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
334                        'geni_expires': rspec_node['expires'],
335                        'geni_allocation_status' : allocation_status,
336                        'geni_operational_status': op_status,
337                        'geni_error': '',
338                        }
339         return geni_sliver        
340
341     def get_leases(self, slice=None, options={}):
342         
343         now = int(time.time())
344         filter={}
345         filter.update({'clip':now})
346         if slice:
347            filter.update({'name':slice['name']})
348         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
349         leases = self.driver.shell.GetLeases(filter)
350         grain = self.driver.shell.GetLeaseGranularity()
351
352         site_ids = []
353         for lease in leases:
354             site_ids.append(lease['site_id'])
355
356         # get sites
357         sites_dict  = self.get_sites({'site_id': site_ids}) 
358   
359         rspec_leases = []
360         for lease in leases:
361
362             rspec_lease = Lease()
363             
364             # xxx how to retrieve site['login_base']
365             site_id=lease['site_id']
366             site=sites_dict[site_id]
367
368             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
369             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
370             slice_urn = hrn_to_urn(slice_hrn, 'slice')
371             rspec_lease['slice_id'] = slice_urn
372             rspec_lease['start_time'] = lease['t_from']
373             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
374             rspec_leases.append(rspec_lease)
375         return rspec_leases
376
377     
378     def list_resources(self, version = None, options={}):
379
380         version_manager = VersionManager()
381         version = version_manager.get_version(version)
382         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
383         rspec = RSpec(version=rspec_version, user_options=options)
384        
385         if not options.get('list_leases') or options['list_leases'] != 'leases':
386             # get nodes
387             nodes  = self.get_nodes(options)
388             site_ids = []
389             interface_ids = []
390             tag_ids = []
391             nodes_dict = {}
392             for node in nodes:
393                 site_ids.append(node['site_id'])
394                 interface_ids.extend(node['interface_ids'])
395                 tag_ids.extend(node['node_tag_ids'])
396                 nodes_dict[node['node_id']] = node
397             sites = self.get_sites({'site_id': site_ids})
398             interfaces = self.get_interfaces({'interface_id':interface_ids})
399             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
400             pl_initscripts = self.get_pl_initscripts()
401             # convert nodes to rspec nodes
402             rspec_nodes = []
403             for node in nodes:
404                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
405                 rspec_nodes.append(rspec_node)
406             rspec.version.add_nodes(rspec_nodes)
407
408             # add links
409             links = self.get_links(sites, nodes_dict, interfaces)        
410             rspec.version.add_links(links)
411
412         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
413            leases = self.get_leases()
414            rspec.version.add_leases(leases)
415
416         return rspec.toxml()
417
418     def describe(self, urns, version=None, options={}):
419         version_manager = VersionManager()
420         version = version_manager.get_version(version)
421         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
422         rspec = RSpec(version=rspec_version, user_options=options)
423
424         # get slivers
425         geni_slivers = []
426         slivers = self.get_slivers(urns, options)
427         if slivers:
428             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
429         else:
430             rspec_expires = datetime_to_string(utcparse(time.time()))      
431         rspec.xml.set('expires',  rspec_expires)
432
433         # lookup the sliver allocations
434         geni_urn = urns[0]
435         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
436         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
437         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
438         sliver_allocation_dict = {}
439         for sliver_allocation in sliver_allocations:
440             geni_urn = sliver_allocation.slice_urn
441             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
442       
443         if not options.get('list_leases') or options['list_leases'] != 'leases':
444             # add slivers
445             site_ids = []
446             interface_ids = []
447             tag_ids = []
448             nodes_dict = {}
449             for sliver in slivers:
450                 site_ids.append(sliver['site_id'])
451                 interface_ids.extend(sliver['interface_ids'])
452                 tag_ids.extend(sliver['node_tag_ids'])
453                 nodes_dict[sliver['node_id']] = sliver
454             sites = self.get_sites({'site_id': site_ids})
455             interfaces = self.get_interfaces({'interface_id':interface_ids})
456             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
457             pl_initscripts = self.get_pl_initscripts()
458             rspec_nodes = []
459             for sliver in slivers:
460                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
461                     continue
462                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
463                                                        pl_initscripts, sliver_allocation_dict)
464                 # manifest node element shouldn't contain available attribute
465                 rspec_node.pop('available')
466                 rspec_nodes.append(rspec_node) 
467                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
468                 geni_slivers.append(geni_sliver)
469             rspec.version.add_nodes(rspec_nodes)
470
471             # add sliver defaults
472             #default_sliver = slivers.get(None, [])
473             #if default_sliver:
474             #    default_sliver_attribs = default_sliver.get('tags', [])
475             #    for attrib in default_sliver_attribs:
476             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
477
478             # add links 
479             links = self.get_links(sites, nodes_dict, interfaces)        
480             rspec.version.add_links(links)
481
482         if not options.get('list_leases') or options['list_leases'] != 'resources':
483             if slivers:
484                 leases = self.get_leases(slivers[0])
485                 rspec.version.add_leases(leases)
486
487                
488         return {'geni_urn': geni_urn, 
489                 'geni_rspec': rspec.toxml(),
490                 'geni_slivers': geni_slivers}