Recovery policy for testbeds, and recovery implementation in PlanetLab.
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 import application
21
22 class UnresponsiveNodeError(RuntimeError):
23     pass
24
25 class Node(object):
26     BASEFILTERS = {
27         # Map Node attribute to plcapi filter name
28         'hostname' : 'hostname',
29     }
30     
31     TAGFILTERS = {
32         # Map Node attribute to (<tag name>, <plcapi filter expression>)
33         #   There are replacements that are applied with string formatting,
34         #   so '%' has to be escaped as '%%'.
35         'architecture' : ('arch','value'),
36         'operatingSystem' : ('fcdistro','value'),
37         'pl_distro' : ('pldistro','value'),
38         'minReliability' : ('reliability%(timeframe)s', ']value'),
39         'maxReliability' : ('reliability%(timeframe)s', '[value'),
40         'minBandwidth' : ('bw%(timeframe)s', ']value'),
41         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
42     }    
43     
44     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
45     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
46     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
47     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
48     
49     def __init__(self, api=None):
50         if not api:
51             api = plcapi.PLCAPI()
52         self._api = api
53         
54         # Attributes
55         self.hostname = None
56         self.architecture = None
57         self.operatingSystem = None
58         self.pl_distro = None
59         self.site = None
60         self.minReliability = None
61         self.maxReliability = None
62         self.minBandwidth = None
63         self.maxBandwidth = None
64         self.min_num_external_ifaces = None
65         self.max_num_external_ifaces = None
66         self.timeframe = 'm'
67         
68         # Applications and routes add requirements to connected nodes
69         self.required_packages = set()
70         self.required_vsys = set()
71         self.pythonpath = []
72         self.rpmFusion = False
73         self.env = collections.defaultdict(list)
74         
75         # Testbed-derived attributes
76         self.slicename = None
77         self.ident_path = None
78         self.server_key = None
79         self.home_path = None
80         
81         # Those are filled when an actual node is allocated
82         self._node_id = None
83         self._yum_dependencies = None
84         self._installed = False
85
86         # Logging
87         self._logger = logging.getLogger('nepi.testbeds.planetlab')
88     
89     def _nepi_testbed_environment_setup_get(self):
90         command = cStringIO.StringIO()
91         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
92             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
93         ))
94         command.write(' ; export PATH=$PATH:%s' % (
95             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
96         ))
97         if self.env:
98             for envkey, envvals in self.env.iteritems():
99                 for envval in envvals:
100                     command.write(' ; export %s=%s' % (envkey, envval))
101         return command.getvalue()
102     def _nepi_testbed_environment_setup_set(self, value):
103         pass
104     _nepi_testbed_environment_setup = property(
105         _nepi_testbed_environment_setup_get,
106         _nepi_testbed_environment_setup_set)
107     
108     def build_filters(self, target_filters, filter_map):
109         for attr, tag in filter_map.iteritems():
110             value = getattr(self, attr, None)
111             if value is not None:
112                 target_filters[tag] = value
113         return target_filters
114     
115     @property
116     def applicable_filters(self):
117         has = lambda att : getattr(self,att,None) is not None
118         return (
119             filter(has, self.BASEFILTERS.iterkeys())
120             + filter(has, self.TAGFILTERS.iterkeys())
121         )
122     
123     def find_candidates(self, filter_slice_id=None):
124         self._logger.info("Finding candidates for %s", self.make_filter_description())
125         
126         fields = ('node_id',)
127         replacements = {'timeframe':self.timeframe}
128         
129         # get initial candidates (no tag filters)
130         basefilters = self.build_filters({}, self.BASEFILTERS)
131         rootfilters = basefilters.copy()
132         if filter_slice_id:
133             basefilters['|slice_ids'] = (filter_slice_id,)
134         
135         # only pick healthy nodes
136         basefilters['run_level'] = 'boot'
137         basefilters['boot_state'] = 'boot'
138         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
139         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
140         
141         # keyword-only "pseudofilters"
142         extra = {}
143         if self.site:
144             extra['peer'] = self.site
145             
146         candidates = set(map(operator.itemgetter('node_id'), 
147             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
148         
149         # filter by tag, one tag at a time
150         applicable = self.applicable_filters
151         for tagfilter in self.TAGFILTERS.iteritems():
152             attr, (tagname, expr) = tagfilter
153             
154             # don't bother if there's no filter defined
155             if attr in applicable:
156                 tagfilter = rootfilters.copy()
157                 tagfilter['tagname'] = tagname % replacements
158                 tagfilter[expr % replacements] = getattr(self,attr)
159                 tagfilter['node_id'] = list(candidates)
160                 
161                 candidates &= set(map(operator.itemgetter('node_id'),
162                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
163         
164         # filter by vsys tags - special case since it doesn't follow
165         # the usual semantics
166         if self.required_vsys:
167             newcandidates = collections.defaultdict(set)
168             
169             vsys_tags = self._api.GetNodeTags(
170                 tagname='vsys', 
171                 node_id = list(candidates), 
172                 fields = ['node_id','value'])
173             
174             vsys_tags = map(
175                 operator.itemgetter(['node_id','value']),
176                 vsys_tags)
177             
178             required_vsys = self.required_vsys
179             for node_id, value in vsys_tags:
180                 if value in required_vsys:
181                     newcandidates[value].add(node_id)
182             
183             # take only those that have all the required vsys tags
184             newcandidates = reduce(
185                 lambda accum, new : accum & new,
186                 newcandidates.itervalues(),
187                 candidates)
188         
189         # filter by iface count
190         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
191             # fetch interfaces for all, in one go
192             filters = basefilters.copy()
193             filters['node_id'] = list(candidates)
194             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
195                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
196             
197             # filter candidates by interface count
198             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
199                 predicate = ( lambda node_id : 
200                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
201             elif self.min_num_external_ifaces is not None:
202                 predicate = ( lambda node_id : 
203                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
204             else:
205                 predicate = ( lambda node_id : 
206                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
207             
208             candidates = set(filter(predicate, candidates))
209         
210         # make sure hostnames are resolvable
211         if candidates:
212             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
213             
214             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
215                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
216             ))
217             def resolvable(node_id):
218                 try:
219                     addr = socket.gethostbyname(hostnames[node_id])
220                     return addr is not None
221                 except:
222                     return False
223             candidates = set(parallel.pfilter(resolvable, candidates,
224                 maxthreads = 16))
225
226             self._logger.info("  Found %s reachable candidates.", len(candidates))
227             
228         return candidates
229     
230     def make_filter_description(self):
231         """
232         Makes a human-readable description of filtering conditions
233         for find_candidates.
234         """
235         
236         # get initial candidates (no tag filters)
237         filters = self.build_filters({}, self.BASEFILTERS)
238         
239         # keyword-only "pseudofilters"
240         if self.site:
241             filters['peer'] = self.site
242             
243         # filter by tag, one tag at a time
244         applicable = self.applicable_filters
245         for tagfilter in self.TAGFILTERS.iteritems():
246             attr, (tagname, expr) = tagfilter
247             
248             # don't bother if there's no filter defined
249             if attr in applicable:
250                 filters[attr] = getattr(self,attr)
251         
252         # filter by vsys tags - special case since it doesn't follow
253         # the usual semantics
254         if self.required_vsys:
255             filters['vsys'] = ','.join(list(self.required_vsys))
256         
257         # filter by iface count
258         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
259             filters['num_ifaces'] = '-'.join([
260                 str(self.min_num_external_ifaces or '0'),
261                 str(self.max_num_external_ifaces or 'inf')
262             ])
263             
264         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
265
266     def assign_node_id(self, node_id):
267         self._node_id = node_id
268         self.fetch_node_info()
269     
270     def unassign_node(self):
271         self._node_id = None
272         self.__dict__.update(self.__orig_attrs)
273     
274     def fetch_node_info(self):
275         orig_attrs = {}
276         
277         info = self._api.GetNodes(self._node_id)[0]
278         tags = dict( (t['tagname'],t['value'])
279                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
280
281         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
282         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
283         self.min_num_external_ifaces = None
284         self.max_num_external_ifaces = None
285         self.timeframe = 'm'
286         
287         replacements = {'timeframe':self.timeframe}
288         for attr, tag in self.BASEFILTERS.iteritems():
289             if tag in info:
290                 value = info[tag]
291                 if hasattr(self, attr):
292                     orig_attrs[attr] = getattr(self, attr)
293                 setattr(self, attr, value)
294         for attr, (tag,_) in self.TAGFILTERS.iteritems():
295             tag = tag % replacements
296             if tag in tags:
297                 value = tags[tag]
298                 if hasattr(self, attr):
299                     orig_attrs[attr] = getattr(self, attr)
300                 setattr(self, attr, value)
301         
302         if 'peer_id' in info:
303             orig_attrs['site'] = self.site
304             self.site = self._api.peer_map[info['peer_id']]
305         
306         if 'interface_ids' in info:
307             self.min_num_external_ifaces = \
308             self.max_num_external_ifaces = len(info['interface_ids'])
309         
310         if 'ssh_rsa_key' in info:
311             orig_attrs['server_key'] = self.server_key
312             self.server_key = info['ssh_rsa_key']
313         
314         self.__orig_attrs = orig_attrs
315
316     def validate(self):
317         if self.home_path is None:
318             raise AssertionError, "Misconfigured node: missing home path"
319         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
320             raise AssertionError, "Misconfigured node: missing slice SSH key"
321         if self.slicename is None:
322             raise AssertionError, "Misconfigured node: unspecified slice"
323
324     def recover(self):
325         # Just mark dependencies installed
326         self._installed = True
327
328     def install_dependencies(self):
329         if self.required_packages and not self._installed:
330             # If we need rpmfusion, we must install the repo definition and the gpg keys
331             if self.rpmFusion:
332                 if self.operatingSystem == 'f12':
333                     # Fedora 12 requires a different rpmfusion package
334                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
335                 else:
336                     # This one works for f13+
337                     RPM_FUSION_URL = self.RPM_FUSION_URL
338                     
339                 rpmFusion = (
340                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
341                 ) % {
342                     'RPM_FUSION_URL' : RPM_FUSION_URL
343                 }
344             else:
345                 rpmFusion = ''
346             
347             if rpmFusion:
348                 (out,err),proc = server.popen_ssh_command(
349                     rpmFusion,
350                     host = self.hostname,
351                     port = None,
352                     user = self.slicename,
353                     agent = None,
354                     ident_key = self.ident_path,
355                     server_key = self.server_key
356                     )
357                 
358                 if proc.wait():
359                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
360             
361             # Launch p2p yum dependency installer
362             self._yum_dependencies.async_setup()
363     
364     def wait_provisioning(self, timeout = 20*60):
365         # Wait for the p2p installer
366         sleeptime = 1.0
367         totaltime = 0.0
368         while not self.is_alive():
369             time.sleep(sleeptime)
370             totaltime += sleeptime
371             sleeptime = min(30.0, sleeptime*1.5)
372             
373             if totaltime > timeout:
374                 # PlanetLab has a 15' delay on configuration propagation
375                 # If we're above that delay, the unresponsiveness is not due
376                 # to this delay.
377                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
378     
379     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
380         # Wait for the p2p installer
381         if self._yum_dependencies and not self._installed:
382             self._yum_dependencies.async_setup_wait()
383             self._installed = True
384         
385     def is_alive(self):
386         # Make sure all the paths are created where 
387         # they have to be created for deployment
388         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
389             "echo 'ALIVE'",
390             host = self.hostname,
391             port = None,
392             user = self.slicename,
393             agent = None,
394             ident_key = self.ident_path,
395             server_key = self.server_key
396             )
397         
398         if proc.wait():
399             return False
400         elif not err and out.strip() == 'ALIVE':
401             return True
402         else:
403             return False
404     
405     def prepare_dependencies(self):
406         # Configure p2p yum dependency installer
407         if self.required_packages and not self._installed:
408             self._yum_dependencies = application.YumDependency(self._api)
409             self._yum_dependencies.node = self
410             self._yum_dependencies.home_path = "nepi-yumdep"
411             self._yum_dependencies.depends = ' '.join(self.required_packages)
412
413     def configure_routes(self, routes, devs):
414         """
415         Add the specified routes to the node's routing table
416         """
417         rules = []
418         
419         for route in routes:
420             for dev in devs:
421                 if dev.routes_here(route):
422                     # Schedule rule
423                     dest, prefix, nexthop, metric = route
424                     rules.append(
425                         "add %s%s gw %s %s" % (
426                             dest,
427                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
428                             nexthop,
429                             dev.if_name,
430                         )
431                     )
432                     
433                     # Stop checking
434                     break
435             else:
436                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
437                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
438         
439         self._logger.info("Setting up routes for %s", self.hostname)
440         
441         (out,err),proc = server.popen_ssh_command(
442             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
443                 home = server.shell_escape(self.home_path)),
444             host = self.hostname,
445             port = None,
446             user = self.slicename,
447             agent = None,
448             ident_key = self.ident_path,
449             server_key = self.server_key,
450             stdin = '\n'.join(rules)
451             )
452         
453         if proc.wait() or err:
454             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
455         
456         
457