Merge with head
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 class UnresponsiveNodeError(RuntimeError):
21     pass
22
23 class Node(object):
24     BASEFILTERS = {
25         # Map Node attribute to plcapi filter name
26         'hostname' : 'hostname',
27     }
28     
29     TAGFILTERS = {
30         # Map Node attribute to (<tag name>, <plcapi filter expression>)
31         #   There are replacements that are applied with string formatting,
32         #   so '%' has to be escaped as '%%'.
33         'architecture' : ('arch','value'),
34         'operatingSystem' : ('fcdistro','value'),
35         'pl_distro' : ('pldistro','value'),
36         'minReliability' : ('reliability%(timeframe)s', ']value'),
37         'maxReliability' : ('reliability%(timeframe)s', '[value'),
38         'minBandwidth' : ('bw%(timeframe)s', ']value'),
39         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
40     }    
41     
42     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
43     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
44     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
45     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
46     
47     def __init__(self, api=None):
48         if not api:
49             api = plcapi.PLCAPI()
50         self._api = api
51         
52         # Attributes
53         self.hostname = None
54         self.architecture = None
55         self.operatingSystem = None
56         self.pl_distro = None
57         self.site = None
58         self.minReliability = None
59         self.maxReliability = None
60         self.minBandwidth = None
61         self.maxBandwidth = None
62         self.min_num_external_ifaces = None
63         self.max_num_external_ifaces = None
64         self.timeframe = 'm'
65         
66         # Applications and routes add requirements to connected nodes
67         self.required_packages = set()
68         self.required_vsys = set()
69         self.pythonpath = []
70         self.rpmFusion = False
71         self.env = collections.defaultdict(list)
72         
73         # Testbed-derived attributes
74         self.slicename = None
75         self.ident_path = None
76         self.server_key = None
77         self.home_path = None
78         
79         # Those are filled when an actual node is allocated
80         self._node_id = None
81
82         # Logging
83         self._logger = logging.getLogger('nepi.testbeds.planetlab')
84     
85     @property
86     def _nepi_testbed_environment_setup(self):
87         command = cStringIO.StringIO()
88         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
89             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
90         ))
91         command.write(' ; export PATH=$PATH:%s' % (
92             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
93         ))
94         if self.env:
95             for envkey, envvals in self.env.iteritems():
96                 for envval in envvals:
97                     command.write(' ; export %s=%s' % (envkey, envval))
98         return command.getvalue()
99     
100     def build_filters(self, target_filters, filter_map):
101         for attr, tag in filter_map.iteritems():
102             value = getattr(self, attr, None)
103             if value is not None:
104                 target_filters[tag] = value
105         return target_filters
106     
107     @property
108     def applicable_filters(self):
109         has = lambda att : getattr(self,att,None) is not None
110         return (
111             filter(has, self.BASEFILTERS.iterkeys())
112             + filter(has, self.TAGFILTERS.iterkeys())
113         )
114     
115     def find_candidates(self, filter_slice_id=None):
116         self._logger.info("Finding candidates for %s", self.make_filter_description())
117         
118         fields = ('node_id',)
119         replacements = {'timeframe':self.timeframe}
120         
121         # get initial candidates (no tag filters)
122         basefilters = self.build_filters({}, self.BASEFILTERS)
123         rootfilters = basefilters.copy()
124         if filter_slice_id:
125             basefilters['|slice_ids'] = (filter_slice_id,)
126         
127         # only pick healthy nodes
128         basefilters['run_level'] = 'boot'
129         basefilters['boot_state'] = 'boot'
130         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
131         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
132         
133         # keyword-only "pseudofilters"
134         extra = {}
135         if self.site:
136             extra['peer'] = self.site
137             
138         candidates = set(map(operator.itemgetter('node_id'), 
139             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
140         
141         # filter by tag, one tag at a time
142         applicable = self.applicable_filters
143         for tagfilter in self.TAGFILTERS.iteritems():
144             attr, (tagname, expr) = tagfilter
145             
146             # don't bother if there's no filter defined
147             if attr in applicable:
148                 tagfilter = rootfilters.copy()
149                 tagfilter['tagname'] = tagname % replacements
150                 tagfilter[expr % replacements] = getattr(self,attr)
151                 tagfilter['node_id'] = list(candidates)
152                 
153                 candidates &= set(map(operator.itemgetter('node_id'),
154                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
155         
156         # filter by vsys tags - special case since it doesn't follow
157         # the usual semantics
158         if self.required_vsys:
159             newcandidates = collections.defaultdict(set)
160             
161             vsys_tags = self._api.GetNodeTags(
162                 tagname='vsys', 
163                 node_id = list(candidates), 
164                 fields = ['node_id','value'])
165             
166             vsys_tags = map(
167                 operator.itemgetter(['node_id','value']),
168                 vsys_tags)
169             
170             required_vsys = self.required_vsys
171             for node_id, value in vsys_tags:
172                 if value in required_vsys:
173                     newcandidates[value].add(node_id)
174             
175             # take only those that have all the required vsys tags
176             newcandidates = reduce(
177                 lambda accum, new : accum & new,
178                 newcandidates.itervalues(),
179                 candidates)
180         
181         # filter by iface count
182         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
183             # fetch interfaces for all, in one go
184             filters = basefilters.copy()
185             filters['node_id'] = list(candidates)
186             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
187                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
188             
189             # filter candidates by interface count
190             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
191                 predicate = ( lambda node_id : 
192                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
193             elif self.min_num_external_ifaces is not None:
194                 predicate = ( lambda node_id : 
195                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
196             else:
197                 predicate = ( lambda node_id : 
198                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
199             
200             candidates = set(filter(predicate, candidates))
201         
202         # make sure hostnames are resolvable
203         if candidates:
204             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
205             
206             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
207                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
208             ))
209             def resolvable(node_id):
210                 try:
211                     addr = socket.gethostbyname(hostnames[node_id])
212                     return addr is not None
213                 except:
214                     return False
215             candidates = set(parallel.pfilter(resolvable, candidates,
216                 maxthreads = 16))
217
218             self._logger.info("  Found %s reachable candidates.", len(candidates))
219             
220         return candidates
221     
222     def make_filter_description(self):
223         """
224         Makes a human-readable description of filtering conditions
225         for find_candidates.
226         """
227         
228         # get initial candidates (no tag filters)
229         filters = self.build_filters({}, self.BASEFILTERS)
230         
231         # keyword-only "pseudofilters"
232         if self.site:
233             filters['peer'] = self.site
234             
235         # filter by tag, one tag at a time
236         applicable = self.applicable_filters
237         for tagfilter in self.TAGFILTERS.iteritems():
238             attr, (tagname, expr) = tagfilter
239             
240             # don't bother if there's no filter defined
241             if attr in applicable:
242                 filters[attr] = getattr(self,attr)
243         
244         # filter by vsys tags - special case since it doesn't follow
245         # the usual semantics
246         if self.required_vsys:
247             filters['vsys'] = ','.join(list(self.required_vsys))
248         
249         # filter by iface count
250         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
251             filters['num_ifaces'] = '-'.join([
252                 str(self.min_num_external_ifaces or '0'),
253                 str(self.max_num_external_ifaces or 'inf')
254             ])
255             
256         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
257
258     def assign_node_id(self, node_id):
259         self._node_id = node_id
260         self.fetch_node_info()
261     
262     def unassign_node(self):
263         self._node_id = None
264         self.__dict__.update(self.__orig_attrs)
265     
266     def fetch_node_info(self):
267         orig_attrs = {}
268         
269         info = self._api.GetNodes(self._node_id)[0]
270         tags = dict( (t['tagname'],t['value'])
271                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
272
273         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
274         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
275         self.min_num_external_ifaces = None
276         self.max_num_external_ifaces = None
277         self.timeframe = 'm'
278         
279         replacements = {'timeframe':self.timeframe}
280         for attr, tag in self.BASEFILTERS.iteritems():
281             if tag in info:
282                 value = info[tag]
283                 if hasattr(self, attr):
284                     orig_attrs[attr] = getattr(self, attr)
285                 setattr(self, attr, value)
286         for attr, (tag,_) in self.TAGFILTERS.iteritems():
287             tag = tag % replacements
288             if tag in tags:
289                 value = tags[tag]
290                 if hasattr(self, attr):
291                     orig_attrs[attr] = getattr(self, attr)
292                 setattr(self, attr, value)
293         
294         if 'peer_id' in info:
295             orig_attrs['site'] = self.site
296             self.site = self._api.peer_map[info['peer_id']]
297         
298         if 'interface_ids' in info:
299             self.min_num_external_ifaces = \
300             self.max_num_external_ifaces = len(info['interface_ids'])
301         
302         if 'ssh_rsa_key' in info:
303             orig_attrs['server_key'] = self.server_key
304             self.server_key = info['ssh_rsa_key']
305         
306         self.__orig_attrs = orig_attrs
307
308     def validate(self):
309         if self.home_path is None:
310             raise AssertionError, "Misconfigured node: missing home path"
311         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
312             raise AssertionError, "Misconfigured node: missing slice SSH key"
313         if self.slicename is None:
314             raise AssertionError, "Misconfigured node: unspecified slice"
315
316     def install_dependencies(self):
317         if self.required_packages:
318             # TODO: make dependant on the experiment somehow...
319             pidfile = self.DEPENDS_PIDFILE
320             logfile = self.DEPENDS_LOGFILE
321             
322             # If we need rpmfusion, we must install the repo definition and the gpg keys
323             if self.rpmFusion:
324                 if self.operatingSystem == 'f12':
325                     # Fedora 12 requires a different rpmfusion package
326                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
327                 else:
328                     # This one works for f13+
329                     RPM_FUSION_URL = self.RPM_FUSION_URL
330                     
331                 rpmFusion = (
332                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
333                 ) % {
334                     'RPM_FUSION_URL' : RPM_FUSION_URL
335                 }
336             else:
337                 rpmFusion = ''
338             
339             # Start process in a "daemonized" way, using nohup and heavy
340             # stdin/out redirection to avoid connection issues
341             (out,err),proc = rspawn.remote_spawn(
342                 "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
343                     'packages' : ' '.join(self.required_packages),
344                     'rpmfusion' : rpmFusion,
345                 },
346                 pidfile = pidfile,
347                 stdout = logfile,
348                 stderr = rspawn.STDOUT,
349                 
350                 host = self.hostname,
351                 port = None,
352                 user = self.slicename,
353                 agent = None,
354                 ident_key = self.ident_path,
355                 server_key = self.server_key,
356                 sudo = True
357                 )
358             
359             if proc.wait():
360                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
361     
362     def wait_provisioning(self, timeout = 20*60):
363         # recently provisioned nodes may not be up yet
364         sleeptime = 1.0
365         totaltime = 0.0
366         while not self.is_alive():
367             time.sleep(sleeptime)
368             totaltime += sleeptime
369             sleeptime = min(30.0, sleeptime*1.5)
370             
371             if totaltime > timeout:
372                 # PlanetLab has a 15' delay on configuration propagation
373                 # If we're above that delay, the unresponsiveness is not due
374                 # to this delay.
375                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
376     
377     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
378         if self.required_packages:
379             pidfile = self.DEPENDS_PIDFILE
380             
381             # get PID
382             pid = ppid = None
383             for probenum in xrange(pidmax):
384                 pidtuple = rspawn.remote_check_pid(
385                     pidfile = pidfile,
386                     host = self.hostname,
387                     port = None,
388                     user = self.slicename,
389                     agent = None,
390                     ident_key = self.ident_path,
391                     server_key = self.server_key
392                     )
393                 if pidtuple:
394                     pid, ppid = pidtuple
395                     break
396                 else:
397                     time.sleep(pidprobe)
398             else:
399                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
400         
401             # wait for it to finish
402             while rspawn.RUNNING is rspawn.remote_status(
403                     pid, ppid,
404                     host = self.hostname,
405                     port = None,
406                     user = self.slicename,
407                     agent = None,
408                     ident_key = self.ident_path,
409                     server_key = self.server_key
410                     ):
411                 time.sleep(probe)
412                 probe = min(probemax, 1.5*probe)
413             
414             # check results
415             logfile = self.DEPENDS_LOGFILE
416             
417             (out,err),proc = server.popen_ssh_command(
418                 "cat %s" % (server.shell_escape(logfile),),
419                 host = self.hostname,
420                 port = None,
421                 user = self.slicename,
422                 agent = None,
423                 ident_key = self.ident_path,
424                 server_key = self.server_key
425                 )
426             
427             if proc.wait():
428                 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
429             
430             success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
431             if not success:
432                 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
433         
434     def is_alive(self):
435         # Make sure all the paths are created where 
436         # they have to be created for deployment
437         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
438             "echo 'ALIVE'",
439             host = self.hostname,
440             port = None,
441             user = self.slicename,
442             agent = None,
443             ident_key = self.ident_path,
444             server_key = self.server_key
445             )
446         
447         if proc.wait():
448             return False
449         elif not err and out.strip() == 'ALIVE':
450             return True
451         else:
452             return False
453     
454
455     def configure_routes(self, routes, devs):
456         """
457         Add the specified routes to the node's routing table
458         """
459         rules = []
460         
461         for route in routes:
462             for dev in devs:
463                 if dev.routes_here(route):
464                     # Schedule rule
465                     dest, prefix, nexthop, metric = route
466                     rules.append(
467                         "add %s%s gw %s %s" % (
468                             dest,
469                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
470                             nexthop,
471                             dev.if_name,
472                         )
473                     )
474                     
475                     # Stop checking
476                     break
477             else:
478                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
479                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
480         
481         self._logger.info("Setting up routes for %s", self.hostname)
482         
483         (out,err),proc = server.popen_ssh_command(
484             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
485                 home = server.shell_escape(self.home_path)),
486             host = self.hostname,
487             port = None,
488             user = self.slicename,
489             agent = None,
490             ident_key = self.ident_path,
491             server_key = self.server_key,
492             stdin = '\n'.join(rules)
493             )
494         
495         if proc.wait() or err:
496             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
497         
498         
499