Clean up of repo
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import rspawn
7 import time
8 import os
9 import collections
10 import cStringIO
11 import resourcealloc
12 import socket
13 import sys
14 import logging
15 import ipaddr
16 import operator
17 import re
18
19 from nepi.util import server
20 from nepi.util import parallel
21
22 import application
23
24 MAX_VROUTE_ROUTES = 5
25
26 class UnresponsiveNodeError(RuntimeError):
27     pass
28
29 def _castproperty(typ, propattr):
30     def _get(self):
31         return getattr(self, propattr)
32     def _set(self, value):
33         if value is not None or (isinstance(value, basestring) and not value):
34             value = typ(value)
35         return setattr(self, propattr, value)
36     def _del(self, value):
37         return delattr(self, propattr)
38     _get.__name__ = propattr + '_get'
39     _set.__name__ = propattr + '_set'
40     _del.__name__ = propattr + '_del'
41     return property(_get, _set, _del)
42
43 class Node(object):
44     BASEFILTERS = {
45         # Map Node attribute to plcapi filter name
46         'hostname' : 'hostname',
47     }
48     
49     TAGFILTERS = {
50         # Map Node attribute to (<tag name>, <plcapi filter expression>)
51         #   There are replacements that are applied with string formatting,
52         #   so '%' has to be escaped as '%%'.
53         'architecture' : ('arch','value'),
54         'operatingSystem' : ('fcdistro','value'),
55         'pl_distro' : ('pldistro','value'),
56         'city' : ('city','value'),
57         'country' : ('country','value'),
58         'region' : ('region','value'),
59         'minReliability' : ('reliability%(timeframe)s', ']value'),
60         'maxReliability' : ('reliability%(timeframe)s', '[value'),
61         'minBandwidth' : ('bw%(timeframe)s', ']value'),
62         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63         'minLoad' : ('load%(timeframe)s', ']value'),
64         'maxLoad' : ('load%(timeframe)s', '[value'),
65         'minCpu' : ('cpu%(timeframe)s', ']value'),
66         'maxCpu' : ('cpu%(timeframe)s', '[value'),
67     }
68     
69     RATE_FACTORS = (
70         # (<tag name>, <weight>, <default>)
71         ('bw%(timeframe)s', -0.001, 1024.0),
72         ('cpu%(timeframe)s', 0.1, 40.0),
73         ('load%(timeframe)s', -0.2, 3.0),
74         ('reliability%(timeframe)s', 1, 100.0),
75     )
76     
77     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79
80     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82     
83     minReliability = _castproperty(float, '_minReliability')
84     maxReliability = _castproperty(float, '_maxReliability')
85     minBandwidth = _castproperty(float, '_minBandwidth')
86     maxBandwidth = _castproperty(float, '_maxBandwidth')
87     minCpu = _castproperty(float, '_minCpu')
88     maxCpu = _castproperty(float, '_maxCpu')
89     minLoad = _castproperty(float, '_minLoad')
90     maxLoad = _castproperty(float, '_maxLoad')
91     
92     def __init__(self, api=None, sliceapi=None):
93         if not api:
94             api = plcapi.PLCAPI()
95         self._api = api
96         self._sliceapi = sliceapi or api
97         
98         # Attributes
99         self.hostname = None
100         self.architecture = None
101         self.operatingSystem = None
102         self.pl_distro = None
103         self.site = None
104         self.city = None
105         self.country = None
106         self.region = None
107         self.minReliability = None
108         self.maxReliability = None
109         self.minBandwidth = None
110         self.maxBandwidth = None
111         self.minCpu = None
112         self.maxCpu = None
113         self.minLoad = None
114         self.maxLoad = None
115         self.min_num_external_ifaces = None
116         self.max_num_external_ifaces = None
117         self._timeframe = 'w'
118         
119         # Applications and routes add requirements to connected nodes
120         self.required_packages = set()
121         self.required_vsys = set()
122         self.pythonpath = []
123         self.rpmFusion = False
124         self.env = collections.defaultdict(list)
125         
126         # Some special applications - initialized when connected
127         self.multicast_forwarder = None
128         
129         # Testbed-derived attributes
130         self.slicename = None
131         self.ident_path = None
132         self.server_key = None
133         self.home_path = None
134         self.enable_proc_cleanup = False
135         self.enable_home_cleanup = False
136         
137         # Those are filled when an actual node is allocated
138         self._node_id = None
139         self._yum_dependencies = None
140         self._installed = False
141
142         # Logging
143         self._logger = logging.getLogger('nepi.testbeds.planetlab')
144
145     def set_timeframe(self, timeframe):
146         if timeframe == "latest":
147             self._timeframe = ""
148         elif timeframe == "month":
149             self._timeframe = "m"
150         elif timeframe == "year":
151             self._timeframe = "y"
152         else:
153             self._timeframe = "w"
154
155     def get_timeframe(self):
156         if self._timeframe == "":
157             return "latest"
158         if self._timeframe == "m":
159             return "month"
160         if self._timeframe == "y":
161             return "year"
162         return "week"
163
164     timeframe = property(get_timeframe, set_timeframe)
165     
166     def _nepi_testbed_environment_setup_get(self):
167         command = cStringIO.StringIO()
168         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
169             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
170         ))
171         command.write(' ; export PATH=$PATH:%s' % (
172             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
173         ))
174         if self.env:
175             for envkey, envvals in self.env.iteritems():
176                 for envval in envvals:
177                     command.write(' ; export %s=%s' % (envkey, envval))
178         return command.getvalue()
179
180     def _nepi_testbed_environment_setup_set(self, value):
181         pass
182
183     _nepi_testbed_environment_setup = property(
184         _nepi_testbed_environment_setup_get,
185         _nepi_testbed_environment_setup_set)
186     
187     def build_filters(self, target_filters, filter_map):
188         for attr, tag in filter_map.iteritems():
189             value = getattr(self, attr, None)
190             if value is not None:
191                 target_filters[tag] = value
192         return target_filters
193     
194     @property
195     def applicable_filters(self):
196         has = lambda att : getattr(self,att,None) is not None
197         return (
198             filter(has, self.BASEFILTERS.iterkeys())
199             + filter(has, self.TAGFILTERS.iterkeys())
200         )
201     
202     def find_candidates(self, filter_slice_id=None):
203         self._logger.info("Finding candidates for %s", self.make_filter_description())
204         
205         fields = ('node_id',)
206         replacements = {'timeframe':self._timeframe}
207         
208         # get initial candidates (no tag filters)
209         basefilters = self.build_filters({}, self.BASEFILTERS)
210         rootfilters = basefilters.copy()
211         if filter_slice_id:
212             basefilters['|slice_ids'] = (filter_slice_id,)
213         
214         # only pick healthy nodes
215         basefilters['run_level'] = 'boot'
216         basefilters['boot_state'] = 'boot'
217         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
218         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
219         
220         # keyword-only "pseudofilters"
221         extra = {}
222         if self.site:
223             extra['peer'] = self.site
224             
225         candidates = set(map(operator.itemgetter('node_id'), 
226             self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
227
228         # filter by tag, one tag at a time
229         applicable = self.applicable_filters
230         for tagfilter in self.TAGFILTERS.iteritems():
231             attr, (tagname, expr) = tagfilter
232             
233             # don't bother if there's no filter defined
234             if attr in applicable:
235                 tagfilter = rootfilters.copy()
236                 tagfilter['tagname'] = tagname % replacements
237                 tagfilter[expr % replacements] = str(getattr(self,attr))
238                 tagfilter['node_id'] = list(candidates)
239               
240                 candidates &= set(map(operator.itemgetter('node_id'),
241                     self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
242
243         # filter by vsys tags - special case since it doesn't follow
244         # the usual semantics
245         if self.required_vsys:
246             newcandidates = collections.defaultdict(set)
247             
248             vsys_tags = self._sliceapi.GetNodeTags(
249                 tagname='vsys', 
250                 node_id = list(candidates), 
251                 fields = ['node_id','value'])
252
253             vsys_tags = map(
254                 operator.itemgetter(['node_id','value']),
255                 vsys_tags)
256             
257             required_vsys = self.required_vsys
258             for node_id, value in vsys_tags:
259                 if value in required_vsys:
260                     newcandidates[value].add(node_id)
261             
262             # take only those that have all the required vsys tags
263             newcandidates = reduce(
264                 lambda accum, new : accum & new,
265                 newcandidates.itervalues(),
266                 candidates)
267         
268         # filter by iface count
269         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
270             # fetch interfaces for all, in one go
271             filters = basefilters.copy()
272             filters['node_id'] = list(candidates)
273             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
274                 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
275             
276             # filter candidates by interface count
277             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
278                 predicate = ( lambda node_id : 
279                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
280             elif self.min_num_external_ifaces is not None:
281                 predicate = ( lambda node_id : 
282                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
283             else:
284                 predicate = ( lambda node_id : 
285                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
286             
287             candidates = set(filter(predicate, candidates))
288        
289         # make sure hostnames are resolvable
290         hostnames = dict() 
291         if candidates:
292             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
293            
294             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
295                 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
296             ))
297
298             def resolvable(node_id):
299                 try:
300                     addr = server.gethostbyname(hostnames[node_id])
301                     return addr is not None
302                 except:
303                     return False
304             candidates = set(parallel.pfilter(resolvable, candidates,
305                 maxthreads = 16))
306
307             self._logger.info("  Found %s reachable candidates.", len(candidates))
308
309             for h in hostnames.keys():
310                 if h not in candidates:
311                     del hostnames[h]
312
313             hostnames = dict((v,k) for k, v in hostnames.iteritems())
314
315         return hostnames
316     
317     def make_filter_description(self):
318         """
319         Makes a human-readable description of filtering conditions
320         for find_candidates.
321         """
322         
323         # get initial candidates (no tag filters)
324         filters = self.build_filters({}, self.BASEFILTERS)
325         
326         # keyword-only "pseudofilters"
327         if self.site:
328             filters['peer'] = self.site
329             
330         # filter by tag, one tag at a time
331         applicable = self.applicable_filters
332         for tagfilter in self.TAGFILTERS.iteritems():
333             attr, (tagname, expr) = tagfilter
334             
335             # don't bother if there's no filter defined
336             if attr in applicable:
337                 filters[attr] = getattr(self,attr)
338         
339         # filter by vsys tags - special case since it doesn't follow
340         # the usual semantics
341         if self.required_vsys:
342             filters['vsys'] = ','.join(list(self.required_vsys))
343         
344         # filter by iface count
345         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
346             filters['num_ifaces'] = '-'.join([
347                 str(self.min_num_external_ifaces or '0'),
348                 str(self.max_num_external_ifaces or 'inf')
349             ])
350             
351         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
352
353     def assign_node_id(self, node_id):
354         self._node_id = node_id
355         self.fetch_node_info()
356     
357     def unassign_node(self):
358         self._node_id = None
359         self.hostip = None
360         
361         try:
362             orig_attrs = self.__orig_attrs
363         except AttributeError:
364             return
365             
366         for key, value in orig_attrs.iteritems():
367             setattr(self, key, value)
368         del self.__orig_attrs
369     
370     def rate_nodes(self, nodes):
371         rates = collections.defaultdict(int)
372         tags = collections.defaultdict(dict)
373         replacements = {'timeframe':self._timeframe}
374         tagnames = [ tagname % replacements 
375                      for tagname, weight, default in self.RATE_FACTORS ]
376        
377         taginfo = self._sliceapi.GetNodeTags(
378             node_id=list(nodes), 
379             tagname=tagnames,
380             fields=('node_id','tagname','value'))
381
382         unpack = operator.itemgetter('node_id','tagname','value')
383         for value in taginfo:
384             node, tagname, value = unpack(value)
385             if value and value.lower() != 'n/a':
386                 tags[tagname][node] = float(value)
387         
388         for tagname, weight, default in self.RATE_FACTORS:
389             taginfo = tags[tagname % replacements].get
390             for node in nodes:
391                 rates[node] += weight * taginfo(node,default)
392         
393         return map(rates.__getitem__, nodes)
394             
395     def fetch_node_info(self):
396         orig_attrs = {}
397         
398         info, tags = self._sliceapi.GetNodeInfo(self._node_id)
399         info = info[0]
400         
401         tags = dict( (t['tagname'],t['value'])
402                      for t in tags )
403
404         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
405         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
406         self.min_num_external_ifaces = None
407         self.max_num_external_ifaces = None
408         if not self._timeframe: self._timeframe = 'w'
409         
410         replacements = {'timeframe':self._timeframe}
411
412         for attr, tag in self.BASEFILTERS.iteritems():
413             if tag in info:
414                 value = info[tag]
415                 if hasattr(self, attr):
416                     orig_attrs[attr] = getattr(self, attr)
417                 setattr(self, attr, value)
418         for attr, (tag,_) in self.TAGFILTERS.iteritems():
419             tag = tag % replacements
420             if tag in tags:
421                 value = tags[tag]
422                 if hasattr(self, attr):
423                     orig_attrs[attr] = getattr(self, attr)
424                 if not value or value.lower() == 'n/a':
425                     value = None
426                 setattr(self, attr, value)
427         
428         if 'peer_id' in info:
429             orig_attrs['site'] = self.site
430             self.site = self._sliceapi.peer_map[info['peer_id']]
431         
432         if 'interface_ids' in info:
433             self.min_num_external_ifaces = \
434             self.max_num_external_ifaces = len(info['interface_ids'])
435         
436         if 'ssh_rsa_key' in info:
437             orig_attrs['server_key'] = self.server_key
438             self.server_key = info['ssh_rsa_key']
439         
440         self.hostip = server.gethostbyname(self.hostname)
441         
442         try:
443             self.__orig_attrs
444         except AttributeError:
445             self.__orig_attrs = orig_attrs
446
447     def validate(self):
448         if self.home_path is None:
449             raise AssertionError, "Misconfigured node: missing home path"
450         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
451             raise AssertionError, "Misconfigured node: missing slice SSH key"
452         if self.slicename is None:
453             raise AssertionError, "Misconfigured node: unspecified slice"
454
455     def recover(self):
456         # Mark dependencies installed
457         self._installed = True
458         
459         # Clear load attributes, they impair re-discovery
460         self.minReliability = \
461         self.maxReliability = \
462         self.minBandwidth = \
463         self.maxBandwidth = \
464         self.minCpu = \
465         self.maxCpu = \
466         self.minLoad = \
467         self.maxLoad = None
468
469     def install_dependencies(self):
470         if self.required_packages and not self._installed:
471             # If we need rpmfusion, we must install the repo definition and the gpg keys
472             if self.rpmFusion:
473                 if self.operatingSystem == 'f12':
474                     # Fedora 12 requires a different rpmfusion package
475                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
476                 else:
477                     # This one works for f13+
478                     RPM_FUSION_URL = self.RPM_FUSION_URL
479                     
480                 rpmFusion = (
481                   'rpm -q rpmfusion-free-release || sudo -S rpm -i %(RPM_FUSION_URL)s'
482                 ) % {
483                     'RPM_FUSION_URL' : RPM_FUSION_URL
484                 }
485             else:
486                 rpmFusion = ''
487            
488             if rpmFusion:
489                 (out,err),proc = server.popen_ssh_command(
490                     rpmFusion,
491                     host = self.hostip,
492                     port = None,
493                     user = self.slicename,
494                     agent = None,
495                     ident_key = self.ident_path,
496                     server_key = self.server_key,
497                     timeout = 600,
498                     )
499                 
500                 if proc.wait():
501                     if self.check_bad_host(out,err):
502                         self.blacklist()
503                     raise RuntimeError, "Failed to set up application on host %s: %s %s" % (self.hostname, out,err,)
504             
505             # Launch p2p yum dependency installer
506             self._yum_dependencies.async_setup()
507     
508     def wait_provisioning(self, timeout = 20*60):
509         # Wait for the p2p installer
510         sleeptime = 1.0
511         totaltime = 0.0
512         while not self.is_alive():
513             time.sleep(sleeptime)
514             totaltime += sleeptime
515             sleeptime = min(30.0, sleeptime*1.5)
516             
517             if totaltime > timeout:
518                 # PlanetLab has a 15' delay on configuration propagation
519                 # If we're above that delay, the unresponsiveness is not due
520                 # to this delay.
521                 if not self.is_alive(verbose=True):
522                     raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
523         
524         # Ensure the node is clean (no apps running that could interfere with operations)
525         if self.enable_proc_cleanup:
526             self.do_proc_cleanup()
527         if self.enable_home_cleanup:
528             self.do_home_cleanup()
529    
530     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
531         # Wait for the p2p installer
532         if self._yum_dependencies and not self._installed:
533             self._yum_dependencies.async_setup_wait()
534             self._installed = True
535         
536     def is_alive(self, verbose = False):
537         # Make sure all the paths are created where 
538         # they have to be created for deployment
539         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
540             "echo 'ALIVE'",
541             host = self.hostip,
542             port = None,
543             user = self.slicename,
544             agent = None,
545             ident_key = self.ident_path,
546             server_key = self.server_key,
547             timeout = 60,
548             err_on_timeout = False,
549             persistent = False
550             )
551         
552         if proc.wait():
553             if verbose:
554                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
555             return False
556         elif not err and out.strip() == 'ALIVE':
557             return True
558         else:
559             if verbose:
560                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
561             return False
562     
563     def destroy(self):
564         if self.enable_proc_cleanup:
565             self.do_proc_cleanup()
566     
567     def blacklist(self):
568         if self._node_id:
569             self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
570             import util
571             util.appendBlacklist(self.hostname)
572     
573     def do_proc_cleanup(self):
574         if self.testbed().recovering:
575             # WOW - not now
576             return
577             
578         self._logger.info("Cleaning up processes on %s", self.hostname)
579         
580         cmds = [
581             "sudo -S killall python tcpdump || /bin/true ; "
582             "sudo -S killall python tcpdump || /bin/true ; "
583             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
584             "sudo -S killall -u %(slicename)s || /bin/true ",
585             "sudo -S killall -u root || /bin/true ",
586             "sudo -S killall -u %(slicename)s || /bin/true ",
587             "sudo -S killall -u root || /bin/true ",
588         ]
589
590         for cmd in cmds:
591             (out,err),proc = server.popen_ssh_command(
592                 # Some apps need two kills
593                 cmd % {
594                     'slicename' : self.slicename ,
595                 },
596                 host = self.hostip,
597                 port = None,
598                 user = self.slicename,
599                 agent = None,
600                 ident_key = self.ident_path,
601                 server_key = self.server_key,
602                 tty = True, # so that ps -N -T works as advertised...
603                 timeout = 60,
604                 retry = 3
605                 )
606             proc.wait()
607      
608     def do_home_cleanup(self):
609         if self.testbed().recovering:
610             # WOW - not now
611             return
612             
613         self._logger.info("Cleaning up home on %s", self.hostname)
614         
615         cmds = [
616             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
617         ]
618
619         for cmd in cmds:
620             (out,err),proc = server.popen_ssh_command(
621                 # Some apps need two kills
622                 cmd,
623                 host = self.hostip,
624                 port = None,
625                 user = self.slicename,
626                 agent = None,
627                 ident_key = self.ident_path,
628                 server_key = self.server_key,
629                 tty = True, # so that ps -N -T works as advertised...
630                 timeout = 60,
631                 retry = 3
632                 )
633             proc.wait()
634    
635     def prepare_dependencies(self):
636         # Configure p2p yum dependency installer
637         if self.required_packages and not self._installed:
638             self._yum_dependencies = application.YumDependency(self._api)
639             self._yum_dependencies.node = self
640             self._yum_dependencies.home_path = "nepi-yumdep"
641             self._yum_dependencies.depends = ' '.join(self.required_packages)
642
643     def routing_method(self, routes, vsys_vnet):
644         """
645         There are two methods, vroute and sliceip.
646         
647         vroute:
648             Modifies the node's routing table directly, validating that the IP
649             range lies within the network given by the slice's vsys_vnet tag.
650             This method is the most scalable for very small routing tables
651             that need not modify other routes (including the default)
652         
653         sliceip:
654             Uses policy routing and iptables filters to create per-sliver
655             routing tables. It's the most flexible way, but it doesn't scale
656             as well since only 155 routing tables can be created this way.
657         
658         This method will return the most appropriate routing method, which will
659         prefer vroute for small routing tables.
660         """
661         
662         # For now, sliceip results in kernel panics
663         # so we HAVE to use vroute
664         return 'vroute'
665         
666         # We should not make the routing table grow too big
667         if len(routes) > MAX_VROUTE_ROUTES:
668             return 'sliceip'
669         
670         vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
671         for route in routes:
672             dest, prefix, nexthop, metric, device = route
673             dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
674             nexthop = ipaddr.IPAddress(nexthop)
675             if dest not in vsys_vnet or nexthop not in vsys_vnet:
676                 return 'sliceip'
677         
678         return 'vroute'
679     
680     def format_route(self, route, dev, method, action):
681         dest, prefix, nexthop, metric, device = route
682         if method == 'vroute':
683             return (
684                 "%s %s%s gw %s %s" % (
685                     action,
686                     dest,
687                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
688                     nexthop,
689                     dev,
690                 )
691             )
692         elif method == 'sliceip':
693             return (
694                 "route %s to %s%s via %s metric %s dev %s" % (
695                     action,
696                     dest,
697                     (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
698                     nexthop,
699                     metric or 1,
700                     dev,
701                 )
702             )
703         else:
704             raise AssertionError, "Unknown method"
705     
706     def _annotate_routes_with_devs(self, routes, devs, method):
707         dev_routes = []
708         for route in routes:
709             for dev in devs:
710                 if dev.routes_here(route):
711                     dev_routes.append(tuple(route) + (dev.if_name,))
712                     
713                     # Stop checking
714                     break
715             else:
716                 if method == 'sliceip':
717                     dev_routes.append(tuple(route) + ('eth0',))
718                 else:
719                     raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
720                         "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
721         return dev_routes
722     
723     def configure_routes(self, routes, devs, vsys_vnet):
724         """
725         Add the specified routes to the node's routing table
726         """
727         rules = []
728         method = self.routing_method(routes, vsys_vnet)
729         tdevs = set()
730         
731         # annotate routes with devices
732         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
733         for route in dev_routes:
734             route, dev = route[:-1], route[-1]
735             
736             # Schedule rule
737             tdevs.add(dev)
738             rules.append(self.format_route(route, dev, method, 'add'))
739         
740         if method == 'sliceip':
741             rules = map('enable '.__add__, tdevs) + rules
742         
743         self._logger.info("Setting up routes for %s using %s", self.hostname, method)
744         self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
745         
746         self.apply_route_rules(rules, method)
747         
748         self._configured_routes = set(routes)
749         self._configured_devs = tdevs
750         self._configured_method = method
751     
752     def reconfigure_routes(self, routes, devs, vsys_vnet):
753         """
754         Updates the routes in the node's routing table to match
755         the given route list
756         """
757         method = self._configured_method
758         
759         dev_routes = self._annotate_routes_with_devs(routes, devs, method)
760
761         current = self._configured_routes
762         current_devs = self._configured_devs
763         
764         new = set(dev_routes)
765         new_devs = set(map(operator.itemgetter(-1), dev_routes))
766         
767         deletions = current - new
768         insertions = new - current
769         
770         dev_deletions = current_devs - new_devs
771         dev_insertions = new_devs - current_devs
772         
773         # Generate rules
774         rules = []
775         
776         # Rule deletions first
777         for route in deletions:
778             route, dev = route[:-1], route[-1]
779             rules.append(self.format_route(route, dev, method, 'del'))
780         
781         if method == 'sliceip':
782             # Dev deletions now
783             rules.extend(map('disable '.__add__, dev_deletions))
784
785             # Dev insertions now
786             rules.extend(map('enable '.__add__, dev_insertions))
787
788         # Rule insertions now
789         for route in insertions:
790             route, dev = route[:-1], dev[-1]
791             rules.append(self.format_route(route, dev, method, 'add'))
792         
793         # Apply
794         self.apply_route_rules(rules, method)
795         
796         self._configured_routes = dev_routes
797         self._configured_devs = new_devs
798         
799     def apply_route_rules(self, rules, method):
800         (out,err),proc = server.popen_ssh_command(
801             "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
802                 home = server.shell_escape(self.home_path),
803                 method = method),
804             host = self.hostip,
805             port = None,
806             user = self.slicename,
807             agent = None,
808             ident_key = self.ident_path,
809             server_key = self.server_key,
810             stdin = '\n'.join(rules),
811             timeout = 300
812             )
813         
814         if proc.wait() or err:
815             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
816         elif out or err:
817             logger.debug("%s said: %s%s", method, out, err)
818
819     def check_bad_host(self, out, err):
820         badre = re.compile(r'(?:'
821                            #r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
822                            r'|Error: disk I/O error'
823                            r')', 
824                            re.I)
825         return badre.search(out) or badre.search(err)
826