d869cc1a01c7a5fcb38079c09792fe4c1afcdf77
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 try:
124                     sliver_id_parts = xrn.get_sliver_id_parts()
125                     slice_id = int(sliver_id_parts[0]) 
126                     node_id = int(sliver_id_parts[1])
127                     slice_ids.add(slice_id) 
128                     node_ids.append(node_id)
129                 except ValueError:
130                     pass 
131             else:  
132                 names.add(xrn.pl_slicename())
133
134         filter = {}
135         if names:
136             filter['name'] = list(names)
137         if slice_ids:
138             filter['slice_id'] = list(slice_ids)
139         # get slices
140         slices = self.driver.shell.GetSlices(filter)
141         if not slices:
142             return []
143         slice = slices[0]     
144         slice['hrn'] = PlXrn(auth=self.driver.hrn, slicename=slice['name']).hrn   
145
146         # get sliver users
147         persons = []
148         person_ids = []
149         for slice in slices:
150             person_ids.extend(slice['person_ids'])
151         if person_ids:
152             persons = self.driver.shell.GetPersons(person_ids)
153                  
154         # get user keys
155         keys = {}
156         key_ids = []
157         for person in persons:
158             key_ids.extend(person['key_ids'])
159         
160         if key_ids:
161             key_list = self.driver.shell.GetKeys(key_ids)
162             for key in key_list:
163                 keys[key['key_id']] = key  
164
165         # construct user key info
166         users = []
167         for person in persons:
168             name = person['email'][0:person['email'].index('@')]
169             user = {
170                 'login': slice['name'], 
171                 'user_urn': Xrn('%s.%s' % (self.driver.hrn, name), type='user').urn,
172                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
173             }
174             users.append(user)
175
176         if node_ids:
177             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
178             slice['node_ids'] = node_ids
179         tags_dict = self.get_slice_tags(slice)
180         nodes_dict = self.get_slice_nodes(slice, options)
181         slivers = []
182         for node in nodes_dict.values():
183             node.update(slice) 
184             node['tags'] = tags_dict[node['node_id']]
185             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
186             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
187             node['urn'] = node['sliver_id'] 
188             node['services_user'] = users
189             slivers.append(node)
190         return slivers
191
192     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
193         rspec_node = NodeElement()
194         # xxx how to retrieve site['login_base']
195         site=sites[node['site_id']]
196         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
197         rspec_node['component_name'] = node['hostname']
198         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
199         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
200         # do not include boot state (<available> element) in the manifest rspec
201         rspec_node['boot_state'] = node['boot_state']
202         if node['boot_state'] == 'boot': 
203             rspec_node['available'] = 'true'
204         else:
205             rspec_node['available'] = 'false'
206
207         #distinguish between Shared and Reservable nodes
208         if node['node_type'] == 'reservable':
209             rspec_node['exclusive'] = 'true'
210         else:
211             rspec_node['exclusive'] = 'false'
212
213         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
214                                         HardwareType({'name': 'pc'})]
215         # only doing this because protogeni rspec needs
216         # to advertise available initscripts
217         rspec_node['pl_initscripts'] = pl_initscripts.values()
218         # add site/interface info to nodes.
219         # assumes that sites, interfaces and tags have already been prepared.
220         if site['longitude'] and site['latitude']:
221             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
222             rspec_node['location'] = location
223         # Granularity
224         granularity = Granularity({'grain': grain})
225         rspec_node['granularity'] = granularity
226         rspec_node['interfaces'] = []
227         if_count=0
228         for if_id in node['interface_ids']:
229             interface = Interface(interfaces[if_id])
230             interface['ipv4'] = interface['ip']
231             interface['component_id'] = PlXrn(auth=self.driver.hrn,
232                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
233             # interfaces in the manifest need a client id
234             if slice:
235                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
236             rspec_node['interfaces'].append(interface)
237             if_count+=1
238         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
239         rspec_node['tags'] = tags
240         return rspec_node
241
242     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
243                              pl_initscripts, sliver_allocations):
244         # get the granularity in second for the reservation system
245         grain = self.driver.shell.GetLeaseGranularity()
246         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
247         # xxx how to retrieve site['login_base']
248         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
249         # remove interfaces from manifest
250         rspec_node['interfaces'] = []
251         # add sliver info
252         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
253                          'name': sliver['name'],
254                          'type': 'plab-vserver',
255                          'tags': []})
256         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
257         if sliver['urn'] in sliver_allocations:
258             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
259             if sliver_allocations[sliver['urn']].component_id:
260                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
261         rspec_node['slivers'] = [rspec_sliver]
262
263         # slivers always provide the ssh service
264         login = Login({'authentication': 'ssh-keys', 
265                        'hostname': sliver['hostname'], 
266                        'port':'22', 
267                        'username': sliver['name'],
268                        'login': sliver['name']
269                       })
270         service = ServicesElement({'login': login,
271                             'services_user': sliver['services_user']})
272         rspec_node['services'] = [service]    
273         return rspec_node      
274
275     def get_slice_tags(self, slice):
276         slice_tag_ids = []
277         slice_tag_ids.extend(slice['slice_tag_ids'])
278         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
279         # sorted by node_id
280         tags_dict = defaultdict(list)
281         for tag in tags:
282             tags_dict[tag['node_id']] = tag
283         return tags_dict
284
285     def get_slice_nodes(self, slice, options={}):
286         nodes_dict = {}
287         filter = {'peer_id': None}
288         tags_filter = {}
289         if slice and slice.get('node_ids'):
290             filter['node_id'] = slice['node_ids']
291         else:
292             # there are no nodes to look up
293             return nodes_dict
294         tags_filter=filter.copy()
295         geni_available = options.get('geni_available')
296         if geni_available == True:
297             filter['boot_state'] = 'boot'
298         nodes = self.driver.shell.GetNodes(filter)
299         for node in nodes:
300             nodes_dict[node['node_id']] = node
301         return nodes_dict
302
303     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
304         if rspec_node['sliver_id'] in sliver_allocations:
305             # set sliver allocation and operational status
306             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
307             if sliver_allocation:
308                 allocation_status = sliver_allocation.allocation_state
309                 if allocation_status == 'geni_allocated':
310                     op_status =  'geni_pending_allocation'
311                 elif allocation_status == 'geni_provisioned':
312                     if rspec_node['boot_state'] == 'boot':
313                         op_status = 'geni_ready'
314                     else:
315                         op_status = 'geni_failed'
316                 else:
317                     op_status = 'geni_unknown'
318             else:
319                 allocation_status = 'geni_unallocated'    
320         else:
321             allocation_status = 'geni_unallocated'
322             op_status = 'geni_failed'
323         # required fields
324         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
325                        'geni_expires': rspec_node['expires'],
326                        'geni_allocation_status' : allocation_status,
327                        'geni_operational_status': op_status,
328                        'geni_error': '',
329                        }
330         return geni_sliver        
331
332     def get_leases(self, slice=None, options={}):
333         
334         now = int(time.time())
335         filter={}
336         filter.update({'clip':now})
337         if slice:
338            filter.update({'name':slice['name']})
339         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
340         leases = self.driver.shell.GetLeases(filter)
341         grain = self.driver.shell.GetLeaseGranularity()
342
343         site_ids = []
344         for lease in leases:
345             site_ids.append(lease['site_id'])
346
347         # get sites
348         sites_dict  = self.get_sites({'site_id': site_ids}) 
349   
350         rspec_leases = []
351         for lease in leases:
352
353             rspec_lease = Lease()
354             
355             # xxx how to retrieve site['login_base']
356             site_id=lease['site_id']
357             site=sites_dict[site_id]
358
359             rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], lease['hostname'])
360             slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name'])
361             slice_urn = hrn_to_urn(slice_hrn, 'slice')
362             rspec_lease['slice_id'] = slice_urn
363             rspec_lease['start_time'] = lease['t_from']
364             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
365             rspec_leases.append(rspec_lease)
366         return rspec_leases
367
368     
369     def list_resources(self, version = None, options={}):
370
371         version_manager = VersionManager()
372         version = version_manager.get_version(version)
373         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
374         rspec = RSpec(version=rspec_version, user_options=options)
375        
376         if not options.get('list_leases') or options['list_leases'] != 'leases':
377             # get nodes
378             nodes  = self.get_nodes(options)
379             site_ids = []
380             interface_ids = []
381             tag_ids = []
382             nodes_dict = {}
383             for node in nodes:
384                 site_ids.append(node['site_id'])
385                 interface_ids.extend(node['interface_ids'])
386                 tag_ids.extend(node['node_tag_ids'])
387                 nodes_dict[node['node_id']] = node
388             sites = self.get_sites({'site_id': site_ids})
389             interfaces = self.get_interfaces({'interface_id':interface_ids})
390             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
391             pl_initscripts = self.get_pl_initscripts()
392             # convert nodes to rspec nodes
393             rspec_nodes = []
394             for node in nodes:
395                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
396                 rspec_nodes.append(rspec_node)
397             rspec.version.add_nodes(rspec_nodes)
398
399             # add links
400             links = self.get_links(sites, nodes_dict, interfaces)        
401             rspec.version.add_links(links)
402
403         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
404            leases = self.get_leases()
405            rspec.version.add_leases(leases)
406
407         return rspec.toxml()
408
409     def describe(self, urns, version=None, options={}):
410         version_manager = VersionManager()
411         version = version_manager.get_version(version)
412         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
413         rspec = RSpec(version=rspec_version, user_options=options)
414
415         # get slivers
416         geni_slivers = []
417         slivers = self.get_slivers(urns, options)
418         if slivers:
419             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
420         else:
421             rspec_expires = datetime_to_string(utcparse(time.time()))      
422         rspec.xml.set('expires',  rspec_expires)
423
424         # lookup the sliver allocations
425         geni_urn = urns[0]
426         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
427         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
428         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
429         sliver_allocation_dict = {}
430         for sliver_allocation in sliver_allocations:
431             geni_urn = sliver_allocation.slice_urn
432             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
433       
434         if not options.get('list_leases') or options['list_leases'] != 'leases':
435             # add slivers
436             site_ids = []
437             interface_ids = []
438             tag_ids = []
439             nodes_dict = {}
440             for sliver in slivers:
441                 site_ids.append(sliver['site_id'])
442                 interface_ids.extend(sliver['interface_ids'])
443                 tag_ids.extend(sliver['node_tag_ids'])
444                 nodes_dict[sliver['node_id']] = sliver
445             sites = self.get_sites({'site_id': site_ids})
446             interfaces = self.get_interfaces({'interface_id':interface_ids})
447             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
448             pl_initscripts = self.get_pl_initscripts()
449             rspec_nodes = []
450             for sliver in slivers:
451                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
452                     continue
453                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
454                                                        pl_initscripts, sliver_allocation_dict)
455                 # manifest node element shouldn't contain available attribute
456                 rspec_node.pop('available')
457                 rspec_nodes.append(rspec_node) 
458                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
459                 geni_slivers.append(geni_sliver)
460             rspec.version.add_nodes(rspec_nodes)
461
462             # add sliver defaults
463             #default_sliver = slivers.get(None, [])
464             #if default_sliver:
465             #    default_sliver_attribs = default_sliver.get('tags', [])
466             #    for attrib in default_sliver_attribs:
467             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
468
469             # add links 
470             links = self.get_links(sites, nodes_dict, interfaces)        
471             rspec.version.add_links(links)
472
473         if not options.get('list_leases') or options['list_leases'] != 'resources':
474             if slivers:
475                 leases = self.get_leases(slivers[0])
476                 rspec.version.add_leases(leases)
477
478                
479         return {'geni_urn': geni_urn, 
480                 'geni_rspec': rspec.toxml(),
481                 'geni_slivers': geni_slivers}