Merge with head
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 import application
21
22 class UnresponsiveNodeError(RuntimeError):
23     pass
24
25 def _castproperty(typ, propattr):
26     def _get(self):
27         return getattr(self, propattr)
28     def _set(self, value):
29         if value is not None or (isinstance(value, basestring) and not value):
30             value = typ(value)
31         return setattr(self, propattr, value)
32     def _del(self, value):
33         return delattr(self, propattr)
34     _get.__name__ = propattr + '_get'
35     _set.__name__ = propattr + '_set'
36     _del.__name__ = propattr + '_del'
37     return property(_get, _set, _del)
38
39 class Node(object):
40     BASEFILTERS = {
41         # Map Node attribute to plcapi filter name
42         'hostname' : 'hostname',
43     }
44     
45     TAGFILTERS = {
46         # Map Node attribute to (<tag name>, <plcapi filter expression>)
47         #   There are replacements that are applied with string formatting,
48         #   so '%' has to be escaped as '%%'.
49         'architecture' : ('arch','value'),
50         'operatingSystem' : ('fcdistro','value'),
51         'pl_distro' : ('pldistro','value'),
52         'city' : ('city','value'),
53         'country' : ('country','value'),
54         'region' : ('region','value'),
55         'minReliability' : ('reliability%(timeframe)s', ']value'),
56         'maxReliability' : ('reliability%(timeframe)s', '[value'),
57         'minBandwidth' : ('bw%(timeframe)s', ']value'),
58         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
59         'minLoad' : ('load%(timeframe)s', ']value'),
60         'maxLoad' : ('load%(timeframe)s', '[value'),
61         'minCpu' : ('cpu%(timeframe)s', ']value'),
62         'maxCpu' : ('cpu%(timeframe)s', '[value'),
63     }    
64     
65     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
66     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
67     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
68     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
69     
70     minReliability = _castproperty(float, '_minReliability')
71     maxReliability = _castproperty(float, '_maxReliability')
72     minBandwidth = _castproperty(float, '_minBandwidth')
73     maxBandwidth = _castproperty(float, '_maxBandwidth')
74     minCpu = _castproperty(float, '_minCpu')
75     maxCpu = _castproperty(float, '_maxCpu')
76     minLoad = _castproperty(float, '_minLoad')
77     maxLoad = _castproperty(float, '_maxLoad')
78     
79     def __init__(self, api=None):
80         if not api:
81             api = plcapi.PLCAPI()
82         self._api = api
83         
84         # Attributes
85         self.hostname = None
86         self.architecture = None
87         self.operatingSystem = None
88         self.pl_distro = None
89         self.site = None
90         self.city = None
91         self.country = None
92         self.region = None
93         self.minReliability = None
94         self.maxReliability = None
95         self.minBandwidth = None
96         self.maxBandwidth = None
97         self.minCpu = None
98         self.maxCpu = None
99         self.minLoad = None
100         self.maxLoad = None
101         self.min_num_external_ifaces = None
102         self.max_num_external_ifaces = None
103         self.timeframe = 'm'
104         
105         # Applications and routes add requirements to connected nodes
106         self.required_packages = set()
107         self.required_vsys = set()
108         self.pythonpath = []
109         self.rpmFusion = False
110         self.env = collections.defaultdict(list)
111         
112         # Testbed-derived attributes
113         self.slicename = None
114         self.ident_path = None
115         self.server_key = None
116         self.home_path = None
117         self.enable_cleanup = False
118         
119         # Those are filled when an actual node is allocated
120         self._node_id = None
121         self._yum_dependencies = None
122         self._installed = False
123
124         # Logging
125         self._logger = logging.getLogger('nepi.testbeds.planetlab')
126     
127     def _nepi_testbed_environment_setup_get(self):
128         command = cStringIO.StringIO()
129         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
130             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
131         ))
132         command.write(' ; export PATH=$PATH:%s' % (
133             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
134         ))
135         if self.env:
136             for envkey, envvals in self.env.iteritems():
137                 for envval in envvals:
138                     command.write(' ; export %s=%s' % (envkey, envval))
139         return command.getvalue()
140     def _nepi_testbed_environment_setup_set(self, value):
141         pass
142     _nepi_testbed_environment_setup = property(
143         _nepi_testbed_environment_setup_get,
144         _nepi_testbed_environment_setup_set)
145     
146     def build_filters(self, target_filters, filter_map):
147         for attr, tag in filter_map.iteritems():
148             value = getattr(self, attr, None)
149             if value is not None:
150                 target_filters[tag] = value
151         return target_filters
152     
153     @property
154     def applicable_filters(self):
155         has = lambda att : getattr(self,att,None) is not None
156         return (
157             filter(has, self.BASEFILTERS.iterkeys())
158             + filter(has, self.TAGFILTERS.iterkeys())
159         )
160     
161     def find_candidates(self, filter_slice_id=None):
162         self._logger.info("Finding candidates for %s", self.make_filter_description())
163         
164         fields = ('node_id',)
165         replacements = {'timeframe':self.timeframe}
166         
167         # get initial candidates (no tag filters)
168         basefilters = self.build_filters({}, self.BASEFILTERS)
169         rootfilters = basefilters.copy()
170         if filter_slice_id:
171             basefilters['|slice_ids'] = (filter_slice_id,)
172         
173         # only pick healthy nodes
174         basefilters['run_level'] = 'boot'
175         basefilters['boot_state'] = 'boot'
176         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
177         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
178         
179         # keyword-only "pseudofilters"
180         extra = {}
181         if self.site:
182             extra['peer'] = self.site
183             
184         candidates = set(map(operator.itemgetter('node_id'), 
185             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
186         
187         # filter by tag, one tag at a time
188         applicable = self.applicable_filters
189         for tagfilter in self.TAGFILTERS.iteritems():
190             attr, (tagname, expr) = tagfilter
191             
192             # don't bother if there's no filter defined
193             if attr in applicable:
194                 tagfilter = rootfilters.copy()
195                 tagfilter['tagname'] = tagname % replacements
196                 tagfilter[expr % replacements] = getattr(self,attr)
197                 tagfilter['node_id'] = list(candidates)
198                 
199                 candidates &= set(map(operator.itemgetter('node_id'),
200                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
201         
202         # filter by vsys tags - special case since it doesn't follow
203         # the usual semantics
204         if self.required_vsys:
205             newcandidates = collections.defaultdict(set)
206             
207             vsys_tags = self._api.GetNodeTags(
208                 tagname='vsys', 
209                 node_id = list(candidates), 
210                 fields = ['node_id','value'])
211             
212             vsys_tags = map(
213                 operator.itemgetter(['node_id','value']),
214                 vsys_tags)
215             
216             required_vsys = self.required_vsys
217             for node_id, value in vsys_tags:
218                 if value in required_vsys:
219                     newcandidates[value].add(node_id)
220             
221             # take only those that have all the required vsys tags
222             newcandidates = reduce(
223                 lambda accum, new : accum & new,
224                 newcandidates.itervalues(),
225                 candidates)
226         
227         # filter by iface count
228         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
229             # fetch interfaces for all, in one go
230             filters = basefilters.copy()
231             filters['node_id'] = list(candidates)
232             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
233                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
234             
235             # filter candidates by interface count
236             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
237                 predicate = ( lambda node_id : 
238                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
239             elif self.min_num_external_ifaces is not None:
240                 predicate = ( lambda node_id : 
241                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
242             else:
243                 predicate = ( lambda node_id : 
244                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
245             
246             candidates = set(filter(predicate, candidates))
247         
248         # make sure hostnames are resolvable
249         if candidates:
250             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
251             
252             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
253                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
254             ))
255             def resolvable(node_id):
256                 try:
257                     addr = socket.gethostbyname(hostnames[node_id])
258                     return addr is not None
259                 except:
260                     return False
261             candidates = set(parallel.pfilter(resolvable, candidates,
262                 maxthreads = 16))
263
264             self._logger.info("  Found %s reachable candidates.", len(candidates))
265             
266         return candidates
267     
268     def make_filter_description(self):
269         """
270         Makes a human-readable description of filtering conditions
271         for find_candidates.
272         """
273         
274         # get initial candidates (no tag filters)
275         filters = self.build_filters({}, self.BASEFILTERS)
276         
277         # keyword-only "pseudofilters"
278         if self.site:
279             filters['peer'] = self.site
280             
281         # filter by tag, one tag at a time
282         applicable = self.applicable_filters
283         for tagfilter in self.TAGFILTERS.iteritems():
284             attr, (tagname, expr) = tagfilter
285             
286             # don't bother if there's no filter defined
287             if attr in applicable:
288                 filters[attr] = getattr(self,attr)
289         
290         # filter by vsys tags - special case since it doesn't follow
291         # the usual semantics
292         if self.required_vsys:
293             filters['vsys'] = ','.join(list(self.required_vsys))
294         
295         # filter by iface count
296         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
297             filters['num_ifaces'] = '-'.join([
298                 str(self.min_num_external_ifaces or '0'),
299                 str(self.max_num_external_ifaces or 'inf')
300             ])
301             
302         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
303
304     def assign_node_id(self, node_id):
305         self._node_id = node_id
306         self.fetch_node_info()
307     
308     def unassign_node(self):
309         self._node_id = None
310         self.__dict__.update(self.__orig_attrs)
311     
312     def fetch_node_info(self):
313         orig_attrs = {}
314         
315         info = self._api.GetNodes(self._node_id)[0]
316         tags = dict( (t['tagname'],t['value'])
317                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
318
319         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
320         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
321         self.min_num_external_ifaces = None
322         self.max_num_external_ifaces = None
323         self.timeframe = 'm'
324         
325         replacements = {'timeframe':self.timeframe}
326         for attr, tag in self.BASEFILTERS.iteritems():
327             if tag in info:
328                 value = info[tag]
329                 if hasattr(self, attr):
330                     orig_attrs[attr] = getattr(self, attr)
331                 setattr(self, attr, value)
332         for attr, (tag,_) in self.TAGFILTERS.iteritems():
333             tag = tag % replacements
334             if tag in tags:
335                 value = tags[tag]
336                 if hasattr(self, attr):
337                     orig_attrs[attr] = getattr(self, attr)
338                 setattr(self, attr, value)
339         
340         if 'peer_id' in info:
341             orig_attrs['site'] = self.site
342             self.site = self._api.peer_map[info['peer_id']]
343         
344         if 'interface_ids' in info:
345             self.min_num_external_ifaces = \
346             self.max_num_external_ifaces = len(info['interface_ids'])
347         
348         if 'ssh_rsa_key' in info:
349             orig_attrs['server_key'] = self.server_key
350             self.server_key = info['ssh_rsa_key']
351         
352         self.__orig_attrs = orig_attrs
353
354     def validate(self):
355         if self.home_path is None:
356             raise AssertionError, "Misconfigured node: missing home path"
357         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
358             raise AssertionError, "Misconfigured node: missing slice SSH key"
359         if self.slicename is None:
360             raise AssertionError, "Misconfigured node: unspecified slice"
361
362     def recover(self):
363         # Mark dependencies installed
364         self._installed = True
365         
366         # Clear load attributes, they impair re-discovery
367         self.minReliability = \
368         self.maxReliability = \
369         self.minBandwidth = \
370         self.maxBandwidth = \
371         self.minCpu = \
372         self.maxCpu = \
373         self.minLoad = \
374         self.maxLoad = None
375
376     def install_dependencies(self):
377         if self.required_packages and not self._installed:
378             # If we need rpmfusion, we must install the repo definition and the gpg keys
379             if self.rpmFusion:
380                 if self.operatingSystem == 'f12':
381                     # Fedora 12 requires a different rpmfusion package
382                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
383                 else:
384                     # This one works for f13+
385                     RPM_FUSION_URL = self.RPM_FUSION_URL
386                     
387                 rpmFusion = (
388                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
389                 ) % {
390                     'RPM_FUSION_URL' : RPM_FUSION_URL
391                 }
392             else:
393                 rpmFusion = ''
394             
395             if rpmFusion:
396                 (out,err),proc = server.popen_ssh_command(
397                     rpmFusion,
398                     host = self.hostname,
399                     port = None,
400                     user = self.slicename,
401                     agent = None,
402                     ident_key = self.ident_path,
403                     server_key = self.server_key
404                     )
405                 
406                 if proc.wait():
407                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
408             
409             # Launch p2p yum dependency installer
410             self._yum_dependencies.async_setup()
411     
412     def wait_provisioning(self, timeout = 20*60):
413         # Wait for the p2p installer
414         sleeptime = 1.0
415         totaltime = 0.0
416         while not self.is_alive():
417             time.sleep(sleeptime)
418             totaltime += sleeptime
419             sleeptime = min(30.0, sleeptime*1.5)
420             
421             if totaltime > timeout:
422                 # PlanetLab has a 15' delay on configuration propagation
423                 # If we're above that delay, the unresponsiveness is not due
424                 # to this delay.
425                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
426         
427         # Ensure the node is clean (no apps running that could interfere with operations)
428         if self.enable_cleanup:
429             self.do_cleanup()
430     
431     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
432         # Wait for the p2p installer
433         if self._yum_dependencies and not self._installed:
434             self._yum_dependencies.async_setup_wait()
435             self._installed = True
436         
437     def is_alive(self):
438         # Make sure all the paths are created where 
439         # they have to be created for deployment
440         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
441             "echo 'ALIVE'",
442             host = self.hostname,
443             port = None,
444             user = self.slicename,
445             agent = None,
446             ident_key = self.ident_path,
447             server_key = self.server_key
448             )
449         
450         if proc.wait():
451             return False
452         elif not err and out.strip() == 'ALIVE':
453             return True
454         else:
455             return False
456     
457     def destroy(self):
458         if self.enable_cleanup:
459             self.do_cleanup()
460     
461     def do_cleanup(self):
462         self._logger.info("Cleaning up %s", self.hostname)
463
464         (out,err),proc = server.popen_ssh_command(
465             # Some apps need two kills
466             "sudo -S killall -u %(slicename)s ; sudo -S killall -u root ; "
467             "sudo -S killall -u %(slicename)s ; sudo -S killall -u root" % {
468                 'slicename' : self.slicename ,
469             },
470             host = self.hostname,
471             port = None,
472             user = self.slicename,
473             agent = None,
474             ident_key = self.ident_path,
475             server_key = self.server_key
476             )
477         proc.wait()
478     
479     def prepare_dependencies(self):
480         # Configure p2p yum dependency installer
481         if self.required_packages and not self._installed:
482             self._yum_dependencies = application.YumDependency(self._api)
483             self._yum_dependencies.node = self
484             self._yum_dependencies.home_path = "nepi-yumdep"
485             self._yum_dependencies.depends = ' '.join(self.required_packages)
486
487     def configure_routes(self, routes, devs):
488         """
489         Add the specified routes to the node's routing table
490         """
491         rules = []
492         
493         for route in routes:
494             for dev in devs:
495                 if dev.routes_here(route):
496                     # Schedule rule
497                     dest, prefix, nexthop, metric = route
498                     rules.append(
499                         "add %s%s gw %s %s" % (
500                             dest,
501                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
502                             nexthop,
503                             dev.if_name,
504                         )
505                     )
506                     
507                     # Stop checking
508                     break
509             else:
510                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
511                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
512         
513         self._logger.info("Setting up routes for %s", self.hostname)
514         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
515         
516         (out,err),proc = server.popen_ssh_command(
517             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.5" % dict(
518                 home = server.shell_escape(self.home_path)),
519             host = self.hostname,
520             port = None,
521             user = self.slicename,
522             agent = None,
523             ident_key = self.ident_path,
524             server_key = self.server_key,
525             stdin = '\n'.join(rules)
526             )
527         
528         if proc.wait() or err:
529             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
530         elif out or err:
531             logger.debug("Routes said: %s%s", out, err)
532         
533         
534