8095291dd9569ffda0d61ba18080732303c18104
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options=None):
36         if options is None:
37             options = {}
38         filter = {'peer_id': None}
39         geni_available = options.get('geni_available')
40         if geni_available == True:
41             filter['boot_state'] = 'boot'
42         nodes = self.driver.shell.GetNodes(filter)
43
44         return nodes
45
46     def get_sites(self, filter=None):
47         if filter is None:
48             filter = {}
49         sites = {}
50         for site in self.driver.shell.GetSites(filter):
51             sites[site['site_id']] = site
52         return sites
53
54     def get_interfaces(self, filter=None):
55         if filter is None:
56             filter = {}
57         interfaces = {}
58         for interface in self.driver.shell.GetInterfaces(filter):
59             iface = Interface()
60             if interface['bwlimit']:
61                 interface['bwlimit'] = str(int(interface['bwlimit']) / 1000)
62             interfaces[interface['interface_id']] = interface
63         return interfaces
64
65     def get_links(self, sites, nodes, interfaces):
66
67         topology = Topology()
68         links = []
69         for (site_id1, site_id2) in topology:
70             site_id1 = int(site_id1)
71             site_id2 = int(site_id2)
72             link = Link()
73             if not site_id1 in sites or site_id2 not in sites:
74                 continue
75             site1 = sites[site_id1]
76             site2 = sites[site_id2]
77             # get hrns
78             site1_hrn = self.driver.hrn + '.' + site1['login_base']
79             site2_hrn = self.driver.hrn + '.' + site2['login_base']
80
81             for s1_node_id in site1['node_ids']:
82                 for s2_node_id in site2['node_ids']:
83                     if s1_node_id not in nodes or s2_node_id not in nodes:
84                         continue
85                     node1 = nodes[s1_node_id]
86                     node2 = nodes[s2_node_id]
87                     # set interfaces
88                     # just get first interface of the first node
89                     if1_xrn = PlXrn(auth=self.driver.hrn,
90                                     interface='node%s:eth0' % (node1['node_id']))
91                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
92                     if2_xrn = PlXrn(auth=self.driver.hrn,
93                                     interface='node%s:eth0' % (node2['node_id']))
94                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
95
96                     if1 = Interface(
97                         {'component_id': if1_xrn.urn, 'ipv4': if1_ipv4})
98                     if2 = Interface(
99                         {'component_id': if2_xrn.urn, 'ipv4': if2_ipv4})
100
101                     # set link
102                     link = Link({'capacity': '1000000', 'latency': '0',
103                                  'packet_loss': '0', 'type': 'ipv4'})
104                     link['interface1'] = if1
105                     link['interface2'] = if2
106                     link['component_name'] = "%s:%s" % (
107                         site1['login_base'], site2['login_base'])
108                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link[
109                                                  'component_name']).get_urn()
110                     link['component_manager_id'] = hrn_to_urn(
111                         self.driver.hrn, 'authority+am')
112                     links.append(link)
113
114         return links
115
116     def get_node_tags(self, filter=None):
117         if filter is None:
118             filter = {}
119         node_tags = {}
120         for node_tag in self.driver.shell.GetNodeTags(filter, ['tagname', 'value', 'node_id', 'node_tag_id']):
121             node_tags[node_tag['node_tag_id']] = node_tag
122         return node_tags
123
124     def get_pl_initscripts(self, filter=None):
125         if filter is None:
126             filter = {}
127         pl_initscripts = {}
128         filter.update({'enabled': True})
129         for initscript in self.driver.shell.GetInitScripts(filter):
130             pl_initscripts[initscript['initscript_id']] = initscript
131         return pl_initscripts
132
133     def get_slivers(self, urns, options=None):
134         if options is None:
135             options = {}
136         names = set()
137         slice_ids = set()
138         node_ids = []
139         slice_hrn = None
140         for urn in urns:
141             xrn = PlXrn(xrn=urn)
142             if xrn.type == 'sliver':
143                  # id: slice_id-node_id
144                 try:
145                     sliver_id_parts = xrn.get_sliver_id_parts()
146                     slice_id = int(sliver_id_parts[0])
147                     node_id = int(sliver_id_parts[1])
148                     slice_ids.add(slice_id)
149                     node_ids.append(node_id)
150                 except ValueError:
151                     pass
152             else:
153                 slice_hrn = xrn.get_hrn()
154
155         filter = {}
156         filter['peer_id'] = None
157         if slice_ids:
158             filter['slice_id'] = list(slice_ids)
159         # get all slices
160         fields = ['slice_id', 'name', 'hrn', 'person_ids',
161                   'node_ids', 'slice_tag_ids', 'expires']
162         all_slices = self.driver.shell.GetSlices(filter, fields)
163         if slice_hrn:
164             slices = [slice for slice in all_slices if slice[
165                 'hrn'] == slice_hrn]
166         else:
167             slices = all_slices
168
169         if not slices:
170             if slice_hrn:
171                 logger.error(
172                     "PlAggregate.get_slivers : no slice found with hrn {}".format(slice_hrn))
173             else:
174                 logger.error(
175                     "PlAggregate.get_slivers : no sliver found with urns {}".format(urns))
176             return []
177         slice = slices[0]
178         slice['hrn'] = slice_hrn
179
180         # get sliver users
181         persons = []
182         person_ids = []
183         for slice in slices:
184             person_ids.extend(slice['person_ids'])
185         if person_ids:
186             persons = self.driver.shell.GetPersons(person_ids)
187
188         # get user keys
189         keys = {}
190         key_ids = []
191         for person in persons:
192             key_ids.extend(person['key_ids'])
193
194         if key_ids:
195             key_list = self.driver.shell.GetKeys(key_ids)
196             for key in key_list:
197                 keys[key['key_id']] = key
198
199         # construct user key info
200         users = []
201         for person in persons:
202             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(
203                 int(person['person_id'])), 'user')
204             user = {
205                 'login': slice['name'],
206                 'user_urn': person_urn,
207                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
208             }
209             users.append(user)
210
211         if node_ids:
212             node_ids = [
213                 node_id for node_id in node_ids if node_id in slice['node_ids']]
214             slice['node_ids'] = node_ids
215         pltags_dict = self.get_pltags_by_node_id(slice)
216         nodes_dict = self.get_slice_nodes(slice, options)
217         slivers = []
218         for node in nodes_dict.values():
219             node.update(slice)
220             # slice-global tags
221             node['slice-tags'] = pltags_dict['slice-global']
222             # xxx
223             # this is where we chould maybe add the nodegroup slice tags,
224             # but it's tedious...
225             # xxx
226             # sliver tags
227             node['slice-tags'] += pltags_dict[node['node_id']]
228             sliver_hrn = '%s.%s-%s' % (self.driver.hrn,
229                                        slice['slice_id'], node['node_id'])
230             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
231             node['urn'] = node['sliver_id']
232             node['services_user'] = users
233             slivers.append(node)
234         if not slivers:
235             logger.warning(
236                 "PlAggregate.get_slivers : slice(s) found but with no sliver {}".format(urns))
237         return slivers
238
239     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
240         if pl_initscripts is None:
241             pl_initscripts = []
242         if options is None:
243             options = {}
244         rspec_node = NodeElement()
245         # xxx how to retrieve site['login_base']
246         site = sites[node['site_id']]
247         rspec_node['component_id'] = hostname_to_urn(
248             self.driver.hrn, site['login_base'], node['hostname'])
249         rspec_node['component_name'] = node['hostname']
250         rspec_node['component_manager_id'] = Xrn(
251             self.driver.hrn, 'authority+cm').get_urn()
252         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(
253             self.driver.hrn, site['login_base']), 'authority+sa')
254         # do not include boot state (<available> element) in the manifest rspec
255         rspec_node['boot_state'] = node['boot_state']
256         if node['boot_state'] == 'boot':
257             rspec_node['available'] = 'true'
258         else:
259             rspec_node['available'] = 'false'
260
261         # distinguish between Shared and Reservable nodes
262         if node['node_type'] == 'reservable':
263             rspec_node['exclusive'] = 'true'
264         else:
265             rspec_node['exclusive'] = 'false'
266
267         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
268                                         HardwareType({'name': 'pc'})]
269         # only doing this because protogeni rspec needs
270         # to advertise available initscripts
271         rspec_node['pl_initscripts'] = pl_initscripts.values()
272         # add site/interface info to nodes.
273         # assumes that sites, interfaces and tags have already been prepared.
274         if site['longitude'] and site['latitude']:
275             location = Location({'longitude': site['longitude'], 'latitude': site[
276                                 'latitude'], 'country': 'unknown'})
277             rspec_node['location'] = location
278         # Granularity
279         granularity = Granularity({'grain': grain})
280         rspec_node['granularity'] = granularity
281         rspec_node['interfaces'] = []
282         if_count = 0
283         for if_id in node['interface_ids']:
284             interface = Interface(interfaces[if_id])
285             interface['ipv4'] = interface['ip']
286             interface['component_id'] = PlXrn(auth=self.driver.hrn,
287                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
288             # interfaces in the manifest need a client id
289             if slice:
290                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
291             rspec_node['interfaces'].append(interface)
292             if_count += 1
293         # this is what describes a particular node
294         node_level_tags = [PLTag(node_tags[tag_id]) for tag_id in node[
295             'node_tag_ids'] if tag_id in node_tags]
296         rspec_node['tags'] = node_level_tags
297         return rspec_node
298
299     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, sliver_pltags,
300                              pl_initscripts, sliver_allocations):
301         # get the granularity in second for the reservation system
302         grain = self.driver.shell.GetLeaseGranularity()
303         rspec_node = self.node_to_rspec_node(
304             sliver, sites, interfaces, node_tags, pl_initscripts, grain)
305         for pltag in sliver_pltags:
306             logger.debug("Need to expose {}".format(pltag))
307         # xxx how to retrieve site['login_base']
308         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
309         # remove interfaces from manifest
310         rspec_node['interfaces'] = []
311         # add sliver info
312         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
313                                'name': sliver['name'],
314                                'type': 'plab-vserver',
315                                'tags': sliver_pltags,
316                                })
317         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
318         if sliver['urn'] in sliver_allocations:
319             rspec_node['client_id'] = sliver_allocations[
320                 sliver['urn']].client_id
321             if sliver_allocations[sliver['urn']].component_id:
322                 rspec_node['component_id'] = sliver_allocations[
323                     sliver['urn']].component_id
324         rspec_node['slivers'] = [rspec_sliver]
325
326         # slivers always provide the ssh service
327         login = Login({'authentication': 'ssh-keys',
328                        'hostname': sliver['hostname'],
329                        'port': '22',
330                        'username': sliver['name'],
331                        'login': sliver['name']
332                        })
333         service = ServicesElement({'login': login,
334                                    'services_user': sliver['services_user']})
335         rspec_node['services'] = [service]
336         return rspec_node
337
338     def get_pltags_by_node_id(self, slice):
339         slice_tag_ids = []
340         slice_tag_ids.extend(slice['slice_tag_ids'])
341         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids},
342                                               ['tagname', 'value', 'node_id', 'nodegroup_id'])
343         # sorted by node_id
344         pltags_dict = defaultdict(list)
345         for tag in tags:
346             # specific to a node
347             if tag['node_id']:
348                 tag['scope'] = 'sliver'
349                 pltags_dict[tag['node_id']].append(PLTag(tag))
350             # restricted to a nodegroup
351             # for now such tags are not exposed to describe
352             # xxx we should also expose the nodegroup name in this case to be
353             # complete..
354             elif tag['nodegroup_id']:
355                 tag['scope'] = 'nodegroup'
356                 pltags_dict['nodegroup'].append(PLTag(tag))
357             # this tag is global to the slice
358             else:
359                 tag['scope'] = 'slice'
360                 pltags_dict['slice-global'].append(PLTag(tag))
361         return pltags_dict
362
363     def get_slice_nodes(self, slice, options=None):
364         if options is None:
365             options = {}
366         nodes_dict = {}
367         filter = {'peer_id': None}
368         tags_filter = {}
369         if slice and slice.get('node_ids'):
370             filter['node_id'] = slice['node_ids']
371         else:
372             # there are no nodes to look up
373             return nodes_dict
374         tags_filter = filter.copy()
375         geni_available = options.get('geni_available')
376         if geni_available == True:
377             filter['boot_state'] = 'boot'
378         nodes = self.driver.shell.GetNodes(filter)
379         for node in nodes:
380             nodes_dict[node['node_id']] = node
381         return nodes_dict
382
383     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
384         if sliver_allocations is None:
385             sliver_allocations = {}
386         if rspec_node['sliver_id'] in sliver_allocations:
387             # set sliver allocation and operational status
388             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
389             if sliver_allocation:
390                 allocation_status = sliver_allocation.allocation_state
391                 if allocation_status == 'geni_allocated':
392                     op_status = 'geni_pending_allocation'
393                 elif allocation_status == 'geni_provisioned':
394                     if rspec_node['boot_state'] == 'boot':
395                         op_status = 'geni_ready'
396                     else:
397                         op_status = 'geni_failed'
398                 else:
399                     op_status = 'geni_unknown'
400             else:
401                 allocation_status = 'geni_unallocated'
402         else:
403             allocation_status = 'geni_unallocated'
404             op_status = 'geni_failed'
405         # required fields
406         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
407                        'geni_expires': rspec_node['expires'],
408                        'geni_allocation_status': allocation_status,
409                        'geni_operational_status': op_status,
410                        'geni_error': '',
411                        }
412         return geni_sliver
413
414     def get_leases(self, slice=None, options=None):
415         if options is None:
416             options = {}
417
418         now = int(time.time())
419         filter = {}
420         filter.update({'clip': now})
421         if slice:
422             filter.update({'name': slice['name']})
423         return_fields = ['lease_id', 'hostname',
424                          'site_id', 'name', 't_from', 't_until']
425         leases = self.driver.shell.GetLeases(filter)
426         grain = self.driver.shell.GetLeaseGranularity()
427
428         site_ids = []
429         for lease in leases:
430             site_ids.append(lease['site_id'])
431
432         # get sites
433         sites_dict = self.get_sites({'site_id': site_ids})
434
435         rspec_leases = []
436         for lease in leases:
437
438             rspec_lease = Lease()
439
440             # xxx how to retrieve site['login_base']
441             site_id = lease['site_id']
442             site = sites_dict[site_id]
443
444             rspec_lease['component_id'] = hrn_to_urn(
445                 self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
446             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
447             slice_urn = hrn_to_urn(slice_hrn, 'slice')
448             rspec_lease['slice_id'] = slice_urn
449             rspec_lease['start_time'] = lease['t_from']
450             rspec_lease['duration'] = (
451                 lease['t_until'] - lease['t_from']) / grain
452             rspec_leases.append(rspec_lease)
453         return rspec_leases
454
455     def list_resources(self, version=None, options=None):
456         if options is None:
457             options = {}
458
459         version_manager = VersionManager()
460         version = version_manager.get_version(version)
461         rspec_version = version_manager._get_version(
462             version.type, version.version, 'ad')
463         rspec = RSpec(version=rspec_version, user_options=options)
464
465         if not options.get('list_leases') or options['list_leases'] != 'leases':
466             # get nodes
467             nodes = self.get_nodes(options)
468             site_ids = []
469             interface_ids = []
470             tag_ids = []
471             nodes_dict = {}
472             for node in nodes:
473                 site_ids.append(node['site_id'])
474                 interface_ids.extend(node['interface_ids'])
475                 tag_ids.extend(node['node_tag_ids'])
476                 nodes_dict[node['node_id']] = node
477             sites = self.get_sites({'site_id': site_ids})
478             interfaces = self.get_interfaces({'interface_id': interface_ids})
479             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
480             pl_initscripts = self.get_pl_initscripts()
481             # convert nodes to rspec nodes
482             rspec_nodes = []
483             for node in nodes:
484                 rspec_node = self.node_to_rspec_node(
485                     node, sites, interfaces, node_tags, pl_initscripts)
486                 rspec_nodes.append(rspec_node)
487             rspec.version.add_nodes(rspec_nodes)
488
489             # add links
490             links = self.get_links(sites, nodes_dict, interfaces)
491             rspec.version.add_links(links)
492
493         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
494             leases = self.get_leases()
495             rspec.version.add_leases(leases)
496
497         return rspec.toxml()
498
499     def describe(self, urns, version=None, options=None):
500         if options is None:
501             options = {}
502         version_manager = VersionManager()
503         version = version_manager.get_version(version)
504         rspec_version = version_manager._get_version(
505             version.type, version.version, 'manifest')
506         rspec = RSpec(version=rspec_version, user_options=options)
507
508         # get slivers
509         geni_slivers = []
510         slivers = self.get_slivers(urns, options)
511         if slivers:
512             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
513         else:
514             rspec_expires = datetime_to_string(utcparse(time.time()))
515         rspec.xml.set('expires',  rspec_expires)
516
517         # lookup the sliver allocations
518         geni_urn = urns[0]
519         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
520         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
521         sliver_allocations = self.driver.api.dbsession().query(
522             SliverAllocation).filter(constraint)
523         sliver_allocation_dict = {}
524         for sliver_allocation in sliver_allocations:
525             geni_urn = sliver_allocation.slice_urn
526             sliver_allocation_dict[
527                 sliver_allocation.sliver_id] = sliver_allocation
528
529         if not options.get('list_leases') or options['list_leases'] != 'leases':
530             # add slivers
531             site_ids = []
532             interface_ids = []
533             tag_ids = []
534             nodes_dict = {}
535             for sliver in slivers:
536                 site_ids.append(sliver['site_id'])
537                 interface_ids.extend(sliver['interface_ids'])
538                 tag_ids.extend(sliver['node_tag_ids'])
539                 nodes_dict[sliver['node_id']] = sliver
540             sites = self.get_sites({'site_id': site_ids})
541             interfaces = self.get_interfaces({'interface_id': interface_ids})
542             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
543             pl_initscripts = self.get_pl_initscripts()
544             rspec_nodes = []
545             for sliver in slivers:
546                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
547                     continue
548                 sliver_pltags = sliver['slice-tags']
549                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, sliver_pltags,
550                                                        pl_initscripts, sliver_allocation_dict)
551                 logger.debug('rspec of type {}'.format(
552                     rspec_node.__class__.__name__))
553                 # manifest node element shouldn't contain available attribute
554                 rspec_node.pop('available')
555                 rspec_nodes.append(rspec_node)
556                 geni_sliver = self.rspec_node_to_geni_sliver(
557                     rspec_node, sliver_allocation_dict)
558                 geni_slivers.append(geni_sliver)
559             rspec.version.add_nodes(rspec_nodes)
560
561             # add sliver defaults
562             #default_sliver = slivers.get(None, [])
563             # if default_sliver:
564             #    default_sliver_attribs = default_sliver.get('tags', [])
565             #    for attrib in default_sliver_attribs:
566             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
567
568             # add links
569             links = self.get_links(sites, nodes_dict, interfaces)
570             rspec.version.add_links(links)
571
572         if not options.get('list_leases') or options['list_leases'] != 'resources':
573             if slivers:
574                 leases = self.get_leases(slivers[0])
575                 rspec.version.add_leases(leases)
576
577         return {'geni_urn': geni_urn,
578                 'geni_rspec': rspec.toxml(),
579                 'geni_slivers': geni_slivers}