the hardware_types in a node rspec is now built by
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options=None):
36         if options is None:
37             options = {}
38         filter = {'peer_id': None}
39         geni_available = options.get('geni_available')
40         if geni_available == True:
41             filter['boot_state'] = 'boot'
42         nodes = self.driver.shell.GetNodes(filter)
43
44         return nodes
45
46     def get_sites(self, filter=None):
47         if filter is None:
48             filter = {}
49         sites = {}
50         for site in self.driver.shell.GetSites(filter):
51             sites[site['site_id']] = site
52         return sites
53
54     def get_interfaces(self, filter=None):
55         if filter is None:
56             filter = {}
57         interfaces = {}
58         for interface in self.driver.shell.GetInterfaces(filter):
59             iface = Interface()
60             if interface['bwlimit']:
61                 interface['bwlimit'] = str(int(interface['bwlimit']) / 1000)
62             interfaces[interface['interface_id']] = interface
63         return interfaces
64
65     def get_links(self, sites, nodes, interfaces):
66
67         topology = Topology()
68         links = []
69         for (site_id1, site_id2) in topology:
70             site_id1 = int(site_id1)
71             site_id2 = int(site_id2)
72             link = Link()
73             if not site_id1 in sites or site_id2 not in sites:
74                 continue
75             site1 = sites[site_id1]
76             site2 = sites[site_id2]
77             # get hrns
78             site1_hrn = self.driver.hrn + '.' + site1['login_base']
79             site2_hrn = self.driver.hrn + '.' + site2['login_base']
80
81             for s1_node_id in site1['node_ids']:
82                 for s2_node_id in site2['node_ids']:
83                     if s1_node_id not in nodes or s2_node_id not in nodes:
84                         continue
85                     node1 = nodes[s1_node_id]
86                     node2 = nodes[s2_node_id]
87                     # set interfaces
88                     # just get first interface of the first node
89                     if1_xrn = PlXrn(auth=self.driver.hrn,
90                                     interface='node%s:eth0' % (node1['node_id']))
91                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
92                     if2_xrn = PlXrn(auth=self.driver.hrn,
93                                     interface='node%s:eth0' % (node2['node_id']))
94                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
95
96                     if1 = Interface(
97                         {'component_id': if1_xrn.urn, 'ipv4': if1_ipv4})
98                     if2 = Interface(
99                         {'component_id': if2_xrn.urn, 'ipv4': if2_ipv4})
100
101                     # set link
102                     link = Link({'capacity': '1000000', 'latency': '0',
103                                  'packet_loss': '0', 'type': 'ipv4'})
104                     link['interface1'] = if1
105                     link['interface2'] = if2
106                     link['component_name'] = "%s:%s" % (
107                         site1['login_base'], site2['login_base'])
108                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link[
109                                                  'component_name']).get_urn()
110                     link['component_manager_id'] = hrn_to_urn(
111                         self.driver.hrn, 'authority+am')
112                     links.append(link)
113
114         return links
115
116     def get_node_tags(self, filter=None):
117         if filter is None:
118             filter = {}
119         node_tags = {}
120         for node_tag in self.driver.shell.GetNodeTags(filter, ['tagname', 'value', 'node_id', 'node_tag_id']):
121             node_tags[node_tag['node_tag_id']] = node_tag
122         return node_tags
123
124     def get_pl_initscripts(self, filter=None):
125         if filter is None:
126             filter = {}
127         pl_initscripts = {}
128         filter.update({'enabled': True})
129         for initscript in self.driver.shell.GetInitScripts(filter):
130             pl_initscripts[initscript['initscript_id']] = initscript
131         return pl_initscripts
132
133     def get_slivers(self, urns, options=None):
134         if options is None:
135             options = {}
136         names = set()
137         slice_ids = set()
138         node_ids = []
139         slice_hrn = None
140         for urn in urns:
141             xrn = PlXrn(xrn=urn)
142             if xrn.type == 'sliver':
143                  # id: slice_id-node_id
144                 try:
145                     sliver_id_parts = xrn.get_sliver_id_parts()
146                     slice_id = int(sliver_id_parts[0])
147                     node_id = int(sliver_id_parts[1])
148                     slice_ids.add(slice_id)
149                     node_ids.append(node_id)
150                 except ValueError:
151                     pass
152             else:
153                 slice_hrn = xrn.get_hrn()
154
155         filter = {}
156         filter['peer_id'] = None
157         if slice_ids:
158             filter['slice_id'] = list(slice_ids)
159         # get all slices
160         fields = ['slice_id', 'name', 'hrn', 'person_ids',
161                   'node_ids', 'slice_tag_ids', 'expires']
162         all_slices = self.driver.shell.GetSlices(filter, fields)
163         if slice_hrn:
164             slices = [slice for slice in all_slices if slice[
165                 'hrn'] == slice_hrn]
166         else:
167             slices = all_slices
168
169         if not slices:
170             if slice_hrn:
171                 logger.error(
172                     "PlAggregate.get_slivers : no slice found with hrn {}".format(slice_hrn))
173             else:
174                 logger.error(
175                     "PlAggregate.get_slivers : no sliver found with urns {}".format(urns))
176             return []
177         slice = slices[0]
178         slice['hrn'] = slice_hrn
179
180         # get sliver users
181         persons = []
182         person_ids = []
183         for slice in slices:
184             person_ids.extend(slice['person_ids'])
185         if person_ids:
186             persons = self.driver.shell.GetPersons(person_ids)
187
188         # get user keys
189         keys = {}
190         key_ids = []
191         for person in persons:
192             key_ids.extend(person['key_ids'])
193
194         if key_ids:
195             key_list = self.driver.shell.GetKeys(key_ids)
196             for key in key_list:
197                 keys[key['key_id']] = key
198
199         # construct user key info
200         users = []
201         for person in persons:
202             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(
203                 int(person['person_id'])), 'user')
204             user = {
205                 'login': slice['name'],
206                 'user_urn': person_urn,
207                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
208             }
209             users.append(user)
210
211         if node_ids:
212             node_ids = [
213                 node_id for node_id in node_ids if node_id in slice['node_ids']]
214             slice['node_ids'] = node_ids
215         pltags_dict = self.get_pltags_by_node_id(slice)
216         nodes_dict = self.get_slice_nodes(slice, options)
217         slivers = []
218         for node in nodes_dict.values():
219             node.update(slice)
220             # slice-global tags
221             node['slice-tags'] = pltags_dict['slice-global']
222             # xxx
223             # this is where we chould maybe add the nodegroup slice tags,
224             # but it's tedious...
225             # xxx
226             # sliver tags
227             node['slice-tags'] += pltags_dict[node['node_id']]
228             sliver_hrn = '%s.%s-%s' % (self.driver.hrn,
229                                        slice['slice_id'], node['node_id'])
230             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
231             node['urn'] = node['sliver_id']
232             node['services_user'] = users
233             slivers.append(node)
234         if not slivers:
235             logger.warning(
236                 "PlAggregate.get_slivers : slice(s) found but with no sliver {}".format(urns))
237         return slivers
238
239     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
240         if pl_initscripts is None:
241             pl_initscripts = []
242         if options is None:
243             options = {}
244         rspec_node = NodeElement()
245         # xxx how to retrieve site['login_base']
246         site = sites[node['site_id']]
247         rspec_node['component_id'] = hostname_to_urn(
248             self.driver.hrn, site['login_base'], node['hostname'])
249         rspec_node['component_name'] = node['hostname']
250         rspec_node['component_manager_id'] = Xrn(
251             self.driver.hrn, 'authority+cm').get_urn()
252         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(
253             self.driver.hrn, site['login_base']), 'authority+sa')
254         # do not include boot state (<available> element) in the manifest rspec
255         rspec_node['boot_state'] = node['boot_state']
256         if node['boot_state'] == 'boot':
257             rspec_node['available'] = 'true'
258         else:
259             rspec_node['available'] = 'false'
260
261         # distinguish between Shared and Reservable nodes
262         if node['node_type'] == 'reservable':
263             rspec_node['exclusive'] = 'true'
264         else:
265             rspec_node['exclusive'] = 'false'
266
267         # expose hardware_types from the hardware_type tag if
268         # set on node
269         tags = self.driver.shell.GetNodeTags({
270             'node_id': node['node_id'],
271             'tagname': 'hardware_type',
272         })
273         if tags:
274             rspec_node['hardware_types'] = [
275                 HardwareType({'name': tags[0]['value']}),
276             ]
277         else:
278             rspec_node['hardware_types'] = [
279                 HardwareType({'name': 'plab-pc'}),
280                 HardwareType({'name': 'pc'})
281         ]
282         # only doing this because protogeni rspec needs
283         # to advertise available initscripts
284         rspec_node['pl_initscripts'] = pl_initscripts.values()
285         # add site/interface info to nodes.
286         # assumes that sites, interfaces and tags have already been prepared.
287         if site['longitude'] and site['latitude']:
288             location_dict = {
289                 'longitude': site['longitude'],
290                 'latitude': site['latitude'],
291             }
292             for extra in ('country', 'city'):
293                 try:
294                     tags = self.driver.shell.GetSiteTags({
295                         'site_id' : site['site_id'],
296                         'tagname' : extra,
297                     })
298                     location_dict[extra] = tags[0]['value']
299                 except:
300                     logger.log_exc('extra = {}'.format(extra))
301                     location_dict[extra] = 'unknown'
302             location = Location(location_dict)
303             rspec_node['location'] = location
304         # Granularity
305         granularity = Granularity({'grain': grain})
306         rspec_node['granularity'] = granularity
307         rspec_node['interfaces'] = []
308         if_count = 0
309         for if_id in node['interface_ids']:
310             interface = Interface(interfaces[if_id])
311             interface['ipv4'] = interface['ip']
312             interface['component_id'] = PlXrn(auth=self.driver.hrn,
313                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
314             # interfaces in the manifest need a client id
315             if slice:
316                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
317             rspec_node['interfaces'].append(interface)
318             if_count += 1
319         # this is what describes a particular node
320         node_level_tags = [PLTag(node_tags[tag_id]) for tag_id in node[
321             'node_tag_ids'] if tag_id in node_tags]
322         rspec_node['tags'] = node_level_tags
323         return rspec_node
324
325     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, sliver_pltags,
326                              pl_initscripts, sliver_allocations):
327         # get the granularity in second for the reservation system
328         grain = self.driver.shell.GetLeaseGranularity()
329         rspec_node = self.node_to_rspec_node(
330             sliver, sites, interfaces, node_tags, pl_initscripts, grain)
331         for pltag in sliver_pltags:
332             logger.debug("Need to expose {}".format(pltag))
333         # xxx how to retrieve site['login_base']
334         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
335         # remove interfaces from manifest
336         rspec_node['interfaces'] = []
337         # add sliver info
338         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
339                                'name': sliver['name'],
340                                'type': 'plab-vserver',
341                                'tags': sliver_pltags,
342                                })
343         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
344         if sliver['urn'] in sliver_allocations:
345             rspec_node['client_id'] = sliver_allocations[
346                 sliver['urn']].client_id
347             if sliver_allocations[sliver['urn']].component_id:
348                 rspec_node['component_id'] = sliver_allocations[
349                     sliver['urn']].component_id
350         rspec_node['slivers'] = [rspec_sliver]
351
352         # slivers always provide the ssh service
353         login = Login({'authentication': 'ssh-keys',
354                        'hostname': sliver['hostname'],
355                        'port': '22',
356                        'username': sliver['name'],
357                        'login': sliver['name']
358                        })
359         service = ServicesElement({'login': login,
360                                    'services_user': sliver['services_user']})
361         rspec_node['services'] = [service]
362         return rspec_node
363
364     def get_pltags_by_node_id(self, slice):
365         slice_tag_ids = []
366         slice_tag_ids.extend(slice['slice_tag_ids'])
367         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids},
368                                               ['tagname', 'value', 'node_id', 'nodegroup_id'])
369         # sorted by node_id
370         pltags_dict = defaultdict(list)
371         for tag in tags:
372             # specific to a node
373             if tag['node_id']:
374                 tag['scope'] = 'sliver'
375                 pltags_dict[tag['node_id']].append(PLTag(tag))
376             # restricted to a nodegroup
377             # for now such tags are not exposed to describe
378             # xxx we should also expose the nodegroup name in this case to be
379             # complete..
380             elif tag['nodegroup_id']:
381                 tag['scope'] = 'nodegroup'
382                 pltags_dict['nodegroup'].append(PLTag(tag))
383             # this tag is global to the slice
384             else:
385                 tag['scope'] = 'slice'
386                 pltags_dict['slice-global'].append(PLTag(tag))
387         return pltags_dict
388
389     def get_slice_nodes(self, slice, options=None):
390         if options is None:
391             options = {}
392         nodes_dict = {}
393         filter = {'peer_id': None}
394         tags_filter = {}
395         if slice and slice.get('node_ids'):
396             filter['node_id'] = slice['node_ids']
397         else:
398             # there are no nodes to look up
399             return nodes_dict
400         tags_filter = filter.copy()
401         geni_available = options.get('geni_available')
402         if geni_available == True:
403             filter['boot_state'] = 'boot'
404         nodes = self.driver.shell.GetNodes(filter)
405         for node in nodes:
406             nodes_dict[node['node_id']] = node
407         return nodes_dict
408
409     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
410         if sliver_allocations is None:
411             sliver_allocations = {}
412         if rspec_node['sliver_id'] in sliver_allocations:
413             # set sliver allocation and operational status
414             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
415             if sliver_allocation:
416                 allocation_status = sliver_allocation.allocation_state
417                 if allocation_status == 'geni_allocated':
418                     op_status = 'geni_pending_allocation'
419                 elif allocation_status == 'geni_provisioned':
420                     if rspec_node['boot_state'] == 'boot':
421                         op_status = 'geni_ready'
422                     else:
423                         op_status = 'geni_failed'
424                 else:
425                     op_status = 'geni_unknown'
426             else:
427                 allocation_status = 'geni_unallocated'
428         else:
429             allocation_status = 'geni_unallocated'
430             op_status = 'geni_failed'
431         # required fields
432         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
433                        'geni_expires': rspec_node['expires'],
434                        'geni_allocation_status': allocation_status,
435                        'geni_operational_status': op_status,
436                        'geni_error': '',
437                        }
438         return geni_sliver
439
440     def get_leases(self, slice=None, options=None):
441         if options is None:
442             options = {}
443
444         now = int(time.time())
445         filter = {}
446         filter.update({'clip': now})
447         if slice:
448             filter.update({'name': slice['name']})
449         return_fields = ['lease_id', 'hostname',
450                          'site_id', 'name', 't_from', 't_until']
451         leases = self.driver.shell.GetLeases(filter)
452         grain = self.driver.shell.GetLeaseGranularity()
453
454         site_ids = []
455         for lease in leases:
456             site_ids.append(lease['site_id'])
457
458         # get sites
459         sites_dict = self.get_sites({'site_id': site_ids})
460
461         rspec_leases = []
462         for lease in leases:
463
464             rspec_lease = Lease()
465
466             # xxx how to retrieve site['login_base']
467             site_id = lease['site_id']
468             site = sites_dict[site_id]
469
470             rspec_lease['component_id'] = hrn_to_urn(
471                 self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
472             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
473             slice_urn = hrn_to_urn(slice_hrn, 'slice')
474             rspec_lease['slice_id'] = slice_urn
475             rspec_lease['start_time'] = lease['t_from']
476             rspec_lease['duration'] = (
477                 lease['t_until'] - lease['t_from']) / grain
478             rspec_leases.append(rspec_lease)
479         return rspec_leases
480
481     def list_resources(self, version=None, options=None):
482         if options is None:
483             options = {}
484
485         version_manager = VersionManager()
486         version = version_manager.get_version(version)
487         rspec_version = version_manager._get_version(
488             version.type, version.version, 'ad')
489         rspec = RSpec(version=rspec_version, user_options=options)
490
491         if not options.get('list_leases') or options['list_leases'] != 'leases':
492             # get nodes
493             nodes = self.get_nodes(options)
494             site_ids = []
495             interface_ids = []
496             tag_ids = []
497             nodes_dict = {}
498             for node in nodes:
499                 site_ids.append(node['site_id'])
500                 interface_ids.extend(node['interface_ids'])
501                 tag_ids.extend(node['node_tag_ids'])
502                 nodes_dict[node['node_id']] = node
503             sites = self.get_sites({'site_id': site_ids})
504             interfaces = self.get_interfaces({'interface_id': interface_ids})
505             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
506             pl_initscripts = self.get_pl_initscripts()
507             # convert nodes to rspec nodes
508             grain = self.driver.shell.GetLeaseGranularity()
509             rspec_nodes = []
510             for node in nodes:
511                 rspec_node = self.node_to_rspec_node(
512                     node, sites, interfaces, node_tags, pl_initscripts, grain)
513                 rspec_nodes.append(rspec_node)
514             rspec.version.add_nodes(rspec_nodes)
515
516             # add links
517             links = self.get_links(sites, nodes_dict, interfaces)
518             rspec.version.add_links(links)
519
520         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
521             leases = self.get_leases()
522             rspec.version.add_leases(leases)
523
524         return rspec.toxml()
525
526     def describe(self, urns, version=None, options=None):
527         if options is None:
528             options = {}
529         version_manager = VersionManager()
530         version = version_manager.get_version(version)
531         rspec_version = version_manager._get_version(
532             version.type, version.version, 'manifest')
533         rspec = RSpec(version=rspec_version, user_options=options)
534
535         # get slivers
536         geni_slivers = []
537         slivers = self.get_slivers(urns, options)
538         if slivers:
539             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
540         else:
541             rspec_expires = datetime_to_string(utcparse(time.time()))
542         rspec.xml.set('expires',  rspec_expires)
543
544         # lookup the sliver allocations
545         geni_urn = urns[0]
546         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
547         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
548         sliver_allocations = self.driver.api.dbsession().query(
549             SliverAllocation).filter(constraint)
550         sliver_allocation_dict = {}
551         for sliver_allocation in sliver_allocations:
552             geni_urn = sliver_allocation.slice_urn
553             sliver_allocation_dict[
554                 sliver_allocation.sliver_id] = sliver_allocation
555
556         if not options.get('list_leases') or options['list_leases'] != 'leases':
557             # add slivers
558             site_ids = []
559             interface_ids = []
560             tag_ids = []
561             nodes_dict = {}
562             for sliver in slivers:
563                 site_ids.append(sliver['site_id'])
564                 interface_ids.extend(sliver['interface_ids'])
565                 tag_ids.extend(sliver['node_tag_ids'])
566                 nodes_dict[sliver['node_id']] = sliver
567             sites = self.get_sites({'site_id': site_ids})
568             interfaces = self.get_interfaces({'interface_id': interface_ids})
569             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
570             pl_initscripts = self.get_pl_initscripts()
571             rspec_nodes = []
572             for sliver in slivers:
573                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
574                     continue
575                 sliver_pltags = sliver['slice-tags']
576                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, sliver_pltags,
577                                                        pl_initscripts, sliver_allocation_dict)
578                 logger.debug('rspec of type {}'.format(
579                     rspec_node.__class__.__name__))
580                 # manifest node element shouldn't contain available attribute
581                 rspec_node.pop('available')
582                 rspec_nodes.append(rspec_node)
583                 geni_sliver = self.rspec_node_to_geni_sliver(
584                     rspec_node, sliver_allocation_dict)
585                 geni_slivers.append(geni_sliver)
586             rspec.version.add_nodes(rspec_nodes)
587
588             # add sliver defaults
589             #default_sliver = slivers.get(None, [])
590             # if default_sliver:
591             #    default_sliver_attribs = default_sliver.get('tags', [])
592             #    for attrib in default_sliver_attribs:
593             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
594
595             # add links
596             links = self.get_links(sites, nodes_dict, interfaces)
597             rspec.version.add_links(links)
598
599         if not options.get('list_leases') or options['list_leases'] != 'resources':
600             if slivers:
601                 leases = self.get_leases(slivers[0])
602                 rspec.version.add_leases(leases)
603
604         return {'geni_urn': geni_urn,
605                 'geni_rspec': rspec.toxml(),
606                 'geni_slivers': geni_slivers}