Ticket #58: remove the emulation flag, instead, pick correctly configured nodes only
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15
16 from nepi.util import server
17 from nepi.util import parallel
18
19 class UnresponsiveNodeError(RuntimeError):
20     pass
21
22 class Node(object):
23     BASEFILTERS = {
24         # Map Node attribute to plcapi filter name
25         'hostname' : 'hostname',
26     }
27     
28     TAGFILTERS = {
29         # Map Node attribute to (<tag name>, <plcapi filter expression>)
30         #   There are replacements that are applied with string formatting,
31         #   so '%' has to be escaped as '%%'.
32         'architecture' : ('arch','value'),
33         'operatingSystem' : ('fcdistro','value'),
34         'pl_distro' : ('pldistro','value'),
35         'minReliability' : ('reliability%(timeframe)s', ']value'),
36         'maxReliability' : ('reliability%(timeframe)s', '[value'),
37         'minBandwidth' : ('bw%(timeframe)s', ']value'),
38         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
39     }    
40     
41     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
42     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
43     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
44     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
45     
46     def __init__(self, api=None):
47         if not api:
48             api = plcapi.PLCAPI()
49         self._api = api
50         
51         # Attributes
52         self.hostname = None
53         self.architecture = None
54         self.operatingSystem = None
55         self.pl_distro = None
56         self.site = None
57         self.minReliability = None
58         self.maxReliability = None
59         self.minBandwidth = None
60         self.maxBandwidth = None
61         self.min_num_external_ifaces = None
62         self.max_num_external_ifaces = None
63         self.timeframe = 'm'
64         
65         # Applications and routes add requirements to connected nodes
66         self.required_packages = set()
67         self.required_vsys = set()
68         self.pythonpath = []
69         self.rpmFusion = False
70         self.env = collections.defaultdict(list)
71         
72         # Testbed-derived attributes
73         self.slicename = None
74         self.ident_path = None
75         self.server_key = None
76         self.home_path = None
77         
78         # Those are filled when an actual node is allocated
79         self._node_id = None
80     
81     @property
82     def _nepi_testbed_environment_setup(self):
83         command = cStringIO.StringIO()
84         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
85             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
86         ))
87         command.write(' ; export PATH=$PATH:%s' % (
88             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
89         ))
90         if self.env:
91             for envkey, envvals in self.env.iteritems():
92                 for envval in envvals:
93                     command.write(' ; export %s=%s' % (envkey, envval))
94         return command.getvalue()
95     
96     def build_filters(self, target_filters, filter_map):
97         for attr, tag in filter_map.iteritems():
98             value = getattr(self, attr, None)
99             if value is not None:
100                 target_filters[tag] = value
101         return target_filters
102     
103     @property
104     def applicable_filters(self):
105         has = lambda att : getattr(self,att,None) is not None
106         return (
107             filter(has, self.BASEFILTERS.iterkeys())
108             + filter(has, self.TAGFILTERS.iterkeys())
109         )
110     
111     def find_candidates(self, filter_slice_id=None):
112         print >>sys.stderr, "Finding candidates for", self.make_filter_description()
113         
114         fields = ('node_id',)
115         replacements = {'timeframe':self.timeframe}
116         
117         # get initial candidates (no tag filters)
118         basefilters = self.build_filters({}, self.BASEFILTERS)
119         rootfilters = basefilters.copy()
120         if filter_slice_id:
121             basefilters['|slice_ids'] = (filter_slice_id,)
122         
123         # only pick healthy nodes
124         basefilters['run_level'] = 'boot'
125         basefilters['boot_state'] = 'boot'
126         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
127         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
128         
129         # keyword-only "pseudofilters"
130         extra = {}
131         if self.site:
132             extra['peer'] = self.site
133             
134         candidates = set(map(operator.itemgetter('node_id'), 
135             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
136         
137         # filter by tag, one tag at a time
138         applicable = self.applicable_filters
139         for tagfilter in self.TAGFILTERS.iteritems():
140             attr, (tagname, expr) = tagfilter
141             
142             # don't bother if there's no filter defined
143             if attr in applicable:
144                 tagfilter = rootfilters.copy()
145                 tagfilter['tagname'] = tagname % replacements
146                 tagfilter[expr % replacements] = getattr(self,attr)
147                 tagfilter['node_id'] = list(candidates)
148                 
149                 candidates &= set(map(operator.itemgetter('node_id'),
150                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
151         
152         # filter by vsys tags - special case since it doesn't follow
153         # the usual semantics
154         if self.required_vsys:
155             newcandidates = collections.defaultdict(set)
156             
157             vsys_tags = self._api.GetNodeTags(
158                 tagname='vsys', 
159                 node_id = list(candidates), 
160                 fields = ['node_id','value'])
161             
162             vsys_tags = map(
163                 operator.itemgetter(['node_id','value']),
164                 vsys_tags)
165             
166             required_vsys = self.required_vsys
167             for node_id, value in vsys_tags:
168                 if value in required_vsys:
169                     newcandidates[value].add(node_id)
170             
171             # take only those that have all the required vsys tags
172             newcandidates = reduce(
173                 lambda accum, new : accum & new,
174                 newcandidates.itervalues(),
175                 candidates)
176         
177         # filter by iface count
178         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
179             # fetch interfaces for all, in one go
180             filters = basefilters.copy()
181             filters['node_id'] = list(candidates)
182             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
183                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
184             
185             # filter candidates by interface count
186             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
187                 predicate = ( lambda node_id : 
188                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
189             elif self.min_num_external_ifaces is not None:
190                 predicate = ( lambda node_id : 
191                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
192             else:
193                 predicate = ( lambda node_id : 
194                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
195             
196             candidates = set(filter(predicate, candidates))
197         
198         # make sure hostnames are resolvable
199         if candidates:
200             print >>sys.stderr, "  Found", len(candidates), "candidates. Checking for reachability..."
201             
202             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
203                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
204             ))
205             def resolvable(node_id):
206                 try:
207                     addr = socket.gethostbyname(hostnames[node_id])
208                     return addr is not None
209                 except:
210                     return False
211             candidates = set(parallel.pfilter(resolvable, candidates,
212                 maxthreads = 16))
213
214             print >>sys.stderr, "  Found", len(candidates), "reachable candidates."
215             
216         return candidates
217     
218     def make_filter_description(self):
219         """
220         Makes a human-readable description of filtering conditions
221         for find_candidates.
222         """
223         
224         # get initial candidates (no tag filters)
225         filters = self.build_filters({}, self.BASEFILTERS)
226         
227         # keyword-only "pseudofilters"
228         if self.site:
229             filters['peer'] = self.site
230             
231         # filter by tag, one tag at a time
232         applicable = self.applicable_filters
233         for tagfilter in self.TAGFILTERS.iteritems():
234             attr, (tagname, expr) = tagfilter
235             
236             # don't bother if there's no filter defined
237             if attr in applicable:
238                 filters[attr] = getattr(self,attr)
239         
240         # filter by vsys tags - special case since it doesn't follow
241         # the usual semantics
242         if self.required_vsys:
243             filters['vsys'] = ','.join(list(self.required_vsys))
244         
245         # filter by iface count
246         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
247             filters['num_ifaces'] = '-'.join([
248                 str(self.min_num_external_ifaces or '0'),
249                 str(self.max_num_external_ifaces or 'inf')
250             ])
251             
252         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
253
254     def assign_node_id(self, node_id):
255         self._node_id = node_id
256         self.fetch_node_info()
257     
258     def unassign_node(self):
259         self._node_id = None
260         self.__dict__.update(self.__orig_attrs)
261     
262     def fetch_node_info(self):
263         orig_attrs = {}
264         
265         info = self._api.GetNodes(self._node_id)[0]
266         tags = dict( (t['tagname'],t['value'])
267                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
268
269         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
270         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
271         self.min_num_external_ifaces = None
272         self.max_num_external_ifaces = None
273         self.timeframe = 'm'
274         
275         replacements = {'timeframe':self.timeframe}
276         for attr, tag in self.BASEFILTERS.iteritems():
277             if tag in info:
278                 value = info[tag]
279                 if hasattr(self, attr):
280                     orig_attrs[attr] = getattr(self, attr)
281                 setattr(self, attr, value)
282         for attr, (tag,_) in self.TAGFILTERS.iteritems():
283             tag = tag % replacements
284             if tag in tags:
285                 value = tags[tag]
286                 if hasattr(self, attr):
287                     orig_attrs[attr] = getattr(self, attr)
288                 setattr(self, attr, value)
289         
290         if 'peer_id' in info:
291             orig_attrs['site'] = self.site
292             self.site = self._api.peer_map[info['peer_id']]
293         
294         if 'interface_ids' in info:
295             self.min_num_external_ifaces = \
296             self.max_num_external_ifaces = len(info['interface_ids'])
297         
298         if 'ssh_rsa_key' in info:
299             orig_attrs['server_key'] = self.server_key
300             self.server_key = info['ssh_rsa_key']
301         
302         self.__orig_attrs = orig_attrs
303
304     def validate(self):
305         if self.home_path is None:
306             raise AssertionError, "Misconfigured node: missing home path"
307         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
308             raise AssertionError, "Misconfigured node: missing slice SSH key"
309         if self.slicename is None:
310             raise AssertionError, "Misconfigured node: unspecified slice"
311
312     def install_dependencies(self):
313         if self.required_packages:
314             # TODO: make dependant on the experiment somehow...
315             pidfile = self.DEPENDS_PIDFILE
316             logfile = self.DEPENDS_LOGFILE
317             
318             # If we need rpmfusion, we must install the repo definition and the gpg keys
319             if self.rpmFusion:
320                 if self.operatingSystem == 'f12':
321                     # Fedora 12 requires a different rpmfusion package
322                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
323                 else:
324                     # This one works for f13+
325                     RPM_FUSION_URL = self.RPM_FUSION_URL
326                     
327                 rpmFusion = (
328                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
329                 ) % {
330                     'RPM_FUSION_URL' : RPM_FUSION_URL
331                 }
332             else:
333                 rpmFusion = ''
334             
335             # Start process in a "daemonized" way, using nohup and heavy
336             # stdin/out redirection to avoid connection issues
337             (out,err),proc = rspawn.remote_spawn(
338                 "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
339                     'packages' : ' '.join(self.required_packages),
340                     'rpmfusion' : rpmFusion,
341                 },
342                 pidfile = pidfile,
343                 stdout = logfile,
344                 stderr = rspawn.STDOUT,
345                 
346                 host = self.hostname,
347                 port = None,
348                 user = self.slicename,
349                 agent = None,
350                 ident_key = self.ident_path,
351                 server_key = self.server_key,
352                 sudo = True
353                 )
354             
355             if proc.wait():
356                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
357     
358     def wait_provisioning(self, timeout = 20*60):
359         # recently provisioned nodes may not be up yet
360         sleeptime = 1.0
361         totaltime = 0.0
362         while not self.is_alive():
363             time.sleep(sleeptime)
364             totaltime += sleeptime
365             sleeptime = min(30.0, sleeptime*1.5)
366             
367             if totaltime > timeout:
368                 # PlanetLab has a 15' delay on configuration propagation
369                 # If we're above that delay, the unresponsiveness is not due
370                 # to this delay.
371                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
372     
373     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
374         if self.required_packages:
375             pidfile = self.DEPENDS_PIDFILE
376             
377             # get PID
378             pid = ppid = None
379             for probenum in xrange(pidmax):
380                 pidtuple = rspawn.remote_check_pid(
381                     pidfile = pidfile,
382                     host = self.hostname,
383                     port = None,
384                     user = self.slicename,
385                     agent = None,
386                     ident_key = self.ident_path,
387                     server_key = self.server_key
388                     )
389                 if pidtuple:
390                     pid, ppid = pidtuple
391                     break
392                 else:
393                     time.sleep(pidprobe)
394             else:
395                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
396         
397             # wait for it to finish
398             while rspawn.RUNNING is rspawn.remote_status(
399                     pid, ppid,
400                     host = self.hostname,
401                     port = None,
402                     user = self.slicename,
403                     agent = None,
404                     ident_key = self.ident_path,
405                     server_key = self.server_key
406                     ):
407                 time.sleep(probe)
408                 probe = min(probemax, 1.5*probe)
409             
410             # check results
411             logfile = self.DEPENDS_LOGFILE
412             
413             (out,err),proc = server.popen_ssh_command(
414                 "cat %s" % (server.shell_escape(logfile),),
415                 host = self.hostname,
416                 port = None,
417                 user = self.slicename,
418                 agent = None,
419                 ident_key = self.ident_path,
420                 server_key = self.server_key
421                 )
422             
423             if proc.wait():
424                 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
425             
426             success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
427             if not success:
428                 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
429         
430     def is_alive(self):
431         # Make sure all the paths are created where 
432         # they have to be created for deployment
433         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
434             "echo 'ALIVE'",
435             host = self.hostname,
436             port = None,
437             user = self.slicename,
438             agent = None,
439             ident_key = self.ident_path,
440             server_key = self.server_key
441             )
442         
443         if proc.wait():
444             return False
445         elif not err and out.strip() == 'ALIVE':
446             return True
447         else:
448             return False
449     
450
451     def configure_routes(self, routes, devs):
452         """
453         Add the specified routes to the node's routing table
454         """
455         rules = []
456         
457         for route in routes:
458             for dev in devs:
459                 if dev.routes_here(route):
460                     # Schedule rule
461                     dest, prefix, nexthop, metric = route
462                     rules.append(
463                         "add %s%s gw %s %s" % (
464                             dest,
465                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
466                             nexthop,
467                             dev.if_name,
468                         )
469                     )
470                     
471                     # Stop checking
472                     break
473             else:
474                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
475                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
476         
477         print >>sys.stderr, "Setting up routes for", self.hostname
478         
479         (out,err),proc = server.popen_ssh_command(
480             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
481                 home = server.shell_escape(self.home_path)),
482             host = self.hostname,
483             port = None,
484             user = self.slicename,
485             agent = None,
486             ident_key = self.ident_path,
487             server_key = self.server_key,
488             stdin = '\n'.join(rules)
489             )
490         
491         if proc.wait() or err:
492             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
493         
494         
495