plshell: explicitly cast to string PL Username and AuthString
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 try:
124                     sliver_id_parts = xrn.get_sliver_id_parts()
125                     slice_id = int(sliver_id_parts[0]) 
126                     node_id = int(sliver_id_parts[1])
127                     slice_ids.add(slice_id) 
128                     node_ids.append(node_id)
129                 except ValueError:
130                     pass 
131             else:  
132                 slice_hrn = xrn.get_hrn()
133                 top_auth_hrn = top_auth(slice_hrn)
134                 site_hrn = '.'.join(slice_hrn.split('.')[:-1])
135                 slice_part = slice_hrn.split('.')[-1]
136                 if top_auth_hrn == self.driver.hrn:
137                     login_base = slice_hrn.split('.')[-2][:12]
138                 else:
139                     login_base = hash_loginbase(site_hrn)
140
141                 slice_name = '_'.join([login_base, slice_part])
142                 names.add(slice_name)
143
144         filter = {}
145         if names:
146             filter['name'] = list(names)
147         if slice_ids:
148             filter['slice_id'] = list(slice_ids)
149         # get slices
150         slices = self.driver.shell.GetSlices(filter)
151         if not slices:
152             return []
153         slice = slices[0]     
154         slice['hrn'] = slice_hrn   
155
156         # get sliver users
157         persons = []
158         person_ids = []
159         for slice in slices:
160             person_ids.extend(slice['person_ids'])
161         if person_ids:
162             persons = self.driver.shell.GetPersons(person_ids)
163                  
164         # get user keys
165         keys = {}
166         key_ids = []
167         for person in persons:
168             key_ids.extend(person['key_ids'])
169         
170         if key_ids:
171             key_list = self.driver.shell.GetKeys(key_ids)
172             for key in key_list:
173                 keys[key['key_id']] = key  
174
175         # construct user key info
176         users = []
177         for person in persons:
178             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
179             user = {
180                 'login': slice['name'], 
181                 'user_urn': person_urn,
182                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
183             }
184             users.append(user)
185
186         if node_ids:
187             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
188             slice['node_ids'] = node_ids
189         tags_dict = self.get_slice_tags(slice)
190         nodes_dict = self.get_slice_nodes(slice, options)
191         slivers = []
192         for node in nodes_dict.values():
193             node.update(slice) 
194             node['tags'] = tags_dict[node['node_id']]
195             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
196             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
197             node['urn'] = node['sliver_id'] 
198             node['services_user'] = users
199             slivers.append(node)
200         return slivers
201
202     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
203         rspec_node = NodeElement()
204         # xxx how to retrieve site['login_base']
205         site=sites[node['site_id']]
206         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
207         rspec_node['component_name'] = node['hostname']
208         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
209         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
210         # do not include boot state (<available> element) in the manifest rspec
211         rspec_node['boot_state'] = node['boot_state']
212         if node['boot_state'] == 'boot': 
213             rspec_node['available'] = 'true'
214         else:
215             rspec_node['available'] = 'false'
216
217         #distinguish between Shared and Reservable nodes
218         if node['node_type'] == 'reservable':
219             rspec_node['exclusive'] = 'true'
220         else:
221             rspec_node['exclusive'] = 'false'
222
223         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
224                                         HardwareType({'name': 'pc'})]
225         # only doing this because protogeni rspec needs
226         # to advertise available initscripts
227         rspec_node['pl_initscripts'] = pl_initscripts.values()
228         # add site/interface info to nodes.
229         # assumes that sites, interfaces and tags have already been prepared.
230         if site['longitude'] and site['latitude']:
231             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
232             rspec_node['location'] = location
233         # Granularity
234         granularity = Granularity({'grain': grain})
235         rspec_node['granularity'] = granularity
236         rspec_node['interfaces'] = []
237         if_count=0
238         for if_id in node['interface_ids']:
239             interface = Interface(interfaces[if_id])
240             interface['ipv4'] = interface['ip']
241             interface['component_id'] = PlXrn(auth=self.driver.hrn,
242                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
243             # interfaces in the manifest need a client id
244             if slice:
245                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
246             rspec_node['interfaces'].append(interface)
247             if_count+=1
248         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
249         rspec_node['tags'] = tags
250         return rspec_node
251
252     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
253                              pl_initscripts, sliver_allocations):
254         # get the granularity in second for the reservation system
255         grain = self.driver.shell.GetLeaseGranularity()
256         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
257         # xxx how to retrieve site['login_base']
258         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
259         # remove interfaces from manifest
260         rspec_node['interfaces'] = []
261         # add sliver info
262         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
263                          'name': sliver['name'],
264                          'type': 'plab-vserver',
265                          'tags': []})
266         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
267         if sliver['urn'] in sliver_allocations:
268             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
269             if sliver_allocations[sliver['urn']].component_id:
270                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
271         rspec_node['slivers'] = [rspec_sliver]
272
273         # slivers always provide the ssh service
274         login = Login({'authentication': 'ssh-keys', 
275                        'hostname': sliver['hostname'], 
276                        'port':'22', 
277                        'username': sliver['name'],
278                        'login': sliver['name']
279                       })
280         service = ServicesElement({'login': login,
281                             'services_user': sliver['services_user']})
282         rspec_node['services'] = [service]    
283         return rspec_node      
284
285     def get_slice_tags(self, slice):
286         slice_tag_ids = []
287         slice_tag_ids.extend(slice['slice_tag_ids'])
288         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
289         # sorted by node_id
290         tags_dict = defaultdict(list)
291         for tag in tags:
292             tags_dict[tag['node_id']] = tag
293         return tags_dict
294
295     def get_slice_nodes(self, slice, options={}):
296         nodes_dict = {}
297         filter = {'peer_id': None}
298         tags_filter = {}
299         if slice and slice.get('node_ids'):
300             filter['node_id'] = slice['node_ids']
301         else:
302             # there are no nodes to look up
303             return nodes_dict
304         tags_filter=filter.copy()
305         geni_available = options.get('geni_available')
306         if geni_available == True:
307             filter['boot_state'] = 'boot'
308         nodes = self.driver.shell.GetNodes(filter)
309         for node in nodes:
310             nodes_dict[node['node_id']] = node
311         return nodes_dict
312
313     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
314         if rspec_node['sliver_id'] in sliver_allocations:
315             # set sliver allocation and operational status
316             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
317             if sliver_allocation:
318                 allocation_status = sliver_allocation.allocation_state
319                 if allocation_status == 'geni_allocated':
320                     op_status =  'geni_pending_allocation'
321                 elif allocation_status == 'geni_provisioned':
322                     if rspec_node['boot_state'] == 'boot':
323                         op_status = 'geni_ready'
324                     else:
325                         op_status = 'geni_failed'
326                 else:
327                     op_status = 'geni_unknown'
328             else:
329                 allocation_status = 'geni_unallocated'    
330         else:
331             allocation_status = 'geni_unallocated'
332             op_status = 'geni_failed'
333         # required fields
334         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
335                        'geni_expires': rspec_node['expires'],
336                        'geni_allocation_status' : allocation_status,
337                        'geni_operational_status': op_status,
338                        'geni_error': '',
339                        }
340         return geni_sliver        
341
342     def get_leases(self, slice=None, options={}):
343         
344         now = int(time.time())
345         filter={}
346         filter.update({'clip':now})
347         if slice:
348            filter.update({'name':slice['name']})
349         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
350         leases = self.driver.shell.GetLeases(filter)
351         grain = self.driver.shell.GetLeaseGranularity()
352
353         site_ids = []
354         for lease in leases:
355             site_ids.append(lease['site_id'])
356
357         # get sites
358         sites_dict  = self.get_sites({'site_id': site_ids}) 
359   
360         rspec_leases = []
361         for lease in leases:
362
363             rspec_lease = Lease()
364             
365             # xxx how to retrieve site['login_base']
366             site_id=lease['site_id']
367             site=sites_dict[site_id]
368
369             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
370             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
371             slice_urn = hrn_to_urn(slice_hrn, 'slice')
372             rspec_lease['slice_id'] = slice_urn
373             rspec_lease['start_time'] = lease['t_from']
374             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
375             rspec_leases.append(rspec_lease)
376         return rspec_leases
377
378     
379     def list_resources(self, version = None, options={}):
380
381         version_manager = VersionManager()
382         version = version_manager.get_version(version)
383         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
384         rspec = RSpec(version=rspec_version, user_options=options)
385        
386         if not options.get('list_leases') or options['list_leases'] != 'leases':
387             # get nodes
388             nodes  = self.get_nodes(options)
389             site_ids = []
390             interface_ids = []
391             tag_ids = []
392             nodes_dict = {}
393             for node in nodes:
394                 site_ids.append(node['site_id'])
395                 interface_ids.extend(node['interface_ids'])
396                 tag_ids.extend(node['node_tag_ids'])
397                 nodes_dict[node['node_id']] = node
398             sites = self.get_sites({'site_id': site_ids})
399             interfaces = self.get_interfaces({'interface_id':interface_ids})
400             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
401             pl_initscripts = self.get_pl_initscripts()
402             # convert nodes to rspec nodes
403             rspec_nodes = []
404             for node in nodes:
405                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
406                 rspec_nodes.append(rspec_node)
407             rspec.version.add_nodes(rspec_nodes)
408
409             # add links
410             links = self.get_links(sites, nodes_dict, interfaces)        
411             rspec.version.add_links(links)
412
413         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
414            leases = self.get_leases()
415            rspec.version.add_leases(leases)
416
417         return rspec.toxml()
418
419     def describe(self, urns, version=None, options={}):
420         version_manager = VersionManager()
421         version = version_manager.get_version(version)
422         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
423         rspec = RSpec(version=rspec_version, user_options=options)
424
425         # get slivers
426         geni_slivers = []
427         slivers = self.get_slivers(urns, options)
428         if slivers:
429             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
430         else:
431             rspec_expires = datetime_to_string(utcparse(time.time()))      
432         rspec.xml.set('expires',  rspec_expires)
433
434         # lookup the sliver allocations
435         geni_urn = urns[0]
436         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
437         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
438         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
439         sliver_allocation_dict = {}
440         for sliver_allocation in sliver_allocations:
441             geni_urn = sliver_allocation.slice_urn
442             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
443       
444         if not options.get('list_leases') or options['list_leases'] != 'leases':
445             # add slivers
446             site_ids = []
447             interface_ids = []
448             tag_ids = []
449             nodes_dict = {}
450             for sliver in slivers:
451                 site_ids.append(sliver['site_id'])
452                 interface_ids.extend(sliver['interface_ids'])
453                 tag_ids.extend(sliver['node_tag_ids'])
454                 nodes_dict[sliver['node_id']] = sliver
455             sites = self.get_sites({'site_id': site_ids})
456             interfaces = self.get_interfaces({'interface_id':interface_ids})
457             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
458             pl_initscripts = self.get_pl_initscripts()
459             rspec_nodes = []
460             for sliver in slivers:
461                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
462                     continue
463                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
464                                                        pl_initscripts, sliver_allocation_dict)
465                 # manifest node element shouldn't contain available attribute
466                 rspec_node.pop('available')
467                 rspec_nodes.append(rspec_node) 
468                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
469                 geni_slivers.append(geni_sliver)
470             rspec.version.add_nodes(rspec_nodes)
471
472             # add sliver defaults
473             #default_sliver = slivers.get(None, [])
474             #if default_sliver:
475             #    default_sliver_attribs = default_sliver.get('tags', [])
476             #    for attrib in default_sliver_attribs:
477             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
478
479             # add links 
480             links = self.get_links(sites, nodes_dict, interfaces)        
481             rspec.version.add_links(links)
482
483         if not options.get('list_leases') or options['list_leases'] != 'resources':
484             if slivers:
485                 leases = self.get_leases(slivers[0])
486                 rspec.version.add_leases(leases)
487
488                
489         return {'geni_urn': geni_urn, 
490                 'geni_rspec': rspec.toxml(),
491                 'geni_slivers': geni_slivers}