32d6d933e7cecd36972137428d5377d9456bfdbc
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29 class PlAggregate:
30
31     def __init__(self, driver):
32         self.driver = driver
33
34     def get_nodes(self, options=None):
35         if options is None: options={}
36         filter = {'peer_id': None}
37         geni_available = options.get('geni_available')    
38         if geni_available == True:
39             filter['boot_state'] = 'boot'
40         nodes = self.driver.shell.GetNodes(filter)
41        
42         return nodes  
43  
44     def get_sites(self, filter=None):
45         if filter is None: filter={}
46         sites = {}
47         for site in self.driver.shell.GetSites(filter):
48             sites[site['site_id']] = site
49         return sites
50
51     def get_interfaces(self, filter=None):
52         if filter is None: filter={}
53         interfaces = {}
54         for interface in self.driver.shell.GetInterfaces(filter):
55             iface = Interface()
56             if interface['bwlimit']:
57                 interface['bwlimit'] = str(int(interface['bwlimit'])/1000)
58             interfaces[interface['interface_id']] = interface
59         return interfaces
60
61     def get_links(self, sites, nodes, interfaces):
62         
63         topology = Topology() 
64         links = []
65         for (site_id1, site_id2) in topology:
66             site_id1 = int(site_id1)
67             site_id2 = int(site_id2)
68             link = Link()
69             if not site_id1 in sites or site_id2 not in sites:
70                 continue
71             site1 = sites[site_id1]
72             site2 = sites[site_id2]
73             # get hrns
74             site1_hrn = self.driver.hrn + '.' + site1['login_base']
75             site2_hrn = self.driver.hrn + '.' + site2['login_base']
76
77             for s1_node_id in site1['node_ids']:
78                 for s2_node_id in site2['node_ids']:
79                     if s1_node_id not in nodes or s2_node_id not in nodes:
80                         continue
81                     node1 = nodes[s1_node_id]
82                     node2 = nodes[s2_node_id]
83                     # set interfaces
84                     # just get first interface of the first node
85                     if1_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node1['node_id']))
86                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
87                     if2_xrn = PlXrn(auth=self.driver.hrn, interface='node%s:eth0' % (node2['node_id']))
88                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
89
90                     if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
91                     if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
92
93                     # set link
94                     link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
95                     link['interface1'] = if1
96                     link['interface2'] = if2
97                     link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
98                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link['component_name']).get_urn()
99                     link['component_manager_id'] =  hrn_to_urn(self.driver.hrn, 'authority+am')
100                     links.append(link)
101
102         return links
103
104     def get_node_tags(self, filter=None):
105         if filter is None: filter={}
106         node_tags = {}
107         for node_tag in self.driver.shell.GetNodeTags(filter, ['tagname', 'value', 'node_id', 'node_tag_id'] ):
108             node_tags[node_tag['node_tag_id']] = node_tag
109         return node_tags
110
111     def get_pl_initscripts(self, filter=None):
112         if filter is None: filter={}
113         pl_initscripts = {}
114         filter.update({'enabled': True})
115         for initscript in self.driver.shell.GetInitScripts(filter):
116             pl_initscripts[initscript['initscript_id']] = initscript
117         return pl_initscripts
118
119     def get_slivers(self, urns, options=None):
120         if options is None: options={}
121         names = set()
122         slice_ids = set()
123         node_ids = []
124         slice_hrn = None
125         for urn in urns:
126             xrn = PlXrn(xrn=urn)
127             if xrn.type == 'sliver':
128                  # id: slice_id-node_id
129                 try:
130                     sliver_id_parts = xrn.get_sliver_id_parts()
131                     slice_id = int(sliver_id_parts[0]) 
132                     node_id = int(sliver_id_parts[1])
133                     slice_ids.add(slice_id) 
134                     node_ids.append(node_id)
135                 except ValueError:
136                     pass 
137             else:  
138                 slice_hrn = xrn.get_hrn()
139
140         filter = {}
141         filter['peer_id'] = None
142         if slice_ids:
143             filter['slice_id'] = list(slice_ids)
144         # get all slices
145         fields = ['slice_id', 'name', 'hrn', 'person_ids', 'node_ids', 'slice_tag_ids', 'expires']
146         all_slices = self.driver.shell.GetSlices(filter, fields)
147         if slice_hrn:
148             slices = [slice for slice in all_slices if slice['hrn'] == slice_hrn]
149         else:
150             slices = all_slices
151       
152         if not slices:
153             if slice_hrn:
154                 logger.error("PlAggregate.get_slivers : no slice found with hrn {}".format(slice_hrn))
155             else:
156                 logger.error("PlAggregate.get_slivers : no sliver found with urns {}".format(urns))
157             return []
158         slice = slices[0]     
159         slice['hrn'] = slice_hrn   
160
161         # get sliver users
162         persons = []
163         person_ids = []
164         for slice in slices:
165             person_ids.extend(slice['person_ids'])
166         if person_ids:
167             persons = self.driver.shell.GetPersons(person_ids)
168                  
169         # get user keys
170         keys = {}
171         key_ids = []
172         for person in persons:
173             key_ids.extend(person['key_ids'])
174         
175         if key_ids:
176             key_list = self.driver.shell.GetKeys(key_ids)
177             for key in key_list:
178                 keys[key['key_id']] = key  
179
180         # construct user key info
181         users = []
182         for person in persons:
183             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(int(person['person_id'])), 'user')
184             user = {
185                 'login': slice['name'], 
186                 'user_urn': person_urn,
187                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
188             }
189             users.append(user)
190
191         if node_ids:
192             node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']]
193             slice['node_ids'] = node_ids
194         pltags_dict = self.get_pltags_by_node_id(slice)
195         nodes_dict = self.get_slice_nodes(slice, options)
196         slivers = []
197         for node in nodes_dict.values():
198             node.update(slice) 
199             # slice-global tags
200             node['slice-tags'] = pltags_dict['slice-global']
201             # xxx
202             # this is where we chould maybe add the nodegroup slice tags,
203             # but it's tedious...
204             # xxx
205             # sliver tags
206             node['slice-tags'] += pltags_dict[node['node_id']]
207             sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id'])
208             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
209             node['urn'] = node['sliver_id'] 
210             node['services_user'] = users
211             slivers.append(node)
212         if not slivers:
213             logger.warning("PlAggregate.get_slivers : slice(s) found but with no sliver {}".format(urns))
214         return slivers
215
216     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
217         if pl_initscripts is None: pl_initscripts=[]
218         if options is None: options={}
219         rspec_node = NodeElement()
220         # xxx how to retrieve site['login_base']
221         site=sites[node['site_id']]
222         rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname'])
223         rspec_node['component_name'] = node['hostname']
224         rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn()
225         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa')
226         # do not include boot state (<available> element) in the manifest rspec
227         rspec_node['boot_state'] = node['boot_state']
228         if node['boot_state'] == 'boot': 
229             rspec_node['available'] = 'true'
230         else:
231             rspec_node['available'] = 'false'
232
233         #distinguish between Shared and Reservable nodes
234         if node['node_type'] == 'reservable':
235             rspec_node['exclusive'] = 'true'
236         else:
237             rspec_node['exclusive'] = 'false'
238
239         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
240                                         HardwareType({'name': 'pc'})]
241         # only doing this because protogeni rspec needs
242         # to advertise available initscripts
243         rspec_node['pl_initscripts'] = pl_initscripts.values()
244         # add site/interface info to nodes.
245         # assumes that sites, interfaces and tags have already been prepared.
246         if site['longitude'] and site['latitude']:
247             location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'})
248             rspec_node['location'] = location
249         # Granularity
250         granularity = Granularity({'grain': grain})
251         rspec_node['granularity'] = granularity
252         rspec_node['interfaces'] = []
253         if_count=0
254         for if_id in node['interface_ids']:
255             interface = Interface(interfaces[if_id])
256             interface['ipv4'] = interface['ip']
257             interface['component_id'] = PlXrn(auth=self.driver.hrn,
258                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
259             # interfaces in the manifest need a client id
260             if slice:
261                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
262             rspec_node['interfaces'].append(interface)
263             if_count+=1
264         # this is what describes a particular node
265         node_level_tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags]
266         rspec_node['tags'] = node_level_tags
267         return rspec_node
268
269     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, sliver_pltags, \
270                              pl_initscripts, sliver_allocations):
271         # get the granularity in second for the reservation system
272         grain = self.driver.shell.GetLeaseGranularity()
273         rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain)
274         for pltag in sliver_pltags:
275             logger.debug("Need to expose {}".format(pltag))
276         # xxx how to retrieve site['login_base']
277         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
278         # remove interfaces from manifest
279         rspec_node['interfaces'] = []
280         # add sliver info
281         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
282                                'name': sliver['name'],
283                                'type': 'plab-vserver',
284                                'tags': sliver_pltags,
285                            })
286         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
287         if sliver['urn'] in sliver_allocations:
288             rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id
289             if sliver_allocations[sliver['urn']].component_id:
290                 rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id
291         rspec_node['slivers'] = [rspec_sliver]
292
293         # slivers always provide the ssh service
294         login = Login({'authentication': 'ssh-keys',
295                        'hostname': sliver['hostname'],
296                        'port':'22', 
297                        'username': sliver['name'],
298                        'login': sliver['name']
299                       })
300         service = ServicesElement({'login': login,
301                                    'services_user': sliver['services_user']})
302         rspec_node['services'] = [service]
303         return rspec_node
304
305     def get_pltags_by_node_id(self, slice):
306         slice_tag_ids = []
307         slice_tag_ids.extend(slice['slice_tag_ids'])
308         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids},
309                                               ['tagname', 'value', 'node_id', 'nodegroup_id'])
310         # sorted by node_id
311         pltags_dict = defaultdict(list)
312         for tag in tags:
313             # specific to a node
314             if tag['node_id']:
315                 tag['scope'] = 'sliver'
316                 pltags_dict[tag['node_id']].append(PLTag(tag))
317             # restricted to a nodegroup
318             # for now such tags are not exposed to describe
319             # xxx we should also expose the nodegroup name in this case to be complete..
320             elif tag['nodegroup_id']:
321                 tag['scope'] = 'nodegroup'
322                 pltags_dict['nodegroup'].append(PLTag(tag))
323             # this tag is global to the slice
324             else:
325                 tag['scope'] = 'slice'
326                 pltags_dict['slice-global'].append(PLTag(tag))
327         return pltags_dict
328
329     def get_slice_nodes(self, slice, options=None):
330         if options is None: options={}
331         nodes_dict = {}
332         filter = {'peer_id': None}
333         tags_filter = {}
334         if slice and slice.get('node_ids'):
335             filter['node_id'] = slice['node_ids']
336         else:
337             # there are no nodes to look up
338             return nodes_dict
339         tags_filter=filter.copy()
340         geni_available = options.get('geni_available')
341         if geni_available == True:
342             filter['boot_state'] = 'boot'
343         nodes = self.driver.shell.GetNodes(filter)
344         for node in nodes:
345             nodes_dict[node['node_id']] = node
346         return nodes_dict
347
348     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
349         if sliver_allocations is None: sliver_allocations={}
350         if rspec_node['sliver_id'] in sliver_allocations:
351             # set sliver allocation and operational status
352             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
353             if sliver_allocation:
354                 allocation_status = sliver_allocation.allocation_state
355                 if allocation_status == 'geni_allocated':
356                     op_status =  'geni_pending_allocation'
357                 elif allocation_status == 'geni_provisioned':
358                     if rspec_node['boot_state'] == 'boot':
359                         op_status = 'geni_ready'
360                     else:
361                         op_status = 'geni_failed'
362                 else:
363                     op_status = 'geni_unknown'
364             else:
365                 allocation_status = 'geni_unallocated'    
366         else:
367             allocation_status = 'geni_unallocated'
368             op_status = 'geni_failed'
369         # required fields
370         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
371                        'geni_expires': rspec_node['expires'],
372                        'geni_allocation_status' : allocation_status,
373                        'geni_operational_status': op_status,
374                        'geni_error': '',
375                        }
376         return geni_sliver        
377
378     def get_leases(self, slice=None, options=None):
379         if options is None: options={}
380         
381         now = int(time.time())
382         filter={}
383         filter.update({'clip':now})
384         if slice:
385            filter.update({'name':slice['name']})
386         return_fields = ['lease_id', 'hostname', 'site_id', 'name', 't_from', 't_until']
387         leases = self.driver.shell.GetLeases(filter)
388         grain = self.driver.shell.GetLeaseGranularity()
389
390         site_ids = []
391         for lease in leases:
392             site_ids.append(lease['site_id'])
393
394         # get sites
395         sites_dict  = self.get_sites({'site_id': site_ids}) 
396   
397         rspec_leases = []
398         for lease in leases:
399
400             rspec_lease = Lease()
401             
402             # xxx how to retrieve site['login_base']
403             site_id=lease['site_id']
404             site=sites_dict[site_id]
405
406             rspec_lease['component_id'] = hrn_to_urn(self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
407             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
408             slice_urn = hrn_to_urn(slice_hrn, 'slice')
409             rspec_lease['slice_id'] = slice_urn
410             rspec_lease['start_time'] = lease['t_from']
411             rspec_lease['duration'] = (lease['t_until'] - lease['t_from']) / grain
412             rspec_leases.append(rspec_lease)
413         return rspec_leases
414
415     
416     def list_resources(self, version = None, options=None):
417         if options is None: options={}
418
419         version_manager = VersionManager()
420         version = version_manager.get_version(version)
421         rspec_version = version_manager._get_version(version.type, version.version, 'ad')
422         rspec = RSpec(version=rspec_version, user_options=options)
423        
424         if not options.get('list_leases') or options['list_leases'] != 'leases':
425             # get nodes
426             nodes  = self.get_nodes(options)
427             site_ids = []
428             interface_ids = []
429             tag_ids = []
430             nodes_dict = {}
431             for node in nodes:
432                 site_ids.append(node['site_id'])
433                 interface_ids.extend(node['interface_ids'])
434                 tag_ids.extend(node['node_tag_ids'])
435                 nodes_dict[node['node_id']] = node
436             sites = self.get_sites({'site_id': site_ids})
437             interfaces = self.get_interfaces({'interface_id':interface_ids})
438             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
439             pl_initscripts = self.get_pl_initscripts()
440             # convert nodes to rspec nodes
441             rspec_nodes = []
442             for node in nodes:
443                 rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts)
444                 rspec_nodes.append(rspec_node)
445             rspec.version.add_nodes(rspec_nodes)
446
447             # add links
448             links = self.get_links(sites, nodes_dict, interfaces)        
449             rspec.version.add_links(links)
450
451         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
452            leases = self.get_leases()
453            rspec.version.add_leases(leases)
454
455         return rspec.toxml()
456
457     def describe(self, urns, version=None, options=None):
458         if options is None: options={}
459         version_manager = VersionManager()
460         version = version_manager.get_version(version)
461         rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
462         rspec = RSpec(version=rspec_version, user_options=options)
463
464         # get slivers
465         geni_slivers = []
466         slivers = self.get_slivers(urns, options)
467         if slivers:
468             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
469         else:
470             rspec_expires = datetime_to_string(utcparse(time.time()))      
471         rspec.xml.set('expires',  rspec_expires)
472
473         # lookup the sliver allocations
474         geni_urn = urns[0]
475         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
476         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
477         sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
478         sliver_allocation_dict = {}
479         for sliver_allocation in sliver_allocations:
480             geni_urn = sliver_allocation.slice_urn
481             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
482       
483         if not options.get('list_leases') or options['list_leases'] != 'leases':
484             # add slivers
485             site_ids = []
486             interface_ids = []
487             tag_ids = []
488             nodes_dict = {}
489             for sliver in slivers:
490                 site_ids.append(sliver['site_id'])
491                 interface_ids.extend(sliver['interface_ids'])
492                 tag_ids.extend(sliver['node_tag_ids'])
493                 nodes_dict[sliver['node_id']] = sliver
494             sites = self.get_sites({'site_id': site_ids})
495             interfaces = self.get_interfaces({'interface_id':interface_ids})
496             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
497             pl_initscripts = self.get_pl_initscripts()
498             rspec_nodes = []
499             for sliver in slivers:
500                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
501                     continue
502                 sliver_pltags = sliver['slice-tags']
503                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, sliver_pltags, 
504                                                        pl_initscripts, sliver_allocation_dict)
505                 logger.debug('rspec of type {}'.format(rspec_node.__class__.__name__))
506                 # manifest node element shouldn't contain available attribute
507                 rspec_node.pop('available')
508                 rspec_nodes.append(rspec_node) 
509                 geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict)
510                 geni_slivers.append(geni_sliver)
511             rspec.version.add_nodes(rspec_nodes)
512
513             # add sliver defaults
514             #default_sliver = slivers.get(None, [])
515             #if default_sliver:
516             #    default_sliver_attribs = default_sliver.get('tags', [])
517             #    for attrib in default_sliver_attribs:
518             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
519
520             # add links 
521             links = self.get_links(sites, nodes_dict, interfaces)        
522             rspec.version.add_links(links)
523
524         if not options.get('list_leases') or options['list_leases'] != 'resources':
525             if slivers:
526                 leases = self.get_leases(slivers[0])
527                 rspec.version.add_leases(leases)
528
529                
530         return {'geni_urn': geni_urn, 
531                 'geni_rspec': rspec.toxml(),
532                 'geni_slivers': geni_slivers}