287669481adf92ebb64df105bfcbcebd67666342
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29 class PlAggregate:
30
31     def __init__(self, driver):
32         self.driver = driver
33
34     def get_nodes(self, options=None):
35         if options is None: options={}
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter=None):
45         if filter is None: filter={}
46         sites = {}
47         for site in self.driver.shell.GetSites(filter):
48             sites[site['site_id']] = site
49         return sites
50
51     def get_interfaces(self, filter=None):
52         if filter is None: filter={}
53         interfaces = {}
54         for interface in self.driver.shell.GetInterfaces(filter):
55             iface = Interface()
56             if interface['bwlimit']:
57                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
58             interfaces[interface['interface_id']] = interface
59         return interfaces
60
61     def get_links(self, sites, nodes, interfaces):
62         
63         topology = Topology() 
64         links = []
65         for (site_id1, site_id2) in topology:
66             site_id1 = int(site_id1)
67             site_id2 = int(site_id2)
68             link = Link()
69             if not site_id1 in sites or site_id2 not in sites:
70                 continue
71             site1 = sites[site_id1]
72             site2 = sites[site_id2]
73             # get hrns
74             site1_hrn = self.driver.hrn + '.' + site1['login_base']
75             site2_hrn = self.driver.hrn + '.' + site2['login_base']
76
77             for s1_node_id in site1['node_ids']:
78                 for s2_node_id in site2['node_ids']:
79                     if s1_node_id not in nodes or s2_node_id not in nodes:
80                         continue
81                     node1 = nodes[s1_node_id]
82                     node2 = nodes[s2_node_id]
83                     # set interfaces
84                     # just get first interface of the first node
85                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
86                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
87                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
88                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
89
90                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
91                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
92
93                     # set link
94                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
95                     link['interface1'] = if1
96                     link['interface2'] = if2
97                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
98                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
99                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
100                     links.append(link)
101
102         return links
103
104     def get_node_tags(self, filter=None):
105         if filter is None: filter={}
106         node_tags = {}
107         for node_tag in self.driver.shell.GetNodeTags(filter):
108             node_tags[node_tag['node_tag_id']] = node_tag
109         return node_tags
110
111     def get_pl_initscripts(self, filter=None):
112         if filter is None: filter={}
113         pl_initscripts = {}
114         filter.update({'enabled': True})
115         for initscript in self.driver.shell.GetInitScripts(filter):
116             pl_initscripts[initscript['initscript_id']] = initscript
117         return pl_initscripts
118
119     def get_slivers(self, urns, options=None):
120         if options is None: options={}
121         names = set()
122         slice_ids = set()
123         node_ids = []
124         slice_hrn = None
125         for urn in urns:
126             xrn = PlXrn(xrn=urn)
127             if xrn.type == 'sliver':
128                  # id: slice_id-node_id
129                 try:
130                     sliver_id_parts = xrn.get_sliver_id_parts()
131                     slice_id = int(sliver_id_parts[0]) 
132                     node_id = int(sliver_id_parts[1])
133                     slice_ids.add(slice_id) 
134                     node_ids.append(node_id)
135                 except ValueError:
136                     pass 
137             else:  
138                 slice_hrn = xrn.get_hrn()
139
140         filter = {}
141         filter['peer_id'] = None
142         if slice_ids:
143             filter['slice_id'] = list(slice_ids)
144         # get all slices
145         all_slices = self.driver.shell.GetSlices(filter, ['slice_id', 'name', 'hrn', 'person_ids', 'node_ids', 'slice_tag_ids', 'expires'])
146         if slice_hrn:
147             slices = [slice for slice in all_slices if slice['hrn'] == slice_hrn]
148         else:
149             slices = all_slices
150       
151         if not slices:
152             return []
153         slice = slices[0]     
154         slice['hrn'] = slice_hrn   
155
156         # get sliver users
157         persons = []
158         person_ids = []
159         for slice in slices:
160             person_ids.extend(slice['person_ids'])
161         if person_ids:
162             persons = self.driver.shell.GetPersons(person_ids)
163                  
164         # get user keys
165         keys = {}
166         key_ids = []
167         for person in persons:
168             key_ids.extend(person['key_ids'])
169         
170         if key_ids:
171             key_list = self.driver.shell.GetKeys(key_ids)
172             for key in key_list:
173                 keys[key['key_id']] = key  
174
175         # construct user key info
176         users = []
177         for person in persons:
178             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
179             user = {
180                 'login': slice['name'], 
181                 'user_urn': person_urn,
182                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
183             }
184             users.append(user)
185
186         if node_ids:
187             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
188             slice['node_ids'] = node_ids
189         tags_dict = self.get_slice_tags(slice)
190         nodes_dict = self.get_slice_nodes(slice, options)
191         slivers = []
192         for node in nodes_dict.values():
193             node.update(slice) 
194             node['tags'] = tags_dict[node['node_id']]
195             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
196             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
197             node['urn'] = node['sliver_id'] 
198             node['services_user'] = users
199             slivers.append(node)
200         return slivers
201
202     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
203         if pl_initscripts is None: pl_initscripts=[]
204         if options is None: options={}
205         rspec_node = NodeElement()
206         # xxx how to retrieve site['login_base']
207         site=sites[node['site_id']]
208         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
209         rspec_node['component_name'] = node['hostname']
210         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
211         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
212         # do not include boot state (<available> element) in the manifest rspec
213         rspec_node['boot_state'] = node['boot_state']
214         if node['boot_state'] == 'boot': 
215             rspec_node['available'] = 'true'
216         else:
217             rspec_node['available'] = 'false'
218
219         #distinguish between Shared and Reservable nodes
220         if node['node_type'] == 'reservable':
221             rspec_node['exclusive'] = 'true'
222         else:
223             rspec_node['exclusive'] = 'false'
224
225         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
226                                         HardwareType({'name': 'pc'})]
227         # only doing this because protogeni rspec needs
228         # to advertise available initscripts
229         rspec_node['pl_initscripts'] = pl_initscripts.values()
230         # add site/interface info to nodes.
231         # assumes that sites, interfaces and tags have already been prepared.
232         if site['longitude'] and site['latitude']:
233             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
234             rspec_node['location'] = location
235         # Granularity
236         granularity = Granularity({'grain': grain})
237         rspec_node['granularity'] = granularity
238         rspec_node['interfaces'] = []
239         if_count=0
240         for if_id in node['interface_ids']:
241             interface = Interface(interfaces[if_id])
242             interface['ipv4'] = interface['ip']
243             interface['component_id'] = PlXrn(auth=self.driver.hrn,
244                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
245             # interfaces in the manifest need a client id
246             if slice:
247                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
248             rspec_node['interfaces'].append(interface)
249             if_count+=1
250         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
251         rspec_node['tags'] = tags
252         return rspec_node
253
254     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
255                              pl_initscripts, sliver_allocations):
256         # get the granularity in second for the reservation system
257         grain = self.driver.shell.GetLeaseGranularity()
258         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
259         # xxx how to retrieve site['login_base']
260         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
261         # remove interfaces from manifest
262         rspec_node['interfaces'] = []
263         # add sliver info
264         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
265                          'name': sliver['name'],
266                          'type': 'plab-vserver',
267                          'tags': []})
268         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
269         if sliver['urn'] in sliver_allocations:
270             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
271             if sliver_allocations[sliver['urn']].component_id:
272                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
273         rspec_node['slivers'] = [rspec_sliver]
274
275         # slivers always provide the ssh service
276         login = Login({'authentication': 'ssh-keys', 
277                        'hostname': sliver['hostname'], 
278                        'port':'22', 
279                        'username': sliver['name'],
280                        'login': sliver['name']
281                       })
282         service = ServicesElement({'login': login,
283                             'services_user': sliver['services_user']})
284         rspec_node['services'] = [service]    
285         return rspec_node      
286
287     def get_slice_tags(self, slice):
288         slice_tag_ids = []
289         slice_tag_ids.extend(slice['slice_tag_ids'])
290         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
291         # sorted by node_id
292         tags_dict = defaultdict(list)
293         for tag in tags:
294             tags_dict[tag['node_id']] = tag
295         return tags_dict
296
297     def get_slice_nodes(self, slice, options=None):
298         if options is None: options={}
299         nodes_dict = {}
300         filter = {'peer_id': None}
301         tags_filter = {}
302         if slice and slice.get('node_ids'):
303             filter['node_id'] = slice['node_ids']
304         else:
305             # there are no nodes to look up
306             return nodes_dict
307         tags_filter=filter.copy()
308         geni_available = options.get('geni_available')
309         if geni_available == True:
310             filter['boot_state'] = 'boot'
311         nodes = self.driver.shell.GetNodes(filter)
312         for node in nodes:
313             nodes_dict[node['node_id']] = node
314         return nodes_dict
315
316     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
317         if sliver_allocations is None: sliver_allocations={}
318         if rspec_node['sliver_id'] in sliver_allocations:
319             # set sliver allocation and operational status
320             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
321             if sliver_allocation:
322                 allocation_status = sliver_allocation.allocation_state
323                 if allocation_status == 'geni_allocated':
324                     op_status =  'geni_pending_allocation'
325                 elif allocation_status == 'geni_provisioned':
326                     if rspec_node['boot_state'] == 'boot':
327                         op_status = 'geni_ready'
328                     else:
329                         op_status = 'geni_failed'
330                 else:
331                     op_status = 'geni_unknown'
332             else:
333                 allocation_status = 'geni_unallocated'    
334         else:
335             allocation_status = 'geni_unallocated'
336             op_status = 'geni_failed'
337         # required fields
338         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
339                        'geni_expires': rspec_node['expires'],
340                        'geni_allocation_status' : allocation_status,
341                        'geni_operational_status': op_status,
342                        'geni_error': '',
343                        }
344         return geni_sliver        
345
346     def get_leases(self, slice=None, options=None):
347         if options is None: options={}
348         
349         now = int(time.time())
350         filter={}
351         filter.update({'clip':now})
352         if slice:
353            filter.update({'name':slice['name']})
354         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
355         leases = self.driver.shell.GetLeases(filter)
356         grain = self.driver.shell.GetLeaseGranularity()
357
358         site_ids = []
359         for lease in leases:
360             site_ids.append(lease['site_id'])
361
362         # get sites
363         sites_dict  = self.get_sites({'site_id': site_ids}) 
364   
365         rspec_leases = []
366         for lease in leases:
367
368             rspec_lease = Lease()
369             
370             # xxx how to retrieve site['login_base']
371             site_id=lease['site_id']
372             site=sites_dict[site_id]
373
374             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
375             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
376             slice_urn = hrn_to_urn(slice_hrn, 'slice')
377             rspec_lease['slice_id'] = slice_urn
378             rspec_lease['start_time'] = lease['t_from']
379             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
380             rspec_leases.append(rspec_lease)
381         return rspec_leases
382
383     
384     def list_resources(self, version = None, options=None):
385         if options is None: options={}
386
387         version_manager = VersionManager()
388         version = version_manager.get_version(version)
389         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
390         rspec = RSpec(version=rspec_version, user_options=options)
391        
392         if not options.get('list_leases') or options['list_leases'] != 'leases':
393             # get nodes
394             nodes  = self.get_nodes(options)
395             site_ids = []
396             interface_ids = []
397             tag_ids = []
398             nodes_dict = {}
399             for node in nodes:
400                 site_ids.append(node['site_id'])
401                 interface_ids.extend(node['interface_ids'])
402                 tag_ids.extend(node['node_tag_ids'])
403                 nodes_dict[node['node_id']] = node
404             sites = self.get_sites({'site_id': site_ids})
405             interfaces = self.get_interfaces({'interface_id':interface_ids})
406             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
407             pl_initscripts = self.get_pl_initscripts()
408             # convert nodes to rspec nodes
409             rspec_nodes = []
410             for node in nodes:
411                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
412                 rspec_nodes.append(rspec_node)
413             rspec.version.add_nodes(rspec_nodes)
414
415             # add links
416             links = self.get_links(sites, nodes_dict, interfaces)        
417             rspec.version.add_links(links)
418
419         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
420            leases = self.get_leases()
421            rspec.version.add_leases(leases)
422
423         return rspec.toxml()
424
425     def describe(self, urns, version=None, options=None):
426         if options is None: options={}
427         version_manager = VersionManager()
428         version = version_manager.get_version(version)
429         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
430         rspec = RSpec(version=rspec_version, user_options=options)
431
432         # get slivers
433         geni_slivers = []
434         slivers = self.get_slivers(urns, options)
435         if slivers:
436             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
437         else:
438             rspec_expires = datetime_to_string(utcparse(time.time()))      
439         rspec.xml.set('expires',  rspec_expires)
440
441         # lookup the sliver allocations
442         geni_urn = urns[0]
443         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
444         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
445         sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
446         sliver_allocation_dict = {}
447         for sliver_allocation in sliver_allocations:
448             geni_urn = sliver_allocation.slice_urn
449             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
450       
451         if not options.get('list_leases') or options['list_leases'] != 'leases':
452             # add slivers
453             site_ids = []
454             interface_ids = []
455             tag_ids = []
456             nodes_dict = {}
457             for sliver in slivers:
458                 site_ids.append(sliver['site_id'])
459                 interface_ids.extend(sliver['interface_ids'])
460                 tag_ids.extend(sliver['node_tag_ids'])
461                 nodes_dict[sliver['node_id']] = sliver
462             sites = self.get_sites({'site_id': site_ids})
463             interfaces = self.get_interfaces({'interface_id':interface_ids})
464             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
465             pl_initscripts = self.get_pl_initscripts()
466             rspec_nodes = []
467             for sliver in slivers:
468                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
469                     continue
470                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
471                                                        pl_initscripts, sliver_allocation_dict)
472                 # manifest node element shouldn't contain available attribute
473                 rspec_node.pop('available')
474                 rspec_nodes.append(rspec_node) 
475                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
476                 geni_slivers.append(geni_sliver)
477             rspec.version.add_nodes(rspec_nodes)
478
479             # add sliver defaults
480             #default_sliver = slivers.get(None, [])
481             #if default_sliver:
482             #    default_sliver_attribs = default_sliver.get('tags', [])
483             #    for attrib in default_sliver_attribs:
484             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
485
486             # add links 
487             links = self.get_links(sites, nodes_dict, interfaces)        
488             rspec.version.add_links(links)
489
490         if not options.get('list_leases') or options['list_leases'] != 'resources':
491             if slivers:
492                 leases = self.get_leases(slivers[0])
493                 rspec.version.add_leases(leases)
494
495                
496         return {'geni_urn': geni_urn, 
497                 'geni_rspec': rspec.toxml(),
498                 'geni_slivers': geni_slivers}