this change is designed for illustrative purposes only
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.elements.memory import Memory
20 from sfa.rspecs.version_manager import VersionManager
21
22 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase
23 from sfa.planetlab.vlink import get_tc_rate
24 from sfa.planetlab.topology import Topology
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options=None):
36         if options is None: options={}
37         filter = {'peer_id': None}
38         geni_available = options.get('geni_available')    
39         if geni_available == True:
40             filter['boot_state'] = 'boot'
41         nodes = self.driver.shell.GetNodes(filter)
42        
43         return nodes  
44  
45     def get_sites(self, filter=None):
46         if filter is None: filter={}
47         sites = {}
48         for site in self.driver.shell.GetSites(filter):
49             sites[site['site_id']] = site
50         return sites
51
52     def get_interfaces(self, filter=None):
53         if filter is None: filter={}
54         interfaces = {}
55         for interface in self.driver.shell.GetInterfaces(filter):
56             iface = Interface()
57             if interface['bwlimit']:
58                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
59             interfaces[interface['interface_id']] = interface
60         return interfaces
61
62     def get_links(self, sites, nodes, interfaces):
63         
64         topology = Topology() 
65         links = []
66         for (site_id1, site_id2) in topology:
67             site_id1 = int(site_id1)
68             site_id2 = int(site_id2)
69             link = Link()
70             if not site_id1 in sites or site_id2 not in sites:
71                 continue
72             site1 = sites[site_id1]
73             site2 = sites[site_id2]
74             # get hrns
75             site1_hrn = self.driver.hrn + '.' + site1['login_base']
76             site2_hrn = self.driver.hrn + '.' + site2['login_base']
77
78             for s1_node_id in site1['node_ids']:
79                 for s2_node_id in site2['node_ids']:
80                     if s1_node_id not in nodes or s2_node_id not in nodes:
81                         continue
82                     node1 = nodes[s1_node_id]
83                     node2 = nodes[s2_node_id]
84                     # set interfaces
85                     # just get first interface of the first node
86                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
87                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
88                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
89                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
90
91                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
92                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
93
94                     # set link
95                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
96                     link['interface1'] = if1
97                     link['interface2'] = if2
98                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
99                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
100                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
101                     links.append(link)
102
103         return links
104
105     def get_node_tags(self, filter=None):
106         if filter is None: filter={}
107         node_tags = {}
108         for node_tag in self.driver.shell.GetNodeTags(filter):
109             node_tags[node_tag['node_tag_id']] = node_tag
110         return node_tags
111
112     def get_pl_initscripts(self, filter=None):
113         if filter is None: filter={}
114         pl_initscripts = {}
115         filter.update({'enabled': True})
116         for initscript in self.driver.shell.GetInitScripts(filter):
117             pl_initscripts[initscript['initscript_id']] = initscript
118         return pl_initscripts
119
120     def get_slivers(self, urns, options=None):
121         if options is None: options={}
122         names = set()
123         slice_ids = set()
124         node_ids = []
125         slice_hrn = None
126         for urn in urns:
127             xrn = PlXrn(xrn=urn)
128             if xrn.type == 'sliver':
129                  # id: slice_id-node_id
130                 try:
131                     sliver_id_parts = xrn.get_sliver_id_parts()
132                     slice_id = int(sliver_id_parts[0]) 
133                     node_id = int(sliver_id_parts[1])
134                     slice_ids.add(slice_id) 
135                     node_ids.append(node_id)
136                 except ValueError:
137                     pass 
138             else:  
139                 slice_hrn = xrn.get_hrn()
140
141         filter = {}
142         filter['peer_id'] = None
143         if slice_ids:
144             filter['slice_id'] = list(slice_ids)
145         # get all slices
146         all_slices = self.driver.shell.GetSlices(filter, ['slice_id', 'name', 'hrn', 'person_ids', 'node_ids', 'slice_tag_ids', 'expires'])
147         if slice_hrn:
148             slices = [slice for slice in all_slices if slice['hrn'] == slice_hrn]
149         else:
150             slices = all_slices
151       
152         if not slices:
153             return []
154         slice = slices[0]     
155         slice['hrn'] = slice_hrn   
156
157         # get sliver users
158         persons = []
159         person_ids = []
160         for slice in slices:
161             person_ids.extend(slice['person_ids'])
162         if person_ids:
163             persons = self.driver.shell.GetPersons(person_ids)
164                  
165         # get user keys
166         keys = {}
167         key_ids = []
168         for person in persons:
169             key_ids.extend(person['key_ids'])
170         
171         if key_ids:
172             key_list = self.driver.shell.GetKeys(key_ids)
173             for key in key_list:
174                 keys[key['key_id']] = key  
175
176         # construct user key info
177         users = []
178         for person in persons:
179             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
180             user = {
181                 'login': slice['name'], 
182                 'user_urn': person_urn,
183                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
184             }
185             users.append(user)
186
187         if node_ids:
188             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
189             slice['node_ids'] = node_ids
190         tags_dict = self.get_slice_tags(slice)
191         nodes_dict = self.get_slice_nodes(slice, options)
192         slivers = []
193         for node in nodes_dict.values():
194             node.update(slice) 
195             node['tags'] = tags_dict[node['node_id']]
196             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
197             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
198             node['urn'] = node['sliver_id'] 
199             node['services_user'] = users
200             slivers.append(node)
201         return slivers
202
203     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
204         if pl_initscripts is None: pl_initscripts=[]
205         if options is None: options={}
206         rspec_node = NodeElement()
207         # xxx how to retrieve site['login_base']
208         site=sites[node['site_id']]
209         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
210         rspec_node['component_name'] = node['hostname']
211         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
212         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
213         # do not include boot state (<available> element) in the manifest rspec
214         rspec_node['boot_state'] = node['boot_state']
215         if node['boot_state'] == 'boot': 
216             rspec_node['available'] = 'true'
217         else:
218             rspec_node['available'] = 'false'
219
220         #distinguish between Shared and Reservable nodes
221         if node['node_type'] == 'reservable':
222             rspec_node['exclusive'] = 'true'
223         else:
224             rspec_node['exclusive'] = 'false'
225
226         # this mostly is a sample code, not designed for production but more for
227         # illustrative purposes, that gives an example of how you can extend the node's
228         # rspec to expose their amount of memory
229         # in this example I chose to always expose a <memory> tag
230         # also by default the exposed amount will be 4 Gb
231         # but this value can be overridden by setting a 'memory' tag on the node
232         memory_in_gb='4'
233         # let's scan the node tags to find for any 'memory' tag
234         for id,node_tag in node_tags.items():
235             if node_tag['tagname']=='memory':
236                 memory_in_gb = node_tag['value']
237                 # note that in this case it would make sense to delete the node_tag
238                 # so that the XML does not contain a duplicate information
239         # always add a 'memory' xml tag
240         # this will be rendered by pgv2node.py
241         rspec_node['memory'] = Memory({'Gb':memory_in_gb})
242         
243         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
244                                         HardwareType({'name': 'pc'})]
245         # only doing this because protogeni rspec needs
246         # to advertise available initscripts
247         rspec_node['pl_initscripts'] = pl_initscripts.values()
248         # add site/interface info to nodes.
249         # assumes that sites, interfaces and tags have already been prepared.
250         if site['longitude'] and site['latitude']:
251             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
252             rspec_node['location'] = location
253         # Granularity
254         granularity = Granularity({'grain': grain})
255         rspec_node['granularity'] = granularity
256         rspec_node['interfaces'] = []
257         if_count=0
258         for if_id in node['interface_ids']:
259             interface = Interface(interfaces[if_id])
260             interface['ipv4'] = interface['ip']
261             interface['component_id'] = PlXrn(auth=self.driver.hrn,
262                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
263             # interfaces in the manifest need a client id
264             if slice:
265                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
266             rspec_node['interfaces'].append(interface)
267             if_count+=1
268         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
269         rspec_node['tags'] = tags
270         return rspec_node
271
272     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
273                              pl_initscripts, sliver_allocations):
274         # get the granularity in second for the reservation system
275         grain = self.driver.shell.GetLeaseGranularity()
276         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
277         # xxx how to retrieve site['login_base']
278         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
279         # remove interfaces from manifest
280         rspec_node['interfaces'] = []
281         # add sliver info
282         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
283                          'name': sliver['name'],
284                          'type': 'plab-vserver',
285                          'tags': []})
286         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
287         if sliver['urn'] in sliver_allocations:
288             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
289             if sliver_allocations[sliver['urn']].component_id:
290                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
291         rspec_node['slivers'] = [rspec_sliver]
292
293         # slivers always provide the ssh service
294         login = Login({'authentication': 'ssh-keys', 
295                        'hostname': sliver['hostname'], 
296                        'port':'22', 
297                        'username': sliver['name'],
298                        'login': sliver['name']
299                       })
300         service = ServicesElement({'login': login,
301                             'services_user': sliver['services_user']})
302         rspec_node['services'] = [service]    
303         return rspec_node      
304
305     def get_slice_tags(self, slice):
306         slice_tag_ids = []
307         slice_tag_ids.extend(slice['slice_tag_ids'])
308         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
309         # sorted by node_id
310         tags_dict = defaultdict(list)
311         for tag in tags:
312             tags_dict[tag['node_id']] = tag
313         return tags_dict
314
315     def get_slice_nodes(self, slice, options=None):
316         if options is None: options={}
317         nodes_dict = {}
318         filter = {'peer_id': None}
319         tags_filter = {}
320         if slice and slice.get('node_ids'):
321             filter['node_id'] = slice['node_ids']
322         else:
323             # there are no nodes to look up
324             return nodes_dict
325         tags_filter=filter.copy()
326         geni_available = options.get('geni_available')
327         if geni_available == True:
328             filter['boot_state'] = 'boot'
329         nodes = self.driver.shell.GetNodes(filter)
330         for node in nodes:
331             nodes_dict[node['node_id']] = node
332         return nodes_dict
333
334     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
335         if sliver_allocations is None: sliver_allocations={}
336         if rspec_node['sliver_id'] in sliver_allocations:
337             # set sliver allocation and operational status
338             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
339             if sliver_allocation:
340                 allocation_status = sliver_allocation.allocation_state
341                 if allocation_status == 'geni_allocated':
342                     op_status =  'geni_pending_allocation'
343                 elif allocation_status == 'geni_provisioned':
344                     if rspec_node['boot_state'] == 'boot':
345                         op_status = 'geni_ready'
346                     else:
347                         op_status = 'geni_failed'
348                 else:
349                     op_status = 'geni_unknown'
350             else:
351                 allocation_status = 'geni_unallocated'    
352         else:
353             allocation_status = 'geni_unallocated'
354             op_status = 'geni_failed'
355         # required fields
356         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
357                        'geni_expires': rspec_node['expires'],
358                        'geni_allocation_status' : allocation_status,
359                        'geni_operational_status': op_status,
360                        'geni_error': '',
361                        }
362         return geni_sliver        
363
364     def get_leases(self, slice=None, options=None):
365         if options is None: options={}
366         
367         now = int(time.time())
368         filter={}
369         filter.update({'clip':now})
370         if slice:
371            filter.update({'name':slice['name']})
372         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
373         leases = self.driver.shell.GetLeases(filter)
374         grain = self.driver.shell.GetLeaseGranularity()
375
376         site_ids = []
377         for lease in leases:
378             site_ids.append(lease['site_id'])
379
380         # get sites
381         sites_dict  = self.get_sites({'site_id': site_ids}) 
382   
383         rspec_leases = []
384         for lease in leases:
385
386             rspec_lease = Lease()
387             
388             # xxx how to retrieve site['login_base']
389             site_id=lease['site_id']
390             site=sites_dict[site_id]
391
392             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
393             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
394             slice_urn = hrn_to_urn(slice_hrn, 'slice')
395             rspec_lease['slice_id'] = slice_urn
396             rspec_lease['start_time'] = lease['t_from']
397             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
398             rspec_leases.append(rspec_lease)
399         return rspec_leases
400
401     
402     def list_resources(self, version = None, options=None):
403         if options is None: options={}
404
405         version_manager = VersionManager()
406         version = version_manager.get_version(version)
407         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
408         rspec = RSpec(version=rspec_version, user_options=options)
409        
410         if not options.get('list_leases') or options['list_leases'] != 'leases':
411             # get nodes
412             nodes  = self.get_nodes(options)
413             site_ids = []
414             interface_ids = []
415             tag_ids = []
416             nodes_dict = {}
417             for node in nodes:
418                 site_ids.append(node['site_id'])
419                 interface_ids.extend(node['interface_ids'])
420                 tag_ids.extend(node['node_tag_ids'])
421                 nodes_dict[node['node_id']] = node
422             sites = self.get_sites({'site_id': site_ids})
423             interfaces = self.get_interfaces({'interface_id':interface_ids})
424             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
425             pl_initscripts = self.get_pl_initscripts()
426             # convert nodes to rspec nodes
427             rspec_nodes = []
428             for node in nodes:
429                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
430                 rspec_nodes.append(rspec_node)
431             rspec.version.add_nodes(rspec_nodes)
432
433             # add links
434             links = self.get_links(sites, nodes_dict, interfaces)        
435             rspec.version.add_links(links)
436
437         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
438            leases = self.get_leases()
439            rspec.version.add_leases(leases)
440
441         return rspec.toxml()
442
443     def describe(self, urns, version=None, options=None):
444         if options is None: options={}
445         version_manager = VersionManager()
446         version = version_manager.get_version(version)
447         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
448         rspec = RSpec(version=rspec_version, user_options=options)
449
450         # get slivers
451         geni_slivers = []
452         slivers = self.get_slivers(urns, options)
453         if slivers:
454             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
455         else:
456             rspec_expires = datetime_to_string(utcparse(time.time()))      
457         rspec.xml.set('expires',  rspec_expires)
458
459         # lookup the sliver allocations
460         geni_urn = urns[0]
461         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
462         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
463         sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
464         sliver_allocation_dict = {}
465         for sliver_allocation in sliver_allocations:
466             geni_urn = sliver_allocation.slice_urn
467             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
468       
469         if not options.get('list_leases') or options['list_leases'] != 'leases':
470             # add slivers
471             site_ids = []
472             interface_ids = []
473             tag_ids = []
474             nodes_dict = {}
475             for sliver in slivers:
476                 site_ids.append(sliver['site_id'])
477                 interface_ids.extend(sliver['interface_ids'])
478                 tag_ids.extend(sliver['node_tag_ids'])
479                 nodes_dict[sliver['node_id']] = sliver
480             sites = self.get_sites({'site_id': site_ids})
481             interfaces = self.get_interfaces({'interface_id':interface_ids})
482             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
483             pl_initscripts = self.get_pl_initscripts()
484             rspec_nodes = []
485             for sliver in slivers:
486                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
487                     continue
488                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
489                                                        pl_initscripts, sliver_allocation_dict)
490                 # manifest node element shouldn't contain available attribute
491                 rspec_node.pop('available')
492                 rspec_nodes.append(rspec_node) 
493                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
494                 geni_slivers.append(geni_sliver)
495             rspec.version.add_nodes(rspec_nodes)
496
497             # add sliver defaults
498             #default_sliver = slivers.get(None, [])
499             #if default_sliver:
500             #    default_sliver_attribs = default_sliver.get('tags', [])
501             #    for attrib in default_sliver_attribs:
502             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
503
504             # add links 
505             links = self.get_links(sites, nodes_dict, interfaces)        
506             rspec.version.add_links(links)
507
508         if not options.get('list_leases') or options['list_leases'] != 'resources':
509             if slivers:
510                 leases = self.get_leases(slivers[0])
511                 rspec.version.add_leases(leases)
512
513                
514         return {'geni_urn': geni_urn, 
515                 'geni_rspec': rspec.toxml(),
516                 'geni_slivers': geni_slivers}