Merge Master in geni-v3 conflict resolution
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 try:
124                     sliver_id_parts = xrn.get_sliver_id_parts()
125                     slice_id = int(sliver_id_parts[0]) 
126                     node_id = int(sliver_id_parts[1])
127                     slice_ids.add(slice_id) 
128                     node_ids.append(node_id)
129                 except ValueError:
130                     pass 
131             else:  
132                 names.add(xrn.pl_slicename())
133
134         filter = {}
135         if names:
136             filter['name'] = list(names)
137         if slice_ids:
138             filter['slice_id'] = list(slice_ids)
139         # get slices
140         slices = self.driver.shell.GetSlices(filter)
141         if not slices:
142             return []
143         slice = slices[0]     
144         slice['hrn'] = PlXrn(auth=self.driver.hrn, slicename=slice['name']).hrn   
145
146         # get sliver users
147         persons = []
148         person_ids = []
149         for slice in slices:
150             person_ids.extend(slice['person_ids'])
151         if person_ids:
152             persons = self.driver.shell.GetPersons(person_ids)
153                  
154         # get user keys
155         keys = {}
156         key_ids = []
157         for person in persons:
158             key_ids.extend(person['key_ids'])
159         
160         if key_ids:
161             key_list = self.driver.shell.GetKeys(key_ids)
162             for key in key_list:
163                 keys[key['key_id']] = key  
164
165         # construct user key info
166         users = []
167         for person in persons:
168             name = person['email'][0:person['email'].index('@')]
169             user = {
170                 'login': slice['name'], 
171                 'user_urn': Xrn('%s.%s' % (self.driver.hrn, name), type='user').urn,
172                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
173             }
174             users.append(user)
175
176         if node_ids:
177             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
178             slice['node_ids'] = node_ids
179         tags_dict = self.get_slice_tags(slice)
180         nodes_dict = self.get_slice_nodes(slice, options)
181         slivers = []
182         for node in nodes_dict.values():
183             node.update(slice) 
184             node['tags'] = tags_dict[node['node_id']]
185             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
186             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
187             node['urn'] = node['sliver_id'] 
188             node['services_user'] = users
189             slivers.append(node)
190         return slivers
191
192     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
193         rspec_node = NodeElement()
194         # xxx how to retrieve site['login_base']
195         site=sites[node['site_id']]
196         rspec_node['component_id'] = PlXrn(self.driver.hrn, hostname=node['hostname']).get_urn()
197         rspec_node['component_name'] = node['hostname']
198         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
199         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
200         # do not include boot state (<available> element) in the manifest rspec
201         rspec_node['boot_state'] = node['boot_state']
202         if node['boot_state'] == 'boot': 
203             rspec_node['available'] = 'true'
204         else:
205             rspec_node['available'] = 'false'
206         rspec_node['exclusive'] = 'false'
207         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
208                                         HardwareType({'name': 'pc'})]
209         # only doing this because protogeni rspec needs
210         # to advertise available initscripts
211         rspec_node['pl_initscripts'] = pl_initscripts.values()
212         # add site/interface info to nodes.
213         # assumes that sites, interfaces and tags have already been prepared.
214         if site['longitude'] and site['latitude']:
215             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
216             rspec_node['location'] = location
217         # Granularity
218         granularity = Granularity({'grain': grain})
219         rspec_node['granularity'] = granularity
220         rspec_node['interfaces'] = []
221         if_count=0
222         for if_id in node['interface_ids']:
223             interface = Interface(interfaces[if_id])
224             interface['ipv4'] = interface['ip']
225             interface['component_id'] = PlXrn(auth=self.driver.hrn,
226                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
227             # interfaces in the manifest need a client id
228             if slice:
229                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
230             rspec_node['interfaces'].append(interface)
231             if_count+=1
232         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
233         rspec_node['tags'] = tags
234         return rspec_node
235
236     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
237                              pl_initscripts, sliver_allocations):
238         # get the granularity in second for the reservation system
239         grain = self.driver.shell.GetLeaseGranularity()
240         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
241         # xxx how to retrieve site['login_base']
242         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
243         # remove interfaces from manifest
244         rspec_node['interfaces'] = []
245         # add sliver info
246         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
247                          'name': sliver['name'],
248                          'type': 'plab-vserver',
249                          'tags': []})
250         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
251         rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
252         if sliver_allocations[sliver['urn']].component_id:
253             rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
254         rspec_node['slivers'] = [rspec_sliver]
255
256         # slivers always provide the ssh service
257         login = Login({'authentication': 'ssh-keys', 
258                        'hostname': sliver['hostname'], 
259                        'port':'22', 
260                        'username': sliver['name'],
261                        'login': sliver['name']
262                       })
263         service = ServicesElement({'login': login,
264                             'services_user': sliver['services_user']})
265         rspec_node['services'] = [service]    
266         return rspec_node      
267
268     def get_slice_tags(self, slice):
269         slice_tag_ids = []
270         slice_tag_ids.extend(slice['slice_tag_ids'])
271         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
272         # sorted by node_id
273         tags_dict = defaultdict(list)
274         for tag in tags:
275             tags_dict[tag['node_id']] = tag
276         return tags_dict
277
278     def get_slice_nodes(self, slice, options={}):
279         nodes_dict = {}
280         filter = {'peer_id': None}
281         tags_filter = {}
282         if slice and slice.get('node_ids'):
283             filter['node_id'] = slice['node_ids']
284         else:
285             # there are no nodes to look up
286             return nodes_dict
287         tags_filter=filter.copy()
288         geni_available = options.get('geni_available')
289         if geni_available == True:
290             filter['boot_state'] = 'boot'
291         nodes = self.driver.shell.GetNodes(filter)
292         for node in nodes:
293             nodes_dict[node['node_id']] = node
294         return nodes_dict
295
296     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
297         if rspec_node['sliver_id'] in sliver_allocations:
298             # set sliver allocation and operational status
299             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
300             if sliver_allocation:
301                 allocation_status = sliver_allocation.allocation_state
302                 if allocation_status == 'geni_allocated':
303                     op_status =  'geni_pending_allocation'
304                 elif allocation_status == 'geni_provisioned':
305                     if rspec_node['boot_state'] == 'boot':
306                         op_status = 'geni_ready'
307                     else:
308                         op_status = 'geni_failed'
309                 else:
310                     op_status = 'geni_unknown'
311             else:
312                 allocation_status = 'geni_unallocated'    
313         # required fields
314         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
315                        'geni_expires': rspec_node['expires'],
316                        'geni_allocation_status' : allocation_status,
317                        'geni_operational_status': op_status,
318                        'geni_error': '',
319                        }
320         return geni_sliver        
321
322     def get_leases(self, slice_xrn=None, slice=None, options={}):
323         
324         if slice_xrn and not slice:
325             return []
326
327         now = int(time.time())
328         filter={}
329         filter.update({'clip':now})
330         if slice:
331            filter.update({'name':slice['name']})
332         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
333         leases = self.driver.shell.GetLeases(filter)
334         grain = self.driver.shell.GetLeaseGranularity()
335
336         site_ids = []
337         for lease in leases:
338             site_ids.append(lease['site_id'])
339
340         # get sites
341         sites_dict  = self.get_sites({'site_id': site_ids}) 
342   
343         rspec_leases = []
344         for lease in leases:
345
346             rspec_lease = Lease()
347             
348             # xxx how to retrieve site['login_base']
349             site_id=lease['site_id']
350             site=sites_dict[site_id]
351
352             rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], lease['hostname'])
353             if slice_xrn:
354                 slice_urn = slice_xrn
355                 slice_hrn = urn_to_hrn(slice_urn)
356             else:
357                 slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name'])
358                 slice_urn = hrn_to_urn(slice_hrn, 'slice')
359             rspec_lease['slice_id'] = slice_urn
360             rspec_lease['start_time'] = lease['t_from']
361             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
362             rspec_leases.append(rspec_lease)
363         return rspec_leases
364
365     
366     def list_resources(self, version = None, options={}):
367
368         version_manager = VersionManager()
369         version = version_manager.get_version(version)
370         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
371         rspec = RSpec(version=rspec_version, user_options=options)
372        
373         if not options.get('list_leases') or options['list_leases'] != 'leases':
374             # get nodes
375             nodes  = self.get_nodes(options)
376             site_ids = []
377             interface_ids = []
378             tag_ids = []
379             nodes_dict = {}
380             for node in nodes:
381                 site_ids.append(node['site_id'])
382                 interface_ids.extend(node['interface_ids'])
383                 tag_ids.extend(node['node_tag_ids'])
384                 nodes_dict[node['node_id']] = node
385             sites = self.get_sites({'site_id': site_ids})
386             interfaces = self.get_interfaces({'interface_id':interface_ids})
387             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
388             pl_initscripts = self.get_pl_initscripts()
389             # convert nodes to rspec nodes
390             rspec_nodes = []
391             for node in nodes:
392                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
393                 rspec_nodes.append(rspec_node)
394             rspec.version.add_nodes(rspec_nodes)
395
396             # add links
397             links = self.get_links(sites, nodes_dict, interfaces)        
398             rspec.version.add_links(links)
399
400         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
401            leases = self.get_leases(slice_xrn, slice)
402            rspec.version.add_leases(leases)
403
404         return rspec.toxml()
405
406     def describe(self, urns, version=None, options={}):
407         version_manager = VersionManager()
408         version = version_manager.get_version(version)
409         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
410         rspec = RSpec(version=rspec_version, user_options=options)
411
412         # get slivers
413         geni_slivers = []
414         slivers = self.get_slivers(urns, options)
415         if slivers:
416             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
417         else:
418             rspec_expires = datetime_to_string(utcparse(time.time()))      
419         rspec.xml.set('expires',  rspec_expires)
420
421         # lookup the sliver allocations
422         geni_urn = urns[0]
423         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
424         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
425         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
426         sliver_allocation_dict = {}
427         for sliver_allocation in sliver_allocations:
428             geni_urn = sliver_allocation.slice_urn
429             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
430       
431         if not options.get('list_leases') or options['list_leases'] != 'leases':
432             # add slivers
433             site_ids = []
434             interface_ids = []
435             tag_ids = []
436             nodes_dict = {}
437             for sliver in slivers:
438                 site_ids.append(sliver['site_id'])
439                 interface_ids.extend(sliver['interface_ids'])
440                 tag_ids.extend(sliver['node_tag_ids'])
441                 nodes_dict[sliver['node_id']] = sliver
442             sites = self.get_sites({'site_id': site_ids})
443             interfaces = self.get_interfaces({'interface_id':interface_ids})
444             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
445             pl_initscripts = self.get_pl_initscripts()
446             rspec_nodes = []
447             for sliver in slivers:
448                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
449                     continue
450                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
451                                                        pl_initscripts, sliver_allocation_dict)
452                 # manifest node element shouldn't contain available attribute
453                 rspec_node.pop('available')
454                 rspec_nodes.append(rspec_node) 
455                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
456                 geni_slivers.append(geni_sliver)
457             rspec.version.add_nodes(rspec_nodes)
458
459             # add sliver defaults
460             #default_sliver = slivers.get(None, [])
461             #if default_sliver:
462             #    default_sliver_attribs = default_sliver.get('tags', [])
463             #    for attrib in default_sliver_attribs:
464             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
465
466             # add links 
467             links = self.get_links(sites, nodes_dict, interfaces)        
468             rspec.version.add_links(links)
469
470         if not options.get('list_leases') or options['list_leases'] != 'resources':
471             if slivers:
472                 leases = self.get_leases(slivers[0])
473                 rspec.version.add_leases(leases)
474
475                
476         return {'geni_urn': geni_urn, 
477                 'geni_rspec': rspec.toxml(),
478                 'geni_slivers': geni_slivers}