Making node.py code independent from the plcapi implementation.
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None, sliceapi=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         self._sliceapi = sliceapi
96         
97         # Attributes
98         self.hostname = None
99         self.architecture = None
100         self.operatingSystem = None
101         self.pl_distro = None
102         self.site = None
103         self.city = None
104         self.country = None
105         self.region = None
106         self.minReliability = None
107         self.maxReliability = None
108         self.minBandwidth = None
109         self.maxBandwidth = None
110         self.minCpu = None
111         self.maxCpu = None
112         self.minLoad = None
113         self.maxLoad = None
114         self.min_num_external_ifaces = None
115         self.max_num_external_ifaces = None
116         self.timeframe = 'm'
117         
118         # Applications and routes add requirements to connected nodes
119         self.required_packages = set()
120         self.required_vsys = set()
121         self.pythonpath = []
122         self.rpmFusion = False
123         self.env = collections.defaultdict(list)
124         
125         # Some special applications - initialized when connected
126         self.multicast_forwarder = None
127         
128         # Testbed-derived attributes
129         self.slicename = None
130         self.ident_path = None
131         self.server_key = None
132         self.home_path = None
133         self.enable_cleanup = False
134         
135         # Those are filled when an actual node is allocated
136         self._node_id = None
137         self._yum_dependencies = None
138         self._installed = False
139
140         # Logging
141         self._logger = logging.getLogger('nepi.testbeds.planetlab')
142     
143     def _nepi_testbed_environment_setup_get(self):
144         command = cStringIO.StringIO()
145         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
147         ))
148         command.write(' ; export PATH=$PATH:%s' % (
149             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
150         ))
151         if self.env:
152             for envkey, envvals in self.env.iteritems():
153                 for envval in envvals:
154                     command.write(' ; export %s=%s' % (envkey, envval))
155         return command.getvalue()
156     def _nepi_testbed_environment_setup_set(self, value):
157         pass
158     _nepi_testbed_environment_setup = property(
159         _nepi_testbed_environment_setup_get,
160         _nepi_testbed_environment_setup_set)
161     
162     def build_filters(self, target_filters, filter_map):
163         for attr, tag in filter_map.iteritems():
164             value = getattr(self, attr, None)
165             if value is not None:
166                 target_filters[tag] = value
167         return target_filters
168     
169     @property
170     def applicable_filters(self):
171         has = lambda att : getattr(self,att,None) is not None
172         return (
173             filter(has, self.BASEFILTERS.iterkeys())
174             + filter(has, self.TAGFILTERS.iterkeys())
175         )
176     
177     def find_candidates(self, filter_slice_id=None):
178         self._logger.info("Finding candidates for %s", self.make_filter_description())
179         
180         fields = ('node_id',)
181         replacements = {'timeframe':self.timeframe}
182         
183         # get initial candidates (no tag filters)
184         basefilters = self.build_filters({}, self.BASEFILTERS)
185         rootfilters = basefilters.copy()
186         if filter_slice_id:
187             basefilters['|slice_ids'] = (filter_slice_id,)
188         
189         # only pick healthy nodes
190         basefilters['run_level'] = 'boot'
191         basefilters['boot_state'] = 'boot'
192         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
194         
195         # keyword-only "pseudofilters"
196         extra = {}
197         if self.site:
198             extra['peer'] = self.site
199             
200         candidates = set(map(operator.itemgetter('node_id'), 
201             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
202         
203         # filter by tag, one tag at a time
204         applicable = self.applicable_filters
205         for tagfilter in self.TAGFILTERS.iteritems():
206             attr, (tagname, expr) = tagfilter
207             
208             # don't bother if there's no filter defined
209             if attr in applicable:
210                 tagfilter = rootfilters.copy()
211                 tagfilter['tagname'] = tagname % replacements
212                 tagfilter[expr % replacements] = getattr(self,attr)
213                 tagfilter['node_id'] = list(candidates)
214                 
215                 candidates &= set(map(operator.itemgetter('node_id'),
216                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
217         
218         # filter by vsys tags - special case since it doesn't follow
219         # the usual semantics
220         if self.required_vsys:
221             newcandidates = collections.defaultdict(set)
222             
223             vsys_tags = self._sliceapi.GetNodeTags(
224                 tagname='vsys', 
225                 node_id = list(candidates), 
226                 fields = ['node_id','value'])
227
228             vsys_tags = map(
229                 operator.itemgetter(['node_id','value']),
230                 vsys_tags)
231             
232             required_vsys = self.required_vsys
233             for node_id, value in vsys_tags:
234                 if value in required_vsys:
235                     newcandidates[value].add(node_id)
236             
237             # take only those that have all the required vsys tags
238             newcandidates = reduce(
239                 lambda accum, new : accum & new,
240                 newcandidates.itervalues(),
241                 candidates)
242         
243         # filter by iface count
244         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245             # fetch interfaces for all, in one go
246             filters = basefilters.copy()
247             filters['node_id'] = list(candidates)
248             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
250             
251             # filter candidates by interface count
252             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253                 predicate = ( lambda node_id : 
254                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255             elif self.min_num_external_ifaces is not None:
256                 predicate = ( lambda node_id : 
257                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
258             else:
259                 predicate = ( lambda node_id : 
260                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
261             
262             candidates = set(filter(predicate, candidates))
263         
264         # make sure hostnames are resolvable
265         if candidates:
266             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
267             
268             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
269                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
270             ))
271             def resolvable(node_id):
272                 try:
273                     addr = socket.gethostbyname(hostnames[node_id])
274                     return addr is not None
275                 except:
276                     return False
277             candidates = set(parallel.pfilter(resolvable, candidates,
278                 maxthreads = 16))
279
280             self._logger.info("  Found %s reachable candidates.", len(candidates))
281             
282         return candidates
283     
284     def make_filter_description(self):
285         """
286         Makes a human-readable description of filtering conditions
287         for find_candidates.
288         """
289         
290         # get initial candidates (no tag filters)
291         filters = self.build_filters({}, self.BASEFILTERS)
292         
293         # keyword-only "pseudofilters"
294         if self.site:
295             filters['peer'] = self.site
296             
297         # filter by tag, one tag at a time
298         applicable = self.applicable_filters
299         for tagfilter in self.TAGFILTERS.iteritems():
300             attr, (tagname, expr) = tagfilter
301             
302             # don't bother if there's no filter defined
303             if attr in applicable:
304                 filters[attr] = getattr(self,attr)
305         
306         # filter by vsys tags - special case since it doesn't follow
307         # the usual semantics
308         if self.required_vsys:
309             filters['vsys'] = ','.join(list(self.required_vsys))
310         
311         # filter by iface count
312         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
313             filters['num_ifaces'] = '-'.join([
314                 str(self.min_num_external_ifaces or '0'),
315                 str(self.max_num_external_ifaces or 'inf')
316             ])
317             
318         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
319
320     def assign_node_id(self, node_id):
321         self._node_id = node_id
322         self.fetch_node_info()
323     
324     def unassign_node(self):
325         self._node_id = None
326         self.hostip = None
327         
328         try:
329             orig_attrs = self.__orig_attrs
330         except AttributeError:
331             return
332             
333         for key, value in orig_attrs.iteritems():
334             setattr(self, key, value)
335         del self.__orig_attrs
336     
337     def rate_nodes(self, nodes):
338         rates = collections.defaultdict(int)
339         tags = collections.defaultdict(dict)
340         replacements = {'timeframe':self.timeframe}
341         tagnames = [ tagname % replacements 
342                      for tagname, weight, default in self.RATE_FACTORS ]
343         
344         taginfo = self._sliceapi.GetNodeTags(
345             node_id=list(nodes), 
346             tagname=tagnames,
347             fields=('node_id','tagname','value'))
348
349         unpack = operator.itemgetter('node_id','tagname','value')
350         for value in taginfo:
351             node, tagname, value = unpack(value)
352             if value and value.lower() != 'n/a':
353                 tags[tagname][int(node)] = float(value)
354         
355         for tagname, weight, default in self.RATE_FACTORS:
356             taginfo = tags[tagname % replacements].get
357             for node in nodes:
358                 rates[node] += weight * taginfo(node,default)
359         
360         return map(rates.__getitem__, nodes)
361             
362     def fetch_node_info(self):
363         orig_attrs = {}
364         
365         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
366         info = info[0]
367         
368         tags = dict( (t['tagname'],t['value'])
369                      for t in tags )
370
371         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
372         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
373         self.min_num_external_ifaces = None
374         self.max_num_external_ifaces = None
375         self.timeframe = 'm'
376         
377         replacements = {'timeframe':self.timeframe}
378         for attr, tag in self.BASEFILTERS.iteritems():
379             if tag in info:
380                 value = info[tag]
381                 if hasattr(self, attr):
382                     orig_attrs[attr] = getattr(self, attr)
383                 setattr(self, attr, value)
384         for attr, (tag,_) in self.TAGFILTERS.iteritems():
385             tag = tag % replacements
386             if tag in tags:
387                 value = tags[tag]
388                 if hasattr(self, attr):
389                     orig_attrs[attr] = getattr(self, attr)
390                 if not value or value.lower() == 'n/a':
391                     value = None
392                 setattr(self, attr, value)
393         
394         if 'peer_id' in info:
395             orig_attrs['site'] = self.site
396             self.site = self._api.peer_map[info['peer_id']]
397         
398         if 'interface_ids' in info:
399             self.min_num_external_ifaces = \
400             self.max_num_external_ifaces = len(info['interface_ids'])
401         
402         if 'ssh_rsa_key' in info:
403             orig_attrs['server_key'] = self.server_key
404             self.server_key = info['ssh_rsa_key']
405         
406         self.hostip = socket.gethostbyname(self.hostname)
407         
408         try:
409             self.__orig_attrs
410         except AttributeError:
411             self.__orig_attrs = orig_attrs
412
413     def validate(self):
414         if self.home_path is None:
415             raise AssertionError, "Misconfigured node: missing home path"
416         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
417             raise AssertionError, "Misconfigured node: missing slice SSH key"
418         if self.slicename is None:
419             raise AssertionError, "Misconfigured node: unspecified slice"
420
421     def recover(self):
422         # Mark dependencies installed
423         self._installed = True
424         
425         # Clear load attributes, they impair re-discovery
426         self.minReliability = \
427         self.maxReliability = \
428         self.minBandwidth = \
429         self.maxBandwidth = \
430         self.minCpu = \
431         self.maxCpu = \
432         self.minLoad = \
433         self.maxLoad = None
434
435     def install_dependencies(self):
436         if self.required_packages and not self._installed:
437             # If we need rpmfusion, we must install the repo definition and the gpg keys
438             if self.rpmFusion:
439                 if self.operatingSystem == 'f12':
440                     # Fedora 12 requires a different rpmfusion package
441                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
442                 else:
443                     # This one works for f13+
444                     RPM_FUSION_URL = self.RPM_FUSION_URL
445                     
446                 rpmFusion = (
447                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
448                 ) % {
449                     'RPM_FUSION_URL' : RPM_FUSION_URL
450                 }
451             else:
452                 rpmFusion = ''
453             
454             if rpmFusion:
455                 (out,err),proc = server.popen_ssh_command(
456                     rpmFusion,
457                     host = self.hostname,
458                     port = None,
459                     user = self.slicename,
460                     agent = None,
461                     ident_key = self.ident_path,
462                     server_key = self.server_key,
463                     timeout = 600,
464                     )
465                 
466                 if proc.wait():
467                     if self.check_bad_host(out,err):
468                         self.blacklist()
469                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
470             
471             # Launch p2p yum dependency installer
472             self._yum_dependencies.async_setup()
473     
474     def wait_provisioning(self, timeout = 20*60):
475         # Wait for the p2p installer
476         sleeptime = 1.0
477         totaltime = 0.0
478         while not self.is_alive():
479             time.sleep(sleeptime)
480             totaltime += sleeptime
481             sleeptime = min(30.0, sleeptime*1.5)
482             
483             if totaltime > timeout:
484                 # PlanetLab has a 15' delay on configuration propagation
485                 # If we're above that delay, the unresponsiveness is not due
486                 # to this delay.
487                 if not self.is_alive(verbose=True):
488                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
489         
490         # Ensure the node is clean (no apps running that could interfere with operations)
491         if self.enable_cleanup:
492             self.do_cleanup()
493     
494     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
495         # Wait for the p2p installer
496         if self._yum_dependencies and not self._installed:
497             self._yum_dependencies.async_setup_wait()
498             self._installed = True
499         
500     def is_alive(self, verbose = False):
501         # Make sure all the paths are created where 
502         # they have to be created for deployment
503         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
504             "echo 'ALIVE'",
505             host = self.hostname,
506             port = None,
507             user = self.slicename,
508             agent = None,
509             ident_key = self.ident_path,
510             server_key = self.server_key,
511             timeout = 60,
512             err_on_timeout = False,
513             persistent = False
514             )
515         
516         if proc.wait():
517             if verbose:
518                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
519             return False
520         elif not err and out.strip() == 'ALIVE':
521             return True
522         else:
523             if verbose:
524                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
525             return False
526     
527     def destroy(self):
528         if self.enable_cleanup:
529             self.do_cleanup()
530     
531     def blacklist(self):
532         if self._node_id:
533             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
534             import util
535             util.appendBlacklist(self._node_id)
536     
537     def do_cleanup(self):
538         if self.testbed().recovering:
539             # WOW - not now
540             return
541             
542         self._logger.info("Cleaning up %s", self.hostname)
543         
544         cmds = [
545             "sudo -S killall python tcpdump || /bin/true ; "
546             "sudo -S killall python tcpdump || /bin/true ; "
547             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
548             "sudo -S killall -u %(slicename)s || /bin/true ",
549             "sudo -S killall -u root || /bin/true ",
550             "sudo -S killall -u %(slicename)s || /bin/true ",
551             "sudo -S killall -u root || /bin/true ",
552         ]
553
554         for cmd in cmds:
555             (out,err),proc = server.popen_ssh_command(
556                 # Some apps need two kills
557                 cmd % {
558                     'slicename' : self.slicename ,
559                 },
560                 host = self.hostname,
561                 port = None,
562                 user = self.slicename,
563                 agent = None,
564                 ident_key = self.ident_path,
565                 server_key = self.server_key,
566                 tty = True, # so that ps -N -T works as advertised...
567                 timeout = 60,
568                 retry = 3
569                 )
570             proc.wait()
571     
572     def prepare_dependencies(self):
573         # Configure p2p yum dependency installer
574         if self.required_packages and not self._installed:
575             self._yum_dependencies = application.YumDependency(self._api)
576             self._yum_dependencies.node = self
577             self._yum_dependencies.home_path = "nepi-yumdep"
578             self._yum_dependencies.depends = ' '.join(self.required_packages)
579
580     def routing_method(self, routes, vsys_vnet):
581         """
582         There are two methods, vroute and sliceip.
583         
584         vroute:
585             Modifies the node's routing table directly, validating that the IP
586             range lies within the network given by the slice's vsys_vnet tag.
587             This method is the most scalable for very small routing tables
588             that need not modify other routes (including the default)
589         
590         sliceip:
591             Uses policy routing and iptables filters to create per-sliver
592             routing tables. It's the most flexible way, but it doesn't scale
593             as well since only 155 routing tables can be created this way.
594         
595         This method will return the most appropriate routing method, which will
596         prefer vroute for small routing tables.
597         """
598         
599         # For now, sliceip results in kernel panics
600         # so we HAVE to use vroute
601         return 'vroute'
602         
603         # We should not make the routing table grow too big
604         if len(routes) > MAX_VROUTE_ROUTES:
605             return 'sliceip'
606         
607         vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
608         for route in routes:
609             dest, prefix, nexthop, metric = route
610             dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
611             nexthop = ipaddr.IPAddress(nexthop)
612             if dest not in vsys_vnet or nexthop not in vsys_vnet:
613                 return 'sliceip'
614         
615         return 'vroute'
616     
617     def format_route(self, route, dev, method, action):
618         dest, prefix, nexthop, metric = route
619         if method == 'vroute':
620             return (
621                 "%s %s%s gw %s %s" % (
622                     action,
623                     dest,
624                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
625                     nexthop,
626                     dev,
627                 )
628             )
629         elif method == 'sliceip':
630             return (
631                 "route %s to %s%s via %s metric %s dev %s" % (
632                     action,
633                     dest,
634                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
635                     nexthop,
636                     metric or 1,
637                     dev,
638                 )
639             )
640         else:
641             raise AssertionError, "Unknown method"
642     
643     def _annotate_routes_with_devs(self, routes, devs, method):
644         dev_routes = []
645         for route in routes:
646             for dev in devs:
647                 if dev.routes_here(route):
648                     dev_routes.append(tuple(route) + (dev.if_name,))
649                     
650                     # Stop checking
651                     break
652             else:
653                 if method == 'sliceip':
654                     dev_routes.append(tuple(route) + ('eth0',))
655                 else:
656                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
657                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
658         return dev_routes
659     
660     def configure_routes(self, routes, devs, vsys_vnet):
661         """
662         Add the specified routes to the node's routing table
663         """
664         rules = []
665         method = self.routing_method(routes, vsys_vnet)
666         tdevs = set()
667         
668         # annotate routes with devices
669         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
670         for route in dev_routes:
671             route, dev = route[:-1], route[-1]
672             
673             # Schedule rule
674             tdevs.add(dev)
675             rules.append(self.format_route(route, dev, method, 'add'))
676         
677         if method == 'sliceip':
678             rules = map('enable '.__add__, tdevs) + rules
679         
680         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
681         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
682         
683         self.apply_route_rules(rules, method)
684         
685         self._configured_routes = set(routes)
686         self._configured_devs = tdevs
687         self._configured_method = method
688     
689     def reconfigure_routes(self, routes, devs, vsys_vnet):
690         """
691         Updates the routes in the node's routing table to match
692         the given route list
693         """
694         method = self._configured_method
695         
696         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
697
698         current = self._configured_routes
699         current_devs = self._configured_devs
700         
701         new = set(dev_routes)
702         new_devs = set(map(operator.itemgetter(-1), dev_routes))
703         
704         deletions = current - new
705         insertions = new - current
706         
707         dev_deletions = current_devs - new_devs
708         dev_insertions = new_devs - current_devs
709         
710         # Generate rules
711         rules = []
712         
713         # Rule deletions first
714         for route in deletions:
715             route, dev = route[:-1], route[-1]
716             rules.append(self.format_route(route, dev, method, 'del'))
717         
718         if method == 'sliceip':
719             # Dev deletions now
720             rules.extend(map('disable '.__add__, dev_deletions))
721
722             # Dev insertions now
723             rules.extend(map('enable '.__add__, dev_insertions))
724
725         # Rule insertions now
726         for route in insertions:
727             route, dev = route[:-1], dev[-1]
728             rules.append(self.format_route(route, dev, method, 'add'))
729         
730         # Apply
731         self.apply_route_rules(rules, method)
732         
733         self._configured_routes = dev_routes
734         self._configured_devs = new_devs
735         
736     def apply_route_rules(self, rules, method):
737         (out,err),proc = server.popen_ssh_command(
738             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
739                 home = server.shell_escape(self.home_path),
740                 method = method),
741             host = self.hostname,
742             port = None,
743             user = self.slicename,
744             agent = None,
745             ident_key = self.ident_path,
746             server_key = self.server_key,
747             stdin = '\n'.join(rules),
748             timeout = 300
749             )
750         
751         if proc.wait() or err:
752             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
753         elif out or err:
754             logger.debug("%s said: %s%s", method, out, err)
755
756     def check_bad_host(self, out, err):
757         badre = re.compile(r'(?:'
758                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
759                            r'|Error: disk I/O error'
760                            r')', 
761                            re.I)
762         return badre.search(out) or badre.search(err)