adding vlc broadcasting over Planetlab example
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None, sliceapi=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         self._sliceapi = sliceapi or api
96         
97         # Attributes
98         self.hostname = None
99         self.architecture = None
100         self.operatingSystem = None
101         self.pl_distro = None
102         self.site = None
103         self.city = None
104         self.country = None
105         self.region = None
106         self.minReliability = None
107         self.maxReliability = None
108         self.minBandwidth = None
109         self.maxBandwidth = None
110         self.minCpu = None
111         self.maxCpu = None
112         self.minLoad = None
113         self.maxLoad = None
114         self.min_num_external_ifaces = None
115         self.max_num_external_ifaces = None
116         self._timeframe = 'w'
117         
118         # Applications and routes add requirements to connected nodes
119         self.required_packages = set()
120         self.required_vsys = set()
121         self.pythonpath = []
122         self.rpmFusion = False
123         self.env = collections.defaultdict(list)
124         
125         # Some special applications - initialized when connected
126         self.multicast_forwarder = None
127         
128         # Testbed-derived attributes
129         self.slicename = None
130         self.ident_path = None
131         self.server_key = None
132         self.home_path = None
133         self.enable_proc_cleanup = False
134         self.enable_home_cleanup = False
135         
136         # Those are filled when an actual node is allocated
137         self._node_id = None
138         self._yum_dependencies = None
139         self._installed = False
140
141         # Logging
142         self._logger = logging.getLogger('nepi.testbeds.planetlab')
143
144     def set_timeframe(self, timeframe):
145         if timeframe == "latest":
146             self._timeframe = ""
147         elif timeframe == "month":
148             self._timeframe = "m"
149         elif timeframe == "year":
150             self._timeframe = "y"
151         else:
152             self._timeframe = "w"
153
154     def get_timeframe(self):
155         if self._timeframe == "":
156             return "latest"
157         if self._timeframe == "m":
158             return "month"
159         if self._timeframe == "y":
160             return "year"
161         return "week"
162
163     timeframe = property(get_timeframe, set_timeframe)
164     
165     def _nepi_testbed_environment_setup_get(self):
166         command = cStringIO.StringIO()
167         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
168             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
169         ))
170         command.write(' ; export PATH=$PATH:%s' % (
171             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
172         ))
173         if self.env:
174             for envkey, envvals in self.env.iteritems():
175                 for envval in envvals:
176                     command.write(' ; export %s=%s' % (envkey, envval))
177         return command.getvalue()
178
179     def _nepi_testbed_environment_setup_set(self, value):
180         pass
181
182     _nepi_testbed_environment_setup = property(
183         _nepi_testbed_environment_setup_get,
184         _nepi_testbed_environment_setup_set)
185     
186     def build_filters(self, target_filters, filter_map):
187         for attr, tag in filter_map.iteritems():
188             value = getattr(self, attr, None)
189             if value is not None:
190                 target_filters[tag] = value
191         return target_filters
192     
193     @property
194     def applicable_filters(self):
195         has = lambda att : getattr(self,att,None) is not None
196         return (
197             filter(has, self.BASEFILTERS.iterkeys())
198             + filter(has, self.TAGFILTERS.iterkeys())
199         )
200     
201     def find_candidates(self, filter_slice_id=None):
202         self._logger.info("Finding candidates for %s", self.make_filter_description())
203         
204         fields = ('node_id',)
205         replacements = {'timeframe':self._timeframe}
206         
207         # get initial candidates (no tag filters)
208         basefilters = self.build_filters({}, self.BASEFILTERS)
209         rootfilters = basefilters.copy()
210         if filter_slice_id:
211             basefilters['|slice_ids'] = (filter_slice_id,)
212         
213         # only pick healthy nodes
214         basefilters['run_level'] = 'boot'
215         basefilters['boot_state'] = 'boot'
216         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
217         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
218         
219         # keyword-only "pseudofilters"
220         extra = {}
221         if self.site:
222             extra['peer'] = self.site
223             
224         candidates = set(map(operator.itemgetter('node_id'), 
225             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
226
227         # filter by tag, one tag at a time
228         applicable = self.applicable_filters
229         for tagfilter in self.TAGFILTERS.iteritems():
230             attr, (tagname, expr) = tagfilter
231             
232             # don't bother if there's no filter defined
233             if attr in applicable:
234                 tagfilter = rootfilters.copy()
235                 tagfilter['tagname'] = tagname % replacements
236                 tagfilter[expr % replacements] = str(getattr(self,attr))
237                 tagfilter['node_id'] = list(candidates)
238               
239                 candidates &= set(map(operator.itemgetter('node_id'),
240                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
241
242         # filter by vsys tags - special case since it doesn't follow
243         # the usual semantics
244         if self.required_vsys:
245             newcandidates = collections.defaultdict(set)
246             
247             vsys_tags = self._sliceapi.GetNodeTags(
248                 tagname='vsys', 
249                 node_id = list(candidates), 
250                 fields = ['node_id','value'])
251
252             vsys_tags = map(
253                 operator.itemgetter(['node_id','value']),
254                 vsys_tags)
255             
256             required_vsys = self.required_vsys
257             for node_id, value in vsys_tags:
258                 if value in required_vsys:
259                     newcandidates[value].add(node_id)
260             
261             # take only those that have all the required vsys tags
262             newcandidates = reduce(
263                 lambda accum, new : accum & new,
264                 newcandidates.itervalues(),
265                 candidates)
266         
267         # filter by iface count
268         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
269             # fetch interfaces for all, in one go
270             filters = basefilters.copy()
271             filters['node_id'] = list(candidates)
272             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
273                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
274             
275             # filter candidates by interface count
276             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
277                 predicate = ( lambda node_id : 
278                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
279             elif self.min_num_external_ifaces is not None:
280                 predicate = ( lambda node_id : 
281                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
282             else:
283                 predicate = ( lambda node_id : 
284                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
285             
286             candidates = set(filter(predicate, candidates))
287        
288         # make sure hostnames are resolvable
289         hostnames = dict() 
290         if candidates:
291             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
292            
293             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
294                 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
295             ))
296
297             def resolvable(node_id):
298                 try:
299                     addr = socket.gethostbyname(hostnames[node_id])
300                     return addr is not None
301                 except:
302                     return False
303             candidates = set(parallel.pfilter(resolvable, candidates,
304                 maxthreads = 16))
305
306             self._logger.info("  Found %s reachable candidates.", len(candidates))
307
308             for h in hostnames.keys():
309                 if h not in candidates:
310                     del hostnames[h]
311
312             hostnames = dict((v,k) for k, v in hostnames.iteritems())
313
314         return hostnames
315     
316     def make_filter_description(self):
317         """
318         Makes a human-readable description of filtering conditions
319         for find_candidates.
320         """
321         
322         # get initial candidates (no tag filters)
323         filters = self.build_filters({}, self.BASEFILTERS)
324         
325         # keyword-only "pseudofilters"
326         if self.site:
327             filters['peer'] = self.site
328             
329         # filter by tag, one tag at a time
330         applicable = self.applicable_filters
331         for tagfilter in self.TAGFILTERS.iteritems():
332             attr, (tagname, expr) = tagfilter
333             
334             # don't bother if there's no filter defined
335             if attr in applicable:
336                 filters[attr] = getattr(self,attr)
337         
338         # filter by vsys tags - special case since it doesn't follow
339         # the usual semantics
340         if self.required_vsys:
341             filters['vsys'] = ','.join(list(self.required_vsys))
342         
343         # filter by iface count
344         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
345             filters['num_ifaces'] = '-'.join([
346                 str(self.min_num_external_ifaces or '0'),
347                 str(self.max_num_external_ifaces or 'inf')
348             ])
349             
350         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
351
352     def assign_node_id(self, node_id):
353         self._node_id = node_id
354         self.fetch_node_info()
355     
356     def unassign_node(self):
357         self._node_id = None
358         self.hostip = None
359         
360         try:
361             orig_attrs = self.__orig_attrs
362         except AttributeError:
363             return
364             
365         for key, value in orig_attrs.iteritems():
366             setattr(self, key, value)
367         del self.__orig_attrs
368     
369     def rate_nodes(self, nodes):
370         rates = collections.defaultdict(int)
371         tags = collections.defaultdict(dict)
372         replacements = {'timeframe':self._timeframe}
373         tagnames = [ tagname % replacements 
374                      for tagname, weight, default in self.RATE_FACTORS ]
375        
376         taginfo = self._sliceapi.GetNodeTags(
377             node_id=list(nodes), 
378             tagname=tagnames,
379             fields=('node_id','tagname','value'))
380
381         unpack = operator.itemgetter('node_id','tagname','value')
382         for value in taginfo:
383             node, tagname, value = unpack(value)
384             if value and value.lower() != 'n/a':
385                 tags[tagname][node] = float(value)
386         
387         for tagname, weight, default in self.RATE_FACTORS:
388             taginfo = tags[tagname % replacements].get
389             for node in nodes:
390                 rates[node] += weight * taginfo(node,default)
391         
392         return map(rates.__getitem__, nodes)
393             
394     def fetch_node_info(self):
395         orig_attrs = {}
396         
397         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
398         info = info[0]
399         
400         tags = dict( (t['tagname'],t['value'])
401                      for t in tags )
402
403         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
404         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
405         self.min_num_external_ifaces = None
406         self.max_num_external_ifaces = None
407         if not self._timeframe: self._timeframe = 'w'
408         
409         replacements = {'timeframe':self._timeframe}
410
411         for attr, tag in self.BASEFILTERS.iteritems():
412             if tag in info:
413                 value = info[tag]
414                 if hasattr(self, attr):
415                     orig_attrs[attr] = getattr(self, attr)
416                 setattr(self, attr, value)
417         for attr, (tag,_) in self.TAGFILTERS.iteritems():
418             tag = tag % replacements
419             if tag in tags:
420                 value = tags[tag]
421                 if hasattr(self, attr):
422                     orig_attrs[attr] = getattr(self, attr)
423                 if not value or value.lower() == 'n/a':
424                     value = None
425                 setattr(self, attr, value)
426         
427         if 'peer_id' in info:
428             orig_attrs['site'] = self.site
429             self.site = self._sliceapi.peer_map[info['peer_id']]
430         
431         if 'interface_ids' in info:
432             self.min_num_external_ifaces = \
433             self.max_num_external_ifaces = len(info['interface_ids'])
434         
435         if 'ssh_rsa_key' in info:
436             orig_attrs['server_key'] = self.server_key
437             self.server_key = info['ssh_rsa_key']
438         
439         self.hostip = socket.gethostbyname(self.hostname)
440         
441         try:
442             self.__orig_attrs
443         except AttributeError:
444             self.__orig_attrs = orig_attrs
445
446     def validate(self):
447         if self.home_path is None:
448             raise AssertionError, "Misconfigured node: missing home path"
449         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
450             raise AssertionError, "Misconfigured node: missing slice SSH key"
451         if self.slicename is None:
452             raise AssertionError, "Misconfigured node: unspecified slice"
453
454     def recover(self):
455         # Mark dependencies installed
456         self._installed = True
457         
458         # Clear load attributes, they impair re-discovery
459         self.minReliability = \
460         self.maxReliability = \
461         self.minBandwidth = \
462         self.maxBandwidth = \
463         self.minCpu = \
464         self.maxCpu = \
465         self.minLoad = \
466         self.maxLoad = None
467
468     def install_dependencies(self):
469         if self.required_packages and not self._installed:
470             # If we need rpmfusion, we must install the repo definition and the gpg keys
471             if self.rpmFusion:
472                 if self.operatingSystem == 'f12':
473                     # Fedora 12 requires a different rpmfusion package
474                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
475                 else:
476                     # This one works for f13+
477                     RPM_FUSION_URL = self.RPM_FUSION_URL
478                     
479                 rpmFusion = (
480                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
481                 ) % {
482                     'RPM_FUSION_URL' : RPM_FUSION_URL
483                 }
484             else:
485                 rpmFusion = ''
486             
487             if rpmFusion:
488                 (out,err),proc = server.popen_ssh_command(
489                     rpmFusion,
490                     host = self.hostname,
491                     port = None,
492                     user = self.slicename,
493                     agent = None,
494                     ident_key = self.ident_path,
495                     server_key = self.server_key,
496                     timeout = 600,
497                     )
498                 
499                 if proc.wait():
500                     if self.check_bad_host(out,err):
501                         self.blacklist()
502                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
503             
504             # Launch p2p yum dependency installer
505             self._yum_dependencies.async_setup()
506     
507     def wait_provisioning(self, timeout = 20*60):
508         # Wait for the p2p installer
509         sleeptime = 1.0
510         totaltime = 0.0
511         while not self.is_alive():
512             time.sleep(sleeptime)
513             totaltime += sleeptime
514             sleeptime = min(30.0, sleeptime*1.5)
515             
516             if totaltime > timeout:
517                 # PlanetLab has a 15' delay on configuration propagation
518                 # If we're above that delay, the unresponsiveness is not due
519                 # to this delay.
520                 if not self.is_alive(verbose=True):
521                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
522         
523         # Ensure the node is clean (no apps running that could interfere with operations)
524         if self.enable_proc_cleanup:
525             self.do_proc_cleanup()
526         if self.enable_home_cleanup:
527             self.do_home_cleanup()
528    
529     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
530         # Wait for the p2p installer
531         if self._yum_dependencies and not self._installed:
532             self._yum_dependencies.async_setup_wait()
533             self._installed = True
534         
535     def is_alive(self, verbose = False):
536         # Make sure all the paths are created where 
537         # they have to be created for deployment
538         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
539             "echo 'ALIVE'",
540             host = self.hostname,
541             port = None,
542             user = self.slicename,
543             agent = None,
544             ident_key = self.ident_path,
545             server_key = self.server_key,
546             timeout = 60,
547             err_on_timeout = False,
548             persistent = False
549             )
550         
551         if proc.wait():
552             if verbose:
553                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
554             return False
555         elif not err and out.strip() == 'ALIVE':
556             return True
557         else:
558             if verbose:
559                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
560             return False
561     
562     def destroy(self):
563         if self.enable_proc_cleanup:
564             self.do_proc_cleanup()
565     
566     def blacklist(self):
567         if self._node_id:
568             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
569             import util
570             util.appendBlacklist(self.hostname)
571     
572     def do_proc_cleanup(self):
573         if self.testbed().recovering:
574             # WOW - not now
575             return
576             
577         self._logger.info("Cleaning up processes on %s", self.hostname)
578         
579         cmds = [
580             "sudo -S killall python tcpdump || /bin/true ; "
581             "sudo -S killall python tcpdump || /bin/true ; "
582             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
583             "sudo -S killall -u %(slicename)s || /bin/true ",
584             "sudo -S killall -u root || /bin/true ",
585             "sudo -S killall -u %(slicename)s || /bin/true ",
586             "sudo -S killall -u root || /bin/true ",
587         ]
588
589         for cmd in cmds:
590             (out,err),proc = server.popen_ssh_command(
591                 # Some apps need two kills
592                 cmd % {
593                     'slicename' : self.slicename ,
594                 },
595                 host = self.hostname,
596                 port = None,
597                 user = self.slicename,
598                 agent = None,
599                 ident_key = self.ident_path,
600                 server_key = self.server_key,
601                 tty = True, # so that ps -N -T works as advertised...
602                 timeout = 60,
603                 retry = 3
604                 )
605             proc.wait()
606      
607     def do_home_cleanup(self):
608         if self.testbed().recovering:
609             # WOW - not now
610             return
611             
612         self._logger.info("Cleaning up home on %s", self.hostname)
613         
614         cmds = [
615             "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + "
616         ]
617
618         for cmd in cmds:
619             (out,err),proc = server.popen_ssh_command(
620                 # Some apps need two kills
621                 cmd % {
622                     'slicename' : self.slicename ,
623                 },
624                 host = self.hostname,
625                 port = None,
626                 user = self.slicename,
627                 agent = None,
628                 ident_key = self.ident_path,
629                 server_key = self.server_key,
630                 tty = True, # so that ps -N -T works as advertised...
631                 timeout = 60,
632                 retry = 3
633                 )
634             proc.wait()
635    
636     def prepare_dependencies(self):
637         # Configure p2p yum dependency installer
638         if self.required_packages and not self._installed:
639             self._yum_dependencies = application.YumDependency(self._api)
640             self._yum_dependencies.node = self
641             self._yum_dependencies.home_path = "nepi-yumdep"
642             self._yum_dependencies.depends = ' '.join(self.required_packages)
643
644     def routing_method(self, routes, vsys_vnet):
645         """
646         There are two methods, vroute and sliceip.
647         
648         vroute:
649             Modifies the node's routing table directly, validating that the IP
650             range lies within the network given by the slice's vsys_vnet tag.
651             This method is the most scalable for very small routing tables
652             that need not modify other routes (including the default)
653         
654         sliceip:
655             Uses policy routing and iptables filters to create per-sliver
656             routing tables. It's the most flexible way, but it doesn't scale
657             as well since only 155 routing tables can be created this way.
658         
659         This method will return the most appropriate routing method, which will
660         prefer vroute for small routing tables.
661         """
662         
663         # For now, sliceip results in kernel panics
664         # so we HAVE to use vroute
665         return 'vroute'
666         
667         # We should not make the routing table grow too big
668         if len(routes) > MAX_VROUTE_ROUTES:
669             return 'sliceip'
670         
671         vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
672         for route in routes:
673             dest, prefix, nexthop, metric, device = route
674             dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
675             nexthop = ipaddr.IPAddress(nexthop)
676             if dest not in vsys_vnet or nexthop not in vsys_vnet:
677                 return 'sliceip'
678         
679         return 'vroute'
680     
681     def format_route(self, route, dev, method, action):
682         dest, prefix, nexthop, metric, device = route
683         if method == 'vroute':
684             return (
685                 "%s %s%s gw %s %s" % (
686                     action,
687                     dest,
688                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
689                     nexthop,
690                     dev,
691                 )
692             )
693         elif method == 'sliceip':
694             return (
695                 "route %s to %s%s via %s metric %s dev %s" % (
696                     action,
697                     dest,
698                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
699                     nexthop,
700                     metric or 1,
701                     dev,
702                 )
703             )
704         else:
705             raise AssertionError, "Unknown method"
706     
707     def _annotate_routes_with_devs(self, routes, devs, method):
708         dev_routes = []
709         for route in routes:
710             for dev in devs:
711                 if dev.routes_here(route):
712                     dev_routes.append(tuple(route) + (dev.if_name,))
713                     
714                     # Stop checking
715                     break
716             else:
717                 if method == 'sliceip':
718                     dev_routes.append(tuple(route) + ('eth0',))
719                 else:
720                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
721                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
722         return dev_routes
723     
724     def configure_routes(self, routes, devs, vsys_vnet):
725         """
726         Add the specified routes to the node's routing table
727         """
728         rules = []
729         method = self.routing_method(routes, vsys_vnet)
730         tdevs = set()
731         
732         # annotate routes with devices
733         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
734         for route in dev_routes:
735             route, dev = route[:-1], route[-1]
736             
737             # Schedule rule
738             tdevs.add(dev)
739             rules.append(self.format_route(route, dev, method, 'add'))
740         
741         if method == 'sliceip':
742             rules = map('enable '.__add__, tdevs) + rules
743         
744         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
745         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
746         
747         self.apply_route_rules(rules, method)
748         
749         self._configured_routes = set(routes)
750         self._configured_devs = tdevs
751         self._configured_method = method
752     
753     def reconfigure_routes(self, routes, devs, vsys_vnet):
754         """
755         Updates the routes in the node's routing table to match
756         the given route list
757         """
758         method = self._configured_method
759         
760         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
761
762         current = self._configured_routes
763         current_devs = self._configured_devs
764         
765         new = set(dev_routes)
766         new_devs = set(map(operator.itemgetter(-1), dev_routes))
767         
768         deletions = current - new
769         insertions = new - current
770         
771         dev_deletions = current_devs - new_devs
772         dev_insertions = new_devs - current_devs
773         
774         # Generate rules
775         rules = []
776         
777         # Rule deletions first
778         for route in deletions:
779             route, dev = route[:-1], route[-1]
780             rules.append(self.format_route(route, dev, method, 'del'))
781         
782         if method == 'sliceip':
783             # Dev deletions now
784             rules.extend(map('disable '.__add__, dev_deletions))
785
786             # Dev insertions now
787             rules.extend(map('enable '.__add__, dev_insertions))
788
789         # Rule insertions now
790         for route in insertions:
791             route, dev = route[:-1], dev[-1]
792             rules.append(self.format_route(route, dev, method, 'add'))
793         
794         # Apply
795         self.apply_route_rules(rules, method)
796         
797         self._configured_routes = dev_routes
798         self._configured_devs = new_devs
799         
800     def apply_route_rules(self, rules, method):
801         (out,err),proc = server.popen_ssh_command(
802             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
803                 home = server.shell_escape(self.home_path),
804                 method = method),
805             host = self.hostname,
806             port = None,
807             user = self.slicename,
808             agent = None,
809             ident_key = self.ident_path,
810             server_key = self.server_key,
811             stdin = '\n'.join(rules),
812             timeout = 300
813             )
814         
815         if proc.wait() or err:
816             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
817         elif out or err:
818             logger.debug("%s said: %s%s", method, out, err)
819
820     def check_bad_host(self, out, err):
821         badre = re.compile(r'(?:'
822                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
823                            r'|Error: disk I/O error'
824                            r')', 
825                            re.I)
826         return badre.search(out) or badre.search(err)