Merge with head
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16 import ipaddr
17 import operator
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         
96         # Attributes
97         self.hostname = None
98         self.architecture = None
99         self.operatingSystem = None
100         self.pl_distro = None
101         self.site = None
102         self.city = None
103         self.country = None
104         self.region = None
105         self.minReliability = None
106         self.maxReliability = None
107         self.minBandwidth = None
108         self.maxBandwidth = None
109         self.minCpu = None
110         self.maxCpu = None
111         self.minLoad = None
112         self.maxLoad = None
113         self.min_num_external_ifaces = None
114         self.max_num_external_ifaces = None
115         self.timeframe = 'm'
116         
117         # Applications and routes add requirements to connected nodes
118         self.required_packages = set()
119         self.required_vsys = set()
120         self.pythonpath = []
121         self.rpmFusion = False
122         self.env = collections.defaultdict(list)
123         
124         # Some special applications - initialized when connected
125         self.multicast_forwarder = None
126         
127         # Testbed-derived attributes
128         self.slicename = None
129         self.ident_path = None
130         self.server_key = None
131         self.home_path = None
132         self.enable_cleanup = False
133         
134         # Those are filled when an actual node is allocated
135         self._node_id = None
136         self._yum_dependencies = None
137         self._installed = False
138
139         # Logging
140         self._logger = logging.getLogger('nepi.testbeds.planetlab')
141     
142     def _nepi_testbed_environment_setup_get(self):
143         command = cStringIO.StringIO()
144         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
145             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
146         ))
147         command.write(' ; export PATH=$PATH:%s' % (
148             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
149         ))
150         if self.env:
151             for envkey, envvals in self.env.iteritems():
152                 for envval in envvals:
153                     command.write(' ; export %s=%s' % (envkey, envval))
154         return command.getvalue()
155     def _nepi_testbed_environment_setup_set(self, value):
156         pass
157     _nepi_testbed_environment_setup = property(
158         _nepi_testbed_environment_setup_get,
159         _nepi_testbed_environment_setup_set)
160     
161     def build_filters(self, target_filters, filter_map):
162         for attr, tag in filter_map.iteritems():
163             value = getattr(self, attr, None)
164             if value is not None:
165                 target_filters[tag] = value
166         return target_filters
167     
168     @property
169     def applicable_filters(self):
170         has = lambda att : getattr(self,att,None) is not None
171         return (
172             filter(has, self.BASEFILTERS.iterkeys())
173             + filter(has, self.TAGFILTERS.iterkeys())
174         )
175     
176     def find_candidates(self, filter_slice_id=None):
177         self._logger.info("Finding candidates for %s", self.make_filter_description())
178         
179         fields = ('node_id',)
180         replacements = {'timeframe':self.timeframe}
181         
182         # get initial candidates (no tag filters)
183         basefilters = self.build_filters({}, self.BASEFILTERS)
184         rootfilters = basefilters.copy()
185         if filter_slice_id:
186             basefilters['|slice_ids'] = (filter_slice_id,)
187         
188         # only pick healthy nodes
189         basefilters['run_level'] = 'boot'
190         basefilters['boot_state'] = 'boot'
191         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
192         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
193         
194         # keyword-only "pseudofilters"
195         extra = {}
196         if self.site:
197             extra['peer'] = self.site
198             
199         candidates = set(map(operator.itemgetter('node_id'), 
200             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
201         
202         # filter by tag, one tag at a time
203         applicable = self.applicable_filters
204         for tagfilter in self.TAGFILTERS.iteritems():
205             attr, (tagname, expr) = tagfilter
206             
207             # don't bother if there's no filter defined
208             if attr in applicable:
209                 tagfilter = rootfilters.copy()
210                 tagfilter['tagname'] = tagname % replacements
211                 tagfilter[expr % replacements] = getattr(self,attr)
212                 tagfilter['node_id'] = list(candidates)
213                 
214                 candidates &= set(map(operator.itemgetter('node_id'),
215                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
216         
217         # filter by vsys tags - special case since it doesn't follow
218         # the usual semantics
219         if self.required_vsys:
220             newcandidates = collections.defaultdict(set)
221             
222             vsys_tags = self._api.GetNodeTags(
223                 tagname='vsys', 
224                 node_id = list(candidates), 
225                 fields = ['node_id','value'])
226             
227             vsys_tags = map(
228                 operator.itemgetter(['node_id','value']),
229                 vsys_tags)
230             
231             required_vsys = self.required_vsys
232             for node_id, value in vsys_tags:
233                 if value in required_vsys:
234                     newcandidates[value].add(node_id)
235             
236             # take only those that have all the required vsys tags
237             newcandidates = reduce(
238                 lambda accum, new : accum & new,
239                 newcandidates.itervalues(),
240                 candidates)
241         
242         # filter by iface count
243         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
244             # fetch interfaces for all, in one go
245             filters = basefilters.copy()
246             filters['node_id'] = list(candidates)
247             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
248                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
249             
250             # filter candidates by interface count
251             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
252                 predicate = ( lambda node_id : 
253                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
254             elif self.min_num_external_ifaces is not None:
255                 predicate = ( lambda node_id : 
256                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
257             else:
258                 predicate = ( lambda node_id : 
259                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
260             
261             candidates = set(filter(predicate, candidates))
262         
263         # make sure hostnames are resolvable
264         if candidates:
265             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
266             
267             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
268                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
269             ))
270             def resolvable(node_id):
271                 try:
272                     addr = socket.gethostbyname(hostnames[node_id])
273                     return addr is not None
274                 except:
275                     return False
276             candidates = set(parallel.pfilter(resolvable, candidates,
277                 maxthreads = 16))
278
279             self._logger.info("  Found %s reachable candidates.", len(candidates))
280             
281         return candidates
282     
283     def make_filter_description(self):
284         """
285         Makes a human-readable description of filtering conditions
286         for find_candidates.
287         """
288         
289         # get initial candidates (no tag filters)
290         filters = self.build_filters({}, self.BASEFILTERS)
291         
292         # keyword-only "pseudofilters"
293         if self.site:
294             filters['peer'] = self.site
295             
296         # filter by tag, one tag at a time
297         applicable = self.applicable_filters
298         for tagfilter in self.TAGFILTERS.iteritems():
299             attr, (tagname, expr) = tagfilter
300             
301             # don't bother if there's no filter defined
302             if attr in applicable:
303                 filters[attr] = getattr(self,attr)
304         
305         # filter by vsys tags - special case since it doesn't follow
306         # the usual semantics
307         if self.required_vsys:
308             filters['vsys'] = ','.join(list(self.required_vsys))
309         
310         # filter by iface count
311         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
312             filters['num_ifaces'] = '-'.join([
313                 str(self.min_num_external_ifaces or '0'),
314                 str(self.max_num_external_ifaces or 'inf')
315             ])
316             
317         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
318
319     def assign_node_id(self, node_id):
320         self._node_id = node_id
321         self.fetch_node_info()
322     
323     def unassign_node(self):
324         self._node_id = None
325         self.__dict__.update(self.__orig_attrs)
326     
327     def rate_nodes(self, nodes):
328         rates = collections.defaultdict(int)
329         tags = collections.defaultdict(dict)
330         replacements = {'timeframe':self.timeframe}
331         tagnames = [ tagname % replacements 
332                      for tagname, weight, default in self.RATE_FACTORS ]
333         
334         taginfo = self._api.GetNodeTags(
335             node_id=list(nodes), 
336             tagname=tagnames,
337             fields=('node_id','tagname','value'))
338         
339         for node, tagname, value in taginfo:
340             tags[tagname][int(node)] = float(value)
341         
342         for tagname, weight, default in self.RATE_FACTORS:
343             taginfo = tags[tagname].get
344             for node in nodes:
345                 rates[node] += weight * taginfo(node,default)
346         
347         return map(rates.__getitem__, nodes)
348             
349     def fetch_node_info(self):
350         orig_attrs = {}
351         
352         info = self._api.GetNodes(self._node_id)[0]
353         tags = dict( (t['tagname'],t['value'])
354                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
355
356         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
357         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
358         self.min_num_external_ifaces = None
359         self.max_num_external_ifaces = None
360         self.timeframe = 'm'
361         
362         replacements = {'timeframe':self.timeframe}
363         for attr, tag in self.BASEFILTERS.iteritems():
364             if tag in info:
365                 value = info[tag]
366                 if hasattr(self, attr):
367                     orig_attrs[attr] = getattr(self, attr)
368                 setattr(self, attr, value)
369         for attr, (tag,_) in self.TAGFILTERS.iteritems():
370             tag = tag % replacements
371             if tag in tags:
372                 value = tags[tag]
373                 if hasattr(self, attr):
374                     orig_attrs[attr] = getattr(self, attr)
375                 setattr(self, attr, value)
376         
377         if 'peer_id' in info:
378             orig_attrs['site'] = self.site
379             self.site = self._api.peer_map[info['peer_id']]
380         
381         if 'interface_ids' in info:
382             self.min_num_external_ifaces = \
383             self.max_num_external_ifaces = len(info['interface_ids'])
384         
385         if 'ssh_rsa_key' in info:
386             orig_attrs['server_key'] = self.server_key
387             self.server_key = info['ssh_rsa_key']
388         
389         self.__orig_attrs = orig_attrs
390
391     def validate(self):
392         if self.home_path is None:
393             raise AssertionError, "Misconfigured node: missing home path"
394         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
395             raise AssertionError, "Misconfigured node: missing slice SSH key"
396         if self.slicename is None:
397             raise AssertionError, "Misconfigured node: unspecified slice"
398
399     def recover(self):
400         # Mark dependencies installed
401         self._installed = True
402         
403         # Clear load attributes, they impair re-discovery
404         self.minReliability = \
405         self.maxReliability = \
406         self.minBandwidth = \
407         self.maxBandwidth = \
408         self.minCpu = \
409         self.maxCpu = \
410         self.minLoad = \
411         self.maxLoad = None
412
413     def install_dependencies(self):
414         if self.required_packages and not self._installed:
415             # If we need rpmfusion, we must install the repo definition and the gpg keys
416             if self.rpmFusion:
417                 if self.operatingSystem == 'f12':
418                     # Fedora 12 requires a different rpmfusion package
419                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
420                 else:
421                     # This one works for f13+
422                     RPM_FUSION_URL = self.RPM_FUSION_URL
423                     
424                 rpmFusion = (
425                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
426                 ) % {
427                     'RPM_FUSION_URL' : RPM_FUSION_URL
428                 }
429             else:
430                 rpmFusion = ''
431             
432             if rpmFusion:
433                 (out,err),proc = server.popen_ssh_command(
434                     rpmFusion,
435                     host = self.hostname,
436                     port = None,
437                     user = self.slicename,
438                     agent = None,
439                     ident_key = self.ident_path,
440                     server_key = self.server_key,
441                     timeout = 600,
442                     )
443                 
444                 if proc.wait():
445                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
446             
447             # Launch p2p yum dependency installer
448             self._yum_dependencies.async_setup()
449     
450     def wait_provisioning(self, timeout = 20*60):
451         # Wait for the p2p installer
452         sleeptime = 1.0
453         totaltime = 0.0
454         while not self.is_alive():
455             time.sleep(sleeptime)
456             totaltime += sleeptime
457             sleeptime = min(30.0, sleeptime*1.5)
458             
459             if totaltime > timeout:
460                 # PlanetLab has a 15' delay on configuration propagation
461                 # If we're above that delay, the unresponsiveness is not due
462                 # to this delay.
463                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
464         
465         # Ensure the node is clean (no apps running that could interfere with operations)
466         if self.enable_cleanup:
467             self.do_cleanup()
468     
469     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
470         # Wait for the p2p installer
471         if self._yum_dependencies and not self._installed:
472             self._yum_dependencies.async_setup_wait()
473             self._installed = True
474         
475     def is_alive(self):
476         # Make sure all the paths are created where 
477         # they have to be created for deployment
478         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
479             "echo 'ALIVE'",
480             host = self.hostname,
481             port = None,
482             user = self.slicename,
483             agent = None,
484             ident_key = self.ident_path,
485             server_key = self.server_key,
486             timeout = 60,
487             err_on_timeout = False
488             )
489         
490         if proc.wait():
491             return False
492         elif not err and out.strip() == 'ALIVE':
493             return True
494         else:
495             return False
496     
497     def destroy(self):
498         if self.enable_cleanup:
499             self.do_cleanup()
500     
501     def do_cleanup(self):
502         if self.testbed().recovering:
503             # WOW - not now
504             return
505             
506         self._logger.info("Cleaning up %s", self.hostname)
507         
508         cmds = [
509             "sudo -S killall python tcpdump || /bin/true ; "
510             "sudo -S killall python tcpdump || /bin/true ; "
511             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
512             "sudo -S killall -u %(slicename)s || /bin/true ",
513             "sudo -S killall -u root || /bin/true ",
514             "sudo -S killall -u %(slicename)s || /bin/true ",
515             "sudo -S killall -u root || /bin/true ",
516         ]
517
518         for cmd in cmds:
519             (out,err),proc = server.popen_ssh_command(
520                 # Some apps need two kills
521                 cmd % {
522                     'slicename' : self.slicename ,
523                 },
524                 host = self.hostname,
525                 port = None,
526                 user = self.slicename,
527                 agent = None,
528                 ident_key = self.ident_path,
529                 server_key = self.server_key,
530                 tty = True, # so that ps -N -T works as advertised...
531                 timeout = 60,
532                 retry = 3
533                 )
534             proc.wait()
535     
536     def prepare_dependencies(self):
537         # Configure p2p yum dependency installer
538         if self.required_packages and not self._installed:
539             self._yum_dependencies = application.YumDependency(self._api)
540             self._yum_dependencies.node = self
541             self._yum_dependencies.home_path = "nepi-yumdep"
542             self._yum_dependencies.depends = ' '.join(self.required_packages)
543
544     def routing_method(self, routes, vsys_vnet):
545         """
546         There are two methods, vroute and sliceip.
547         
548         vroute:
549             Modifies the node's routing table directly, validating that the IP
550             range lies within the network given by the slice's vsys_vnet tag.
551             This method is the most scalable for very small routing tables
552             that need not modify other routes (including the default)
553         
554         sliceip:
555             Uses policy routing and iptables filters to create per-sliver
556             routing tables. It's the most flexible way, but it doesn't scale
557             as well since only 155 routing tables can be created this way.
558         
559         This method will return the most appropriate routing method, which will
560         prefer vroute for small routing tables.
561         """
562         
563         # For now, sliceip results in kernel panics
564         # so we HAVE to use vroute
565         return 'vroute'
566         
567         # We should not make the routing table grow too big
568         if len(routes) > MAX_VROUTE_ROUTES:
569             return 'sliceip'
570         
571         vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
572         for route in routes:
573             dest, prefix, nexthop, metric = route
574             dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
575             nexthop = ipaddr.IPAddress(nexthop)
576             if dest not in vsys_vnet or nexthop not in vsys_vnet:
577                 return 'sliceip'
578         
579         return 'vroute'
580     
581     def format_route(self, route, dev, method, action):
582         dest, prefix, nexthop, metric = route
583         if method == 'vroute':
584             return (
585                 "%s %s%s gw %s %s" % (
586                     action,
587                     dest,
588                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
589                     nexthop,
590                     dev,
591                 )
592             )
593         elif method == 'sliceip':
594             return (
595                 "route %s to %s%s via %s metric %s dev %s" % (
596                     action,
597                     dest,
598                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
599                     nexthop,
600                     metric or 1,
601                     dev,
602                 )
603             )
604         else:
605             raise AssertionError, "Unknown method"
606     
607     def _annotate_routes_with_devs(self, routes, devs, method):
608         dev_routes = []
609         for route in routes:
610             for dev in devs:
611                 if dev.routes_here(route):
612                     dev_routes.append(tuple(route) + (dev.if_name,))
613                     
614                     # Stop checking
615                     break
616             else:
617                 if method == 'sliceip':
618                     dev_routes.append(tuple(route) + ('eth0',))
619                 else:
620                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
621                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
622         return dev_routes
623     
624     def configure_routes(self, routes, devs, vsys_vnet):
625         """
626         Add the specified routes to the node's routing table
627         """
628         rules = []
629         method = self.routing_method(routes, vsys_vnet)
630         tdevs = set()
631         
632         # annotate routes with devices
633         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
634         for route in dev_routes:
635             route, dev = route[:-1], route[-1]
636             
637             # Schedule rule
638             tdevs.add(dev)
639             rules.append(self.format_route(route, dev, method, 'add'))
640         
641         if method == 'sliceip':
642             rules = map('enable '.__add__, tdevs) + rules
643         
644         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
645         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
646         
647         self.apply_route_rules(rules, method)
648         
649         self._configured_routes = set(routes)
650         self._configured_devs = tdevs
651         self._configured_method = method
652     
653     def reconfigure_routes(self, routes, devs, vsys_vnet):
654         """
655         Updates the routes in the node's routing table to match
656         the given route list
657         """
658         method = self._configured_method
659         
660         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
661
662         current = self._configured_routes
663         current_devs = self._configured_devs
664         
665         new = set(dev_routes)
666         new_devs = set(map(operator.itemgetter(-1), dev_routes))
667         
668         deletions = current - new
669         insertions = new - current
670         
671         dev_deletions = current_devs - new_devs
672         dev_insertions = new_devs - current_devs
673         
674         # Generate rules
675         rules = []
676         
677         # Rule deletions first
678         for route in deletions:
679             route, dev = route[:-1], route[-1]
680             rules.append(self.format_route(route, dev, method, 'del'))
681         
682         if method == 'sliceip':
683             # Dev deletions now
684             rules.extend(map('disable '.__add__, dev_deletions))
685
686             # Dev insertions now
687             rules.extend(map('enable '.__add__, dev_insertions))
688
689         # Rule insertions now
690         for route in insertions:
691             route, dev = route[:-1], dev[-1]
692             rules.append(self.format_route(route, dev, method, 'add'))
693         
694         # Apply
695         self.apply_route_rules(rules, method)
696         
697         self._configured_routes = dev_routes
698         self._configured_devs = new_devs
699         
700     def apply_route_rules(self, rules, method):
701         (out,err),proc = server.popen_ssh_command(
702             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
703                 home = server.shell_escape(self.home_path),
704                 method = method),
705             host = self.hostname,
706             port = None,
707             user = self.slicename,
708             agent = None,
709             ident_key = self.ident_path,
710             server_key = self.server_key,
711             stdin = '\n'.join(rules),
712             timeout = 300
713             )
714         
715         if proc.wait() or err:
716             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
717         elif out or err:
718             logger.debug("%s said: %s%s", method, out, err)
719