Working (and easy to use) multicast forwarding
[nepi.git] / src / nepi / testbeds / planetlab / node.py
index 1b9b15e..5e1445e 100644 (file)
@@ -13,12 +13,16 @@ import resourcealloc
 import socket
 import sys
 import logging
+import ipaddr
+import operator
 
 from nepi.util import server
 from nepi.util import parallel
 
 import application
 
+MAX_VROUTE_ROUTES = 5
+
 class UnresponsiveNodeError(RuntimeError):
     pass
 
@@ -49,10 +53,17 @@ class Node(object):
         'architecture' : ('arch','value'),
         'operatingSystem' : ('fcdistro','value'),
         'pl_distro' : ('pldistro','value'),
+        'city' : ('city','value'),
+        'country' : ('country','value'),
+        'region' : ('region','value'),
         'minReliability' : ('reliability%(timeframe)s', ']value'),
         'maxReliability' : ('reliability%(timeframe)s', '[value'),
         'minBandwidth' : ('bw%(timeframe)s', ']value'),
         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
+        'minLoad' : ('load%(timeframe)s', ']value'),
+        'maxLoad' : ('load%(timeframe)s', '[value'),
+        'minCpu' : ('cpu%(timeframe)s', ']value'),
+        'maxCpu' : ('cpu%(timeframe)s', '[value'),
     }    
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
@@ -64,6 +75,10 @@ class Node(object):
     maxReliability = _castproperty(float, '_maxReliability')
     minBandwidth = _castproperty(float, '_minBandwidth')
     maxBandwidth = _castproperty(float, '_maxBandwidth')
+    minCpu = _castproperty(float, '_minCpu')
+    maxCpu = _castproperty(float, '_maxCpu')
+    minLoad = _castproperty(float, '_minLoad')
+    maxLoad = _castproperty(float, '_maxLoad')
     
     def __init__(self, api=None):
         if not api:
@@ -76,10 +91,17 @@ class Node(object):
         self.operatingSystem = None
         self.pl_distro = None
         self.site = None
+        self.city = None
+        self.country = None
+        self.region = None
         self.minReliability = None
         self.maxReliability = None
         self.minBandwidth = None
         self.maxBandwidth = None
+        self.minCpu = None
+        self.maxCpu = None
+        self.minLoad = None
+        self.maxLoad = None
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -91,11 +113,15 @@ class Node(object):
         self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
+        # Some special applications - initialized when connected
+        self.multicast_forwarder = None
+        
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
         self.server_key = None
         self.home_path = None
+        self.enable_cleanup = False
         
         # Those are filled when an actual node is allocated
         self._node_id = None
@@ -348,7 +374,11 @@ class Node(object):
         self.minReliability = \
         self.maxReliability = \
         self.minBandwidth = \
-        self.maxBandwidth = None
+        self.maxBandwidth = \
+        self.minCpu = \
+        self.maxCpu = \
+        self.minLoad = \
+        self.maxLoad = None
 
     def install_dependencies(self):
         if self.required_packages and not self._installed:
@@ -377,7 +407,8 @@ class Node(object):
                     user = self.slicename,
                     agent = None,
                     ident_key = self.ident_path,
-                    server_key = self.server_key
+                    server_key = self.server_key,
+                    timeout = 600,
                     )
                 
                 if proc.wait():
@@ -400,6 +431,10 @@ class Node(object):
                 # If we're above that delay, the unresponsiveness is not due
                 # to this delay.
                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+        
+        # Ensure the node is clean (no apps running that could interfere with operations)
+        if self.enable_cleanup:
+            self.do_cleanup()
     
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
         # Wait for the p2p installer
@@ -417,7 +452,9 @@ class Node(object):
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
-            server_key = self.server_key
+            server_key = self.server_key,
+            timeout = 60,
+            err_on_timeout = False
             )
         
         if proc.wait():
@@ -427,6 +464,45 @@ class Node(object):
         else:
             return False
     
+    def destroy(self):
+        if self.enable_cleanup:
+            self.do_cleanup()
+    
+    def do_cleanup(self):
+        if self.testbed().recovering:
+            # WOW - not now
+            return
+            
+        self._logger.info("Cleaning up %s", self.hostname)
+        
+        cmds = [
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+        ]
+
+        for cmd in cmds:
+            (out,err),proc = server.popen_ssh_command(
+                # Some apps need two kills
+                cmd % {
+                    'slicename' : self.slicename ,
+                },
+                host = self.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path,
+                server_key = self.server_key,
+                tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
+                )
+            proc.wait()
+    
     def prepare_dependencies(self):
         # Configure p2p yum dependency installer
         if self.required_packages and not self._installed:
@@ -435,48 +511,179 @@ class Node(object):
             self._yum_dependencies.home_path = "nepi-yumdep"
             self._yum_dependencies.depends = ' '.join(self.required_packages)
 
