Better mgt of external slices/users/sites + fixes
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, xrn_to_ext_slicename, top_auth
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 try:
124                     sliver_id_parts = xrn.get_sliver_id_parts()
125                     slice_id = int(sliver_id_parts[0]) 
126                     node_id = int(sliver_id_parts[1])
127                     slice_ids.add(slice_id) 
128                     node_ids.append(node_id)
129                 except ValueError:
130                     pass 
131             else:  
132                 slice_hrn = xrn.get_hrn()
133                 top_auth_hrn = top_auth(slice_hrn)
134                 if top_auth_hrn == self.driver.hrn:
135                     slice_name = hrn_to_pl_slicename(slice_hrn)
136                 else:
137                     slice_name = xrn_to_ext_slicename(slice_hrn) 
138                 names.add(slice_name)
139
140         filter = {}
141         if names:
142             filter['name'] = list(names)
143         if slice_ids:
144             filter['slice_id'] = list(slice_ids)
145         # get slices
146         slices = self.driver.shell.GetSlices(filter)
147         if not slices:
148             return []
149         slice = slices[0]     
150         slice['hrn'] = slice_hrn   
151
152         # get sliver users
153         persons = []
154         person_ids = []
155         for slice in slices:
156             person_ids.extend(slice['person_ids'])
157         if person_ids:
158             persons = self.driver.shell.GetPersons(person_ids)
159                  
160         # get user keys
161         keys = {}
162         key_ids = []
163         for person in persons:
164             key_ids.extend(person['key_ids'])
165         
166         if key_ids:
167             key_list = self.driver.shell.GetKeys(key_ids)
168             for key in key_list:
169                 keys[key['key_id']] = key  
170
171         # construct user key info
172         users = []
173         for person in persons:
174             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
175             user = {
176                 'login': slice['name'], 
177                 'user_urn': person_urn,
178                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
179             }
180             users.append(user)
181
182         if node_ids:
183             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
184             slice['node_ids'] = node_ids
185         tags_dict = self.get_slice_tags(slice)
186         nodes_dict = self.get_slice_nodes(slice, options)
187         slivers = []
188         for node in nodes_dict.values():
189             node.update(slice) 
190             node['tags'] = tags_dict[node['node_id']]
191             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
192             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
193             node['urn'] = node['sliver_id'] 
194             node['services_user'] = users
195             slivers.append(node)
196         return slivers
197
198     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
199         rspec_node = NodeElement()
200         # xxx how to retrieve site['login_base']
201         site=sites[node['site_id']]
202         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
203         rspec_node['component_name'] = node['hostname']
204         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
205         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
206         # do not include boot state (<available> element) in the manifest rspec
207         rspec_node['boot_state'] = node['boot_state']
208         if node['boot_state'] == 'boot': 
209             rspec_node['available'] = 'true'
210         else:
211             rspec_node['available'] = 'false'
212
213         #distinguish between Shared and Reservable nodes
214         if node['node_type'] == 'reservable':
215             rspec_node['exclusive'] = 'true'
216         else:
217             rspec_node['exclusive'] = 'false'
218
219         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
220                                         HardwareType({'name': 'pc'})]
221         # only doing this because protogeni rspec needs
222         # to advertise available initscripts
223         rspec_node['pl_initscripts'] = pl_initscripts.values()
224         # add site/interface info to nodes.
225         # assumes that sites, interfaces and tags have already been prepared.
226         if site['longitude'] and site['latitude']:
227             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
228             rspec_node['location'] = location
229         # Granularity
230         granularity = Granularity({'grain': grain})
231         rspec_node['granularity'] = granularity
232         rspec_node['interfaces'] = []
233         if_count=0
234         for if_id in node['interface_ids']:
235             interface = Interface(interfaces[if_id])
236             interface['ipv4'] = interface['ip']
237             interface['component_id'] = PlXrn(auth=self.driver.hrn,
238                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
239             # interfaces in the manifest need a client id
240             if slice:
241                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
242             rspec_node['interfaces'].append(interface)
243             if_count+=1
244         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
245         rspec_node['tags'] = tags
246         return rspec_node
247
248     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
249                              pl_initscripts, sliver_allocations):
250         # get the granularity in second for the reservation system
251         grain = self.driver.shell.GetLeaseGranularity()
252         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
253         # xxx how to retrieve site['login_base']
254         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
255         # remove interfaces from manifest
256         rspec_node['interfaces'] = []
257         # add sliver info
258         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
259                          'name': sliver['name'],
260                          'type': 'plab-vserver',
261                          'tags': []})
262         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
263         if sliver['urn'] in sliver_allocations:
264             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
265             if sliver_allocations[sliver['urn']].component_id:
266                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
267         rspec_node['slivers'] = [rspec_sliver]
268
269         # slivers always provide the ssh service
270         login = Login({'authentication': 'ssh-keys', 
271                        'hostname': sliver['hostname'], 
272                        'port':'22', 
273                        'username': sliver['name'],
274                        'login': sliver['name']
275                       })
276         service = ServicesElement({'login': login,
277                             'services_user': sliver['services_user']})
278         rspec_node['services'] = [service]    
279         return rspec_node      
280
281     def get_slice_tags(self, slice):
282         slice_tag_ids = []
283         slice_tag_ids.extend(slice['slice_tag_ids'])
284         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
285         # sorted by node_id
286         tags_dict = defaultdict(list)
287         for tag in tags:
288             tags_dict[tag['node_id']] = tag
289         return tags_dict
290
291     def get_slice_nodes(self, slice, options={}):
292         nodes_dict = {}
293         filter = {'peer_id': None}
294         tags_filter = {}
295         if slice and slice.get('node_ids'):
296             filter['node_id'] = slice['node_ids']
297         else:
298             # there are no nodes to look up
299             return nodes_dict
300         tags_filter=filter.copy()
301         geni_available = options.get('geni_available')
302         if geni_available == True:
303             filter['boot_state'] = 'boot'
304         nodes = self.driver.shell.GetNodes(filter)
305         for node in nodes:
306             nodes_dict[node['node_id']] = node
307         return nodes_dict
308
309     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
310         if rspec_node['sliver_id'] in sliver_allocations:
311             # set sliver allocation and operational status
312             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
313             if sliver_allocation:
314                 allocation_status = sliver_allocation.allocation_state
315                 if allocation_status == 'geni_allocated':
316                     op_status =  'geni_pending_allocation'
317                 elif allocation_status == 'geni_provisioned':
318                     if rspec_node['boot_state'] == 'boot':
319                         op_status = 'geni_ready'
320                     else:
321                         op_status = 'geni_failed'
322                 else:
323                     op_status = 'geni_unknown'
324             else:
325                 allocation_status = 'geni_unallocated'    
326         else:
327             allocation_status = 'geni_unallocated'
328             op_status = 'geni_failed'
329         # required fields
330         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
331                        'geni_expires': rspec_node['expires'],
332                        'geni_allocation_status' : allocation_status,
333                        'geni_operational_status': op_status,
334                        'geni_error': '',
335                        }
336         return geni_sliver        
337
338     def get_leases(self, slice=None, options={}):
339         
340         now = int(time.time())
341         filter={}
342         filter.update({'clip':now})
343         if slice:
344            filter.update({'name':slice['name']})
345         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
346         leases = self.driver.shell.GetLeases(filter)
347         grain = self.driver.shell.GetLeaseGranularity()
348
349         site_ids = []
350         for lease in leases:
351             site_ids.append(lease['site_id'])
352
353         # get sites
354         sites_dict  = self.get_sites({'site_id': site_ids}) 
355   
356         rspec_leases = []
357         for lease in leases:
358
359             rspec_lease = Lease()
360             
361             # xxx how to retrieve site['login_base']
362             site_id=lease['site_id']
363             site=sites_dict[site_id]
364
365             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
366             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
367             slice_urn = hrn_to_urn(slice_hrn, 'slice')
368             rspec_lease['slice_id'] = slice_urn
369             rspec_lease['start_time'] = lease['t_from']
370             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
371             rspec_leases.append(rspec_lease)
372         return rspec_leases
373
374     
375     def list_resources(self, version = None, options={}):
376
377         version_manager = VersionManager()
378         version = version_manager.get_version(version)
379         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
380         rspec = RSpec(version=rspec_version, user_options=options)
381        
382         if not options.get('list_leases') or options['list_leases'] != 'leases':
383             # get nodes
384             nodes  = self.get_nodes(options)
385             site_ids = []
386             interface_ids = []
387             tag_ids = []
388             nodes_dict = {}
389             for node in nodes:
390                 site_ids.append(node['site_id'])
391                 interface_ids.extend(node['interface_ids'])
392                 tag_ids.extend(node['node_tag_ids'])
393                 nodes_dict[node['node_id']] = node
394             sites = self.get_sites({'site_id': site_ids})
395             interfaces = self.get_interfaces({'interface_id':interface_ids})
396             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
397             pl_initscripts = self.get_pl_initscripts()
398             # convert nodes to rspec nodes
399             rspec_nodes = []
400             for node in nodes:
401                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
402                 rspec_nodes.append(rspec_node)
403             rspec.version.add_nodes(rspec_nodes)
404
405             # add links
406             links = self.get_links(sites, nodes_dict, interfaces)        
407             rspec.version.add_links(links)
408
409         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
410            leases = self.get_leases()
411            rspec.version.add_leases(leases)
412
413         return rspec.toxml()
414
415     def describe(self, urns, version=None, options={}):
416         version_manager = VersionManager()
417         version = version_manager.get_version(version)
418         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
419         rspec = RSpec(version=rspec_version, user_options=options)
420
421         # get slivers
422         geni_slivers = []
423         slivers = self.get_slivers(urns, options)
424         if slivers:
425             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
426         else:
427             rspec_expires = datetime_to_string(utcparse(time.time()))      
428         rspec.xml.set('expires',  rspec_expires)
429
430         # lookup the sliver allocations
431         geni_urn = urns[0]
432         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
433         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
434         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
435         sliver_allocation_dict = {}
436         for sliver_allocation in sliver_allocations:
437             geni_urn = sliver_allocation.slice_urn
438             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
439       
440         if not options.get('list_leases') or options['list_leases'] != 'leases':
441             # add slivers
442             site_ids = []
443             interface_ids = []
444             tag_ids = []
445             nodes_dict = {}
446             for sliver in slivers:
447                 site_ids.append(sliver['site_id'])
448                 interface_ids.extend(sliver['interface_ids'])
449                 tag_ids.extend(sliver['node_tag_ids'])
450                 nodes_dict[sliver['node_id']] = sliver
451             sites = self.get_sites({'site_id': site_ids})
452             interfaces = self.get_interfaces({'interface_id':interface_ids})
453             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
454             pl_initscripts = self.get_pl_initscripts()
455             rspec_nodes = []
456             for sliver in slivers:
457                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
458                     continue
459                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
460                                                        pl_initscripts, sliver_allocation_dict)
461                 # manifest node element shouldn't contain available attribute
462                 rspec_node.pop('available')
463                 rspec_nodes.append(rspec_node) 
464                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
465                 geni_slivers.append(geni_sliver)
466             rspec.version.add_nodes(rspec_nodes)
467
468             # add sliver defaults
469             #default_sliver = slivers.get(None, [])
470             #if default_sliver:
471             #    default_sliver_attribs = default_sliver.get('tags', [])
472             #    for attrib in default_sliver_attribs:
473             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
474
475             # add links 
476             links = self.get_links(sites, nodes_dict, interfaces)        
477             rspec.version.add_links(links)
478
479         if not options.get('list_leases') or options['list_leases'] != 'resources':
480             if slivers:
481                 leases = self.get_leases(slivers[0])
482                 rspec.version.add_leases(leases)
483
484                
485         return {'geni_urn': geni_urn, 
486                 'geni_rspec': rspec.toxml(),
487                 'geni_slivers': geni_slivers}