bc2625224e71b09205072de1f39cce725a2d8174
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79
80     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82     
83     minReliability = _castproperty(float, '_minReliability')
84     maxReliability = _castproperty(float, '_maxReliability')
85     minBandwidth = _castproperty(float, '_minBandwidth')
86     maxBandwidth = _castproperty(float, '_maxBandwidth')
87     minCpu = _castproperty(float, '_minCpu')
88     maxCpu = _castproperty(float, '_maxCpu')
89     minLoad = _castproperty(float, '_minLoad')
90     maxLoad = _castproperty(float, '_maxLoad')
91     
92     def __init__(self, api=None, sliceapi=None):
93         if not api:
94             api = plcapi.PLCAPI()
95         self._api = api
96         self._sliceapi = sliceapi or api
97         
98         # Attributes
99         self.hostname = None
100         self.architecture = None
101         self.operatingSystem = None
102         self.pl_distro = None
103         self.site = None
104         self.city = None
105         self.country = None
106         self.region = None
107         self.minReliability = None
108         self.maxReliability = None
109         self.minBandwidth = None
110         self.maxBandwidth = None
111         self.minCpu = None
112         self.maxCpu = None
113         self.minLoad = None
114         self.maxLoad = None
115         self.min_num_external_ifaces = None
116         self.max_num_external_ifaces = None
117         self._timeframe = 'w'
118         
119         # Applications and routes add requirements to connected nodes
120         self.required_packages = set()
121         self.required_vsys = set()
122         self.pythonpath = []
123         self.rpmFusion = False
124         self.env = collections.defaultdict(list)
125         
126         # Some special applications - initialized when connected
127         self.multicast_forwarder = None
128         
129         # Testbed-derived attributes
130         self.slicename = None
131         self.ident_path = None
132         self.server_key = None
133         self.home_path = None
134         self.enable_proc_cleanup = False
135         self.enable_home_cleanup = False
136         
137         # Those are filled when an actual node is allocated
138         self._node_id = None
139         self._yum_dependencies = None
140         self._installed = False
141
142         # Logging
143         self._logger = logging.getLogger('nepi.testbeds.planetlab')
144
145     def set_timeframe(self, timeframe):
146         if timeframe == "latest":
147             self._timeframe = ""
148         elif timeframe == "month":
149             self._timeframe = "m"
150         elif timeframe == "year":
151             self._timeframe = "y"
152         else:
153             self._timeframe = "w"
154
155     def get_timeframe(self):
156         if self._timeframe == "":
157             return "latest"
158         if self._timeframe == "m":
159             return "month"
160         if self._timeframe == "y":
161             return "year"
162         return "week"
163
164     timeframe = property(get_timeframe, set_timeframe)
165     
166     def _nepi_testbed_environment_setup_get(self):
167         command = cStringIO.StringIO()
168         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
169             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
170         ))
171         command.write(' ; export PATH=$PATH:%s' % (
172             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
173         ))
174         if self.env:
175             for envkey, envvals in self.env.iteritems():
176                 for envval in envvals:
177                     command.write(' ; export %s=%s' % (envkey, envval))
178         return command.getvalue()
179
180     def _nepi_testbed_environment_setup_set(self, value):
181         pass
182
183     _nepi_testbed_environment_setup = property(
184         _nepi_testbed_environment_setup_get,
185         _nepi_testbed_environment_setup_set)
186     
187     def build_filters(self, target_filters, filter_map):
188         for attr, tag in filter_map.iteritems():
189             value = getattr(self, attr, None)
190             if value is not None:
191                 target_filters[tag] = value
192         return target_filters
193     
194     @property
195     def applicable_filters(self):
196         has = lambda att : getattr(self,att,None) is not None
197         return (
198             filter(has, self.BASEFILTERS.iterkeys())
199             + filter(has, self.TAGFILTERS.iterkeys())
200         )
201     
202     def find_candidates(self, filter_slice_id=None):
203         self._logger.info("Finding candidates for %s", self.make_filter_description())
204         
205         fields = ('node_id',)
206         replacements = {'timeframe':self._timeframe}
207         
208         # get initial candidates (no tag filters)
209         basefilters = self.build_filters({}, self.BASEFILTERS)
210         rootfilters = basefilters.copy()
211         if filter_slice_id:
212             basefilters['|slice_ids'] = (filter_slice_id,)
213         
214         # only pick healthy nodes
215         basefilters['run_level'] = 'boot'
216         basefilters['boot_state'] = 'boot'
217         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
218         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
219         
220         # keyword-only "pseudofilters"
221         extra = {}
222         if self.site:
223             extra['peer'] = self.site
224             
225         candidates = set(map(operator.itemgetter('node_id'), 
226             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
227
228         # filter by tag, one tag at a time
229         applicable = self.applicable_filters
230         for tagfilter in self.TAGFILTERS.iteritems():
231             attr, (tagname, expr) = tagfilter
232             
233             # don't bother if there's no filter defined
234             if attr in applicable:
235                 tagfilter = rootfilters.copy()
236                 tagfilter['tagname'] = tagname % replacements
237                 tagfilter[expr % replacements] = str(getattr(self,attr))
238                 tagfilter['node_id'] = list(candidates)
239               
240                 candidates &= set(map(operator.itemgetter('node_id'),
241                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
242
243         # filter by vsys tags - special case since it doesn't follow
244         # the usual semantics
245         if self.required_vsys:
246             newcandidates = collections.defaultdict(set)
247             
248             vsys_tags = self._sliceapi.GetNodeTags(
249                 tagname='vsys', 
250                 node_id = list(candidates), 
251                 fields = ['node_id','value'])
252
253             vsys_tags = map(
254                 operator.itemgetter(['node_id','value']),
255                 vsys_tags)
256             
257             required_vsys = self.required_vsys
258             for node_id, value in vsys_tags:
259                 if value in required_vsys:
260                     newcandidates[value].add(node_id)
261             
262             # take only those that have all the required vsys tags
263             newcandidates = reduce(
264                 lambda accum, new : accum & new,
265                 newcandidates.itervalues(),
266                 candidates)
267         
268         # filter by iface count
269         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
270             # fetch interfaces for all, in one go
271             filters = basefilters.copy()
272             filters['node_id'] = list(candidates)
273             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
274                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
275             
276             # filter candidates by interface count
277             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
278                 predicate = ( lambda node_id : 
279                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
280             elif self.min_num_external_ifaces is not None:
281                 predicate = ( lambda node_id : 
282                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
283             else:
284                 predicate = ( lambda node_id : 
285                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
286             
287             candidates = set(filter(predicate, candidates))
288        
289         # make sure hostnames are resolvable
290         hostnames = dict() 
291         if candidates:
292             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
293            
294             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
295                 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
296             ))
297
298             def resolvable(node_id):
299                 try:
300                     addr = server.gethostbyname(hostnames[node_id])
301                     return addr is not None
302                 except:
303                     return False
304             candidates = set(parallel.pfilter(resolvable, candidates,
305                 maxthreads = 16))
306
307             self._logger.info("  Found %s reachable candidates.", len(candidates))
308
309             for h in hostnames.keys():
310                 if h not in candidates:
311                     del hostnames[h]
312
313             hostnames = dict((v,k) for k, v in hostnames.iteritems())
314
315         return hostnames
316     
317     def make_filter_description(self):
318         """
319         Makes a human-readable description of filtering conditions
320         for find_candidates.
321         """
322         
323         # get initial candidates (no tag filters)
324         filters = self.build_filters({}, self.BASEFILTERS)
325         
326         # keyword-only "pseudofilters"
327         if self.site:
328             filters['peer'] = self.site
329             
330         # filter by tag, one tag at a time
331         applicable = self.applicable_filters
332         for tagfilter in self.TAGFILTERS.iteritems():
333             attr, (tagname, expr) = tagfilter
334             
335             # don't bother if there's no filter defined
336             if attr in applicable:
337                 filters[attr] = getattr(self,attr)
338         
339         # filter by vsys tags - special case since it doesn't follow
340         # the usual semantics
341         if self.required_vsys:
342             filters['vsys'] = ','.join(list(self.required_vsys))
343         
344         # filter by iface count
345         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
346             filters['num_ifaces'] = '-'.join([
347                 str(self.min_num_external_ifaces or '0'),
348                 str(self.max_num_external_ifaces or 'inf')
349             ])
350             
351         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
352
353     def assign_node_id(self, node_id):
354         self._node_id = node_id
355         self.fetch_node_info()
356     
357     def unassign_node(self):
358         self._node_id = None
359         self.hostip = None
360         
361         try:
362             orig_attrs = self.__orig_attrs
363         except AttributeError:
364             return
365             
366         for key, value in orig_attrs.iteritems():
367             setattr(self, key, value)
368         del self.__orig_attrs
369     
370     def rate_nodes(self, nodes):
371         rates = collections.defaultdict(int)
372         tags = collections.defaultdict(dict)
373         replacements = {'timeframe':self._timeframe}
374         tagnames = [ tagname % replacements 
375                      for tagname, weight, default in self.RATE_FACTORS ]
376        
377         taginfo = self._sliceapi.GetNodeTags(
378             node_id=list(nodes), 
379             tagname=tagnames,
380             fields=('node_id','tagname','value'))
381
382         unpack = operator.itemgetter('node_id','tagname','value')
383         for value in taginfo:
384             node, tagname, value = unpack(value)
385             if value and value.lower() != 'n/a':
386                 tags[tagname][node] = float(value)
387         
388         for tagname, weight, default in self.RATE_FACTORS:
389             taginfo = tags[tagname % replacements].get
390             for node in nodes:
391                 rates[node] += weight * taginfo(node,default)
392         
393         return map(rates.__getitem__, nodes)
394             
395     def fetch_node_info(self):
396         orig_attrs = {}
397         
398         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
399         info = info[0]
400         
401         tags = dict( (t['tagname'],t['value'])
402                      for t in tags )
403
404         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
405         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
406         self.min_num_external_ifaces = None
407         self.max_num_external_ifaces = None
408         if not self._timeframe: self._timeframe = 'w'
409         
410         replacements = {'timeframe':self._timeframe}
411
412         for attr, tag in self.BASEFILTERS.iteritems():
413             if tag in info:
414                 value = info[tag]
415                 if hasattr(self, attr):
416                     orig_attrs[attr] = getattr(self, attr)
417                 setattr(self, attr, value)
418         for attr, (tag,_) in self.TAGFILTERS.iteritems():
419             tag = tag % replacements
420             if tag in tags:
421                 value = tags[tag]
422                 if hasattr(self, attr):
423                     orig_attrs[attr] = getattr(self, attr)
424                 if not value or value.lower() == 'n/a':
425                     value = None
426                 setattr(self, attr, value)
427         
428         if 'peer_id' in info:
429             orig_attrs['site'] = self.site
430             self.site = self._sliceapi.peer_map[info['peer_id']]
431         
432         if 'interface_ids' in info:
433             self.min_num_external_ifaces = \
434             self.max_num_external_ifaces = len(info['interface_ids'])
435         
436         if 'ssh_rsa_key' in info:
437             orig_attrs['server_key'] = self.server_key
438             self.server_key = info['ssh_rsa_key']
439         
440         self.hostip = server.gethostbyname(self.hostname)
441         
442         try:
443             self.__orig_attrs
444         except AttributeError:
445             self.__orig_attrs = orig_attrs
446
447     def validate(self):
448         if self.home_path is None:
449             raise AssertionError, "Misconfigured node: missing home path"
450         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
451             raise AssertionError, "Misconfigured node: missing slice SSH key"
452         if self.slicename is None:
453             raise AssertionError, "Misconfigured node: unspecified slice"
454
455     def recover(self):
456         # Mark dependencies installed
457         self._installed = True
458         
459         # Clear load attributes, they impair re-discovery
460         self.minReliability = \
461         self.maxReliability = \
462         self.minBandwidth = \
463         self.maxBandwidth = \
464         self.minCpu = \
465         self.maxCpu = \
466         self.minLoad = \
467         self.maxLoad = None
468
469     def install_dependencies(self):
470         if self.required_packages and not self._installed:
471             # If we need rpmfusion, we must install the repo definition and the gpg keys
472             if self.rpmFusion:
473                 if self.operatingSystem == 'f12':
474                     # Fedora 12 requires a different rpmfusion package
475                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
476                 else:
477                     # This one works for f13+
478                     RPM_FUSION_URL = self.RPM_FUSION_URL
479                     
480                 rpmFusion = (
481                   'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
482                 ) % {
483                     'RPM_FUSION_URL' : RPM_FUSION_URL
484                 }
485             else:
486                 rpmFusion = ''
487            
488             if rpmFusion:
489                 (out,err),proc = server.popen_ssh_command(
490                     rpmFusion,
491                     host = self.hostip,
492                     port = None,
493                     user = self.slicename,
494                     agent = None,
495                     ident_key = self.ident_path,
496                     server_key = self.server_key,
497                     timeout = 600,
498                     )
499                 
500                 if proc.wait():
501                     if self.check_bad_host(out,err):
502                         self.blacklist()
503                     raise RuntimeError, "Failed to set up application on host %s: %s %s" % (self.hostname, out,err,)
504             
505             # Launch p2p yum dependency installer
506             self._yum_dependencies.async_setup()
507     
508     def wait_provisioning(self, timeout = 20*60):
509         # Wait for the p2p installer
510         sleeptime = 1.0
511         totaltime = 0.0
512         while not self.is_alive():
513             time.sleep(sleeptime)
514             totaltime += sleeptime
515             sleeptime = min(30.0, sleeptime*1.5)
516             
517             if totaltime > timeout:
518                 # PlanetLab has a 15' delay on configuration propagation
519                 # If we're above that delay, the unresponsiveness is not due
520                 # to this delay.
521                 if not self.is_alive(verbose=True):
522                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
523         
524         # Ensure the node is clean (no apps running that could interfere with operations)
525         if self.enable_proc_cleanup:
526             self.do_proc_cleanup()
527         if self.enable_home_cleanup:
528             self.do_home_cleanup()
529    
530     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
531         # Wait for the p2p installer
532         if self._yum_dependencies and not self._installed:
533             self._yum_dependencies.async_setup_wait()
534             self._installed = True
535         
536     def is_alive(self, verbose = False):
537         # Make sure all the paths are created where 
538         # they have to be created for deployment
539         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
540             "echo 'ALIVE'",
541             host = self.hostip,
542             port = None,
543             user = self.slicename,
544             agent = None,
545             ident_key = self.ident_path,
546             server_key = self.server_key,
547             timeout = 60,
548             err_on_timeout = False,
549             persistent = False
550             )
551         
552         if proc.wait():
553             if verbose:
554                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
555             return False
556         elif not err and out.strip() == 'ALIVE':
557             return True
558         else:
559             if verbose:
560                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
561             return False
562     
563     def destroy(self):
564         if self.enable_proc_cleanup:
565             self.do_proc_cleanup()
566     
567     def blacklist(self):
568         if self._node_id:
569             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
570             import util
571             util.appendBlacklist(self.hostname)
572     
573     def do_proc_cleanup(self):
574         if self.testbed().recovering:
575             # WOW - not now
576             return
577             
578         self._logger.info("Cleaning up processes on %s", self.hostname)
579         
580         cmds = [
581             "sudo -S killall python tcpdump || /bin/true ; "
582             "sudo -S killall python tcpdump || /bin/true ; "
583             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
584             "sudo -S killall -u %(slicename)s || /bin/true ",
585             "sudo -S killall -u root || /bin/true ",
586             "sudo -S killall -u %(slicename)s || /bin/true ",
587             "sudo -S killall -u root || /bin/true ",
588         ]
589
590         for cmd in cmds:
591             (out,err),proc = server.popen_ssh_command(
592                 # Some apps need two kills
593                 cmd % {
594                     'slicename' : self.slicename ,
595                 },
596                 host = self.hostip,
597                 port = None,
598                 user = self.slicename,
599                 agent = None,
600                 ident_key = self.ident_path,
601                 server_key = self.server_key,
602                 tty = True, # so that ps -N -T works as advertised...
603                 timeout = 60,
604                 retry = 3
605                 )
606             proc.wait()
607      
608     def do_home_cleanup(self):
609         if self.testbed().recovering:
610             # WOW - not now
611             return
612             
613         self._logger.info("Cleaning up home on %s", self.hostname)
614         
615         cmds = [
616             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
617         ]
618
619         for cmd in cmds:
620             (out,err),proc = server.popen_ssh_command(
621                 # Some apps need two kills
622                 cmd % {
623                     'slicename' : self.slicename ,
624                 },
625                 host = self.hostip,
626                 port = None,
627                 user = self.slicename,
628                 agent = None,
629                 ident_key = self.ident_path,
630                 server_key = self.server_key,
631                 tty = True, # so that ps -N -T works as advertised...
632                 timeout = 60,
633                 retry = 3
634                 )
635             proc.wait()
636    
637     def prepare_dependencies(self):
638         # Configure p2p yum dependency installer
639         if self.required_packages and not self._installed:
640             self._yum_dependencies = application.YumDependency(self._api)
641             self._yum_dependencies.node = self
642             self._yum_dependencies.home_path = "nepi-yumdep"
643             self._yum_dependencies.depends = ' '.join(self.required_packages)
644
645     def routing_method(self, routes, vsys_vnet):
646         """
647         There are two methods, vroute and sliceip.
648         
649         vroute:
650             Modifies the node's routing table directly, validating that the IP
651             range lies within the network given by the slice's vsys_vnet tag.
652             This method is the most scalable for very small routing tables
653             that need not modify other routes (including the default)
654         
655         sliceip:
656             Uses policy routing and iptables filters to create per-sliver
657             routing tables. It's the most flexible way, but it doesn't scale
658             as well since only 155 routing tables can be created this way.
659         
660         This method will return the most appropriate routing method, which will
661         prefer vroute for small routing tables.
662         """
663         
664         # For now, sliceip results in kernel panics
665         # so we HAVE to use vroute
666         return 'vroute'
667         
668         # We should not make the routing table grow too big
669         if len(routes) > MAX_VROUTE_ROUTES:
670             return 'sliceip'
671         
672         vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
673         for route in routes:
674             dest, prefix, nexthop, metric, device = route
675             dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
676             nexthop = ipaddr.IPAddress(nexthop)
677             if dest not in vsys_vnet or nexthop not in vsys_vnet:
678                 return 'sliceip'
679         
680         return 'vroute'
681     
682     def format_route(self, route, dev, method, action):
683         dest, prefix, nexthop, metric, device = route
684         if method == 'vroute':
685             return (
686                 "%s %s%s gw %s %s" % (
687                     action,
688                     dest,
689                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
690                     nexthop,
691                     dev,
692                 )
693             )
694         elif method == 'sliceip':
695             return (
696                 "route %s to %s%s via %s metric %s dev %s" % (
697                     action,
698                     dest,
699                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
700                     nexthop,
701                     metric or 1,
702                     dev,
703                 )
704             )
705         else:
706             raise AssertionError, "Unknown method"
707     
708     def _annotate_routes_with_devs(self, routes, devs, method):
709         dev_routes = []
710         for route in routes:
711             for dev in devs:
712                 if dev.routes_here(route):
713                     dev_routes.append(tuple(route) + (dev.if_name,))
714                     
715                     # Stop checking
716                     break
717             else:
718                 if method == 'sliceip':
719                     dev_routes.append(tuple(route) + ('eth0',))
720                 else:
721                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
722                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
723         return dev_routes
724     
725     def configure_routes(self, routes, devs, vsys_vnet):
726         """
727         Add the specified routes to the node's routing table
728         """
729         rules = []
730         method = self.routing_method(routes, vsys_vnet)
731         tdevs = set()
732         
733         # annotate routes with devices
734         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
735         for route in dev_routes:
736             route, dev = route[:-1], route[-1]
737             
738             # Schedule rule
739             tdevs.add(dev)
740             rules.append(self.format_route(route, dev, method, 'add'))
741         
742         if method == 'sliceip':
743             rules = map('enable '.__add__, tdevs) + rules
744         
745         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
746         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
747         
748         self.apply_route_rules(rules, method)
749         
750         self._configured_routes = set(routes)
751         self._configured_devs = tdevs
752         self._configured_method = method
753     
754     def reconfigure_routes(self, routes, devs, vsys_vnet):
755         """
756         Updates the routes in the node's routing table to match
757         the given route list
758         """
759         method = self._configured_method
760         
761         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
762
763         current = self._configured_routes
764         current_devs = self._configured_devs
765         
766         new = set(dev_routes)
767         new_devs = set(map(operator.itemgetter(-1), dev_routes))
768         
769         deletions = current - new
770         insertions = new - current
771         
772         dev_deletions = current_devs - new_devs
773         dev_insertions = new_devs - current_devs
774         
775         # Generate rules
776         rules = []
777         
778         # Rule deletions first
779         for route in deletions:
780             route, dev = route[:-1], route[-1]
781             rules.append(self.format_route(route, dev, method, 'del'))
782         
783         if method == 'sliceip':
784             # Dev deletions now
785             rules.extend(map('disable '.__add__, dev_deletions))
786
787             # Dev insertions now
788             rules.extend(map('enable '.__add__, dev_insertions))
789
790         # Rule insertions now
791         for route in insertions:
792             route, dev = route[:-1], dev[-1]
793             rules.append(self.format_route(route, dev, method, 'add'))
794         
795         # Apply
796         self.apply_route_rules(rules, method)
797         
798         self._configured_routes = dev_routes
799         self._configured_devs = new_devs
800         
801     def apply_route_rules(self, rules, method):
802         (out,err),proc = server.popen_ssh_command(
803             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
804                 home = server.shell_escape(self.home_path),
805                 method = method),
806             host = self.hostip,
807             port = None,
808             user = self.slicename,
809             agent = None,
810             ident_key = self.ident_path,
811             server_key = self.server_key,
812             stdin = '\n'.join(rules),
813             timeout = 300
814             )
815         
816         if proc.wait() or err:
817             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
818         elif out or err:
819             logger.debug("%s said: %s%s", method, out, err)
820
821     def check_bad_host(self, out, err):
822         badre = re.compile(r'(?:'
823                            r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
824                            r'|Error: disk I/O error'
825                            r')', 
826                            re.I)
827         return badre.search(out) or badre.search(err)