Proxying and marshalling fixes
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15
16 from nepi.util import server
17 from nepi.util import parallel
18
19 class UnresponsiveNodeError(RuntimeError):
20     pass
21
22 class Node(object):
23     BASEFILTERS = {
24         # Map Node attribute to plcapi filter name
25         'hostname' : 'hostname',
26     }
27     
28     TAGFILTERS = {
29         # Map Node attribute to (<tag name>, <plcapi filter expression>)
30         #   There are replacements that are applied with string formatting,
31         #   so '%' has to be escaped as '%%'.
32         'architecture' : ('arch','value'),
33         'operatingSystem' : ('fcdistro','value'),
34         'pl_distro' : ('pldistro','value'),
35         'minReliability' : ('reliability%(timeframe)s', ']value'),
36         'maxReliability' : ('reliability%(timeframe)s', '[value'),
37         'minBandwidth' : ('bw%(timeframe)s', ']value'),
38         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
39     }    
40     
41     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
42     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
43     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
44     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
45     
46     def __init__(self, api=None):
47         if not api:
48             api = plcapi.PLCAPI()
49         self._api = api
50         
51         # Attributes
52         self.hostname = None
53         self.architecture = None
54         self.operatingSystem = None
55         self.pl_distro = None
56         self.site = None
57         self.emulation = None
58         self.minReliability = None
59         self.maxReliability = None
60         self.minBandwidth = None
61         self.maxBandwidth = None
62         self.min_num_external_ifaces = None
63         self.max_num_external_ifaces = None
64         self.timeframe = 'm'
65         
66         # Applications and routes add requirements to connected nodes
67         self.required_packages = set()
68         self.required_vsys = set()
69         self.pythonpath = []
70         self.rpmFusion = False
71         self.env = collections.defaultdict(list)
72         
73         # Testbed-derived attributes
74         self.slicename = None
75         self.ident_path = None
76         self.server_key = None
77         self.home_path = None
78         
79         # Those are filled when an actual node is allocated
80         self._node_id = None
81     
82     @property
83     def _nepi_testbed_environment_setup(self):
84         command = cStringIO.StringIO()
85         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
86             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
87         ))
88         command.write(' ; export PATH=$PATH:%s' % (
89             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
90         ))
91         if self.env:
92             for envkey, envvals in self.env.iteritems():
93                 for envval in envvals:
94                     command.write(' ; export %s=%s' % (envkey, envval))
95         return command.getvalue()
96     
97     def build_filters(self, target_filters, filter_map):
98         for attr, tag in filter_map.iteritems():
99             value = getattr(self, attr, None)
100             if value is not None:
101                 target_filters[tag] = value
102         return target_filters
103     
104     @property
105     def applicable_filters(self):
106         has = lambda att : getattr(self,att,None) is not None
107         return (
108             filter(has, self.BASEFILTERS.iterkeys())
109             + filter(has, self.TAGFILTERS.iterkeys())
110         )
111     
112     def find_candidates(self, filter_slice_id=None):
113         print >>sys.stderr, "Finding candidates for", self.make_filter_description()
114         
115         fields = ('node_id',)
116         replacements = {'timeframe':self.timeframe}
117         
118         # get initial candidates (no tag filters)
119         basefilters = self.build_filters({}, self.BASEFILTERS)
120         rootfilters = basefilters.copy()
121         if filter_slice_id:
122             basefilters['|slice_ids'] = (filter_slice_id,)
123         
124         # only pick healthy nodes
125         basefilters['run_level'] = 'boot'
126         basefilters['boot_state'] = 'boot'
127         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
128         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
129         
130         # keyword-only "pseudofilters"
131         extra = {}
132         if self.site:
133             extra['peer'] = self.site
134             
135         candidates = set(map(operator.itemgetter('node_id'), 
136             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
137         
138         # filter by tag, one tag at a time
139         applicable = self.applicable_filters
140         for tagfilter in self.TAGFILTERS.iteritems():
141             attr, (tagname, expr) = tagfilter
142             
143             # don't bother if there's no filter defined
144             if attr in applicable:
145                 tagfilter = rootfilters.copy()
146                 tagfilter['tagname'] = tagname % replacements
147                 tagfilter[expr % replacements] = getattr(self,attr)
148                 tagfilter['node_id'] = list(candidates)
149                 
150                 candidates &= set(map(operator.itemgetter('node_id'),
151                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
152         
153         # filter by vsys tags - special case since it doesn't follow
154         # the usual semantics
155         if self.required_vsys:
156             newcandidates = collections.defaultdict(set)
157             
158             vsys_tags = self._api.GetNodeTags(
159                 tagname='vsys', 
160                 node_id = list(candidates), 
161                 fields = ['node_id','value'])
162             
163             vsys_tags = map(
164                 operator.itemgetter(['node_id','value']),
165                 vsys_tags)
166             
167             required_vsys = self.required_vsys
168             for node_id, value in vsys_tags:
169                 if value in required_vsys:
170                     newcandidates[value].add(node_id)
171             
172             # take only those that have all the required vsys tags
173             newcandidates = reduce(
174                 lambda accum, new : accum & new,
175                 newcandidates.itervalues(),
176                 candidates)
177         
178         # filter by iface count
179         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
180             # fetch interfaces for all, in one go
181             filters = basefilters.copy()
182             filters['node_id'] = list(candidates)
183             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
184                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
185             
186             # filter candidates by interface count
187             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
188                 predicate = ( lambda node_id : 
189                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
190             elif self.min_num_external_ifaces is not None:
191                 predicate = ( lambda node_id : 
192                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
193             else:
194                 predicate = ( lambda node_id : 
195                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
196             
197             candidates = set(filter(predicate, candidates))
198         
199         # make sure hostnames are resolvable
200         if candidates:
201             print >>sys.stderr, "  Found", len(candidates), "candidates. Checking for reachability..."
202             
203             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
204                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
205             ))
206             def resolvable(node_id):
207                 try:
208                     addr = socket.gethostbyname(hostnames[node_id])
209                     return addr is not None
210                 except:
211                     return False
212             candidates = set(parallel.pfilter(resolvable, candidates,
213                 maxthreads = 16))
214
215             print >>sys.stderr, "  Found", len(candidates), "reachable candidates."
216             
217         return candidates
218     
219     def make_filter_description(self):
220         """
221         Makes a human-readable description of filtering conditions
222         for find_candidates.
223         """
224         
225         # get initial candidates (no tag filters)
226         filters = self.build_filters({}, self.BASEFILTERS)
227         
228         # keyword-only "pseudofilters"
229         if self.site:
230             filters['peer'] = self.site
231             
232         # filter by tag, one tag at a time
233         applicable = self.applicable_filters
234         for tagfilter in self.TAGFILTERS.iteritems():
235             attr, (tagname, expr) = tagfilter
236             
237             # don't bother if there's no filter defined
238             if attr in applicable:
239                 filters[attr] = getattr(self,attr)
240         
241         # filter by vsys tags - special case since it doesn't follow
242         # the usual semantics
243         if self.required_vsys:
244             filters['vsys'] = ','.join(list(self.required_vsys))
245         
246         # filter by iface count
247         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
248             filters['num_ifaces'] = '-'.join([
249                 str(self.min_num_external_ifaces or '0'),
250                 str(self.max_num_external_ifaces or 'inf')
251             ])
252             
253         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
254
255     def assign_node_id(self, node_id):
256         self._node_id = node_id
257         self.fetch_node_info()
258     
259     def unassign_node(self):
260         self._node_id = None
261         self.__dict__.update(self.__orig_attrs)
262     
263     def fetch_node_info(self):
264         orig_attrs = {}
265         
266         info = self._api.GetNodes(self._node_id)[0]
267         tags = dict( (t['tagname'],t['value'])
268                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
269
270         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
271         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
272         self.min_num_external_ifaces = None
273         self.max_num_external_ifaces = None
274         self.timeframe = 'm'
275         
276         replacements = {'timeframe':self.timeframe}
277         for attr, tag in self.BASEFILTERS.iteritems():
278             if tag in info:
279                 value = info[tag]
280                 if hasattr(self, attr):
281                     orig_attrs[attr] = getattr(self, attr)
282                 setattr(self, attr, value)
283         for attr, (tag,_) in self.TAGFILTERS.iteritems():
284             tag = tag % replacements
285             if tag in tags:
286                 value = tags[tag]
287                 if hasattr(self, attr):
288                     orig_attrs[attr] = getattr(self, attr)
289                 setattr(self, attr, value)
290         
291         if 'peer_id' in info:
292             orig_attrs['site'] = self.site
293             self.site = self._api.peer_map[info['peer_id']]
294         
295         if 'interface_ids' in info:
296             self.min_num_external_ifaces = \
297             self.max_num_external_ifaces = len(info['interface_ids'])
298         
299         if 'ssh_rsa_key' in info:
300             orig_attrs['server_key'] = self.server_key
301             self.server_key = info['ssh_rsa_key']
302         
303         self.__orig_attrs = orig_attrs
304
305     def validate(self):
306         if self.home_path is None:
307             raise AssertionError, "Misconfigured node: missing home path"
308         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
309             raise AssertionError, "Misconfigured node: missing slice SSH key"
310         if self.slicename is None:
311             raise AssertionError, "Misconfigured node: unspecified slice"
312
313     def install_dependencies(self):
314         if self.required_packages:
315             # TODO: make dependant on the experiment somehow...
316             pidfile = self.DEPENDS_PIDFILE
317             logfile = self.DEPENDS_LOGFILE
318             
319             # If we need rpmfusion, we must install the repo definition and the gpg keys
320             if self.rpmFusion:
321                 if self.operatingSystem == 'f12':
322                     # Fedora 12 requires a different rpmfusion package
323                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
324                 else:
325                     # This one works for f13+
326                     RPM_FUSION_URL = self.RPM_FUSION_URL
327                     
328                 rpmFusion = (
329                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
330                 ) % {
331                     'RPM_FUSION_URL' : RPM_FUSION_URL
332                 }
333             else:
334                 rpmFusion = ''
335             
336             # Start process in a "daemonized" way, using nohup and heavy
337             # stdin/out redirection to avoid connection issues
338             (out,err),proc = rspawn.remote_spawn(
339                 "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
340                     'packages' : ' '.join(self.required_packages),
341                     'rpmfusion' : rpmFusion,
342                 },
343                 pidfile = pidfile,
344                 stdout = logfile,
345                 stderr = rspawn.STDOUT,
346                 
347                 host = self.hostname,
348                 port = None,
349                 user = self.slicename,
350                 agent = None,
351                 ident_key = self.ident_path,
352                 server_key = self.server_key,
353                 sudo = True
354                 )
355             
356             if proc.wait():
357                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
358     
359     def wait_provisioning(self, timeout = 20*60):
360         # recently provisioned nodes may not be up yet
361         sleeptime = 1.0
362         totaltime = 0.0
363         while not self.is_alive():
364             time.sleep(sleeptime)
365             totaltime += sleeptime
366             sleeptime = min(30.0, sleeptime*1.5)
367             
368             if totaltime > timeout:
369                 # PlanetLab has a 15' delay on configuration propagation
370                 # If we're above that delay, the unresponsiveness is not due
371                 # to this delay.
372                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
373     
374     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
375         if self.required_packages:
376             pidfile = self.DEPENDS_PIDFILE
377             
378             # get PID
379             pid = ppid = None
380             for probenum in xrange(pidmax):
381                 pidtuple = rspawn.remote_check_pid(
382                     pidfile = pidfile,
383                     host = self.hostname,
384                     port = None,
385                     user = self.slicename,
386                     agent = None,
387                     ident_key = self.ident_path,
388                     server_key = self.server_key
389                     )
390                 if pidtuple:
391                     pid, ppid = pidtuple
392                     break
393                 else:
394                     time.sleep(pidprobe)
395             else:
396                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
397         
398             # wait for it to finish
399             while rspawn.RUNNING is rspawn.remote_status(
400                     pid, ppid,
401                     host = self.hostname,
402                     port = None,
403                     user = self.slicename,
404                     agent = None,
405                     ident_key = self.ident_path,
406                     server_key = self.server_key
407                     ):
408                 time.sleep(probe)
409                 probe = min(probemax, 1.5*probe)
410             
411             # check results
412             logfile = self.DEPENDS_LOGFILE
413             
414             (out,err),proc = server.popen_ssh_command(
415                 "cat %s" % (server.shell_escape(logfile),),
416                 host = self.hostname,
417                 port = None,
418                 user = self.slicename,
419                 agent = None,
420                 ident_key = self.ident_path,
421                 server_key = self.server_key
422                 )
423             
424             if proc.wait():
425                 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
426             
427             success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
428             if not success:
429                 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
430         
431     def is_alive(self):
432         # Make sure all the paths are created where 
433         # they have to be created for deployment
434         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
435             "echo 'ALIVE'",
436             host = self.hostname,
437             port = None,
438             user = self.slicename,
439             agent = None,
440             ident_key = self.ident_path,
441             server_key = self.server_key
442             )
443         
444         if proc.wait():
445             return False
446         elif not err and out.strip() == 'ALIVE':
447             return True
448         else:
449             return False
450     
451
452     def configure_routes(self, routes, devs):
453         """
454         Add the specified routes to the node's routing table
455         """
456         rules = []
457         
458         for route in routes:
459             for dev in devs:
460                 if dev.routes_here(route):
461                     # Schedule rule
462                     dest, prefix, nexthop, metric = route
463                     rules.append(
464                         "add %s%s gw %s %s" % (
465                             dest,
466                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
467                             nexthop,
468                             dev.if_name,
469                         )
470                     )
471                     
472                     # Stop checking
473                     break
474             else:
475                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
476                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
477         
478         print >>sys.stderr, "Setting up routes for", self.hostname
479         
480         (out,err),proc = server.popen_ssh_command(
481             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
482                 home = server.shell_escape(self.home_path)),
483             host = self.hostname,
484             port = None,
485             user = self.slicename,
486             agent = None,
487             ident_key = self.ident_path,
488             server_key = self.server_key,
489             stdin = '\n'.join(rules)
490             )
491         
492         if proc.wait() or err:
493             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
494         
495         
496