9e56c32fe4730eec1f841ab47d0a7e18df2fa271
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15
16 from nepi.util import server
17 from nepi.util import parallel
18
19 class UnresponsiveNodeError(RuntimeError):
20     pass
21
22 class Node(object):
23     BASEFILTERS = {
24         # Map Node attribute to plcapi filter name
25         'hostname' : 'hostname',
26     }
27     
28     TAGFILTERS = {
29         # Map Node attribute to (<tag name>, <plcapi filter expression>)
30         #   There are replacements that are applied with string formatting,
31         #   so '%' has to be escaped as '%%'.
32         'architecture' : ('arch','value'),
33         'operatingSystem' : ('fcdistro','value'),
34         'pl_distro' : ('pldistro','value'),
35         'minReliability' : ('reliability%(timeframe)s', ']value'),
36         'maxReliability' : ('reliability%(timeframe)s', '[value'),
37         'minBandwidth' : ('bw%(timeframe)s', ']value'),
38         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
39     }    
40     
41     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
42     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
43     
44     def __init__(self, api=None):
45         if not api:
46             api = plcapi.PLCAPI()
47         self._api = api
48         
49         # Attributes
50         self.hostname = None
51         self.architecture = None
52         self.operatingSystem = None
53         self.pl_distro = None
54         self.site = None
55         self.emulation = None
56         self.minReliability = None
57         self.maxReliability = None
58         self.minBandwidth = None
59         self.maxBandwidth = None
60         self.min_num_external_ifaces = None
61         self.max_num_external_ifaces = None
62         self.timeframe = 'm'
63         
64         # Applications and routes add requirements to connected nodes
65         self.required_packages = set()
66         self.required_vsys = set()
67         self.pythonpath = []
68         self.env = collections.defaultdict(list)
69         
70         # Testbed-derived attributes
71         self.slicename = None
72         self.ident_path = None
73         self.server_key = None
74         self.home_path = None
75         
76         # Those are filled when an actual node is allocated
77         self._node_id = None
78     
79     @property
80     def _nepi_testbed_environment_setup(self):
81         command = cStringIO.StringIO()
82         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
83             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
84         ))
85         command.write(' ; export PATH=$PATH:%s' % (
86             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
87         ))
88         if self.env:
89             for envkey, envvals in self.env.iteritems():
90                 for envval in envvals:
91                     command.write(' ; export %s=%s' % (envkey, envval))
92         return command.getvalue()
93     
94     def build_filters(self, target_filters, filter_map):
95         for attr, tag in filter_map.iteritems():
96             value = getattr(self, attr, None)
97             if value is not None:
98                 target_filters[tag] = value
99         return target_filters
100     
101     @property
102     def applicable_filters(self):
103         has = lambda att : getattr(self,att,None) is not None
104         return (
105             filter(has, self.BASEFILTERS.iterkeys())
106             + filter(has, self.TAGFILTERS.iterkeys())
107         )
108     
109     def find_candidates(self, filter_slice_id=None):
110         print >>sys.stderr, "Finding candidates for", self.make_filter_description()
111         
112         fields = ('node_id',)
113         replacements = {'timeframe':self.timeframe}
114         
115         # get initial candidates (no tag filters)
116         basefilters = self.build_filters({}, self.BASEFILTERS)
117         rootfilters = basefilters.copy()
118         if filter_slice_id:
119             basefilters['|slice_ids'] = (filter_slice_id,)
120         
121         # only pick healthy nodes
122         basefilters['run_level'] = 'boot'
123         basefilters['boot_state'] = 'boot'
124         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
125         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
126         
127         # keyword-only "pseudofilters"
128         extra = {}
129         if self.site:
130             extra['peer'] = self.site
131             
132         candidates = set(map(operator.itemgetter('node_id'), 
133             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
134         
135         # filter by tag, one tag at a time
136         applicable = self.applicable_filters
137         for tagfilter in self.TAGFILTERS.iteritems():
138             attr, (tagname, expr) = tagfilter
139             
140             # don't bother if there's no filter defined
141             if attr in applicable:
142                 tagfilter = rootfilters.copy()
143                 tagfilter['tagname'] = tagname % replacements
144                 tagfilter[expr % replacements] = getattr(self,attr)
145                 tagfilter['node_id'] = list(candidates)
146                 
147                 candidates &= set(map(operator.itemgetter('node_id'),
148                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
149         
150         # filter by vsys tags - special case since it doesn't follow
151         # the usual semantics
152         if self.required_vsys:
153             newcandidates = collections.defaultdict(set)
154             
155             vsys_tags = self._api.GetNodeTags(
156                 tagname='vsys', 
157                 node_id = list(candidates), 
158                 fields = ['node_id','value'])
159             
160             vsys_tags = map(
161                 operator.itemgetter(['node_id','value']),
162                 vsys_tags)
163             
164             required_vsys = self.required_vsys
165             for node_id, value in vsys_tags:
166                 if value in required_vsys:
167                     newcandidates[value].add(node_id)
168             
169             # take only those that have all the required vsys tags
170             newcandidates = reduce(
171                 lambda accum, new : accum & new,
172                 newcandidates.itervalues(),
173                 candidates)
174         
175         # filter by iface count
176         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
177             # fetch interfaces for all, in one go
178             filters = basefilters.copy()
179             filters['node_id'] = list(candidates)
180             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
181                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
182             
183             # filter candidates by interface count
184             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
185                 predicate = ( lambda node_id : 
186                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
187             elif self.min_num_external_ifaces is not None:
188                 predicate = ( lambda node_id : 
189                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
190             else:
191                 predicate = ( lambda node_id : 
192                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
193             
194             candidates = set(filter(predicate, candidates))
195         
196         # make sure hostnames are resolvable
197         if candidates:
198             print >>sys.stderr, "  Found", len(candidates), "candidates. Checking for reachability..."
199             
200             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
201                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
202             ))
203             def resolvable(node_id):
204                 try:
205                     addr = socket.gethostbyname(hostnames[node_id])
206                     return addr is not None
207                 except:
208                     return False
209             candidates = set(parallel.pfilter(resolvable, candidates,
210                 maxthreads = 16))
211
212             print >>sys.stderr, "  Found", len(candidates), "reachable candidates."
213             
214         return candidates
215     
216     def make_filter_description(self):
217         """
218         Makes a human-readable description of filtering conditions
219         for find_candidates.
220         """
221         
222         # get initial candidates (no tag filters)
223         filters = self.build_filters({}, self.BASEFILTERS)
224         
225         # keyword-only "pseudofilters"
226         if self.site:
227             filters['peer'] = self.site
228             
229         # filter by tag, one tag at a time
230         applicable = self.applicable_filters
231         for tagfilter in self.TAGFILTERS.iteritems():
232             attr, (tagname, expr) = tagfilter
233             
234             # don't bother if there's no filter defined
235             if attr in applicable:
236                 filters[attr] = getattr(self,attr)
237         
238         # filter by vsys tags - special case since it doesn't follow
239         # the usual semantics
240         if self.required_vsys:
241             filters['vsys'] = ','.join(list(self.required_vsys))
242         
243         # filter by iface count
244         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245             filters['num_ifaces'] = '-'.join([
246                 str(self.min_num_external_ifaces or '0'),
247                 str(self.max_num_external_ifaces or 'inf')
248             ])
249             
250         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
251
252     def assign_node_id(self, node_id):
253         self._node_id = node_id
254         self.fetch_node_info()
255     
256     def unassign_node(self):
257         self._node_id = None
258         self.__dict__.update(self.__orig_attrs)
259     
260     def fetch_node_info(self):
261         orig_attrs = {}
262         
263         info = self._api.GetNodes(self._node_id)[0]
264         tags = dict( (t['tagname'],t['value'])
265                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
266
267         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
268         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
269         self.min_num_external_ifaces = None
270         self.max_num_external_ifaces = None
271         self.timeframe = 'm'
272         
273         replacements = {'timeframe':self.timeframe}
274         for attr, tag in self.BASEFILTERS.iteritems():
275             if tag in info:
276                 value = info[tag]
277                 if hasattr(self, attr):
278                     orig_attrs[attr] = getattr(self, attr)
279                 setattr(self, attr, value)
280         for attr, (tag,_) in self.TAGFILTERS.iteritems():
281             tag = tag % replacements
282             if tag in tags:
283                 value = tags[tag]
284                 if hasattr(self, attr):
285                     orig_attrs[attr] = getattr(self, attr)
286                 setattr(self, attr, value)
287         
288         if 'peer_id' in info:
289             orig_attrs['site'] = self.site
290             self.site = self._api.peer_map[info['peer_id']]
291         
292         if 'interface_ids' in info:
293             self.min_num_external_ifaces = \
294             self.max_num_external_ifaces = len(info['interface_ids'])
295         
296         if 'ssh_rsa_key' in info:
297             orig_attrs['server_key'] = self.server_key
298             self.server_key = info['ssh_rsa_key']
299         
300         self.__orig_attrs = orig_attrs
301
302     def validate(self):
303         if self.home_path is None:
304             raise AssertionError, "Misconfigured node: missing home path"
305         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
306             raise AssertionError, "Misconfigured node: missing slice SSH key"
307         if self.slicename is None:
308             raise AssertionError, "Misconfigured node: unspecified slice"
309
310     def install_dependencies(self):
311         if self.required_packages:
312             # TODO: make dependant on the experiment somehow...
313             pidfile = self.DEPENDS_PIDFILE
314             logfile = self.DEPENDS_LOGFILE
315             
316             # Start process in a "daemonized" way, using nohup and heavy
317             # stdin/out redirection to avoid connection issues
318             (out,err),proc = rspawn.remote_spawn(
319                 "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
320                     'packages' : ' '.join(self.required_packages),
321                 },
322                 pidfile = pidfile,
323                 stdout = logfile,
324                 stderr = rspawn.STDOUT,
325                 
326                 host = self.hostname,
327                 port = None,
328                 user = self.slicename,
329                 agent = None,
330                 ident_key = self.ident_path,
331                 server_key = self.server_key,
332                 sudo = True
333                 )
334             
335             if proc.wait():
336                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
337     
338     def wait_provisioning(self):
339         # recently provisioned nodes may not be up yet
340         sleeptime = 1.0
341         totaltime = 0.0
342         while not self.is_alive():
343             time.sleep(sleeptime)
344             totaltime += sleeptime
345             sleeptime = min(30.0, sleeptime*1.5)
346             
347             if totaltime > 20*60:
348                 # PlanetLab has a 15' delay on configuration propagation
349                 # If we're above that delay, the unresponsiveness is not due
350                 # to this delay.
351                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
352     
353     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
354         if self.required_packages:
355             pidfile = self.DEPENDS_PIDFILE
356             
357             # get PID
358             pid = ppid = None
359             for probenum in xrange(pidmax):
360                 pidtuple = rspawn.remote_check_pid(
361                     pidfile = pidfile,
362                     host = self.hostname,
363                     port = None,
364                     user = self.slicename,
365                     agent = None,
366                     ident_key = self.ident_path,
367                     server_key = self.server_key
368                     )
369                 if pidtuple:
370                     pid, ppid = pidtuple
371                     break
372                 else:
373                     time.sleep(pidprobe)
374             else:
375                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
376         
377             # wait for it to finish
378             while rspawn.RUNNING is rspawn.remote_status(
379                     pid, ppid,
380                     host = self.hostname,
381                     port = None,
382                     user = self.slicename,
383                     agent = None,
384                     ident_key = self.ident_path,
385                     server_key = self.server_key
386                     ):
387                 time.sleep(probe)
388                 probe = min(probemax, 1.5*probe)
389             
390             # check results
391             logfile = self.DEPENDS_LOGFILE
392             
393             (out,err),proc = server.popen_ssh_command(
394                 "cat %s" % (server.shell_escape(logfile),),
395                 host = self.hostname,
396                 port = None,
397                 user = self.slicename,
398                 agent = None,
399                 ident_key = self.ident_path,
400                 server_key = self.server_key
401                 )
402             
403             if proc.wait():
404                 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
405             
406             success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
407             if not success:
408                 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
409         
410     def is_alive(self):
411         # Make sure all the paths are created where 
412         # they have to be created for deployment
413         (out,err),proc = server.popen_ssh_command(
414             "echo 'ALIVE'",
415             host = self.hostname,
416             port = None,
417             user = self.slicename,
418             agent = None,
419             ident_key = self.ident_path,
420             server_key = self.server_key
421             )
422         
423         if proc.wait():
424             return False
425         elif not err and out.strip() == 'ALIVE':
426             return True
427         else:
428             return False
429     
430
431     def configure_routes(self, routes, devs):
432         """
433         Add the specified routes to the node's routing table
434         """
435         rules = []
436         
437         for route in routes:
438             for dev in devs:
439                 if dev.routes_here(route):
440                     # Schedule rule
441                     dest, prefix, nexthop = route
442                     rules.append(
443                         "add %s%s gw %s %s" % (
444                             dest,
445                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
446                             nexthop,
447                             dev.if_name,
448                         )
449                     )
450                     
451                     # Stop checking
452                     break
453             else:
454                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
455                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
456         
457         (out,err),proc = server.popen_ssh_command(
458             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
459                 home = server.shell_escape(self.home_path)),
460             host = self.hostname,
461             port = None,
462             user = self.slicename,
463             agent = None,
464             ident_key = self.ident_path,
465             server_key = self.server_key,
466             stdin = '\n'.join(rules)
467             )
468         
469         if proc.wait() or err:
470             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
471         
472         
473