1d4d5cae63ab4273ea1f9b55bf97b9e174ad7ab7
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16 import ipaddr
17 import operator
18 import re
19
20 from nepi.util import server
21 from nepi.util import parallel
22
23 import application
24
25 MAX_VROUTE_ROUTES = 5
26
27 class UnresponsiveNodeError(RuntimeError):
28     pass
29
30 def _castproperty(typ, propattr):
31     def _get(self):
32         return getattr(self, propattr)
33     def _set(self, value):
34         if value is not None or (isinstance(value, basestring) and not value):
35             value = typ(value)
36         return setattr(self, propattr, value)
37     def _del(self, value):
38         return delattr(self, propattr)
39     _get.__name__ = propattr + '_get'
40     _set.__name__ = propattr + '_set'
41     _del.__name__ = propattr + '_del'
42     return property(_get, _set, _del)
43
44 class Node(object):
45     BASEFILTERS = {
46         # Map Node attribute to plcapi filter name
47         'hostname' : 'hostname',
48     }
49     
50     TAGFILTERS = {
51         # Map Node attribute to (<tag name>, <plcapi filter expression>)
52         #   There are replacements that are applied with string formatting,
53         #   so '%' has to be escaped as '%%'.
54         'architecture' : ('arch','value'),
55         'operatingSystem' : ('fcdistro','value'),
56         'pl_distro' : ('pldistro','value'),
57         'city' : ('city','value'),
58         'country' : ('country','value'),
59         'region' : ('region','value'),
60         'minReliability' : ('reliability%(timeframe)s', ']value'),
61         'maxReliability' : ('reliability%(timeframe)s', '[value'),
62         'minBandwidth' : ('bw%(timeframe)s', ']value'),
63         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
64         'minLoad' : ('load%(timeframe)s', ']value'),
65         'maxLoad' : ('load%(timeframe)s', '[value'),
66         'minCpu' : ('cpu%(timeframe)s', ']value'),
67         'maxCpu' : ('cpu%(timeframe)s', '[value'),
68     }
69     
70     RATE_FACTORS = (
71         # (<tag name>, <weight>, <default>)
72         ('bw%(timeframe)s', -0.001, 1024.0),
73         ('cpu%(timeframe)s', 0.1, 40.0),
74         ('load%(timeframe)s', -0.2, 3.0),
75         ('reliability%(timeframe)s', 1, 100.0),
76     )
77     
78     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
79     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
80     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82     
83     minReliability = _castproperty(float, '_minReliability')
84     maxReliability = _castproperty(float, '_maxReliability')
85     minBandwidth = _castproperty(float, '_minBandwidth')
86     maxBandwidth = _castproperty(float, '_maxBandwidth')
87     minCpu = _castproperty(float, '_minCpu')
88     maxCpu = _castproperty(float, '_maxCpu')
89     minLoad = _castproperty(float, '_minLoad')
90     maxLoad = _castproperty(float, '_maxLoad')
91     
92     def __init__(self, api=None):
93         if not api:
94             api = plcapi.PLCAPI()
95         self._api = api
96         
97         # Attributes
98         self.hostname = None
99         self.architecture = None
100         self.operatingSystem = None
101         self.pl_distro = None
102         self.site = None
103         self.city = None
104         self.country = None
105         self.region = None
106         self.minReliability = None
107         self.maxReliability = None
108         self.minBandwidth = None
109         self.maxBandwidth = None
110         self.minCpu = None
111         self.maxCpu = None
112         self.minLoad = None
113         self.maxLoad = None
114         self.min_num_external_ifaces = None
115         self.max_num_external_ifaces = None
116         self.timeframe = 'm'
117         
118         # Applications and routes add requirements to connected nodes
119         self.required_packages = set()
120         self.required_vsys = set()
121         self.pythonpath = []
122         self.rpmFusion = False
123         self.env = collections.defaultdict(list)
124         
125         # Some special applications - initialized when connected
126         self.multicast_forwarder = None
127         
128         # Testbed-derived attributes
129         self.slicename = None
130         self.ident_path = None
131         self.server_key = None
132         self.home_path = None
133         self.enable_cleanup = False
134         
135         # Those are filled when an actual node is allocated
136         self._node_id = None
137         self._yum_dependencies = None
138         self._installed = False
139
140         # Logging
141         self._logger = logging.getLogger('nepi.testbeds.planetlab')
142     
143     def _nepi_testbed_environment_setup_get(self):
144         command = cStringIO.StringIO()
145         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
147         ))
148         command.write(' ; export PATH=$PATH:%s' % (
149             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
150         ))
151         if self.env:
152             for envkey, envvals in self.env.iteritems():
153                 for envval in envvals:
154                     command.write(' ; export %s=%s' % (envkey, envval))
155         return command.getvalue()
156     def _nepi_testbed_environment_setup_set(self, value):
157         pass
158     _nepi_testbed_environment_setup = property(
159         _nepi_testbed_environment_setup_get,
160         _nepi_testbed_environment_setup_set)
161     
162     def build_filters(self, target_filters, filter_map):
163         for attr, tag in filter_map.iteritems():
164             value = getattr(self, attr, None)
165             if value is not None:
166                 target_filters[tag] = value
167         return target_filters
168     
169     @property
170     def applicable_filters(self):
171         has = lambda att : getattr(self,att,None) is not None
172         return (
173             filter(has, self.BASEFILTERS.iterkeys())
174             + filter(has, self.TAGFILTERS.iterkeys())
175         )
176     
177     def find_candidates(self, filter_slice_id=None):
178         self._logger.info("Finding candidates for %s", self.make_filter_description())
179         
180         fields = ('node_id',)
181         replacements = {'timeframe':self.timeframe}
182         
183         # get initial candidates (no tag filters)
184         basefilters = self.build_filters({}, self.BASEFILTERS)
185         rootfilters = basefilters.copy()
186         if filter_slice_id:
187             basefilters['|slice_ids'] = (filter_slice_id,)
188         
189         # only pick healthy nodes
190         basefilters['run_level'] = 'boot'
191         basefilters['boot_state'] = 'boot'
192         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
194         
195         # keyword-only "pseudofilters"
196         extra = {}
197         if self.site:
198             extra['peer'] = self.site
199             
200         candidates = set(map(operator.itemgetter('node_id'), 
201             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
202         
203         # filter by tag, one tag at a time
204         applicable = self.applicable_filters
205         for tagfilter in self.TAGFILTERS.iteritems():
206             attr, (tagname, expr) = tagfilter
207             
208             # don't bother if there's no filter defined
209             if attr in applicable:
210                 tagfilter = rootfilters.copy()
211                 tagfilter['tagname'] = tagname % replacements
212                 tagfilter[expr % replacements] = getattr(self,attr)
213                 tagfilter['node_id'] = list(candidates)
214                 
215                 candidates &= set(map(operator.itemgetter('node_id'),
216                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
217         
218         # filter by vsys tags - special case since it doesn't follow
219         # the usual semantics
220         if self.required_vsys:
221             newcandidates = collections.defaultdict(set)
222             
223             vsys_tags = self._api.GetNodeTags(
224                 tagname='vsys', 
225                 node_id = list(candidates), 
226                 fields = ['node_id','value'])
227             
228             vsys_tags = map(
229                 operator.itemgetter(['node_id','value']),
230                 vsys_tags)
231             
232             required_vsys = self.required_vsys
233             for node_id, value in vsys_tags:
234                 if value in required_vsys:
235                     newcandidates[value].add(node_id)
236             
237             # take only those that have all the required vsys tags
238             newcandidates = reduce(
239                 lambda accum, new : accum & new,
240                 newcandidates.itervalues(),
241                 candidates)
242         
243         # filter by iface count
244         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245             # fetch interfaces for all, in one go
246             filters = basefilters.copy()
247             filters['node_id'] = list(candidates)
248             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
250             
251             # filter candidates by interface count
252             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253                 predicate = ( lambda node_id : 
254                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255             elif self.min_num_external_ifaces is not None:
256                 predicate = ( lambda node_id : 
257                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
258             else:
259                 predicate = ( lambda node_id : 
260                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
261             
262             candidates = set(filter(predicate, candidates))
263         
264         # make sure hostnames are resolvable
265         if candidates:
266             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
267             
268             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
269                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
270             ))
271             def resolvable(node_id):
272                 try:
273                     addr = socket.gethostbyname(hostnames[node_id])
274                     return addr is not None
275                 except:
276                     return False
277             candidates = set(parallel.pfilter(resolvable, candidates,
278                 maxthreads = 16))
279
280             self._logger.info("  Found %s reachable candidates.", len(candidates))
281             
282         return candidates
283     
284     def make_filter_description(self):
285         """
286         Makes a human-readable description of filtering conditions
287         for find_candidates.
288         """
289         
290         # get initial candidates (no tag filters)
291         filters = self.build_filters({}, self.BASEFILTERS)
292         
293         # keyword-only "pseudofilters"
294         if self.site:
295             filters['peer'] = self.site
296             
297         # filter by tag, one tag at a time
298         applicable = self.applicable_filters
299         for tagfilter in self.TAGFILTERS.iteritems():
300             attr, (tagname, expr) = tagfilter
301             
302             # don't bother if there's no filter defined
303             if attr in applicable:
304                 filters[attr] = getattr(self,attr)
305         
306         # filter by vsys tags - special case since it doesn't follow
307         # the usual semantics
308         if self.required_vsys:
309             filters['vsys'] = ','.join(list(self.required_vsys))
310         
311         # filter by iface count
312         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
313             filters['num_ifaces'] = '-'.join([
314                 str(self.min_num_external_ifaces or '0'),
315                 str(self.max_num_external_ifaces or 'inf')
316             ])
317             
318         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
319
320     def assign_node_id(self, node_id):
321         self._node_id = node_id
322         self.fetch_node_info()
323     
324     def unassign_node(self):
325         self._node_id = None
326         self.hostip = None
327         
328         try:
329             orig_attrs = self.__orig_attrs
330         except AttributeError:
331             return
332             
333         for key, value in orig_attrs.iteritems():
334             setattr(self, key, value)
335         del self.__orig_attrs
336     
337     def rate_nodes(self, nodes):
338         rates = collections.defaultdict(int)
339         tags = collections.defaultdict(dict)
340         replacements = {'timeframe':self.timeframe}
341         tagnames = [ tagname % replacements 
342                      for tagname, weight, default in self.RATE_FACTORS ]
343         
344         taginfo = self._api.GetNodeTags(
345             node_id=list(nodes), 
346             tagname=tagnames,
347             fields=('node_id','tagname','value'))
348         
349         unpack = operator.itemgetter('node_id','tagname','value')
350         for value in taginfo:
351             node, tagname, value = unpack(value)
352             if value and value.lower() != 'n/a':
353                 tags[tagname][int(node)] = float(value)
354         
355         for tagname, weight, default in self.RATE_FACTORS:
356             taginfo = tags[tagname % replacements].get
357             for node in nodes:
358                 rates[node] += weight * taginfo(node,default)
359         
360         return map(rates.__getitem__, nodes)
361             
362     def fetch_node_info(self):
363         orig_attrs = {}
364         
365         self._api.StartMulticall()
366         info = self._api.GetNodes(self._node_id)
367         tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
368         info, tags = self._api.FinishMulticall()
369         info = info[0]
370         
371         tags = dict( (t['tagname'],t['value'])
372                      for t in tags )
373
374         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
375         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
376         self.min_num_external_ifaces = None
377         self.max_num_external_ifaces = None
378         self.timeframe = 'm'
379         
380         replacements = {'timeframe':self.timeframe}
381         for attr, tag in self.BASEFILTERS.iteritems():
382             if tag in info:
383                 value = info[tag]
384                 if hasattr(self, attr):
385                     orig_attrs[attr] = getattr(self, attr)
386                 setattr(self, attr, value)
387         for attr, (tag,_) in self.TAGFILTERS.iteritems():
388             tag = tag % replacements
389             if tag in tags:
390                 value = tags[tag]
391                 if hasattr(self, attr):
392                     orig_attrs[attr] = getattr(self, attr)
393                 if not value or value.lower() == 'n/a':
394                     value = None
395                 setattr(self, attr, value)
396         
397         if 'peer_id' in info:
398             orig_attrs['site'] = self.site
399             self.site = self._api.peer_map[info['peer_id']]
400         
401         if 'interface_ids' in info:
402             self.min_num_external_ifaces = \
403             self.max_num_external_ifaces = len(info['interface_ids'])
404         
405         if 'ssh_rsa_key' in info:
406             orig_attrs['server_key'] = self.server_key
407             self.server_key = info['ssh_rsa_key']
408         
409         self.hostip = socket.gethostbyname(self.hostname)
410         
411         try:
412             self.__orig_attrs
413         except AttributeError:
414             self.__orig_attrs = orig_attrs
415
416     def validate(self):
417         if self.home_path is None:
418             raise AssertionError, "Misconfigured node: missing home path"
419         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
420             raise AssertionError, "Misconfigured node: missing slice SSH key"
421         if self.slicename is None:
422             raise AssertionError, "Misconfigured node: unspecified slice"
423
424     def recover(self):
425         # Mark dependencies installed
426         self._installed = True
427         
428         # Clear load attributes, they impair re-discovery
429         self.minReliability = \
430         self.maxReliability = \
431         self.minBandwidth = \
432         self.maxBandwidth = \
433         self.minCpu = \
434         self.maxCpu = \
435         self.minLoad = \
436         self.maxLoad = None
437
438     def install_dependencies(self):
439         if self.required_packages and not self._installed:
440             # If we need rpmfusion, we must install the repo definition and the gpg keys
441             if self.rpmFusion:
442                 if self.operatingSystem == 'f12':
443                     # Fedora 12 requires a different rpmfusion package
444                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
445                 else:
446                     # This one works for f13+
447                     RPM_FUSION_URL = self.RPM_FUSION_URL
448                     
449                 rpmFusion = (
450                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
451                 ) % {
452                     'RPM_FUSION_URL' : RPM_FUSION_URL
453                 }
454             else:
455                 rpmFusion = ''
456             
457             if rpmFusion:
458                 (out,err),proc = server.popen_ssh_command(
459                     rpmFusion,
460                     host = self.hostname,
461                     port = None,
462                     user = self.slicename,
463                     agent = None,
464                     ident_key = self.ident_path,
465                     server_key = self.server_key,
466                     timeout = 600,
467                     )
468                 
469                 if proc.wait():
470                     if self.check_bad_host(out,err):
471                         self.blacklist()
472                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
473             
474             # Launch p2p yum dependency installer
475             self._yum_dependencies.async_setup()
476     
477     def wait_provisioning(self, timeout = 20*60):
478         # Wait for the p2p installer
479         sleeptime = 1.0
480         totaltime = 0.0
481         while not self.is_alive():
482             time.sleep(sleeptime)
483             totaltime += sleeptime
484             sleeptime = min(30.0, sleeptime*1.5)
485             
486             if totaltime > timeout:
487                 # PlanetLab has a 15' delay on configuration propagation
488                 # If we're above that delay, the unresponsiveness is not due
489                 # to this delay.
490                 if not self.is_alive(verbose=True):
491                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
492         
493         # Ensure the node is clean (no apps running that could interfere with operations)
494         if self.enable_cleanup:
495             self.do_cleanup()
496     
497     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
498         # Wait for the p2p installer
499         if self._yum_dependencies and not self._installed:
500             self._yum_dependencies.async_setup_wait()
501             self._installed = True
502         
503     def is_alive(self, verbose = False):
504         # Make sure all the paths are created where 
505         # they have to be created for deployment
506         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
507             "echo 'ALIVE'",
508             host = self.hostname,
509             port = None,
510             user = self.slicename,
511             agent = None,
512             ident_key = self.ident_path,
513             server_key = self.server_key,
514             timeout = 60,
515             err_on_timeout = False,
516             persistent = False
517             )
518         
519         if proc.wait():
520             if verbose:
521                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
522             return False
523         elif not err and out.strip() == 'ALIVE':
524             return True
525         else:
526             if verbose:
527                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
528             return False
529     
530     def destroy(self):
531         if self.enable_cleanup:
532             self.do_cleanup()
533     
534     def blacklist(self):
535         if self._node_id:
536             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
537             import util
538             util.appendBlacklist(self._node_id)
539     
540     def do_cleanup(self):
541         if self.testbed().recovering:
542             # WOW - not now
543             return
544             
545         self._logger.info("Cleaning up %s", self.hostname)
546         
547         cmds = [
548             "sudo -S killall python tcpdump || /bin/true ; "
549             "sudo -S killall python tcpdump || /bin/true ; "
550             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
551             "sudo -S killall -u %(slicename)s || /bin/true ",
552             "sudo -S killall -u root || /bin/true ",
553             "sudo -S killall -u %(slicename)s || /bin/true ",
554             "sudo -S killall -u root || /bin/true ",
555         ]
556
557         for cmd in cmds:
558             (out,err),proc = server.popen_ssh_command(
559                 # Some apps need two kills
560                 cmd % {
561                     'slicename' : self.slicename ,
562                 },
563                 host = self.hostname,
564                 port = None,
565                 user = self.slicename,
566                 agent = None,
567                 ident_key = self.ident_path,
568                 server_key = self.server_key,
569                 tty = True, # so that ps -N -T works as advertised...
570                 timeout = 60,
571                 retry = 3
572                 )
573             proc.wait()
574     
575     def prepare_dependencies(self):
576         # Configure p2p yum dependency installer
577         if self.required_packages and not self._installed:
578             self._yum_dependencies = application.YumDependency(self._api)
579             self._yum_dependencies.node = self
580             self._yum_dependencies.home_path = "nepi-yumdep"
581             self._yum_dependencies.depends = ' '.join(self.required_packages)
582
583     def routing_method(self, routes, vsys_vnet):
584         """
585         There are two methods, vroute and sliceip.
586         
587         vroute:
588             Modifies the node's routing table directly, validating that the IP
589             range lies within the network given by the slice's vsys_vnet tag.
590             This method is the most scalable for very small routing tables
591             that need not modify other routes (including the default)
592         
593         sliceip:
594             Uses policy routing and iptables filters to create per-sliver
595             routing tables. It's the most flexible way, but it doesn't scale
596             as well since only 155 routing tables can be created this way.
597         
598         This method will return the most appropriate routing method, which will
599         prefer vroute for small routing tables.
600         """
601         
602         # For now, sliceip results in kernel panics
603         # so we HAVE to use vroute
604         return 'vroute'
605         
606         # We should not make the routing table grow too big
607         if len(routes) > MAX_VROUTE_ROUTES:
608             return 'sliceip'
609         
610         vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
611         for route in routes:
612             dest, prefix, nexthop, metric = route
613             dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
614             nexthop = ipaddr.IPAddress(nexthop)
615             if dest not in vsys_vnet or nexthop not in vsys_vnet:
616                 return 'sliceip'
617         
618         return 'vroute'
619     
620     def format_route(self, route, dev, method, action):
621         dest, prefix, nexthop, metric = route
622         if method == 'vroute':
623             return (
624                 "%s %s%s gw %s %s" % (
625                     action,
626                     dest,
627                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
628                     nexthop,
629                     dev,
630                 )
631             )
632         elif method == 'sliceip':
633             return (
634                 "route %s to %s%s via %s metric %s dev %s" % (
635                     action,
636                     dest,
637                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
638                     nexthop,
639                     metric or 1,
640                     dev,
641                 )
642             )
643         else:
644             raise AssertionError, "Unknown method"
645     
646     def _annotate_routes_with_devs(self, routes, devs, method):
647         dev_routes = []
648         for route in routes:
649             for dev in devs:
650                 if dev.routes_here(route):
651                     dev_routes.append(tuple(route) + (dev.if_name,))
652                     
653                     # Stop checking
654                     break
655             else:
656                 if method == 'sliceip':
657                     dev_routes.append(tuple(route) + ('eth0',))
658                 else:
659                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
660                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
661         return dev_routes
662     
663     def configure_routes(self, routes, devs, vsys_vnet):
664         """
665         Add the specified routes to the node's routing table
666         """
667         rules = []
668         method = self.routing_method(routes, vsys_vnet)
669         tdevs = set()
670         
671         # annotate routes with devices
672         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
673         for route in dev_routes:
674             route, dev = route[:-1], route[-1]
675             
676             # Schedule rule
677             tdevs.add(dev)
678             rules.append(self.format_route(route, dev, method, 'add'))
679         
680         if method == 'sliceip':
681             rules = map('enable '.__add__, tdevs) + rules
682         
683         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
684         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
685         
686         self.apply_route_rules(rules, method)
687         
688         self._configured_routes = set(routes)
689         self._configured_devs = tdevs
690         self._configured_method = method
691     
692     def reconfigure_routes(self, routes, devs, vsys_vnet):
693         """
694         Updates the routes in the node's routing table to match
695         the given route list
696         """
697         method = self._configured_method
698         
699         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
700
701         current = self._configured_routes
702         current_devs = self._configured_devs
703         
704         new = set(dev_routes)
705         new_devs = set(map(operator.itemgetter(-1), dev_routes))
706         
707         deletions = current - new
708         insertions = new - current
709         
710         dev_deletions = current_devs - new_devs
711         dev_insertions = new_devs - current_devs
712         
713         # Generate rules
714         rules = []
715         
716         # Rule deletions first
717         for route in deletions:
718             route, dev = route[:-1], route[-1]
719             rules.append(self.format_route(route, dev, method, 'del'))
720         
721         if method == 'sliceip':
722             # Dev deletions now
723             rules.extend(map('disable '.__add__, dev_deletions))
724
725             # Dev insertions now
726             rules.extend(map('enable '.__add__, dev_insertions))
727
728         # Rule insertions now
729         for route in insertions:
730             route, dev = route[:-1], dev[-1]
731             rules.append(self.format_route(route, dev, method, 'add'))
732         
733         # Apply
734         self.apply_route_rules(rules, method)
735         
736         self._configured_routes = dev_routes
737         self._configured_devs = new_devs
738         
739     def apply_route_rules(self, rules, method):
740         (out,err),proc = server.popen_ssh_command(
741             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
742                 home = server.shell_escape(self.home_path),
743                 method = method),
744             host = self.hostname,
745             port = None,
746             user = self.slicename,
747             agent = None,
748             ident_key = self.ident_path,
749             server_key = self.server_key,
750             stdin = '\n'.join(rules),
751             timeout = 300
752             )
753         
754         if proc.wait() or err:
755             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
756         elif out or err:
757             logger.debug("%s said: %s%s", method, out, err)
758
759     def check_bad_host(self, out, err):
760         badre = re.compile(r'(?:'
761                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
762                            r'|Error: disk I/O error'
763                            r')', 
764                            re.I)
765         return badre.search(out) or badre.search(err)