-    def configure_routes(self, routes, devs):
+    def routing_method(self, routes, vsys_vnet):
         """
-        Add the specified routes to the node's routing table
+        There are two methods, vroute and sliceip.
+        
+        vroute:
+            Modifies the node's routing table directly, validating that the IP
+            range lies within the network given by the slice's vsys_vnet tag.
+            This method is the most scalable for very small routing tables
+            that need not modify other routes (including the default)
+        
+        sliceip:
+            Uses policy routing and iptables filters to create per-sliver
+            routing tables. It's the most flexible way, but it doesn't scale
+            as well since only 155 routing tables can be created this way.
+        
+        This method will return the most appropriate routing method, which will
+        prefer vroute for small routing tables.
         """
-        rules = []
         
+        # For now, sliceip results in kernel panics
+        # so we HAVE to use vroute
+        return 'vroute'
+        
+        # We should not make the routing table grow too big
+        if len(routes) > MAX_VROUTE_ROUTES:
+            return 'sliceip'
+        
+        vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+        for route in routes:
+            dest, prefix, nexthop, metric = route
+            dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+            nexthop = ipaddr.IPAddress(nexthop)
+            if dest not in vsys_vnet or nexthop not in vsys_vnet:
+                return 'sliceip'
+        
+        return 'vroute'
+    
+    def format_route(self, route, dev, method, action):
+        dest, prefix, nexthop, metric = route
+        if method == 'vroute':
+            return (
+                "%s %s%s gw %s %s" % (
+                    action,
+                    dest,
+                    (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+                    nexthop,
+                    dev,
+                )
+            )
+        elif method == 'sliceip':
+            return (
+                "route %s to %s%s via %s metric %s dev %s" % (
+                    action,
+                    dest,
+                    (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+                    nexthop,
+                    metric or 1,
+                    dev,
+                )
+            )
+        else:
+            raise AssertionError, "Unknown method"
+    
+    def _annotate_routes_with_devs(self, routes, devs, method):
+        dev_routes = []
         for route in routes:
             for dev in devs:
                 if dev.routes_here(route):
-                    # Schedule rule
-                    dest, prefix, nexthop, metric = route
-                    rules.append(
-                        "add %s%s gw %s %s" % (
-                            dest,
-                            (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
-                            nexthop,
-                            dev.if_name,
-                        )
-                    )
+                    dev_routes.append(tuple(route) + (dev.if_name,))
                     
                     # Stop checking
                     break
             else:
-                raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
-                    "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+                if method == 'sliceip':
+                    dev_routes.append(tuple(route) + ('eth0',))
+                else:
+                    raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
+                        "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+        return dev_routes
+    
+    def configure_routes(self, routes, devs, vsys_vnet):
+        """
+        Add the specified routes to the node's routing table
+        """
+        rules = []
+        method = self.routing_method(routes, vsys_vnet)
+        tdevs = set()
+        
+        # annotate routes with devices
+        dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+        for route in dev_routes:
+            route, dev = route[:-1], route[-1]
+            
+            # Schedule rule
+            tdevs.add(dev)
+            rules.append(self.format_route(route, dev, method, 'add'))
+        
+        if method == 'sliceip':
+            rules = map('enable '.__add__, tdevs) + rules
         
-        self._logger.info("Setting up routes for %s", self.hostname)
+        self._logger.info("Setting up routes for %s using %s", self.hostname, method)
+        self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
+        
+        self.apply_route_rules(rules, method)
+        
+        self._configured_routes = set(routes)
+        self._configured_devs = tdevs
+        self._configured_method = method
+    
+    def reconfigure_routes(self, routes, devs, vsys_vnet):
+        """
+        Updates the routes in the node's routing table to match
+        the given route list
+        """
+        method = self._configured_method
+        
+        dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+
+        current = self._configured_routes
+        current_devs = self._configured_devs
         
+        new = set(dev_routes)
+        new_devs = set(map(operator.itemgetter(-1), dev_routes))
+        
+        deletions = current - new
+        insertions = new - current
+        
+        dev_deletions = current_devs - new_devs
+        dev_insertions = new_devs - current_devs
+        
+        # Generate rules
+        rules = []
+        
+        # Rule deletions first
+        for route in deletions:
+            route, dev = route[:-1], route[-1]
+            rules.append(self.format_route(route, dev, method, 'del'))
+        
+        if method == 'sliceip':
+            # Dev deletions now
+            rules.extend(map('disable '.__add__, dev_deletions))
+
+            # Dev insertions now
+            rules.extend(map('enable '.__add__, dev_insertions))
+
+        # Rule insertions now
+        for route in insertions:
+            route, dev = route[:-1], dev[-1]
+            rules.append(self.format_route(route, dev, method, 'add'))
+        
+        # Apply
+        self.apply_route_rules(rules, method)
+        
+        self._configured_routes = dev_routes
+        self._configured_devs = new_devs
+        
+    def apply_route_rules(self, rules, method):
         (out,err),proc = server.popen_ssh_command(
-            "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
-                home = server.shell_escape(self.home_path)),
+            "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
+                home = server.shell_escape(self.home_path),
+                method = method),
             host = self.hostname,
             port = None,
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
             server_key = self.server_key,
-            stdin = '\n'.join(rules)
+            stdin = '\n'.join(rules),
+            timeout = 300
             )
         
         if proc.wait() or err:
             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
-        
-        
+        elif out or err:
+            logger.debug("%s said: %s%s", method, out, err)