Cross-connection recovery among PlanetLab instances
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 import application
21
22 class UnresponsiveNodeError(RuntimeError):
23     pass
24
25 def _castproperty(typ, propattr):
26     def _get(self):
27         return getattr(self, propattr)
28     def _set(self, value):
29         if value is not None or (isinstance(value, basestring) and not value):
30             value = typ(value)
31         return setattr(self, propattr, value)
32     def _del(self, value):
33         return delattr(self, propattr)
34     _get.__name__ = propattr + '_get'
35     _set.__name__ = propattr + '_set'
36     _del.__name__ = propattr + '_del'
37     return property(_get, _set, _del)
38
39 class Node(object):
40     BASEFILTERS = {
41         # Map Node attribute to plcapi filter name
42         'hostname' : 'hostname',
43     }
44     
45     TAGFILTERS = {
46         # Map Node attribute to (<tag name>, <plcapi filter expression>)
47         #   There are replacements that are applied with string formatting,
48         #   so '%' has to be escaped as '%%'.
49         'architecture' : ('arch','value'),
50         'operatingSystem' : ('fcdistro','value'),
51         'pl_distro' : ('pldistro','value'),
52         'minReliability' : ('reliability%(timeframe)s', ']value'),
53         'maxReliability' : ('reliability%(timeframe)s', '[value'),
54         'minBandwidth' : ('bw%(timeframe)s', ']value'),
55         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
56     }    
57     
58     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
59     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
60     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
61     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
62     
63     minReliability = _castproperty(float, '_minReliability')
64     maxReliability = _castproperty(float, '_maxReliability')
65     minBandwidth = _castproperty(float, '_minBandwidth')
66     maxBandwidth = _castproperty(float, '_maxBandwidth')
67     
68     def __init__(self, api=None):
69         if not api:
70             api = plcapi.PLCAPI()
71         self._api = api
72         
73         # Attributes
74         self.hostname = None
75         self.architecture = None
76         self.operatingSystem = None
77         self.pl_distro = None
78         self.site = None
79         self.minReliability = None
80         self.maxReliability = None
81         self.minBandwidth = None
82         self.maxBandwidth = None
83         self.min_num_external_ifaces = None
84         self.max_num_external_ifaces = None
85         self.timeframe = 'm'
86         
87         # Applications and routes add requirements to connected nodes
88         self.required_packages = set()
89         self.required_vsys = set()
90         self.pythonpath = []
91         self.rpmFusion = False
92         self.env = collections.defaultdict(list)
93         
94         # Testbed-derived attributes
95         self.slicename = None
96         self.ident_path = None
97         self.server_key = None
98         self.home_path = None
99         
100         # Those are filled when an actual node is allocated
101         self._node_id = None
102         self._yum_dependencies = None
103         self._installed = False
104
105         # Logging
106         self._logger = logging.getLogger('nepi.testbeds.planetlab')
107     
108     def _nepi_testbed_environment_setup_get(self):
109         command = cStringIO.StringIO()
110         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
111             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
112         ))
113         command.write(' ; export PATH=$PATH:%s' % (
114             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
115         ))
116         if self.env:
117             for envkey, envvals in self.env.iteritems():
118                 for envval in envvals:
119                     command.write(' ; export %s=%s' % (envkey, envval))
120         return command.getvalue()
121     def _nepi_testbed_environment_setup_set(self, value):
122         pass
123     _nepi_testbed_environment_setup = property(
124         _nepi_testbed_environment_setup_get,
125         _nepi_testbed_environment_setup_set)
126     
127     def build_filters(self, target_filters, filter_map):
128         for attr, tag in filter_map.iteritems():
129             value = getattr(self, attr, None)
130             if value is not None:
131                 target_filters[tag] = value
132         return target_filters
133     
134     @property
135     def applicable_filters(self):
136         has = lambda att : getattr(self,att,None) is not None
137         return (
138             filter(has, self.BASEFILTERS.iterkeys())
139             + filter(has, self.TAGFILTERS.iterkeys())
140         )
141     
142     def find_candidates(self, filter_slice_id=None):
143         self._logger.info("Finding candidates for %s", self.make_filter_description())
144         
145         fields = ('node_id',)
146         replacements = {'timeframe':self.timeframe}
147         
148         # get initial candidates (no tag filters)
149         basefilters = self.build_filters({}, self.BASEFILTERS)
150         rootfilters = basefilters.copy()
151         if filter_slice_id:
152             basefilters['|slice_ids'] = (filter_slice_id,)
153         
154         # only pick healthy nodes
155         basefilters['run_level'] = 'boot'
156         basefilters['boot_state'] = 'boot'
157         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
158         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
159         
160         # keyword-only "pseudofilters"
161         extra = {}
162         if self.site:
163             extra['peer'] = self.site
164             
165         candidates = set(map(operator.itemgetter('node_id'), 
166             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
167         
168         # filter by tag, one tag at a time
169         applicable = self.applicable_filters
170         for tagfilter in self.TAGFILTERS.iteritems():
171             attr, (tagname, expr) = tagfilter
172             
173             # don't bother if there's no filter defined
174             if attr in applicable:
175                 tagfilter = rootfilters.copy()
176                 tagfilter['tagname'] = tagname % replacements
177                 tagfilter[expr % replacements] = getattr(self,attr)
178                 tagfilter['node_id'] = list(candidates)
179                 
180                 candidates &= set(map(operator.itemgetter('node_id'),
181                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
182         
183         # filter by vsys tags - special case since it doesn't follow
184         # the usual semantics
185         if self.required_vsys:
186             newcandidates = collections.defaultdict(set)
187             
188             vsys_tags = self._api.GetNodeTags(
189                 tagname='vsys', 
190                 node_id = list(candidates), 
191                 fields = ['node_id','value'])
192             
193             vsys_tags = map(
194                 operator.itemgetter(['node_id','value']),
195                 vsys_tags)
196             
197             required_vsys = self.required_vsys
198             for node_id, value in vsys_tags:
199                 if value in required_vsys:
200                     newcandidates[value].add(node_id)
201             
202             # take only those that have all the required vsys tags
203             newcandidates = reduce(
204                 lambda accum, new : accum & new,
205                 newcandidates.itervalues(),
206                 candidates)
207         
208         # filter by iface count
209         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
210             # fetch interfaces for all, in one go
211             filters = basefilters.copy()
212             filters['node_id'] = list(candidates)
213             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
214                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
215             
216             # filter candidates by interface count
217             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
218                 predicate = ( lambda node_id : 
219                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
220             elif self.min_num_external_ifaces is not None:
221                 predicate = ( lambda node_id : 
222                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
223             else:
224                 predicate = ( lambda node_id : 
225                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
226             
227             candidates = set(filter(predicate, candidates))
228         
229         # make sure hostnames are resolvable
230         if candidates:
231             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
232             
233             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
234                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
235             ))
236             def resolvable(node_id):
237                 try:
238                     addr = socket.gethostbyname(hostnames[node_id])
239                     return addr is not None
240                 except:
241                     return False
242             candidates = set(parallel.pfilter(resolvable, candidates,
243                 maxthreads = 16))
244
245             self._logger.info("  Found %s reachable candidates.", len(candidates))
246             
247         return candidates
248     
249     def make_filter_description(self):
250         """
251         Makes a human-readable description of filtering conditions
252         for find_candidates.
253         """
254         
255         # get initial candidates (no tag filters)
256         filters = self.build_filters({}, self.BASEFILTERS)
257         
258         # keyword-only "pseudofilters"
259         if self.site:
260             filters['peer'] = self.site
261             
262         # filter by tag, one tag at a time
263         applicable = self.applicable_filters
264         for tagfilter in self.TAGFILTERS.iteritems():
265             attr, (tagname, expr) = tagfilter
266             
267             # don't bother if there's no filter defined
268             if attr in applicable:
269                 filters[attr] = getattr(self,attr)
270         
271         # filter by vsys tags - special case since it doesn't follow
272         # the usual semantics
273         if self.required_vsys:
274             filters['vsys'] = ','.join(list(self.required_vsys))
275         
276         # filter by iface count
277         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
278             filters['num_ifaces'] = '-'.join([
279                 str(self.min_num_external_ifaces or '0'),
280                 str(self.max_num_external_ifaces or 'inf')
281             ])
282             
283         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
284
285     def assign_node_id(self, node_id):
286         self._node_id = node_id
287         self.fetch_node_info()
288     
289     def unassign_node(self):
290         self._node_id = None
291         self.__dict__.update(self.__orig_attrs)
292     
293     def fetch_node_info(self):
294         orig_attrs = {}
295         
296         info = self._api.GetNodes(self._node_id)[0]
297         tags = dict( (t['tagname'],t['value'])
298                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
299
300         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
301         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
302         self.min_num_external_ifaces = None
303         self.max_num_external_ifaces = None
304         self.timeframe = 'm'
305         
306         replacements = {'timeframe':self.timeframe}
307         for attr, tag in self.BASEFILTERS.iteritems():
308             if tag in info:
309                 value = info[tag]
310                 if hasattr(self, attr):
311                     orig_attrs[attr] = getattr(self, attr)
312                 setattr(self, attr, value)
313         for attr, (tag,_) in self.TAGFILTERS.iteritems():
314             tag = tag % replacements
315             if tag in tags:
316                 value = tags[tag]
317                 if hasattr(self, attr):
318                     orig_attrs[attr] = getattr(self, attr)
319                 setattr(self, attr, value)
320         
321         if 'peer_id' in info:
322             orig_attrs['site'] = self.site
323             self.site = self._api.peer_map[info['peer_id']]
324         
325         if 'interface_ids' in info:
326             self.min_num_external_ifaces = \
327             self.max_num_external_ifaces = len(info['interface_ids'])
328         
329         if 'ssh_rsa_key' in info:
330             orig_attrs['server_key'] = self.server_key
331             self.server_key = info['ssh_rsa_key']
332         
333         self.__orig_attrs = orig_attrs
334
335     def validate(self):
336         if self.home_path is None:
337             raise AssertionError, "Misconfigured node: missing home path"
338         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
339             raise AssertionError, "Misconfigured node: missing slice SSH key"
340         if self.slicename is None:
341             raise AssertionError, "Misconfigured node: unspecified slice"
342
343     def recover(self):
344         # Mark dependencies installed
345         self._installed = True
346         
347         # Clear load attributes, they impair re-discovery
348         self.minReliability = \
349         self.maxReliability = \
350         self.minBandwidth = \
351         self.maxBandwidth = None
352
353     def install_dependencies(self):
354         if self.required_packages and not self._installed:
355             # If we need rpmfusion, we must install the repo definition and the gpg keys
356             if self.rpmFusion:
357                 if self.operatingSystem == 'f12':
358                     # Fedora 12 requires a different rpmfusion package
359                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
360                 else:
361                     # This one works for f13+
362                     RPM_FUSION_URL = self.RPM_FUSION_URL
363                     
364                 rpmFusion = (
365                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
366                 ) % {
367                     'RPM_FUSION_URL' : RPM_FUSION_URL
368                 }
369             else:
370                 rpmFusion = ''
371             
372             if rpmFusion:
373                 (out,err),proc = server.popen_ssh_command(
374                     rpmFusion,
375                     host = self.hostname,
376                     port = None,
377                     user = self.slicename,
378                     agent = None,
379                     ident_key = self.ident_path,
380                     server_key = self.server_key
381                     )
382                 
383                 if proc.wait():
384                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
385             
386             # Launch p2p yum dependency installer
387             self._yum_dependencies.async_setup()
388     
389     def wait_provisioning(self, timeout = 20*60):
390         # Wait for the p2p installer
391         sleeptime = 1.0
392         totaltime = 0.0
393         while not self.is_alive():
394             time.sleep(sleeptime)
395             totaltime += sleeptime
396             sleeptime = min(30.0, sleeptime*1.5)
397             
398             if totaltime > timeout:
399                 # PlanetLab has a 15' delay on configuration propagation
400                 # If we're above that delay, the unresponsiveness is not due
401                 # to this delay.
402                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
403     
404     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
405         # Wait for the p2p installer
406         if self._yum_dependencies and not self._installed:
407             self._yum_dependencies.async_setup_wait()
408             self._installed = True
409         
410     def is_alive(self):
411         # Make sure all the paths are created where 
412         # they have to be created for deployment
413         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
414             "echo 'ALIVE'",
415             host = self.hostname,
416             port = None,
417             user = self.slicename,
418             agent = None,
419             ident_key = self.ident_path,
420             server_key = self.server_key
421             )
422         
423         if proc.wait():
424             return False
425         elif not err and out.strip() == 'ALIVE':
426             return True
427         else:
428             return False
429     
430     def prepare_dependencies(self):
431         # Configure p2p yum dependency installer
432         if self.required_packages and not self._installed:
433             self._yum_dependencies = application.YumDependency(self._api)
434             self._yum_dependencies.node = self
435             self._yum_dependencies.home_path = "nepi-yumdep"
436             self._yum_dependencies.depends = ' '.join(self.required_packages)
437
438     def configure_routes(self, routes, devs):
439         """
440         Add the specified routes to the node's routing table
441         """
442         rules = []
443         
444         for route in routes:
445             for dev in devs:
446                 if dev.routes_here(route):
447                     # Schedule rule
448                     dest, prefix, nexthop, metric = route
449                     rules.append(
450                         "add %s%s gw %s %s" % (
451                             dest,
452                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
453                             nexthop,
454                             dev.if_name,
455                         )
456                     )
457                     
458                     # Stop checking
459                     break
460             else:
461                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
462                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
463         
464         self._logger.info("Setting up routes for %s", self.hostname)
465         
466         (out,err),proc = server.popen_ssh_command(
467             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
468                 home = server.shell_escape(self.home_path)),
469             host = self.hostname,
470             port = None,
471             user = self.slicename,
472             agent = None,
473             ident_key = self.ident_path,
474             server_key = self.server_key,
475             stdin = '\n'.join(rules)
476             )
477         
478         if proc.wait() or err:
479             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
480         
481         
482