include opstates in ad rspec. include ssh-user in manifest rspec
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import Node
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import Services
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.alchemy import dbsession
25 from sfa.storage.model import SliverAllocation
26
27
28 import time
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options={}):
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter={}):
45         sites = {}
46         for site in self.driver.shell.GetSites(filter):
47             sites[site['site_id']] = site
48         return sites
49
50     def get_interfaces(self, filter={}):
51         interfaces = {}
52         for interface in self.driver.shell.GetInterfaces(filter):
53             iface = Interface()
54             if interface['bwlimit']:
55                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
56             interfaces[interface['interface_id']] = interface
57         return interfaces
58
59     def get_links(self, sites, nodes, interfaces):
60         
61         topology = Topology() 
62         links = []
63         for (site_id1, site_id2) in topology:
64             site_id1 = int(site_id1)
65             site_id2 = int(site_id2)
66             link = Link()
67             if not site_id1 in sites or site_id2 not in sites:
68                 continue
69             site1 = sites[site_id1]
70             site2 = sites[site_id2]
71             # get hrns
72             site1_hrn = self.driver.hrn + '.' + site1['login_base']
73             site2_hrn = self.driver.hrn + '.' + site2['login_base']
74
75             for s1_node_id in site1['node_ids']:
76                 for s2_node_id in site2['node_ids']:
77                     if s1_node_id not in nodes or s2_node_id not in nodes:
78                         continue
79                     node1 = nodes[s1_node_id]
80                     node2 = nodes[s2_node_id]
81                     # set interfaces
82                     # just get first interface of the first node
83                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
84                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
85                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
86                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
87
88                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
89                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
90
91                     # set link
92                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
93                     link['interface1'] = if1
94                     link['interface2'] = if2
95                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
96                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
97                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
98                     links.append(link)
99
100         return links
101
102     def get_node_tags(self, filter={}):
103         node_tags = {}
104         for node_tag in self.driver.shell.GetNodeTags(filter):
105             node_tags[node_tag['node_tag_id']] = node_tag
106         return node_tags
107
108     def get_pl_initscripts(self, filter={}):
109         pl_initscripts = {}
110         filter.update({'enabled': True})
111         for initscript in self.driver.shell.GetInitScripts(filter):
112             pl_initscripts[initscript['initscript_id']] = initscript
113         return pl_initscripts
114
115     def get_slivers(self, urns, options={}):
116         names = set()
117         slice_ids = set()
118         node_ids = []
119         for urn in urns:
120             xrn = PlXrn(xrn=urn)
121             if xrn.type == 'sliver':
122                  # id: slice_id-node_id
123                 try:
124                     sliver_id_parts = xrn.get_sliver_id_parts()
125                     slice_id = int(sliver_id_parts[0]) 
126                     node_id = int(sliver_id_parts[1])
127                     slice_ids.add(slice_id) 
128                     node_ids.append(node_id)
129                 except ValueError:
130                     pass 
131             else:  
132                 names.add(xrn.pl_slicename())
133
134         filter = {}
135         if names:
136             filter['name'] = list(names)
137         if slice_ids:
138             filter['slice_id'] = list(slice_ids)
139         # get slices
140         slices = self.driver.shell.GetSlices(filter)
141         if not slices:
142             return []
143         slice = slices[0]        
144
145         # get sliver users
146         persons = []
147         person_ids = []
148         for slice in slices:
149             person_ids.extend(slice['person_ids'])
150         if person_ids:
151             persons = self.driver.shell.GetPersons(person_ids)
152                  
153         # get user keys
154         keys = {}
155         key_ids = []
156         for person in persons:
157             key_ids.extend(person['key_ids'])
158         
159         if key_ids:
160             key_list = self.driver.shell.GetKeys(key_ids)
161             for key in key_list:
162                 keys[key['key_id']] = key  
163
164         # construct user key info
165         users = []
166         for person in persons:
167             name = person['email'][0:person['email'].index('@')]
168             user = {
169                 'login': slice['name'], 
170                 'user_urn': Xrn('%s.%s' % (self.driver.hrn, name), type='user').urn,
171                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
172             }
173             users.append(user)
174
175         if node_ids:
176             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
177             slice['node_ids'] = node_ids
178         tags_dict = self.get_slice_tags(slice)
179         nodes_dict = self.get_slice_nodes(slice, options)
180         slivers = []
181         for node in nodes_dict.values():
182             node.update(slices[0]) 
183             node['tags'] = tags_dict[node['node_id']]
184             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
185             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
186             node['urn'] = node['sliver_id'] 
187             node['services_user'] = users
188             slivers.append(node)
189         return slivers
190
191     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}):
192         rspec_node = Node()
193         # xxx how to retrieve site['login_base']
194         site=sites[node['site_id']]
195         rspec_node['component_id'] = PlXrn(self.driver.hrn, hostname=node['hostname']).get_urn()
196         rspec_node['component_name'] = node['hostname']
197         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
198         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
199         # do not include boot state (<available> element) in the manifest rspec
200         rspec_node['boot_state'] = node['boot_state']
201         if node['boot_state'] == 'boot': 
202             rspec_node['available'] = 'true'
203         else:
204             rspec_node['available'] = 'false'
205         rspec_node['exclusive'] = 'false'
206         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
207                                         HardwareType({'name': 'pc'})]
208         # only doing this because protogeni rspec needs
209         # to advertise available initscripts
210         rspec_node['pl_initscripts'] = pl_initscripts.values()
211         # add site/interface info to nodes.
212         # assumes that sites, interfaces and tags have already been prepared.
213         if site['longitude'] and site['latitude']:
214             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
215             rspec_node['location'] = location
216         # Granularity
217         granularity = Granularity({'grain': grain})
218         rspec_node['granularity'] = granularity
219         rspec_node['interfaces'] = []
220         if_count=0
221         for if_id in node['interface_ids']:
222             interface = Interface(interfaces[if_id])
223             interface['ipv4'] = interface['ip']
224             interface['component_id'] = PlXrn(auth=self.driver.hrn,
225                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
226             # interfaces in the manifest need a client id
227             if slice:
228                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
229             rspec_node['interfaces'].append(interface)
230             if_count+=1
231         tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
232         rspec_node['tags'] = tags
233         return rspec_node
234
235     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \
236                              pl_initscripts, sliver_allocations):
237         # get the granularity in second for the reservation system
238         grain = self.driver.shell.GetLeaseGranularity()
239         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
240         # xxx how to retrieve site['login_base']
241         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
242         # remove interfaces from manifest
243         rspec_node['interfaces'] = []
244         # add sliver info
245         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
246                          'name': sliver['name'],
247                          'type': 'plab-vserver',
248                          'tags': []})
249         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
250         rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
251         if sliver_allocations[sliver['urn']].component_id:
252             rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
253         rspec_node['slivers'] = [rspec_sliver]
254
255         # slivers always provide the ssh service
256         login = Login({'authentication': 'ssh-keys', 
257                        'hostname': sliver['hostname'], 
258                        'port':'22', 
259                        'username': sliver['name'],
260                        'login': sliver['name']
261                       })
262         service = Services({'login': login,
263                             'services_user': sliver['services_user']})
264         rspec_node['services'] = [service]    
265         return rspec_node      
266
267     def get_slice_tags(self, slice):
268         slice_tag_ids = []
269         slice_tag_ids.extend(slice['slice_tag_ids'])
270         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids})
271         # sorted by node_id
272         tags_dict = defaultdict(list)
273         for tag in tags:
274             tags_dict[tag['node_id']] = tag
275         return tags_dict
276
277     def get_slice_nodes(self, slice, options={}):
278         nodes_dict = {}
279         filter = {'peer_id': None}
280         tags_filter = {}
281         if slice and slice.get('node_ids'):
282             filter['node_id'] = slice['node_ids']
283         else:
284             # there are no nodes to look up
285             return nodes_dict
286         tags_filter=filter.copy()
287         geni_available = options.get('geni_available')
288         if geni_available == True:
289             filter['boot_state'] = 'boot'
290         nodes = self.driver.shell.GetNodes(filter)
291         for node in nodes:
292             nodes_dict[node['node_id']] = node
293         return nodes_dict
294
295     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}):
296         if rspec_node['sliver_id'] in sliver_allocations:
297             # set sliver allocation and operational status
298             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
299             if sliver_allocation:
300                 allocation_status = sliver_allocation.allocation_state
301                 if allocation_status == 'geni_allocated':
302                     op_status =  'geni_pending_allocation'
303                 elif allocation_status == 'geni_provisioned':
304                     if rspec_node['boot_state'] == 'boot':
305                         op_status = 'geni_ready'
306                     else:
307                         op_status = 'geni_failed'
308                 else:
309                     op_status = 'geni_unknown'
310             else:
311                 allocation_status = 'geni_unallocated'    
312         # required fields
313         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
314                        'geni_expires': rspec_node['expires'],
315                        'geni_allocation_status' : allocation_status,
316                        'geni_operational_status': op_status,
317                        'geni_error': '',
318                        }
319         return geni_sliver        
320
321     def get_leases(self, slice=None, options={}):
322         
323         now = int(time.time())
324         filter={}
325         filter.update({'clip':now})
326         if slice:
327            filter.update({'name':slice['name']})
328         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
329         leases = self.driver.shell.GetLeases(filter)
330         grain = self.driver.shell.GetLeaseGranularity()
331
332         site_ids = []
333         for lease in leases:
334             site_ids.append(lease['site_id'])
335
336         # get sites
337         sites_dict  = self.get_sites({'site_id': site_ids}) 
338   
339         rspec_leases = []
340         for lease in leases:
341
342             rspec_lease = Lease()
343             
344             # xxx how to retrieve site['login_base']
345             site_id=lease['site_id']
346             site=sites_dict[site_id]
347
348             rspec_lease['lease_id'] = lease['lease_id']
349             rspec_lease['component_id'] = PlXrn(self.driver.hrn, hostname=lease['hostname']).urn
350             slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name'])
351             slice_urn = hrn_to_urn(slice_hrn, 'slice')
352             rspec_lease['slice_id'] = slice_urn
353             rspec_lease['start_time'] = lease['t_from']
354             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
355             rspec_leases.append(rspec_lease)
356         return rspec_leases
357
358     
359     def list_resources(self, version = None, options={}):
360
361         version_manager = VersionManager()
362         version = version_manager.get_version(version)
363         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
364         rspec = RSpec(version=rspec_version, user_options=options)
365        
366         if not options.get('list_leases') or options['list_leases'] != 'leases':
367             # get nodes
368             nodes  = self.get_nodes(options)
369             site_ids = []
370             interface_ids = []
371             tag_ids = []
372             nodes_dict = {}
373             for node in nodes:
374                 site_ids.append(node['site_id'])
375                 interface_ids.extend(node['interface_ids'])
376                 tag_ids.extend(node['node_tag_ids'])
377                 nodes_dict[node['node_id']] = node
378             sites = self.get_sites({'site_id': site_ids})
379             interfaces = self.get_interfaces({'interface_id':interface_ids})
380             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
381             pl_initscripts = self.get_pl_initscripts()
382             # convert nodes to rspec nodes
383             rspec_nodes = []
384             for node in nodes:
385                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
386                 rspec_nodes.append(rspec_node)
387             rspec.version.add_nodes(rspec_nodes)
388
389             # add links
390             links = self.get_links(sites, nodes_dict, interfaces)        
391             rspec.version.add_links(links)
392         return rspec.toxml()
393
394     def describe(self, urns, version=None, options={}):
395         version_manager = VersionManager()
396         version = version_manager.get_version(version)
397         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
398         rspec = RSpec(version=rspec_version, user_options=options)
399
400         # get slivers
401         geni_slivers = []
402         slivers = self.get_slivers(urns, options)
403         if slivers:
404             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
405         else:
406             rspec_expires = datetime_to_string(utcparse(time.time()))      
407         rspec.xml.set('expires',  rspec_expires)
408
409         # lookup the sliver allocations
410         geni_urn = urns[0]
411         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
412         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
413         sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
414         sliver_allocation_dict = {}
415         for sliver_allocation in sliver_allocations:
416             geni_urn = sliver_allocation.slice_urn
417             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
418       
419         if not options.get('list_leases') or options['list_leases'] != 'leases':
420             # add slivers
421             site_ids = []
422             interface_ids = []
423             tag_ids = []
424             nodes_dict = {}
425             for sliver in slivers:
426                 site_ids.append(sliver['site_id'])
427                 interface_ids.extend(sliver['interface_ids'])
428                 tag_ids.extend(sliver['node_tag_ids'])
429                 nodes_dict[sliver['node_id']] = sliver
430             sites = self.get_sites({'site_id': site_ids})
431             interfaces = self.get_interfaces({'interface_id':interface_ids})
432             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
433             pl_initscripts = self.get_pl_initscripts()
434             rspec_nodes = []
435             for sliver in slivers:
436                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
437                     continue
438                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, 
439                                                        pl_initscripts, sliver_allocation_dict)
440                 # manifest node element shouldn't contain available attribute
441                 rspec_node.pop('available')
442                 rspec_nodes.append(rspec_node) 
443                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
444                 geni_slivers.append(geni_sliver)
445             rspec.version.add_nodes(rspec_nodes)
446
447             # add sliver defaults
448             #default_sliver = slivers.get(None, [])
449             #if default_sliver:
450             #    default_sliver_attribs = default_sliver.get('tags', [])
451             #    for attrib in default_sliver_attribs:
452             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
453
454             # add links 
455             links = self.get_links(sites, nodes_dict, interfaces)        
456             rspec.version.add_links(links)
457
458         if not options.get('list_leases') or options['list_leases'] != 'resources':
459             if slivers:
460                 leases = self.get_leases(slivers[0])
461                 rspec.version.add_leases(leases)
462
463                
464         return {'geni_urn': geni_urn, 
465                 'geni_rspec': rspec.toxml(),
466                 'geni_slivers': geni_slivers}