325ca180fbfca1e7f12d93c19c15824b63164c33
[sfa.git] / sfa / planetlab / plaggregate.py
1 #!/usr/bin/python
2 from collections import defaultdict
3 from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn, get_authority, get_leaf
4 from sfa.util.sfatime import utcparse, datetime_to_string
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import SliverDoesNotExist
7 from sfa.rspecs.rspec import RSpec
8 from sfa.rspecs.elements.hardware_type import HardwareType
9 from sfa.rspecs.elements.node import NodeElement
10 from sfa.rspecs.elements.link import Link
11 from sfa.rspecs.elements.sliver import Sliver
12 from sfa.rspecs.elements.login import Login
13 from sfa.rspecs.elements.location import Location
14 from sfa.rspecs.elements.interface import Interface
15 from sfa.rspecs.elements.services import ServicesElement
16 from sfa.rspecs.elements.pltag import PLTag
17 from sfa.rspecs.elements.lease import Lease
18 from sfa.rspecs.elements.granularity import Granularity
19 from sfa.rspecs.version_manager import VersionManager
20
21 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn
22 from sfa.planetlab.vlink import get_tc_rate
23 from sfa.planetlab.topology import Topology
24 from sfa.storage.model import SliverAllocation
25
26
27 import time
28
29
30 class PlAggregate:
31
32     def __init__(self, driver):
33         self.driver = driver
34
35     def get_nodes(self, options=None):
36         if options is None:
37             options = {}
38         filter = {'peer_id': None}
39         geni_available = options.get('geni_available')
40         if geni_available == True:
41             filter['boot_state'] = 'boot'
42         nodes = self.driver.shell.GetNodes(filter)
43
44         return nodes
45
46     def get_sites(self, filter=None):
47         if filter is None:
48             filter = {}
49         sites = {}
50         for site in self.driver.shell.GetSites(filter):
51             sites[site['site_id']] = site
52         return sites
53
54     def get_interfaces(self, filter=None):
55         if filter is None:
56             filter = {}
57         interfaces = {}
58         for interface in self.driver.shell.GetInterfaces(filter):
59             iface = Interface()
60             if interface['bwlimit']:
61                 interface['bwlimit'] = str(int(interface['bwlimit']) / 1000)
62             interfaces[interface['interface_id']] = interface
63         return interfaces
64
65     def get_links(self, sites, nodes, interfaces):
66
67         topology = Topology()
68         links = []
69         for (site_id1, site_id2) in topology:
70             site_id1 = int(site_id1)
71             site_id2 = int(site_id2)
72             link = Link()
73             if not site_id1 in sites or site_id2 not in sites:
74                 continue
75             site1 = sites[site_id1]
76             site2 = sites[site_id2]
77             # get hrns
78             site1_hrn = self.driver.hrn + '.' + site1['login_base']
79             site2_hrn = self.driver.hrn + '.' + site2['login_base']
80
81             for s1_node_id in site1['node_ids']:
82                 for s2_node_id in site2['node_ids']:
83                     if s1_node_id not in nodes or s2_node_id not in nodes:
84                         continue
85                     node1 = nodes[s1_node_id]
86                     node2 = nodes[s2_node_id]
87                     # set interfaces
88                     # just get first interface of the first node
89                     if1_xrn = PlXrn(auth=self.driver.hrn,
90                                     interface='node%s:eth0' % (node1['node_id']))
91                     if1_ipv4 = interfaces[node1['interface_ids'][0]]['ip']
92                     if2_xrn = PlXrn(auth=self.driver.hrn,
93                                     interface='node%s:eth0' % (node2['node_id']))
94                     if2_ipv4 = interfaces[node2['interface_ids'][0]]['ip']
95
96                     if1 = Interface(
97                         {'component_id': if1_xrn.urn, 'ipv4': if1_ipv4})
98                     if2 = Interface(
99                         {'component_id': if2_xrn.urn, 'ipv4': if2_ipv4})
100
101                     # set link
102                     link = Link({'capacity': '1000000', 'latency': '0',
103                                  'packet_loss': '0', 'type': 'ipv4'})
104                     link['interface1'] = if1
105                     link['interface2'] = if2
106                     link['component_name'] = "%s:%s" % (
107                         site1['login_base'], site2['login_base'])
108                     link['component_id'] = PlXrn(auth=self.driver.hrn, interface=link[
109                                                  'component_name']).get_urn()
110                     link['component_manager_id'] = hrn_to_urn(
111                         self.driver.hrn, 'authority+am')
112                     links.append(link)
113
114         return links
115
116     def get_node_tags(self, filter=None):
117         if filter is None:
118             filter = {}
119         node_tags = {}
120         for node_tag in self.driver.shell.GetNodeTags(filter, ['tagname', 'value', 'node_id', 'node_tag_id']):
121             node_tags[node_tag['node_tag_id']] = node_tag
122         return node_tags
123
124     def get_pl_initscripts(self, filter=None):
125         if filter is None:
126             filter = {}
127         pl_initscripts = {}
128         filter.update({'enabled': True})
129         for initscript in self.driver.shell.GetInitScripts(filter):
130             pl_initscripts[initscript['initscript_id']] = initscript
131         return pl_initscripts
132
133     def get_slivers(self, urns, options=None):
134         if options is None:
135             options = {}
136         names = set()
137         slice_ids = set()
138         node_ids = []
139         slice_hrn = None
140         for urn in urns:
141             xrn = PlXrn(xrn=urn)
142             if xrn.type == 'sliver':
143                  # id: slice_id-node_id
144                 try:
145                     sliver_id_parts = xrn.get_sliver_id_parts()
146                     slice_id = int(sliver_id_parts[0])
147                     node_id = int(sliver_id_parts[1])
148                     slice_ids.add(slice_id)
149                     node_ids.append(node_id)
150                 except ValueError:
151                     pass
152             else:
153                 slice_hrn = xrn.get_hrn()
154
155         filter = {}
156         filter['peer_id'] = None
157         if slice_ids:
158             filter['slice_id'] = list(slice_ids)
159         # get all slices
160         fields = ['slice_id', 'name', 'hrn', 'person_ids',
161                   'node_ids', 'slice_tag_ids', 'expires']
162         all_slices = self.driver.shell.GetSlices(filter, fields)
163         if slice_hrn:
164             slices = [slice for slice in all_slices if slice[
165                 'hrn'] == slice_hrn]
166         else:
167             slices = all_slices
168
169         if not slices:
170             if slice_hrn:
171                 logger.error(
172                     "PlAggregate.get_slivers : no slice found with hrn {}".format(slice_hrn))
173             else:
174                 logger.error(
175                     "PlAggregate.get_slivers : no sliver found with urns {}".format(urns))
176             return []
177         slice = slices[0]
178         slice['hrn'] = slice_hrn
179
180         # get sliver users
181         persons = []
182         person_ids = []
183         for slice in slices:
184             person_ids.extend(slice['person_ids'])
185         if person_ids:
186             persons = self.driver.shell.GetPersons(person_ids)
187
188         # get user keys
189         keys = {}
190         key_ids = []
191         for person in persons:
192             key_ids.extend(person['key_ids'])
193
194         if key_ids:
195             key_list = self.driver.shell.GetKeys(key_ids)
196             for key in key_list:
197                 keys[key['key_id']] = key
198
199         # construct user key info
200         users = []
201         for person in persons:
202             person_urn = hrn_to_urn(self.driver.shell.GetPersonHrn(
203                 int(person['person_id'])), 'user')
204             user = {
205                 'login': slice['name'],
206                 'user_urn': person_urn,
207                 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys]
208             }
209             users.append(user)
210
211         if node_ids:
212             node_ids = [
213                 node_id for node_id in node_ids if node_id in slice['node_ids']]
214             slice['node_ids'] = node_ids
215         pltags_dict = self.get_pltags_by_node_id(slice)
216         nodes_dict = self.get_slice_nodes(slice, options)
217         slivers = []
218         for node in nodes_dict.values():
219             node.update(slice)
220             # slice-global tags
221             node['slice-tags'] = pltags_dict['slice-global']
222             # xxx
223             # this is where we chould maybe add the nodegroup slice tags,
224             # but it's tedious...
225             # xxx
226             # sliver tags
227             node['slice-tags'] += pltags_dict[node['node_id']]
228             sliver_hrn = '%s.%s-%s' % (self.driver.hrn,
229                                        slice['slice_id'], node['node_id'])
230             node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn
231             node['urn'] = node['sliver_id']
232             node['services_user'] = users
233             slivers.append(node)
234         if not slivers:
235             logger.warning(
236                 "PlAggregate.get_slivers : slice(s) found but with no sliver {}".format(urns))
237         return slivers
238
239     def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=None, grain=None, options=None):
240         if pl_initscripts is None:
241             pl_initscripts = []
242         if options is None:
243             options = {}
244         rspec_node = NodeElement()
245         # xxx how to retrieve site['login_base']
246         site = sites[node['site_id']]
247         rspec_node['component_id'] = hostname_to_urn(
248             self.driver.hrn, site['login_base'], node['hostname'])
249         rspec_node['component_name'] = node['hostname']
250         rspec_node['component_manager_id'] = Xrn(
251             self.driver.hrn, 'authority+cm').get_urn()
252         rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(
253             self.driver.hrn, site['login_base']), 'authority+sa')
254         # do not include boot state (<available> element) in the manifest rspec
255         rspec_node['boot_state'] = node['boot_state']
256         if node['boot_state'] == 'boot':
257             rspec_node['available'] = 'true'
258         else:
259             rspec_node['available'] = 'false'
260
261         # distinguish between Shared and Reservable nodes
262         if node['node_type'] == 'reservable':
263             rspec_node['exclusive'] = 'true'
264         else:
265             rspec_node['exclusive'] = 'false'
266
267         rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}),
268                                         HardwareType({'name': 'pc'})]
269         # only doing this because protogeni rspec needs
270         # to advertise available initscripts
271         rspec_node['pl_initscripts'] = pl_initscripts.values()
272         # add site/interface info to nodes.
273         # assumes that sites, interfaces and tags have already been prepared.
274         if site['longitude'] and site['latitude']:
275             location_dict = {
276                 'longitude': site['longitude'],
277                 'latitude': site['latitude'],
278             }
279             for extra in ('country', 'city'):
280                 try:
281                     tts = self.driver.shell.GetSiteTags({
282                         'site_id' : site['site_id'],
283                         'tagname' : extra,
284                     })
285                     location_dict[extra] = tts[0]['value']
286                 except:
287                     logger.log_exc('extra = {}'.format(extra))
288                     location_dict[extra] = 'unknown'
289             location = Location(location_dict)
290             rspec_node['location'] = location
291         # Granularity
292         granularity = Granularity({'grain': grain})
293         rspec_node['granularity'] = granularity
294         rspec_node['interfaces'] = []
295         if_count = 0
296         for if_id in node['interface_ids']:
297             interface = Interface(interfaces[if_id])
298             interface['ipv4'] = interface['ip']
299             interface['component_id'] = PlXrn(auth=self.driver.hrn,
300                                               interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn()
301             # interfaces in the manifest need a client id
302             if slice:
303                 interface['client_id'] = "%s:%s" % (node['node_id'], if_id)
304             rspec_node['interfaces'].append(interface)
305             if_count += 1
306         # this is what describes a particular node
307         node_level_tags = [PLTag(node_tags[tag_id]) for tag_id in node[
308             'node_tag_ids'] if tag_id in node_tags]
309         rspec_node['tags'] = node_level_tags
310         return rspec_node
311
312     def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, sliver_pltags,
313                              pl_initscripts, sliver_allocations):
314         # get the granularity in second for the reservation system
315         grain = self.driver.shell.GetLeaseGranularity()
316         rspec_node = self.node_to_rspec_node(
317             sliver, sites, interfaces, node_tags, pl_initscripts, grain)
318         for pltag in sliver_pltags:
319             logger.debug("Need to expose {}".format(pltag))
320         # xxx how to retrieve site['login_base']
321         rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires']))
322         # remove interfaces from manifest
323         rspec_node['interfaces'] = []
324         # add sliver info
325         rspec_sliver = Sliver({'sliver_id': sliver['urn'],
326                                'name': sliver['name'],
327                                'type': 'plab-vserver',
328                                'tags': sliver_pltags,
329                                })
330         rspec_node['sliver_id'] = rspec_sliver['sliver_id']
331         if sliver['urn'] in sliver_allocations:
332             rspec_node['client_id'] = sliver_allocations[
333                 sliver['urn']].client_id
334             if sliver_allocations[sliver['urn']].component_id:
335                 rspec_node['component_id'] = sliver_allocations[
336                     sliver['urn']].component_id
337         rspec_node['slivers'] = [rspec_sliver]
338
339         # slivers always provide the ssh service
340         login = Login({'authentication': 'ssh-keys',
341                        'hostname': sliver['hostname'],
342                        'port': '22',
343                        'username': sliver['name'],
344                        'login': sliver['name']
345                        })
346         service = ServicesElement({'login': login,
347                                    'services_user': sliver['services_user']})
348         rspec_node['services'] = [service]
349         return rspec_node
350
351     def get_pltags_by_node_id(self, slice):
352         slice_tag_ids = []
353         slice_tag_ids.extend(slice['slice_tag_ids'])
354         tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids},
355                                               ['tagname', 'value', 'node_id', 'nodegroup_id'])
356         # sorted by node_id
357         pltags_dict = defaultdict(list)
358         for tag in tags:
359             # specific to a node
360             if tag['node_id']:
361                 tag['scope'] = 'sliver'
362                 pltags_dict[tag['node_id']].append(PLTag(tag))
363             # restricted to a nodegroup
364             # for now such tags are not exposed to describe
365             # xxx we should also expose the nodegroup name in this case to be
366             # complete..
367             elif tag['nodegroup_id']:
368                 tag['scope'] = 'nodegroup'
369                 pltags_dict['nodegroup'].append(PLTag(tag))
370             # this tag is global to the slice
371             else:
372                 tag['scope'] = 'slice'
373                 pltags_dict['slice-global'].append(PLTag(tag))
374         return pltags_dict
375
376     def get_slice_nodes(self, slice, options=None):
377         if options is None:
378             options = {}
379         nodes_dict = {}
380         filter = {'peer_id': None}
381         tags_filter = {}
382         if slice and slice.get('node_ids'):
383             filter['node_id'] = slice['node_ids']
384         else:
385             # there are no nodes to look up
386             return nodes_dict
387         tags_filter = filter.copy()
388         geni_available = options.get('geni_available')
389         if geni_available == True:
390             filter['boot_state'] = 'boot'
391         nodes = self.driver.shell.GetNodes(filter)
392         for node in nodes:
393             nodes_dict[node['node_id']] = node
394         return nodes_dict
395
396     def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations=None):
397         if sliver_allocations is None:
398             sliver_allocations = {}
399         if rspec_node['sliver_id'] in sliver_allocations:
400             # set sliver allocation and operational status
401             sliver_allocation = sliver_allocations[rspec_node['sliver_id']]
402             if sliver_allocation:
403                 allocation_status = sliver_allocation.allocation_state
404                 if allocation_status == 'geni_allocated':
405                     op_status = 'geni_pending_allocation'
406                 elif allocation_status == 'geni_provisioned':
407                     if rspec_node['boot_state'] == 'boot':
408                         op_status = 'geni_ready'
409                     else:
410                         op_status = 'geni_failed'
411                 else:
412                     op_status = 'geni_unknown'
413             else:
414                 allocation_status = 'geni_unallocated'
415         else:
416             allocation_status = 'geni_unallocated'
417             op_status = 'geni_failed'
418         # required fields
419         geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'],
420                        'geni_expires': rspec_node['expires'],
421                        'geni_allocation_status': allocation_status,
422                        'geni_operational_status': op_status,
423                        'geni_error': '',
424                        }
425         return geni_sliver
426
427     def get_leases(self, slice=None, options=None):
428         if options is None:
429             options = {}
430
431         now = int(time.time())
432         filter = {}
433         filter.update({'clip': now})
434         if slice:
435             filter.update({'name': slice['name']})
436         return_fields = ['lease_id', 'hostname',
437                          'site_id', 'name', 't_from', 't_until']
438         leases = self.driver.shell.GetLeases(filter)
439         grain = self.driver.shell.GetLeaseGranularity()
440
441         site_ids = []
442         for lease in leases:
443             site_ids.append(lease['site_id'])
444
445         # get sites
446         sites_dict = self.get_sites({'site_id': site_ids})
447
448         rspec_leases = []
449         for lease in leases:
450
451             rspec_lease = Lease()
452
453             # xxx how to retrieve site['login_base']
454             site_id = lease['site_id']
455             site = sites_dict[site_id]
456
457             rspec_lease['component_id'] = hrn_to_urn(
458                 self.driver.shell.GetNodeHrn(lease['hostname']), 'node')
459             slice_hrn = self.driver.shell.GetSliceHrn(lease['slice_id'])
460             slice_urn = hrn_to_urn(slice_hrn, 'slice')
461             rspec_lease['slice_id'] = slice_urn
462             rspec_lease['start_time'] = lease['t_from']
463             rspec_lease['duration'] = (
464                 lease['t_until'] - lease['t_from']) / grain
465             rspec_leases.append(rspec_lease)
466         return rspec_leases
467
468     def list_resources(self, version=None, options=None):
469         if options is None:
470             options = {}
471
472         version_manager = VersionManager()
473         version = version_manager.get_version(version)
474         rspec_version = version_manager._get_version(
475             version.type, version.version, 'ad')
476         rspec = RSpec(version=rspec_version, user_options=options)
477
478         if not options.get('list_leases') or options['list_leases'] != 'leases':
479             # get nodes
480             nodes = self.get_nodes(options)
481             site_ids = []
482             interface_ids = []
483             tag_ids = []
484             nodes_dict = {}
485             for node in nodes:
486                 site_ids.append(node['site_id'])
487                 interface_ids.extend(node['interface_ids'])
488                 tag_ids.extend(node['node_tag_ids'])
489                 nodes_dict[node['node_id']] = node
490             sites = self.get_sites({'site_id': site_ids})
491             interfaces = self.get_interfaces({'interface_id': interface_ids})
492             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
493             pl_initscripts = self.get_pl_initscripts()
494             # convert nodes to rspec nodes
495             grain = self.driver.shell.GetLeaseGranularity()
496             rspec_nodes = []
497             for node in nodes:
498                 rspec_node = self.node_to_rspec_node(
499                     node, sites, interfaces, node_tags, pl_initscripts, grain)
500                 rspec_nodes.append(rspec_node)
501             rspec.version.add_nodes(rspec_nodes)
502
503             # add links
504             links = self.get_links(sites, nodes_dict, interfaces)
505             rspec.version.add_links(links)
506
507         if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources':
508             leases = self.get_leases()
509             rspec.version.add_leases(leases)
510
511         return rspec.toxml()
512
513     def describe(self, urns, version=None, options=None):
514         if options is None:
515             options = {}
516         version_manager = VersionManager()
517         version = version_manager.get_version(version)
518         rspec_version = version_manager._get_version(
519             version.type, version.version, 'manifest')
520         rspec = RSpec(version=rspec_version, user_options=options)
521
522         # get slivers
523         geni_slivers = []
524         slivers = self.get_slivers(urns, options)
525         if slivers:
526             rspec_expires = datetime_to_string(utcparse(slivers[0]['expires']))
527         else:
528             rspec_expires = datetime_to_string(utcparse(time.time()))
529         rspec.xml.set('expires',  rspec_expires)
530
531         # lookup the sliver allocations
532         geni_urn = urns[0]
533         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
534         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
535         sliver_allocations = self.driver.api.dbsession().query(
536             SliverAllocation).filter(constraint)
537         sliver_allocation_dict = {}
538         for sliver_allocation in sliver_allocations:
539             geni_urn = sliver_allocation.slice_urn
540             sliver_allocation_dict[
541                 sliver_allocation.sliver_id] = sliver_allocation
542
543         if not options.get('list_leases') or options['list_leases'] != 'leases':
544             # add slivers
545             site_ids = []
546             interface_ids = []
547             tag_ids = []
548             nodes_dict = {}
549             for sliver in slivers:
550                 site_ids.append(sliver['site_id'])
551                 interface_ids.extend(sliver['interface_ids'])
552                 tag_ids.extend(sliver['node_tag_ids'])
553                 nodes_dict[sliver['node_id']] = sliver
554             sites = self.get_sites({'site_id': site_ids})
555             interfaces = self.get_interfaces({'interface_id': interface_ids})
556             node_tags = self.get_node_tags({'node_tag_id': tag_ids})
557             pl_initscripts = self.get_pl_initscripts()
558             rspec_nodes = []
559             for sliver in slivers:
560                 if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']:
561                     continue
562                 sliver_pltags = sliver['slice-tags']
563                 rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, sliver_pltags,
564                                                        pl_initscripts, sliver_allocation_dict)
565                 logger.debug('rspec of type {}'.format(
566                     rspec_node.__class__.__name__))
567                 # manifest node element shouldn't contain available attribute
568                 rspec_node.pop('available')
569                 rspec_nodes.append(rspec_node)
570                 geni_sliver = self.rspec_node_to_geni_sliver(
571                     rspec_node, sliver_allocation_dict)
572                 geni_slivers.append(geni_sliver)
573             rspec.version.add_nodes(rspec_nodes)
574
575             # add sliver defaults
576             #default_sliver = slivers.get(None, [])
577             # if default_sliver:
578             #    default_sliver_attribs = default_sliver.get('tags', [])
579             #    for attrib in default_sliver_attribs:
580             #        rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value'])
581
582             # add links
583             links = self.get_links(sites, nodes_dict, interfaces)
584             rspec.version.add_links(links)
585
586         if not options.get('list_leases') or options['list_leases'] != 'resources':
587             if slivers:
588                 leases = self.get_leases(slivers[0])
589                 rspec.version.add_leases(leases)
590
591         return {'geni_urn': geni_urn,
592                 'geni_rspec': rspec.toxml(),
593                 'geni_slivers': geni_slivers}