merge
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16 import ipaddr
17 import operator
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         
96         # Attributes
97         self.hostname = None
98         self.architecture = None
99         self.operatingSystem = None
100         self.pl_distro = None
101         self.site = None
102         self.city = None
103         self.country = None
104         self.region = None
105         self.minReliability = None
106         self.maxReliability = None
107         self.minBandwidth = None
108         self.maxBandwidth = None
109         self.minCpu = None
110         self.maxCpu = None
111         self.minLoad = None
112         self.maxLoad = None
113         self.min_num_external_ifaces = None
114         self.max_num_external_ifaces = None
115         self.timeframe = 'm'
116         
117         # Applications and routes add requirements to connected nodes
118         self.required_packages = set()
119         self.required_vsys = set()
120         self.pythonpath = []
121         self.rpmFusion = False
122         self.env = collections.defaultdict(list)
123         
124         # Some special applications - initialized when connected
125         self.multicast_forwarder = None
126         
127         # Testbed-derived attributes
128         self.slicename = None
129         self.ident_path = None
130         self.server_key = None
131         self.home_path = None
132         self.enable_cleanup = False
133         
134         # Those are filled when an actual node is allocated
135         self._node_id = None
136         self._yum_dependencies = None
137         self._installed = False
138
139         # Logging
140         self._logger = logging.getLogger('nepi.testbeds.planetlab')
141     
142     def _nepi_testbed_environment_setup_get(self):
143         command = cStringIO.StringIO()
144         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
145             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
146         ))
147         command.write(' ; export PATH=$PATH:%s' % (
148             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
149         ))
150         if self.env:
151             for envkey, envvals in self.env.iteritems():
152                 for envval in envvals:
153                     command.write(' ; export %s=%s' % (envkey, envval))
154         return command.getvalue()
155     def _nepi_testbed_environment_setup_set(self, value):
156         pass
157     _nepi_testbed_environment_setup = property(
158         _nepi_testbed_environment_setup_get,
159         _nepi_testbed_environment_setup_set)
160     
161     def build_filters(self, target_filters, filter_map):
162         for attr, tag in filter_map.iteritems():
163             value = getattr(self, attr, None)
164             if value is not None:
165                 target_filters[tag] = value
166         return target_filters
167     
168     @property
169     def applicable_filters(self):
170         has = lambda att : getattr(self,att,None) is not None
171         return (
172             filter(has, self.BASEFILTERS.iterkeys())
173             + filter(has, self.TAGFILTERS.iterkeys())
174         )
175     
176     def find_candidates(self, filter_slice_id=None):
177         self._logger.info("Finding candidates for %s", self.make_filter_description())
178         
179         fields = ('node_id',)
180         replacements = {'timeframe':self.timeframe}
181         
182         # get initial candidates (no tag filters)
183         basefilters = self.build_filters({}, self.BASEFILTERS)
184         rootfilters = basefilters.copy()
185         if filter_slice_id:
186             basefilters['|slice_ids'] = (filter_slice_id,)
187         
188         # only pick healthy nodes
189         basefilters['run_level'] = 'boot'
190         basefilters['boot_state'] = 'boot'
191         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
192         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
193         
194         # keyword-only "pseudofilters"
195         extra = {}
196         if self.site:
197             extra['peer'] = self.site
198             
199         candidates = set(map(operator.itemgetter('node_id'), 
200             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
201         
202         # filter by tag, one tag at a time
203         applicable = self.applicable_filters
204         for tagfilter in self.TAGFILTERS.iteritems():
205             attr, (tagname, expr) = tagfilter
206             
207             # don't bother if there's no filter defined
208             if attr in applicable:
209                 tagfilter = rootfilters.copy()
210                 tagfilter['tagname'] = tagname % replacements
211                 tagfilter[expr % replacements] = getattr(self,attr)
212                 tagfilter['node_id'] = list(candidates)
213                 
214                 candidates &= set(map(operator.itemgetter('node_id'),
215                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
216         
217         # filter by vsys tags - special case since it doesn't follow
218         # the usual semantics
219         if self.required_vsys:
220             newcandidates = collections.defaultdict(set)
221             
222             vsys_tags = self._api.GetNodeTags(
223                 tagname='vsys', 
224                 node_id = list(candidates), 
225                 fields = ['node_id','value'])
226             
227             vsys_tags = map(
228                 operator.itemgetter(['node_id','value']),
229                 vsys_tags)
230             
231             required_vsys = self.required_vsys
232             for node_id, value in vsys_tags:
233                 if value in required_vsys:
234                     newcandidates[value].add(node_id)
235             
236             # take only those that have all the required vsys tags
237             newcandidates = reduce(
238                 lambda accum, new : accum & new,
239                 newcandidates.itervalues(),
240                 candidates)
241         
242         # filter by iface count
243         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
244             # fetch interfaces for all, in one go
245             filters = basefilters.copy()
246             filters['node_id'] = list(candidates)
247             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
248                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
249             
250             # filter candidates by interface count
251             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
252                 predicate = ( lambda node_id : 
253                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
254             elif self.min_num_external_ifaces is not None:
255                 predicate = ( lambda node_id : 
256                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
257             else:
258                 predicate = ( lambda node_id : 
259                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
260             
261             candidates = set(filter(predicate, candidates))
262         
263         # make sure hostnames are resolvable
264         if candidates:
265             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
266             
267             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
268                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
269             ))
270             def resolvable(node_id):
271                 try:
272                     addr = socket.gethostbyname(hostnames[node_id])
273                     return addr is not None
274                 except:
275                     return False
276             candidates = set(parallel.pfilter(resolvable, candidates,
277                 maxthreads = 16))
278
279             self._logger.info("  Found %s reachable candidates.", len(candidates))
280             
281         return candidates
282     
283     def make_filter_description(self):
284         """
285         Makes a human-readable description of filtering conditions
286         for find_candidates.
287         """
288         
289         # get initial candidates (no tag filters)
290         filters = self.build_filters({}, self.BASEFILTERS)
291         
292         # keyword-only "pseudofilters"
293         if self.site:
294             filters['peer'] = self.site
295             
296         # filter by tag, one tag at a time
297         applicable = self.applicable_filters
298         for tagfilter in self.TAGFILTERS.iteritems():
299             attr, (tagname, expr) = tagfilter
300             
301             # don't bother if there's no filter defined
302             if attr in applicable:
303                 filters[attr] = getattr(self,attr)
304         
305         # filter by vsys tags - special case since it doesn't follow
306         # the usual semantics
307         if self.required_vsys:
308             filters['vsys'] = ','.join(list(self.required_vsys))
309         
310         # filter by iface count
311         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
312             filters['num_ifaces'] = '-'.join([
313                 str(self.min_num_external_ifaces or '0'),
314                 str(self.max_num_external_ifaces or 'inf')
315             ])
316             
317         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
318
319     def assign_node_id(self, node_id):
320         self._node_id = node_id
321         self.fetch_node_info()
322     
323     def unassign_node(self):
324         self._node_id = None
325         self.__dict__.update(self.__orig_attrs)
326     
327     def rate_nodes(self, nodes):
328         rates = collections.defaultdict(int)
329         tags = collections.defaultdict(dict)
330         replacements = {'timeframe':self.timeframe}
331         tagnames = [ tagname % replacements 
332                      for tagname, weight, default in self.RATE_FACTORS ]
333         
334         taginfo = self._api.GetNodeTags(
335             node_id=list(nodes), 
336             tagname=tagnames,
337             fields=('node_id','tagname','value'))
338         
339         unpack = operator.itemgetter('node_id','tagname','value')
340         for value in taginfo:
341             node, tagname, value = unpack(value)
342             if value and value.lower() != 'n/a':
343                 tags[tagname][int(node)] = float(value)
344         
345         for tagname, weight, default in self.RATE_FACTORS:
346             taginfo = tags[tagname % replacements].get
347             for node in nodes:
348                 rates[node] += weight * taginfo(node,default)
349         
350         return map(rates.__getitem__, nodes)
351             
352     def fetch_node_info(self):
353         orig_attrs = {}
354         
355         self._api.StartMulticall()
356         info = self._api.GetNodes(self._node_id)
357         tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
358         info, tags = self._api.FinishMulticall()
359         info = info[0]
360         
361         tags = dict( (t['tagname'],t['value'])
362                      for t in tags )
363
364         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
365         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
366         self.min_num_external_ifaces = None
367         self.max_num_external_ifaces = None
368         self.timeframe = 'm'
369         
370         replacements = {'timeframe':self.timeframe}
371         for attr, tag in self.BASEFILTERS.iteritems():
372             if tag in info:
373                 value = info[tag]
374                 if hasattr(self, attr):
375                     orig_attrs[attr] = getattr(self, attr)
376                 setattr(self, attr, value)
377         for attr, (tag,_) in self.TAGFILTERS.iteritems():
378             tag = tag % replacements
379             if tag in tags:
380                 value = tags[tag]
381                 if hasattr(self, attr):
382                     orig_attrs[attr] = getattr(self, attr)
383                 if not value or value.lower() == 'n/a':
384                     value = None
385                 setattr(self, attr, value)
386         
387         if 'peer_id' in info:
388             orig_attrs['site'] = self.site
389             self.site = self._api.peer_map[info['peer_id']]
390         
391         if 'interface_ids' in info:
392             self.min_num_external_ifaces = \
393             self.max_num_external_ifaces = len(info['interface_ids'])
394         
395         if 'ssh_rsa_key' in info:
396             orig_attrs['server_key'] = self.server_key
397             self.server_key = info['ssh_rsa_key']
398         
399         self.__orig_attrs = orig_attrs
400
401     def validate(self):
402         if self.home_path is None:
403             raise AssertionError, "Misconfigured node: missing home path"
404         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
405             raise AssertionError, "Misconfigured node: missing slice SSH key"
406         if self.slicename is None:
407             raise AssertionError, "Misconfigured node: unspecified slice"
408
409     def recover(self):
410         # Mark dependencies installed
411         self._installed = True
412         
413         # Clear load attributes, they impair re-discovery
414         self.minReliability = \
415         self.maxReliability = \
416         self.minBandwidth = \
417         self.maxBandwidth = \
418         self.minCpu = \
419         self.maxCpu = \
420         self.minLoad = \
421         self.maxLoad = None
422
423     def install_dependencies(self):
424         if self.required_packages and not self._installed:
425             # If we need rpmfusion, we must install the repo definition and the gpg keys
426             if self.rpmFusion:
427                 if self.operatingSystem == 'f12':
428                     # Fedora 12 requires a different rpmfusion package
429                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
430                 else:
431                     # This one works for f13+
432                     RPM_FUSION_URL = self.RPM_FUSION_URL
433                     
434                 rpmFusion = (
435                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
436                 ) % {
437                     'RPM_FUSION_URL' : RPM_FUSION_URL
438                 }
439             else:
440                 rpmFusion = ''
441             
442             if rpmFusion:
443                 (out,err),proc = server.popen_ssh_command(
444                     rpmFusion,
445                     host = self.hostname,
446                     port = None,
447                     user = self.slicename,
448                     agent = None,
449                     ident_key = self.ident_path,
450                     server_key = self.server_key,
451                     timeout = 600,
452                     )
453                 
454                 if proc.wait():
455                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
456             
457             # Launch p2p yum dependency installer
458             self._yum_dependencies.async_setup()
459     
460     def wait_provisioning(self, timeout = 20*60):
461         # Wait for the p2p installer
462         sleeptime = 1.0
463         totaltime = 0.0
464         while not self.is_alive():
465             time.sleep(sleeptime)
466             totaltime += sleeptime
467             sleeptime = min(30.0, sleeptime*1.5)
468             
469             if totaltime > timeout:
470                 # PlanetLab has a 15' delay on configuration propagation
471                 # If we're above that delay, the unresponsiveness is not due
472                 # to this delay.
473                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
474         
475         # Ensure the node is clean (no apps running that could interfere with operations)
476         if self.enable_cleanup:
477             self.do_cleanup()
478     
479     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
480         # Wait for the p2p installer
481         if self._yum_dependencies and not self._installed:
482             self._yum_dependencies.async_setup_wait()
483             self._installed = True
484         
485     def is_alive(self):
486         # Make sure all the paths are created where 
487         # they have to be created for deployment
488         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
489             "echo 'ALIVE'",
490             host = self.hostname,
491             port = None,
492             user = self.slicename,
493             agent = None,
494             ident_key = self.ident_path,
495             server_key = self.server_key,
496             timeout = 60,
497             err_on_timeout = False
498             )
499         
500         if proc.wait():
501             return False
502         elif not err and out.strip() == 'ALIVE':
503             return True
504         else:
505             return False
506     
507     def destroy(self):
508         if self.enable_cleanup:
509             self.do_cleanup()
510     
511     def do_cleanup(self):
512         if self.testbed().recovering:
513             # WOW - not now
514             return
515             
516         self._logger.info("Cleaning up %s", self.hostname)
517         
518         cmds = [
519             "sudo -S killall python tcpdump || /bin/true ; "
520             "sudo -S killall python tcpdump || /bin/true ; "
521             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
522             "sudo -S killall -u %(slicename)s || /bin/true ",
523             "sudo -S killall -u root || /bin/true ",
524             "sudo -S killall -u %(slicename)s || /bin/true ",
525             "sudo -S killall -u root || /bin/true ",
526         ]
527
528         for cmd in cmds:
529             (out,err),proc = server.popen_ssh_command(
530                 # Some apps need two kills
531                 cmd % {
532                     'slicename' : self.slicename ,
533                 },
534                 host = self.hostname,
535                 port = None,
536                 user = self.slicename,
537                 agent = None,
538                 ident_key = self.ident_path,
539                 server_key = self.server_key,
540                 tty = True, # so that ps -N -T works as advertised...
541                 timeout = 60,
542                 retry = 3
543                 )
544             proc.wait()
545     
546     def prepare_dependencies(self):
547         # Configure p2p yum dependency installer
548         if self.required_packages and not self._installed:
549             self._yum_dependencies = application.YumDependency(self._api)
550             self._yum_dependencies.node = self
551             self._yum_dependencies.home_path = "nepi-yumdep"
552             self._yum_dependencies.depends = ' '.join(self.required_packages)
553
554     def routing_method(self, routes, vsys_vnet):
555         """
556         There are two methods, vroute and sliceip.
557         
558         vroute:
559             Modifies the node's routing table directly, validating that the IP
560             range lies within the network given by the slice's vsys_vnet tag.
561             This method is the most scalable for very small routing tables
562             that need not modify other routes (including the default)
563         
564         sliceip:
565             Uses policy routing and iptables filters to create per-sliver
566             routing tables. It's the most flexible way, but it doesn't scale
567             as well since only 155 routing tables can be created this way.
568         
569         This method will return the most appropriate routing method, which will
570         prefer vroute for small routing tables.
571         """
572         
573         # For now, sliceip results in kernel panics
574         # so we HAVE to use vroute
575         return 'vroute'
576         
577         # We should not make the routing table grow too big
578         if len(routes) > MAX_VROUTE_ROUTES:
579             return 'sliceip'
580         
581         vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
582         for route in routes:
583             dest, prefix, nexthop, metric = route
584             dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
585             nexthop = ipaddr.IPAddress(nexthop)
586             if dest not in vsys_vnet or nexthop not in vsys_vnet:
587                 return 'sliceip'
588         
589         return 'vroute'
590     
591     def format_route(self, route, dev, method, action):
592         dest, prefix, nexthop, metric = route
593         if method == 'vroute':
594             return (
595                 "%s %s%s gw %s %s" % (
596                     action,
597                     dest,
598                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
599                     nexthop,
600                     dev,
601                 )
602             )
603         elif method == 'sliceip':
604             return (
605                 "route %s to %s%s via %s metric %s dev %s" % (
606                     action,
607                     dest,
608                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
609                     nexthop,
610                     metric or 1,
611                     dev,
612                 )
613             )
614         else:
615             raise AssertionError, "Unknown method"
616     
617     def _annotate_routes_with_devs(self, routes, devs, method):
618         dev_routes = []
619         for route in routes:
620             for dev in devs:
621                 if dev.routes_here(route):
622                     dev_routes.append(tuple(route) + (dev.if_name,))
623                     
624                     # Stop checking
625                     break
626             else:
627                 if method == 'sliceip':
628                     dev_routes.append(tuple(route) + ('eth0',))
629                 else:
630                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
631                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
632         return dev_routes
633     
634     def configure_routes(self, routes, devs, vsys_vnet):
635         """
636         Add the specified routes to the node's routing table
637         """
638         rules = []
639         method = self.routing_method(routes, vsys_vnet)
640         tdevs = set()
641         
642         # annotate routes with devices
643         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
644         for route in dev_routes:
645             route, dev = route[:-1], route[-1]
646             
647             # Schedule rule
648             tdevs.add(dev)
649             rules.append(self.format_route(route, dev, method, 'add'))
650         
651         if method == 'sliceip':
652             rules = map('enable '.__add__, tdevs) + rules
653         
654         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
655         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
656         
657         self.apply_route_rules(rules, method)
658         
659         self._configured_routes = set(routes)
660         self._configured_devs = tdevs
661         self._configured_method = method
662     
663     def reconfigure_routes(self, routes, devs, vsys_vnet):
664         """
665         Updates the routes in the node's routing table to match
666         the given route list
667         """
668         method = self._configured_method
669         
670         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
671
672         current = self._configured_routes
673         current_devs = self._configured_devs
674         
675         new = set(dev_routes)
676         new_devs = set(map(operator.itemgetter(-1), dev_routes))
677         
678         deletions = current - new
679         insertions = new - current
680         
681         dev_deletions = current_devs - new_devs
682         dev_insertions = new_devs - current_devs
683         
684         # Generate rules
685         rules = []
686         
687         # Rule deletions first
688         for route in deletions:
689             route, dev = route[:-1], route[-1]
690             rules.append(self.format_route(route, dev, method, 'del'))
691         
692         if method == 'sliceip':
693             # Dev deletions now
694             rules.extend(map('disable '.__add__, dev_deletions))
695
696             # Dev insertions now
697             rules.extend(map('enable '.__add__, dev_insertions))
698
699         # Rule insertions now
700         for route in insertions:
701             route, dev = route[:-1], dev[-1]
702             rules.append(self.format_route(route, dev, method, 'add'))
703         
704         # Apply
705         self.apply_route_rules(rules, method)
706         
707         self._configured_routes = dev_routes
708         self._configured_devs = new_devs
709         
710     def apply_route_rules(self, rules, method):
711         (out,err),proc = server.popen_ssh_command(
712             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
713                 home = server.shell_escape(self.home_path),
714                 method = method),
715             host = self.hostname,
716             port = None,
717             user = self.slicename,
718             agent = None,
719             ident_key = self.ident_path,
720             server_key = self.server_key,
721             stdin = '\n'.join(rules),
722             timeout = 300
723             )
724         
725         if proc.wait() or err:
726             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
727         elif out or err:
728             logger.debug("%s said: %s%s", method, out, err)
729