34f6f797ee5f7937b370b642136e4e5a4ede0666
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
81     
82     minReliability = _castproperty(float, '_minReliability')
83     maxReliability = _castproperty(float, '_maxReliability')
84     minBandwidth = _castproperty(float, '_minBandwidth')
85     maxBandwidth = _castproperty(float, '_maxBandwidth')
86     minCpu = _castproperty(float, '_minCpu')
87     maxCpu = _castproperty(float, '_maxCpu')
88     minLoad = _castproperty(float, '_minLoad')
89     maxLoad = _castproperty(float, '_maxLoad')
90     
91     def __init__(self, api=None, sliceapi=None):
92         if not api:
93             api = plcapi.PLCAPI()
94         self._api = api
95         self._sliceapi = sliceapi or api
96         
97         # Attributes
98         self.hostname = None
99         self.architecture = None
100         self.operatingSystem = None
101         self.pl_distro = None
102         self.site = None
103         self.city = None
104         self.country = None
105         self.region = None
106         self.minReliability = None
107         self.maxReliability = None
108         self.minBandwidth = None
109         self.maxBandwidth = None
110         self.minCpu = None
111         self.maxCpu = None
112         self.minLoad = None
113         self.maxLoad = None
114         self.min_num_external_ifaces = None
115         self.max_num_external_ifaces = None
116         self._timeframe = 'w'
117         
118         # Applications and routes add requirements to connected nodes
119         self.required_packages = set()
120         self.required_vsys = set()
121         self.pythonpath = []
122         self.rpmFusion = False
123         self.env = collections.defaultdict(list)
124         
125         # Some special applications - initialized when connected
126         self.multicast_forwarder = None
127         
128         # Testbed-derived attributes
129         self.slicename = None
130         self.ident_path = None
131         self.server_key = None
132         self.home_path = None
133         self.enable_proc_cleanup = False
134         self.enable_home_cleanup = False
135         
136         # Those are filled when an actual node is allocated
137         self._node_id = None
138         self._yum_dependencies = None
139         self._installed = False
140
141         # Logging
142         self._logger = logging.getLogger('nepi.testbeds.planetlab')
143
144     def set_timeframe(self, timeframe):
145         if timeframe == "latest":
146             self._timeframe = ""
147         elif timeframe == "month":
148             self._timeframe = "m"
149         elif timeframe == "year":
150             self._timeframe = "y"
151         else:
152             self._timeframe = "w"
153
154     def get_timeframe(self):
155         if self._timeframe == "":
156             return "latest"
157         if self._timeframe == "m":
158             return "month"
159         if self._timeframe == "y":
160             return "year"
161         return "week"
162
163     timeframe = property(get_timeframe, set_timeframe)
164     
165     def _nepi_testbed_environment_setup_get(self):
166         command = cStringIO.StringIO()
167         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
168             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
169         ))
170         command.write(' ; export PATH=$PATH:%s' % (
171             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
172         ))
173         if self.env:
174             for envkey, envvals in self.env.iteritems():
175                 for envval in envvals:
176                     command.write(' ; export %s=%s' % (envkey, envval))
177         return command.getvalue()
178
179     def _nepi_testbed_environment_setup_set(self, value):
180         pass
181
182     _nepi_testbed_environment_setup = property(
183         _nepi_testbed_environment_setup_get,
184         _nepi_testbed_environment_setup_set)
185     
186     def build_filters(self, target_filters, filter_map):
187         for attr, tag in filter_map.iteritems():
188             value = getattr(self, attr, None)
189             if value is not None:
190                 target_filters[tag] = value
191         return target_filters
192     
193     @property
194     def applicable_filters(self):
195         has = lambda att : getattr(self,att,None) is not None
196         return (
197             filter(has, self.BASEFILTERS.iterkeys())
198             + filter(has, self.TAGFILTERS.iterkeys())
199         )
200     
201     def find_candidates(self, filter_slice_id=None):
202         self._logger.info("Finding candidates for %s", self.make_filter_description())
203         
204         fields = ('node_id',)
205         replacements = {'timeframe':self._timeframe}
206         
207         # get initial candidates (no tag filters)
208         basefilters = self.build_filters({}, self.BASEFILTERS)
209         rootfilters = basefilters.copy()
210         if filter_slice_id:
211             basefilters['|slice_ids'] = (filter_slice_id,)
212         
213         # only pick healthy nodes
214         basefilters['run_level'] = 'boot'
215         basefilters['boot_state'] = 'boot'
216         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
217         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
218         
219         # keyword-only "pseudofilters"
220         extra = {}
221         if self.site:
222             extra['peer'] = self.site
223             
224         candidates = set(map(operator.itemgetter('node_id'), 
225             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
226
227         # filter by tag, one tag at a time
228         applicable = self.applicable_filters
229         for tagfilter in self.TAGFILTERS.iteritems():
230             attr, (tagname, expr) = tagfilter
231             
232             # don't bother if there's no filter defined
233             if attr in applicable:
234                 tagfilter = rootfilters.copy()
235                 tagfilter['tagname'] = tagname % replacements
236                 tagfilter[expr % replacements] = str(getattr(self,attr))
237                 tagfilter['node_id'] = list(candidates)
238               
239                 candidates &= set(map(operator.itemgetter('node_id'),
240                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
241
242         # filter by vsys tags - special case since it doesn't follow
243         # the usual semantics
244         if self.required_vsys:
245             newcandidates = collections.defaultdict(set)
246             
247             vsys_tags = self._sliceapi.GetNodeTags(
248                 tagname='vsys', 
249                 node_id = list(candidates), 
250                 fields = ['node_id','value'])
251
252             vsys_tags = map(
253                 operator.itemgetter(['node_id','value']),
254                 vsys_tags)
255             
256             required_vsys = self.required_vsys
257             for node_id, value in vsys_tags:
258                 if value in required_vsys:
259                     newcandidates[value].add(node_id)
260             
261             # take only those that have all the required vsys tags
262             newcandidates = reduce(
263                 lambda accum, new : accum & new,
264                 newcandidates.itervalues(),
265                 candidates)
266         
267         # filter by iface count
268         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
269             # fetch interfaces for all, in one go
270             filters = basefilters.copy()
271             filters['node_id'] = list(candidates)
272             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
273                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
274             
275             # filter candidates by interface count
276             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
277                 predicate = ( lambda node_id : 
278                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
279             elif self.min_num_external_ifaces is not None:
280                 predicate = ( lambda node_id : 
281                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
282             else:
283                 predicate = ( lambda node_id : 
284                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
285             
286             candidates = set(filter(predicate, candidates))
287        
288         # make sure hostnames are resolvable
289         hostnames = dict() 
290         if candidates:
291             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
292            
293             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
294                 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
295             ))
296
297             def resolvable(node_id):
298                 try:
299                     addr = socket.gethostbyname(hostnames[node_id])
300                     return addr is not None
301                 except:
302                     return False
303             candidates = set(parallel.pfilter(resolvable, candidates,
304                 maxthreads = 16))
305
306             self._logger.info("  Found %s reachable candidates.", len(candidates))
307
308             for h in hostnames.keys():
309                 if h not in candidates:
310                     del hostnames[h]
311
312             hostnames = dict((v,k) for k, v in hostnames.iteritems())
313
314         return hostnames
315     
316     def make_filter_description(self):
317         """
318         Makes a human-readable description of filtering conditions
319         for find_candidates.
320         """
321         
322         # get initial candidates (no tag filters)
323         filters = self.build_filters({}, self.BASEFILTERS)
324         
325         # keyword-only "pseudofilters"
326         if self.site:
327             filters['peer'] = self.site
328             
329         # filter by tag, one tag at a time
330         applicable = self.applicable_filters
331         for tagfilter in self.TAGFILTERS.iteritems():
332             attr, (tagname, expr) = tagfilter
333             
334             # don't bother if there's no filter defined
335             if attr in applicable:
336                 filters[attr] = getattr(self,attr)
337         
338         # filter by vsys tags - special case since it doesn't follow
339         # the usual semantics
340         if self.required_vsys:
341             filters['vsys'] = ','.join(list(self.required_vsys))
342         
343         # filter by iface count
344         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
345             filters['num_ifaces'] = '-'.join([
346                 str(self.min_num_external_ifaces or '0'),
347                 str(self.max_num_external_ifaces or 'inf')
348             ])
349             
350         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
351
352     def assign_node_id(self, node_id):
353         self._node_id = node_id
354         self.fetch_node_info()
355     
356     def unassign_node(self):
357         self._node_id = None
358         self.hostip = None
359         
360         try:
361             orig_attrs = self.__orig_attrs
362         except AttributeError:
363             return
364             
365         for key, value in orig_attrs.iteritems():
366             setattr(self, key, value)
367         del self.__orig_attrs
368     
369     def rate_nodes(self, nodes):
370         rates = collections.defaultdict(int)
371         tags = collections.defaultdict(dict)
372         replacements = {'timeframe':self._timeframe}
373         tagnames = [ tagname % replacements 
374                      for tagname, weight, default in self.RATE_FACTORS ]
375        
376         taginfo = self._sliceapi.GetNodeTags(
377             node_id=list(nodes), 
378             tagname=tagnames,
379             fields=('node_id','tagname','value'))
380
381         unpack = operator.itemgetter('node_id','tagname','value')
382         for value in taginfo:
383             node, tagname, value = unpack(value)
384             if value and value.lower() != 'n/a':
385                 tags[tagname][node] = float(value)
386         
387         for tagname, weight, default in self.RATE_FACTORS:
388             taginfo = tags[tagname % replacements].get
389             for node in nodes:
390                 rates[node] += weight * taginfo(node,default)
391         
392         return map(rates.__getitem__, nodes)
393             
394     def fetch_node_info(self):
395         orig_attrs = {}
396         
397         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
398         info = info[0]
399         
400         tags = dict( (t['tagname'],t['value'])
401                      for t in tags )
402
403         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
404         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
405         self.min_num_external_ifaces = None
406         self.max_num_external_ifaces = None
407         if not self._timeframe: self._timeframe = 'w'
408         
409         replacements = {'timeframe':self._timeframe}
410
411         for attr, tag in self.BASEFILTERS.iteritems():
412             if tag in info:
413                 value = info[tag]
414                 if hasattr(self, attr):
415                     orig_attrs[attr] = getattr(self, attr)
416                 setattr(self, attr, value)
417         for attr, (tag,_) in self.TAGFILTERS.iteritems():
418             tag = tag % replacements
419             if tag in tags:
420                 value = tags[tag]
421                 if hasattr(self, attr):
422                     orig_attrs[attr] = getattr(self, attr)
423                 if not value or value.lower() == 'n/a':
424                     value = None
425                 setattr(self, attr, value)
426         
427         if 'peer_id' in info:
428             orig_attrs['site'] = self.site
429             self.site = self._sliceapi.peer_map[info['peer_id']]
430         
431         if 'interface_ids' in info:
432             self.min_num_external_ifaces = \
433             self.max_num_external_ifaces = len(info['interface_ids'])
434         
435         if 'ssh_rsa_key' in info:
436             orig_attrs['server_key'] = self.server_key
437             self.server_key = info['ssh_rsa_key']
438         
439         self.hostip = socket.gethostbyname(self.hostname)
440         
441         try:
442             self.__orig_attrs
443         except AttributeError:
444             self.__orig_attrs = orig_attrs
445
446     def validate(self):
447         if self.home_path is None:
448             raise AssertionError, "Misconfigured node: missing home path"
449         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
450             raise AssertionError, "Misconfigured node: missing slice SSH key"
451         if self.slicename is None:
452             raise AssertionError, "Misconfigured node: unspecified slice"
453
454     def recover(self):
455         # Mark dependencies installed
456         self._installed = True
457         
458         # Clear load attributes, they impair re-discovery
459         self.minReliability = \
460         self.maxReliability = \
461         self.minBandwidth = \
462         self.maxBandwidth = \
463         self.minCpu = \
464         self.maxCpu = \
465         self.minLoad = \
466         self.maxLoad = None
467
468     def install_dependencies(self):
469         if self.required_packages and not self._installed:
470             # If we need rpmfusion, we must install the repo definition and the gpg keys
471             if self.rpmFusion:
472                 if self.operatingSystem == 'f12':
473                     # Fedora 12 requires a different rpmfusion package
474                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
475                 else:
476                     # This one works for f13+
477                     RPM_FUSION_URL = self.RPM_FUSION_URL
478                     
479                 rpmFusion = (
480                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
481                 ) % {
482                     'RPM_FUSION_URL' : RPM_FUSION_URL
483                 }
484             else:
485                 rpmFusion = ''
486             
487             if rpmFusion:
488                 (out,err),proc = server.popen_ssh_command(
489                     rpmFusion,
490                     host = self.hostname,
491                     port = None,
492                     user = self.slicename,
493                     agent = None,
494                     ident_key = self.ident_path,
495                     server_key = self.server_key,
496                     timeout = 600,
497                     )
498                 
499                 if proc.wait():
500                     if self.check_bad_host(out,err):
501                         self.blacklist()
502                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
503             
504             # Launch p2p yum dependency installer
505             self._yum_dependencies.async_setup()
506     
507     def wait_provisioning(self, timeout = 20*60):
508         # Wait for the p2p installer
509         sleeptime = 1.0
510         totaltime = 0.0
511         while not self.is_alive():
512             time.sleep(sleeptime)
513             totaltime += sleeptime
514             sleeptime = min(30.0, sleeptime*1.5)
515             
516             if totaltime > timeout:
517                 # PlanetLab has a 15' delay on configuration propagation
518                 # If we're above that delay, the unresponsiveness is not due
519                 # to this delay.
520                 if not self.is_alive(verbose=True):
521                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
522         
523         # Ensure the node is clean (no apps running that could interfere with operations)
524         if self.enable_proc_cleanup:
525             self.do_proc_cleanup()
526         if self.enable_home_cleanup:
527             self.do_home_cleanup()
528    
529     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
530         # Wait for the p2p installer
531         if self._yum_dependencies and not self._installed:
532             self._yum_dependencies.async_setup_wait()
533             self._installed = True
534         
535     def is_alive(self, verbose = False):
536         # Make sure all the paths are created where 
537         # they have to be created for deployment
538         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
539             "echo 'ALIVE'",
540             host = self.hostname,
541             port = None,
542             user = self.slicename,
543             agent = None,
544             ident_key = self.ident_path,
545             server_key = self.server_key,
546             timeout = 60,
547             err_on_timeout = False,
548             persistent = False
549             )
550         
551         if proc.wait():
552             if verbose:
553                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
554             return False
555         elif not err and out.strip() == 'ALIVE':
556             return True
557         else:
558             if verbose:
559                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
560             return False
561     
562     def destroy(self):
563         if self.enable_proc_cleanup:
564             self.do_proc_cleanup()
565         if self.enable_home_cleanup:
566             self.do_home_cleanup()
567     
568     def blacklist(self):
569         if self._node_id:
570             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
571             import util
572             util.appendBlacklist(self.hostname)
573     
574     def do_proc_cleanup(self):
575         if self.testbed().recovering:
576             # WOW - not now
577             return
578             
579         self._logger.info("Cleaning up processes on %s", self.hostname)
580         
581         cmds = [
582             "sudo -S killall python tcpdump || /bin/true ; "
583             "sudo -S killall python tcpdump || /bin/true ; "
584             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
585             "sudo -S killall -u %(slicename)s || /bin/true ",
586             "sudo -S killall -u root || /bin/true ",
587             "sudo -S killall -u %(slicename)s || /bin/true ",
588             "sudo -S killall -u root || /bin/true ",
589         ]
590
591         for cmd in cmds:
592             (out,err),proc = server.popen_ssh_command(
593                 # Some apps need two kills
594                 cmd % {
595                     'slicename' : self.slicename ,
596                 },
597                 host = self.hostname,
598                 port = None,
599                 user = self.slicename,
600                 agent = None,
601                 ident_key = self.ident_path,
602                 server_key = self.server_key,
603                 tty = True, # so that ps -N -T works as advertised...
604                 timeout = 60,
605                 retry = 3
606                 )
607             proc.wait()
608      
609     def do_home_cleanup(self):
610         if self.testbed().recovering:
611             # WOW - not now
612             return
613             
614         self._logger.info("Cleaning up home on %s", self.hostname)
615         
616         cmds = [
617             "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + "
618         ]
619
620         for cmd in cmds:
621             (out,err),proc = server.popen_ssh_command(
622                 # Some apps need two kills
623                 cmd % {
624                     'slicename' : self.slicename ,
625                 },
626                 host = self.hostname,
627                 port = None,
628                 user = self.slicename,
629                 agent = None,
630                 ident_key = self.ident_path,
631                 server_key = self.server_key,
632                 tty = True, # so that ps -N -T works as advertised...
633                 timeout = 60,
634                 retry = 3
635                 )
636             proc.wait()
637    
638     def prepare_dependencies(self):
639         # Configure p2p yum dependency installer
640         if self.required_packages and not self._installed:
641             self._yum_dependencies = application.YumDependency(self._api)
642             self._yum_dependencies.node = self
643             self._yum_dependencies.home_path = "nepi-yumdep"
644             self._yum_dependencies.depends = ' '.join(self.required_packages)
645
646     def routing_method(self, routes, vsys_vnet):
647         """
648         There are two methods, vroute and sliceip.
649         
650         vroute:
651             Modifies the node's routing table directly, validating that the IP
652             range lies within the network given by the slice's vsys_vnet tag.
653             This method is the most scalable for very small routing tables
654             that need not modify other routes (including the default)
655         
656         sliceip:
657             Uses policy routing and iptables filters to create per-sliver
658             routing tables. It's the most flexible way, but it doesn't scale
659             as well since only 155 routing tables can be created this way.
660         
661         This method will return the most appropriate routing method, which will
662         prefer vroute for small routing tables.
663         """
664         
665         # For now, sliceip results in kernel panics
666         # so we HAVE to use vroute
667         return 'vroute'
668         
669         # We should not make the routing table grow too big
670         if len(routes) > MAX_VROUTE_ROUTES:
671             return 'sliceip'
672         
673         vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
674         for route in routes:
675             dest, prefix, nexthop, metric, device = route
676             dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
677             nexthop = ipaddr.IPAddress(nexthop)
678             if dest not in vsys_vnet or nexthop not in vsys_vnet:
679                 return 'sliceip'
680         
681         return 'vroute'
682     
683     def format_route(self, route, dev, method, action):
684         dest, prefix, nexthop, metric, device = route
685         if method == 'vroute':
686             return (
687                 "%s %s%s gw %s %s" % (
688                     action,
689                     dest,
690                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
691                     nexthop,
692                     dev,
693                 )
694             )
695         elif method == 'sliceip':
696             return (
697                 "route %s to %s%s via %s metric %s dev %s" % (
698                     action,
699                     dest,
700                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
701                     nexthop,
702                     metric or 1,
703                     dev,
704                 )
705             )
706         else:
707             raise AssertionError, "Unknown method"
708     
709     def _annotate_routes_with_devs(self, routes, devs, method):
710         dev_routes = []
711         for route in routes:
712             for dev in devs:
713                 if dev.routes_here(route):
714                     dev_routes.append(tuple(route) + (dev.if_name,))
715                     
716                     # Stop checking
717                     break
718             else:
719                 if method == 'sliceip':
720                     dev_routes.append(tuple(route) + ('eth0',))
721                 else:
722                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
723                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
724         return dev_routes
725     
726     def configure_routes(self, routes, devs, vsys_vnet):
727         """
728         Add the specified routes to the node's routing table
729         """
730         rules = []
731         method = self.routing_method(routes, vsys_vnet)
732         tdevs = set()
733         
734         # annotate routes with devices
735         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
736         for route in dev_routes:
737             route, dev = route[:-1], route[-1]
738             
739             # Schedule rule
740             tdevs.add(dev)
741             rules.append(self.format_route(route, dev, method, 'add'))
742         
743         if method == 'sliceip':
744             rules = map('enable '.__add__, tdevs) + rules
745         
746         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
747         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
748         
749         self.apply_route_rules(rules, method)
750         
751         self._configured_routes = set(routes)
752         self._configured_devs = tdevs
753         self._configured_method = method
754     
755     def reconfigure_routes(self, routes, devs, vsys_vnet):
756         """
757         Updates the routes in the node's routing table to match
758         the given route list
759         """
760         method = self._configured_method
761         
762         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
763
764         current = self._configured_routes
765         current_devs = self._configured_devs
766         
767         new = set(dev_routes)
768         new_devs = set(map(operator.itemgetter(-1), dev_routes))
769         
770         deletions = current - new
771         insertions = new - current
772         
773         dev_deletions = current_devs - new_devs
774         dev_insertions = new_devs - current_devs
775         
776         # Generate rules
777         rules = []
778         
779         # Rule deletions first
780         for route in deletions:
781             route, dev = route[:-1], route[-1]
782             rules.append(self.format_route(route, dev, method, 'del'))
783         
784         if method == 'sliceip':
785             # Dev deletions now
786             rules.extend(map('disable '.__add__, dev_deletions))
787
788             # Dev insertions now
789             rules.extend(map('enable '.__add__, dev_insertions))
790
791         # Rule insertions now
792         for route in insertions:
793             route, dev = route[:-1], dev[-1]
794             rules.append(self.format_route(route, dev, method, 'add'))
795         
796         # Apply
797         self.apply_route_rules(rules, method)
798         
799         self._configured_routes = dev_routes
800         self._configured_devs = new_devs
801         
802     def apply_route_rules(self, rules, method):
803         (out,err),proc = server.popen_ssh_command(
804             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
805                 home = server.shell_escape(self.home_path),
806                 method = method),
807             host = self.hostname,
808             port = None,
809             user = self.slicename,
810             agent = None,
811             ident_key = self.ident_path,
812             server_key = self.server_key,
813             stdin = '\n'.join(rules),
814             timeout = 300
815             )
816         
817         if proc.wait() or err:
818             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
819         elif out or err:
820             logger.debug("%s said: %s%s", method, out, err)
821
822     def check_bad_host(self, out, err):
823         badre = re.compile(r'(?:'
824                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
825                            r'|Error: disk I/O error'
826                            r')', 
827                            re.I)
828         return badre.search(out) or badre.search(err)