sfiapi.py complete. needs testing
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None, sliceapi=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         self._sliceapi = sliceapi
96         
97         # Attributes
98         self.hostname = None
99         self.architecture = None
100         self.operatingSystem = None
101         self.pl_distro = None
102         self.site = None
103         self.city = None
104         self.country = None
105         self.region = None
106         self.minReliability = None
107         self.maxReliability = None
108         self.minBandwidth = None
109         self.maxBandwidth = None
110         self.minCpu = None
111         self.maxCpu = None
112         self.minLoad = None
113         self.maxLoad = None
114         self.min_num_external_ifaces = None
115         self.max_num_external_ifaces = None
116         self.timeframe = 'm'
117         
118         # Applications and routes add requirements to connected nodes
119         self.required_packages = set()
120         self.required_vsys = set()
121         self.pythonpath = []
122         self.rpmFusion = False
123         self.env = collections.defaultdict(list)
124         
125         # Some special applications - initialized when connected
126         self.multicast_forwarder = None
127         
128         # Testbed-derived attributes
129         self.slicename = None
130         self.ident_path = None
131         self.server_key = None
132         self.home_path = None
133         self.enable_cleanup = False
134         
135         # Those are filled when an actual node is allocated
136         self._node_id = None
137         self._yum_dependencies = None
138         self._installed = False
139
140         # Logging
141         self._logger = logging.getLogger('nepi.testbeds.planetlab')
142     
143     def _nepi_testbed_environment_setup_get(self):
144         command = cStringIO.StringIO()
145         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
147         ))
148         command.write(' ; export PATH=$PATH:%s' % (
149             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
150         ))
151         if self.env:
152             for envkey, envvals in self.env.iteritems():
153                 for envval in envvals:
154                     command.write(' ; export %s=%s' % (envkey, envval))
155         return command.getvalue()
156     def _nepi_testbed_environment_setup_set(self, value):
157         pass
158     _nepi_testbed_environment_setup = property(
159         _nepi_testbed_environment_setup_get,
160         _nepi_testbed_environment_setup_set)
161     
162     def build_filters(self, target_filters, filter_map):
163         for attr, tag in filter_map.iteritems():
164             value = getattr(self, attr, None)
165             if value is not None:
166                 target_filters[tag] = value
167         return target_filters
168     
169     @property
170     def applicable_filters(self):
171         has = lambda att : getattr(self,att,None) is not None
172         return (
173             filter(has, self.BASEFILTERS.iterkeys())
174             + filter(has, self.TAGFILTERS.iterkeys())
175         )
176     
177     def find_candidates(self, filter_slice_id=None):
178         self._logger.info("Finding candidates for %s", self.make_filter_description())
179         
180         fields = ('node_id',)
181         replacements = {'timeframe':self.timeframe}
182         
183         # get initial candidates (no tag filters)
184         basefilters = self.build_filters({}, self.BASEFILTERS)
185         rootfilters = basefilters.copy()
186         if filter_slice_id:
187             basefilters['|slice_ids'] = (filter_slice_id,)
188         
189         # only pick healthy nodes
190         basefilters['run_level'] = 'boot'
191         basefilters['boot_state'] = 'boot'
192         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
194         
195         # keyword-only "pseudofilters"
196         extra = {}
197         if self.site:
198             extra['peer'] = self.site
199             
200         candidates = set(map(operator.itemgetter('node_id'), 
201             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
202
203         # filter by tag, one tag at a time
204         applicable = self.applicable_filters
205         for tagfilter in self.TAGFILTERS.iteritems():
206             attr, (tagname, expr) = tagfilter
207             
208             # don't bother if there's no filter defined
209             if attr in applicable:
210                 tagfilter = rootfilters.copy()
211                 tagfilter['tagname'] = tagname % replacements
212                 tagfilter[expr % replacements] = getattr(self,attr)
213                 tagfilter['node_id'] = list(candidates)
214                 
215                 candidates &= set(map(operator.itemgetter('node_id'),
216                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
217
218         # filter by vsys tags - special case since it doesn't follow
219         # the usual semantics
220         if self.required_vsys:
221             newcandidates = collections.defaultdict(set)
222             
223             vsys_tags = self._sliceapi.GetNodeTags(
224                 tagname='vsys', 
225                 node_id = list(candidates), 
226                 fields = ['node_id','value'])
227
228             vsys_tags = map(
229                 operator.itemgetter(['node_id','value']),
230                 vsys_tags)
231             
232             required_vsys = self.required_vsys
233             for node_id, value in vsys_tags:
234                 if value in required_vsys:
235                     newcandidates[value].add(node_id)
236             
237             # take only those that have all the required vsys tags
238             newcandidates = reduce(
239                 lambda accum, new : accum & new,
240                 newcandidates.itervalues(),
241                 candidates)
242         
243         # filter by iface count
244         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245             # fetch interfaces for all, in one go
246             filters = basefilters.copy()
247             filters['node_id'] = list(candidates)
248             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
250             
251             # filter candidates by interface count
252             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253                 predicate = ( lambda node_id : 
254                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255             elif self.min_num_external_ifaces is not None:
256                 predicate = ( lambda node_id : 
257                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
258             else:
259                 predicate = ( lambda node_id : 
260                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
261             
262             candidates = set(filter(predicate, candidates))
263        
264         # make sure hostnames are resolvable
265         hostnames = None
266         if candidates:
267             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
268             
269             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
270                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
271             ))
272
273             def resolvable(node_id):
274                 try:
275                     addr = socket.gethostbyname(hostnames[node_id])
276                     return addr is not None
277                 except:
278                     return False
279             candidates = set(parallel.pfilter(resolvable, candidates,
280                 maxthreads = 16))
281
282             self._logger.info("  Found %s reachable candidates.", len(candidates))
283
284             for h in hostnames.keys():
285                 if h not in candidates:
286                     del hostnames[h]
287
288             hostnames = dict((v,k) for k, v in hostnames.iteritems())
289
290         return hostnames
291     
292     def make_filter_description(self):
293         """
294         Makes a human-readable description of filtering conditions
295         for find_candidates.
296         """
297         
298         # get initial candidates (no tag filters)
299         filters = self.build_filters({}, self.BASEFILTERS)
300         
301         # keyword-only "pseudofilters"
302         if self.site:
303             filters['peer'] = self.site
304             
305         # filter by tag, one tag at a time
306         applicable = self.applicable_filters
307         for tagfilter in self.TAGFILTERS.iteritems():
308             attr, (tagname, expr) = tagfilter
309             
310             # don't bother if there's no filter defined
311             if attr in applicable:
312                 filters[attr] = getattr(self,attr)
313         
314         # filter by vsys tags - special case since it doesn't follow
315         # the usual semantics
316         if self.required_vsys:
317             filters['vsys'] = ','.join(list(self.required_vsys))
318         
319         # filter by iface count
320         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
321             filters['num_ifaces'] = '-'.join([
322                 str(self.min_num_external_ifaces or '0'),
323                 str(self.max_num_external_ifaces or 'inf')
324             ])
325             
326         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
327
328     def assign_node_id(self, node_id):
329         self._node_id = node_id
330         self.fetch_node_info()
331     
332     def unassign_node(self):
333         self._node_id = None
334         self.hostip = None
335         
336         try:
337             orig_attrs = self.__orig_attrs
338         except AttributeError:
339             return
340             
341         for key, value in orig_attrs.iteritems():
342             setattr(self, key, value)
343         del self.__orig_attrs
344     
345     def rate_nodes(self, nodes):
346         rates = collections.defaultdict(int)
347         tags = collections.defaultdict(dict)
348         replacements = {'timeframe':self.timeframe}
349         tagnames = [ tagname % replacements 
350                      for tagname, weight, default in self.RATE_FACTORS ]
351         
352         taginfo = self._sliceapi.GetNodeTags(
353             node_id=list(nodes), 
354             tagname=tagnames,
355             fields=('node_id','tagname','value'))
356
357         unpack = operator.itemgetter('node_id','tagname','value')
358         for value in taginfo:
359             node, tagname, value = unpack(value)
360             if value and value.lower() != 'n/a':
361                 tags[tagname][int(node)] = float(value)
362         
363         for tagname, weight, default in self.RATE_FACTORS:
364             taginfo = tags[tagname % replacements].get
365             for node in nodes:
366                 rates[node] += weight * taginfo(node,default)
367         
368         return map(rates.__getitem__, nodes)
369             
370     def fetch_node_info(self):
371         orig_attrs = {}
372         
373         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
374         info = info[0]
375         
376         tags = dict( (t['tagname'],t['value'])
377                      for t in tags )
378
379         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
380         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
381         self.min_num_external_ifaces = None
382         self.max_num_external_ifaces = None
383         self.timeframe = 'm'
384         
385         replacements = {'timeframe':self.timeframe}
386         for attr, tag in self.BASEFILTERS.iteritems():
387             if tag in info:
388                 value = info[tag]
389                 if hasattr(self, attr):
390                     orig_attrs[attr] = getattr(self, attr)
391                 setattr(self, attr, value)
392         for attr, (tag,_) in self.TAGFILTERS.iteritems():
393             tag = tag % replacements
394             if tag in tags:
395                 value = tags[tag]
396                 if hasattr(self, attr):
397                     orig_attrs[attr] = getattr(self, attr)
398                 if not value or value.lower() == 'n/a':
399                     value = None
400                 setattr(self, attr, value)
401         
402         if 'peer_id' in info:
403             orig_attrs['site'] = self.site
404             self.site = self._api.peer_map[info['peer_id']]
405         
406         if 'interface_ids' in info:
407             self.min_num_external_ifaces = \
408             self.max_num_external_ifaces = len(info['interface_ids'])
409         
410         if 'ssh_rsa_key' in info:
411             orig_attrs['server_key'] = self.server_key
412             self.server_key = info['ssh_rsa_key']
413         
414         self.hostip = socket.gethostbyname(self.hostname)
415         
416         try:
417             self.__orig_attrs
418         except AttributeError:
419             self.__orig_attrs = orig_attrs
420
421     def validate(self):
422         if self.home_path is None:
423             raise AssertionError, "Misconfigured node: missing home path"
424         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
425             raise AssertionError, "Misconfigured node: missing slice SSH key"
426         if self.slicename is None:
427             raise AssertionError, "Misconfigured node: unspecified slice"
428
429     def recover(self):
430         # Mark dependencies installed
431         self._installed = True
432         
433         # Clear load attributes, they impair re-discovery
434         self.minReliability = \
435         self.maxReliability = \
436         self.minBandwidth = \
437         self.maxBandwidth = \
438         self.minCpu = \
439         self.maxCpu = \
440         self.minLoad = \
441         self.maxLoad = None
442
443     def install_dependencies(self):
444         if self.required_packages and not self._installed:
445             # If we need rpmfusion, we must install the repo definition and the gpg keys
446             if self.rpmFusion:
447                 if self.operatingSystem == 'f12':
448                     # Fedora 12 requires a different rpmfusion package
449                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
450                 else:
451                     # This one works for f13+
452                     RPM_FUSION_URL = self.RPM_FUSION_URL
453                     
454                 rpmFusion = (
455                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
456                 ) % {
457                     'RPM_FUSION_URL' : RPM_FUSION_URL
458                 }
459             else:
460                 rpmFusion = ''
461             
462             if rpmFusion:
463                 (out,err),proc = server.popen_ssh_command(
464                     rpmFusion,
465                     host = self.hostname,
466                     port = None,
467                     user = self.slicename,
468                     agent = None,
469                     ident_key = self.ident_path,
470                     server_key = self.server_key,
471                     timeout = 600,
472                     )
473                 
474                 if proc.wait():
475                     if self.check_bad_host(out,err):
476                         self.blacklist()
477                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
478             
479             # Launch p2p yum dependency installer
480             self._yum_dependencies.async_setup()
481     
482     def wait_provisioning(self, timeout = 20*60):
483         # Wait for the p2p installer
484         sleeptime = 1.0
485         totaltime = 0.0
486         while not self.is_alive():
487             time.sleep(sleeptime)
488             totaltime += sleeptime
489             sleeptime = min(30.0, sleeptime*1.5)
490             
491             if totaltime > timeout:
492                 # PlanetLab has a 15' delay on configuration propagation
493                 # If we're above that delay, the unresponsiveness is not due
494                 # to this delay.
495                 if not self.is_alive(verbose=True):
496                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
497         
498         # Ensure the node is clean (no apps running that could interfere with operations)
499         if self.enable_cleanup:
500             self.do_cleanup()
501     
502     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
503         # Wait for the p2p installer
504         if self._yum_dependencies and not self._installed:
505             self._yum_dependencies.async_setup_wait()
506             self._installed = True
507         
508     def is_alive(self, verbose = False):
509         # Make sure all the paths are created where 
510         # they have to be created for deployment
511         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
512             "echo 'ALIVE'",
513             host = self.hostname,
514             port = None,
515             user = self.slicename,
516             agent = None,
517             ident_key = self.ident_path,
518             server_key = self.server_key,
519             timeout = 60,
520             err_on_timeout = False,
521             persistent = False
522             )
523         
524         if proc.wait():
525             if verbose:
526                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
527             return False
528         elif not err and out.strip() == 'ALIVE':
529             return True
530         else:
531             if verbose:
532                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
533             return False
534     
535     def destroy(self):
536         if self.enable_cleanup:
537             self.do_cleanup()
538     
539     def blacklist(self):
540         if self._node_id:
541             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
542             import util
543             util.appendBlacklist(self.hostname)
544     
545     def do_cleanup(self):
546         if self.testbed().recovering:
547             # WOW - not now
548             return
549             
550         self._logger.info("Cleaning up %s", self.hostname)
551         
552         cmds = [
553             "sudo -S killall python tcpdump || /bin/true ; "
554             "sudo -S killall python tcpdump || /bin/true ; "
555             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
556             "sudo -S killall -u %(slicename)s || /bin/true ",
557             "sudo -S killall -u root || /bin/true ",
558             "sudo -S killall -u %(slicename)s || /bin/true ",
559             "sudo -S killall -u root || /bin/true ",
560         ]
561
562         for cmd in cmds:
563             (out,err),proc = server.popen_ssh_command(
564                 # Some apps need two kills
565                 cmd % {
566                     'slicename' : self.slicename ,
567                 },
568                 host = self.hostname,
569                 port = None,
570                 user = self.slicename,
571                 agent = None,
572                 ident_key = self.ident_path,
573                 server_key = self.server_key,
574                 tty = True, # so that ps -N -T works as advertised...
575                 timeout = 60,
576                 retry = 3
577                 )
578             proc.wait()
579     
580     def prepare_dependencies(self):
581         # Configure p2p yum dependency installer
582         if self.required_packages and not self._installed:
583             self._yum_dependencies = application.YumDependency(self._api)
584             self._yum_dependencies.node = self
585             self._yum_dependencies.home_path = "nepi-yumdep"
586             self._yum_dependencies.depends = ' '.join(self.required_packages)
587
588     def routing_method(self, routes, vsys_vnet):
589         """
590         There are two methods, vroute and sliceip.
591         
592         vroute:
593             Modifies the node's routing table directly, validating that the IP
594             range lies within the network given by the slice's vsys_vnet tag.
595             This method is the most scalable for very small routing tables
596             that need not modify other routes (including the default)
597         
598         sliceip:
599             Uses policy routing and iptables filters to create per-sliver
600             routing tables. It's the most flexible way, but it doesn't scale
601             as well since only 155 routing tables can be created this way.
602         
603         This method will return the most appropriate routing method, which will
604         prefer vroute for small routing tables.
605         """
606         
607         # For now, sliceip results in kernel panics
608         # so we HAVE to use vroute
609         return 'vroute'
610         
611         # We should not make the routing table grow too big
612         if len(routes) > MAX_VROUTE_ROUTES:
613             return 'sliceip'
614         
615         vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
616         for route in routes:
617             dest, prefix, nexthop, metric = route
618             dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
619             nexthop = ipaddr.IPAddress(nexthop)
620             if dest not in vsys_vnet or nexthop not in vsys_vnet:
621                 return 'sliceip'
622         
623         return 'vroute'
624     
625     def format_route(self, route, dev, method, action):
626         dest, prefix, nexthop, metric = route
627         if method == 'vroute':
628             return (
629                 "%s %s%s gw %s %s" % (
630                     action,
631                     dest,
632                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
633                     nexthop,
634                     dev,
635                 )
636             )
637         elif method == 'sliceip':
638             return (
639                 "route %s to %s%s via %s metric %s dev %s" % (
640                     action,
641                     dest,
642                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
643                     nexthop,
644                     metric or 1,
645                     dev,
646                 )
647             )
648         else:
649             raise AssertionError, "Unknown method"
650     
651     def _annotate_routes_with_devs(self, routes, devs, method):
652         dev_routes = []
653         for route in routes:
654             for dev in devs:
655                 if dev.routes_here(route):
656                     dev_routes.append(tuple(route) + (dev.if_name,))
657                     
658                     # Stop checking
659                     break
660             else:
661                 if method == 'sliceip':
662                     dev_routes.append(tuple(route) + ('eth0',))
663                 else:
664                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
665                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
666         return dev_routes
667     
668     def configure_routes(self, routes, devs, vsys_vnet):
669         """
670         Add the specified routes to the node's routing table
671         """
672         rules = []
673         method = self.routing_method(routes, vsys_vnet)
674         tdevs = set()
675         
676         # annotate routes with devices
677         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
678         for route in dev_routes:
679             route, dev = route[:-1], route[-1]
680             
681             # Schedule rule
682             tdevs.add(dev)
683             rules.append(self.format_route(route, dev, method, 'add'))
684         
685         if method == 'sliceip':
686             rules = map('enable '.__add__, tdevs) + rules
687         
688         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
689         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
690         
691         self.apply_route_rules(rules, method)
692         
693         self._configured_routes = set(routes)
694         self._configured_devs = tdevs
695         self._configured_method = method
696     
697     def reconfigure_routes(self, routes, devs, vsys_vnet):
698         """
699         Updates the routes in the node's routing table to match
700         the given route list
701         """
702         method = self._configured_method
703         
704         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
705
706         current = self._configured_routes
707         current_devs = self._configured_devs
708         
709         new = set(dev_routes)
710         new_devs = set(map(operator.itemgetter(-1), dev_routes))
711         
712         deletions = current - new
713         insertions = new - current
714         
715         dev_deletions = current_devs - new_devs
716         dev_insertions = new_devs - current_devs
717         
718         # Generate rules
719         rules = []
720         
721         # Rule deletions first
722         for route in deletions:
723             route, dev = route[:-1], route[-1]
724             rules.append(self.format_route(route, dev, method, 'del'))
725         
726         if method == 'sliceip':
727             # Dev deletions now
728             rules.extend(map('disable '.__add__, dev_deletions))
729
730             # Dev insertions now
731             rules.extend(map('enable '.__add__, dev_insertions))
732
733         # Rule insertions now
734         for route in insertions:
735             route, dev = route[:-1], dev[-1]
736             rules.append(self.format_route(route, dev, method, 'add'))
737         
738         # Apply
739         self.apply_route_rules(rules, method)
740         
741         self._configured_routes = dev_routes
742         self._configured_devs = new_devs
743         
744     def apply_route_rules(self, rules, method):
745         (out,err),proc = server.popen_ssh_command(
746             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
747                 home = server.shell_escape(self.home_path),
748                 method = method),
749             host = self.hostname,
750             port = None,
751             user = self.slicename,
752             agent = None,
753             ident_key = self.ident_path,
754             server_key = self.server_key,
755             stdin = '\n'.join(rules),
756             timeout = 300
757             )
758         
759         if proc.wait() or err:
760             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
761         elif out or err:
762             logger.debug("%s said: %s%s", method, out, err)
763
764     def check_bad_host(self, out, err):
765         badre = re.compile(r'(?:'
766                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
767                            r'|Error: disk I/O error'
768                            r')', 
769                            re.I)
770         return badre.search(out) or badre.search(err)