6fabf79ac6275992ac6611a2cd0d3654d7d11d5d
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29 class PlAggregate:
30
31     def __init__(self, driver):
32         self.driver = driver
33
34     def get_nodes(self, options=None):
35         if options is None: options={}
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter=None):
45         if filter is None: filter={}
46         sites = {}
47         for site in self.driver.shell.GetSites(filter):
48             sites[site['site_id']] = site
49         return sites
50
51     def get_interfaces(self, filter=None):
52         if filter is None: filter={}
53         interfaces = {}
54         for interface in self.driver.shell.GetInterfaces(filter):
55             iface = Interface()
56             if interface['bwlimit']:
57                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
58             interfaces[interface['interface_id']] = interface
59         return interfaces
60
61     def get_links(self, sites, nodes, interfaces):
62         
63         topology = Topology() 
64         links = []
65         for (site_id1, site_id2) in topology:
66             site_id1 = int(site_id1)
67             site_id2 = int(site_id2)
68             link = Link()
69             if not site_id1 in sites or site_id2 not in sites:
70                 continue
71             site1 = sites[site_id1]
72             site2 = sites[site_id2]
73             # get hrns
74             site1_hrn = self.driver.hrn + '.' + site1['login_base']
75             site2_hrn = self.driver.hrn + '.' + site2['login_base']
76
77             for s1_node_id in site1['node_ids']:
78                 for s2_node_id in site2['node_ids']:
79                     if s1_node_id not in nodes or s2_node_id not in nodes:
80                         continue
81                     node1 = nodes[s1_node_id]
82                     node2 = nodes[s2_node_id]
83                     # set interfaces
84                     # just get first interface of the first node
85                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
86                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
87                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
88                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
89
90                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
91                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
92
93                     # set link
94                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
95                     link['interface1'] = if1
96                     link['interface2'] = if2
97                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
98                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
99                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
100                     links.append(link)
101
102         return links
103
104     def get_node_tags(self, filter=None):
105         if filter is None: filter={}
106         node_tags = {}
107         for node_tag in self.driver.shell.GetNodeTags(filter):
108             node_tags[node_tag['node_tag_id']] = node_tag
109         return node_tags
110
111     def get_pl_initscripts(self, filter=None):
112         if filter is None: filter={}
113         pl_initscripts = {}
114         filter.update({'enabled': True})
115         for initscript in self.driver.shell.GetInitScripts(filter):
116             pl_initscripts[initscript['initscript_id']] = initscript
117         return pl_initscripts
118
119     def get_slivers(self, urns, options=None):
120         if options is None: options={}
121         names = set()
122         slice_ids = set()
123         node_ids = []
124         slice_hrn = None
125         for urn in urns:
126             xrn = PlXrn(xrn=urn)
127             if xrn.type == 'sliver':
128                  # id: slice_id-node_id
129                 try:
130                     sliver_id_parts = xrn.get_sliver_id_parts()
131                     slice_id = int(sliver_id_parts[0]) 
132                     node_id = int(sliver_id_parts[1])
133                     slice_ids.add(slice_id) 
134                     node_ids.append(node_id)
135                 except ValueError:
136                     pass 
137             else:  
138                 slice_hrn = xrn.get_hrn()
139
140         filter = {}
141         filter['peer_id'] = None
142         if slice_ids:
143             filter['slice_id'] = list(slice_ids)
144         # get all slices
145         fields = ['slice_id', 'name', 'hrn', 'person_ids', 'node_ids', 'slice_tag_ids', 'expires']
146         all_slices = self.driver.shell.GetSlices(filter, fields)
147         if slice_hrn:
148             slices = [slice for slice in all_slices if slice['hrn'] == slice_hrn]
149         else:
150             slices = all_slices
151       
152         if not slices:
153             if slice_hrn:
154                 logger.error("PlAggregate.get_slivers : no slice found with hrn {}".format(slice_hrn))
155             else:
156                 logger.error("PlAggregate.get_slivers : no sliver found with urns {}".format(urns))
157             return []
158         slice = slices[0]     
159         slice['hrn'] = slice_hrn   
160
161         # get sliver users
162         persons = []
163         person_ids = []
164         for slice in slices:
165             person_ids.extend(slice['person_ids'])
166         if person_ids:
167             persons = self.driver.shell.GetPersons(person_ids)
168                  
169         # get user keys
170         keys = {}
171         key_ids = []
172         for person in persons:
173             key_ids.extend(person['key_ids'])
174         
175         if key_ids:
176             key_list = self.driver.shell.GetKeys(key_ids)
177             for key in key_list:
178                 keys[key['key_id']] = key  
179
180         # construct user key info
181         users = []
182         for person in persons:
183             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
184             user = {
185                 'login': slice['name'], 
186                 'user_urn': person_urn,
187                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
188             }
189             users.append(user)
190
191         if node_ids:
192             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
193             slice['node_ids'] = node_ids
194         tags_dict = self.get_slice_tags(slice)
195         nodes_dict = self.get_slice_nodes(slice, options)
196         slivers = []
197         for node in nodes_dict.values():
198             node.update(slice) 
199             node['tags'] = tags_dict[node['node_id']]
200             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
201             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
202             node['urn'] = node['sliver_id'] 
203             node['services_user'] = users
204             slivers.append(node)
205         if not slivers:
206             logger.warning("PlAggregate.get_slivers : slice(s) found but with no sliver {}".format(urns))
207         return slivers
208
209     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
210         if pl_initscripts is None: pl_initscripts=[]
211         if options is None: options={}
212         rspec_node = NodeElement()
213         # xxx how to retrieve site['login_base']
214         site=sites[node['site_id']]
215         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
216         rspec_node['component_name'] = node['hostname']
217         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
218         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
219         # do not include boot state (<available> element) in the manifest rspec
220         rspec_node['boot_state'] = node['boot_state']
221         if node['boot_state'] == 'boot': 
222             rspec_node['available'] = 'true'
223         else:
224             rspec_node['available'] = 'false'
225
226         #distinguish between Shared and Reservable nodes
227         if node['node_type'] == 'reservable':
228             rspec_node['exclusive'] = 'true'
229         else:
230             rspec_node['exclusive'] = 'false'
231
232         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
233                                         HardwareType({'name': 'pc'})]
234         # only doing this because protogeni rspec needs
235         # to advertise available initscripts
236         rspec_node['pl_initscripts'] = pl_initscripts.values()
237         # add site/interface info to nodes.
238         # assumes that sites, interfaces and tags have already been prepared.
239         if site['longitude'] and site['latitude']:
240             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
241             rspec_node['location'] = location
242         # Granularity
243         granularity = Granularity({'grain': grain})
244         rspec_node['granularity'] = granularity
245         rspec_node['interfaces'] = []
246         if_count=0
247         for if_id in node['interface_ids']:
248             interface = Interface(interfaces[if_id])
249             interface['ipv4'] = interface['ip']
250             interface['component_id'] = PlXrn(auth=self.driver.hrn,
251                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
252             # interfaces in the manifest need a client id
253             if slice:
254                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
255             rspec_node['interfaces'].append(interface)
256             if_count+=1
257         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
258         rspec_node['tags'] = tags
259         return rspec_node
260
261     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
262                              pl_initscripts, sliver_allocations):
263         # get the granularity in second for the reservation system
264         grain = self.driver.shell.GetLeaseGranularity()
265         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
266         # xxx how to retrieve site['login_base']
267         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
268         # remove interfaces from manifest
269         rspec_node['interfaces'] = []
270         # add sliver info
271         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
272                          'name': sliver['name'],
273                          'type': 'plab-vserver',
274                          'tags': []})
275         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
276         if sliver['urn'] in sliver_allocations:
277             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
278             if sliver_allocations[sliver['urn']].component_id:
279                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
280         rspec_node['slivers'] = [rspec_sliver]
281
282         # slivers always provide the ssh service
283         login = Login({'authentication': 'ssh-keys', 
284                        'hostname': sliver['hostname'], 
285                        'port':'22', 
286                        'username': sliver['name'],
287                        'login': sliver['name']
288                       })
289         service = ServicesElement({'login': login,
290                             'services_user': sliver['services_user']})
291         rspec_node['services'] = [service]    
292         return rspec_node      
293
294     def get_slice_tags(self, slice):
295         slice_tag_ids = []
296         slice_tag_ids.extend(slice['slice_tag_ids'])
297         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
298         # sorted by node_id
299         tags_dict = defaultdict(list)
300         for tag in tags:
301             tags_dict[tag['node_id']] = tag
302         return tags_dict
303
304     def get_slice_nodes(self, slice, options=None):
305         if options is None: options={}
306         nodes_dict = {}
307         filter = {'peer_id': None}
308         tags_filter = {}
309         if slice and slice.get('node_ids'):
310             filter['node_id'] = slice['node_ids']
311         else:
312             # there are no nodes to look up
313             return nodes_dict
314         tags_filter=filter.copy()
315         geni_available = options.get('geni_available')
316         if geni_available == True:
317             filter['boot_state'] = 'boot'
318         nodes = self.driver.shell.GetNodes(filter)
319         for node in nodes:
320             nodes_dict[node['node_id']] = node
321         return nodes_dict
322
323     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
324         if sliver_allocations is None: sliver_allocations={}
325         if rspec_node['sliver_id'] in sliver_allocations:
326             # set sliver allocation and operational status
327             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
328             if sliver_allocation:
329                 allocation_status = sliver_allocation.allocation_state
330                 if allocation_status == 'geni_allocated':
331                     op_status =  'geni_pending_allocation'
332                 elif allocation_status == 'geni_provisioned':
333                     if rspec_node['boot_state'] == 'boot':
334                         op_status = 'geni_ready'
335                     else:
336                         op_status = 'geni_failed'
337                 else:
338                     op_status = 'geni_unknown'
339             else:
340                 allocation_status = 'geni_unallocated'    
341         else:
342             allocation_status = 'geni_unallocated'
343             op_status = 'geni_failed'
344         # required fields
345         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
346                        'geni_expires': rspec_node['expires'],
347                        'geni_allocation_status' : allocation_status,
348                        'geni_operational_status': op_status,
349                        'geni_error': '',
350                        }
351         return geni_sliver        
352
353     def get_leases(self, slice=None, options=None):
354         if options is None: options={}
355         
356         now = int(time.time())
357         filter={}
358         filter.update({'clip':now})
359         if slice:
360            filter.update({'name':slice['name']})
361         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
362         leases = self.driver.shell.GetLeases(filter)
363         grain = self.driver.shell.GetLeaseGranularity()
364
365         site_ids = []
366         for lease in leases:
367             site_ids.append(lease['site_id'])
368
369         # get sites
370         sites_dict  = self.get_sites({'site_id': site_ids}) 
371   
372         rspec_leases = []
373         for lease in leases:
374
375             rspec_lease = Lease()
376             
377             # xxx how to retrieve site['login_base']
378             site_id=lease['site_id']
379             site=sites_dict[site_id]
380
381             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
382             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
383             slice_urn = hrn_to_urn(slice_hrn, 'slice')
384             rspec_lease['slice_id'] = slice_urn
385             rspec_lease['start_time'] = lease['t_from']
386             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
387             rspec_leases.append(rspec_lease)
388         return rspec_leases
389
390     
391     def list_resources(self, version = None, options=None):
392         if options is None: options={}
393
394         version_manager = VersionManager()
395         version = version_manager.get_version(version)
396         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
397         rspec = RSpec(version=rspec_version, user_options=options)
398        
399         if not options.get('list_leases') or options['list_leases'] != 'leases':
400             # get nodes
401             nodes  = self.get_nodes(options)
402             site_ids = []
403             interface_ids = []
404             tag_ids = []
405             nodes_dict = {}
406             for node in nodes:
407                 site_ids.append(node['site_id'])
408                 interface_ids.extend(node['interface_ids'])
409                 tag_ids.extend(node['node_tag_ids'])
410                 nodes_dict[node['node_id']] = node
411             sites = self.get_sites({'site_id': site_ids})
412             interfaces = self.get_interfaces({'interface_id':interface_ids})
413             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
414             pl_initscripts = self.get_pl_initscripts()
415             # convert nodes to rspec nodes
416             rspec_nodes = []
417             for node in nodes:
418                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
419                 rspec_nodes.append(rspec_node)
420             rspec.version.add_nodes(rspec_nodes)
421
422             # add links
423             links = self.get_links(sites, nodes_dict, interfaces)        
424             rspec.version.add_links(links)
425
426         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
427            leases = self.get_leases()
428            rspec.version.add_leases(leases)
429
430         return rspec.toxml()
431
432     def describe(self, urns, version=None, options=None):
433         if options is None: options={}
434         version_manager = VersionManager()
435         version = version_manager.get_version(version)
436         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
437         rspec = RSpec(version=rspec_version, user_options=options)
438
439         # get slivers
440         geni_slivers = []
441         slivers = self.get_slivers(urns, options)
442         if slivers:
443             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
444         else:
445             rspec_expires = datetime_to_string(utcparse(time.time()))      
446         rspec.xml.set('expires',  rspec_expires)
447
448         # lookup the sliver allocations
449         geni_urn = urns[0]
450         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
451         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
452         sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
453         sliver_allocation_dict = {}
454         for sliver_allocation in sliver_allocations:
455             geni_urn = sliver_allocation.slice_urn
456             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
457       
458         if not options.get('list_leases') or options['list_leases'] != 'leases':
459             # add slivers
460             site_ids = []
461             interface_ids = []
462             tag_ids = []
463             nodes_dict = {}
464             for sliver in slivers:
465                 site_ids.append(sliver['site_id'])
466                 interface_ids.extend(sliver['interface_ids'])
467                 tag_ids.extend(sliver['node_tag_ids'])
468                 nodes_dict[sliver['node_id']] = sliver
469             sites = self.get_sites({'site_id': site_ids})
470             interfaces = self.get_interfaces({'interface_id':interface_ids})
471             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
472             pl_initscripts = self.get_pl_initscripts()
473             rspec_nodes = []
474             for sliver in slivers:
475                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
476                     continue
477                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
478                                                        pl_initscripts, sliver_allocation_dict)
479                 # manifest node element shouldn't contain available attribute
480                 rspec_node.pop('available')
481                 rspec_nodes.append(rspec_node) 
482                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
483                 geni_slivers.append(geni_sliver)
484             rspec.version.add_nodes(rspec_nodes)
485
486             # add sliver defaults
487             #default_sliver = slivers.get(None, [])
488             #if default_sliver:
489             #    default_sliver_attribs = default_sliver.get('tags', [])
490             #    for attrib in default_sliver_attribs:
491             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
492
493             # add links 
494             links = self.get_links(sites, nodes_dict, interfaces)        
495             rspec.version.add_links(links)
496
497         if not options.get('list_leases') or options['list_leases'] != 'resources':
498             if slivers:
499                 leases = self.get_leases(slivers[0])
500                 rspec.version.add_leases(leases)
501
502                
503         return {'geni_urn': geni_urn, 
504                 'geni_rspec': rspec.toxml(),
505                 'geni_slivers': geni_slivers}