Working (and easy to use) multicast forwarding
[nepi.git] / src / nepi / testbeds / planetlab / node.py
index 8557a95..5e1445e 100644 (file)
@@ -10,8 +10,35 @@ import os
 import collections
 import cStringIO
 import resourcealloc
+import socket
+import sys
+import logging
+import ipaddr
+import operator
 
 from nepi.util import server
+from nepi.util import parallel
+
+import application
+
+MAX_VROUTE_ROUTES = 5
+
+class UnresponsiveNodeError(RuntimeError):
+    pass
+
+def _castproperty(typ, propattr):
+    def _get(self):
+        return getattr(self, propattr)
+    def _set(self, value):
+        if value is not None or (isinstance(value, basestring) and not value):
+            value = typ(value)
+        return setattr(self, propattr, value)
+    def _del(self, value):
+        return delattr(self, propattr)
+    _get.__name__ = propattr + '_get'
+    _set.__name__ = propattr + '_set'
+    _del.__name__ = propattr + '_del'
+    return property(_get, _set, _del)
 
 class Node(object):
     BASEFILTERS = {
@@ -26,14 +53,32 @@ class Node(object):
         'architecture' : ('arch','value'),
         'operatingSystem' : ('fcdistro','value'),
         'pl_distro' : ('pldistro','value'),
+        'city' : ('city','value'),
+        'country' : ('country','value'),
+        'region' : ('region','value'),
         'minReliability' : ('reliability%(timeframe)s', ']value'),
         'maxReliability' : ('reliability%(timeframe)s', '[value'),
         'minBandwidth' : ('bw%(timeframe)s', ']value'),
         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
+        'minLoad' : ('load%(timeframe)s', ']value'),
+        'maxLoad' : ('load%(timeframe)s', '[value'),
+        'minCpu' : ('cpu%(timeframe)s', ']value'),
+        'maxCpu' : ('cpu%(timeframe)s', '[value'),
     }    
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
+    RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+    RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
+    
+    minReliability = _castproperty(float, '_minReliability')
+    maxReliability = _castproperty(float, '_maxReliability')
+    minBandwidth = _castproperty(float, '_minBandwidth')
+    maxBandwidth = _castproperty(float, '_maxBandwidth')
+    minCpu = _castproperty(float, '_minCpu')
+    maxCpu = _castproperty(float, '_maxCpu')
+    minLoad = _castproperty(float, '_minLoad')
+    maxLoad = _castproperty(float, '_maxLoad')
     
     def __init__(self, api=None):
         if not api:
@@ -46,11 +91,17 @@ class Node(object):
         self.operatingSystem = None
         self.pl_distro = None
         self.site = None
-        self.emulation = None
+        self.city = None
+        self.country = None
+        self.region = None
         self.minReliability = None
         self.maxReliability = None
         self.minBandwidth = None
         self.maxBandwidth = None
+        self.minCpu = None
+        self.maxCpu = None
+        self.minLoad = None
+        self.maxLoad = None
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -59,19 +110,28 @@ class Node(object):
         self.required_packages = set()
         self.required_vsys = set()
         self.pythonpath = []
+        self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
+        # Some special applications - initialized when connected
+        self.multicast_forwarder = None
+        
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
         self.server_key = None
         self.home_path = None
+        self.enable_cleanup = False
         
         # Those are filled when an actual node is allocated
         self._node_id = None
+        self._yum_dependencies = None
+        self._installed = False
+
+        # Logging
+        self._logger = logging.getLogger('nepi.testbeds.planetlab')
     
-    @property
-    def _nepi_testbed_environment_setup(self):
+    def _nepi_testbed_environment_setup_get(self):
         command = cStringIO.StringIO()
         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
@@ -84,6 +144,11 @@ class Node(object):
                 for envval in envvals:
                     command.write(' ; export %s=%s' % (envkey, envval))
         return command.getvalue()
+    def _nepi_testbed_environment_setup_set(self, value):
+        pass
+    _nepi_testbed_environment_setup = property(
+        _nepi_testbed_environment_setup_get,
+        _nepi_testbed_environment_setup_set)
     
     def build_filters(self, target_filters, filter_map):
         for attr, tag in filter_map.iteritems():
@@ -101,6 +166,8 @@ class Node(object):
         )
     
     def find_candidates(self, filter_slice_id=None):
+        self._logger.info("Finding candidates for %s", self.make_filter_description())
+        
         fields = ('node_id',)
         replacements = {'timeframe':self.timeframe}
         
@@ -113,6 +180,8 @@ class Node(object):
         # only pick healthy nodes
         basefilters['run_level'] = 'boot'
         basefilters['boot_state'] = 'boot'
+        basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
+        basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
         
         # keyword-only "pseudofilters"
         extra = {}
@@ -182,6 +251,24 @@ class Node(object):
                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
             
             candidates = set(filter(predicate, candidates))
+        
+        # make sure hostnames are resolvable
+        if candidates:
+            self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
+            
+            hostnames = dict(map(operator.itemgetter('node_id','hostname'),
+                self._api.GetNodes(list(candidates), ['node_id','hostname'])
+            ))
+            def resolvable(node_id):
+                try:
+                    addr = socket.gethostbyname(hostnames[node_id])
+                    return addr is not None
+                except:
+                    return False
+            candidates = set(parallel.pfilter(resolvable, candidates,
+                maxthreads = 16))
+
+            self._logger.info("  Found %s reachable candidates.", len(candidates))
             
         return candidates
     
@@ -225,11 +312,19 @@ class Node(object):
         self._node_id = node_id
         self.fetch_node_info()
     
+    def unassign_node(self):
+        self._node_id = None
+        self.__dict__.update(self.__orig_attrs)
+    
     def fetch_node_info(self):
+        orig_attrs = {}
+        
         info = self._api.GetNodes(self._node_id)[0]
         tags = dict( (t['tagname'],t['value'])
                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
 
+        orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
+        orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -238,14 +333,19 @@ class Node(object):
         for attr, tag in self.BASEFILTERS.iteritems():
             if tag in info:
                 value = info[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         for attr, (tag,_) in self.TAGFILTERS.iteritems():
             tag = tag % replacements
             if tag in tags:
                 value = tags[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         
         if 'peer_id' in info:
+            orig_attrs['site'] = self.site
             self.site = self._api.peer_map[info['peer_id']]
         
         if 'interface_ids' in info:
@@ -253,7 +353,10 @@ class Node(object):
             self.max_num_external_ifaces = len(info['interface_ids'])
         
         if 'ssh_rsa_key' in info:
+            orig_attrs['server_key'] = self.server_key
             self.server_key = info['ssh_rsa_key']
+        
+        self.__orig_attrs = orig_attrs
 
     def validate(self):
         if self.home_path is None:
@@ -263,102 +366,95 @@ class Node(object):
         if self.slicename is None:
             raise AssertionError, "Misconfigured node: unspecified slice"
 
+    def recover(self):
+        # Mark dependencies installed
+        self._installed = True
+        
+        # Clear load attributes, they impair re-discovery
+        self.minReliability = \
+        self.maxReliability = \
+        self.minBandwidth = \
+        self.maxBandwidth = \
+        self.minCpu = \
+        self.maxCpu = \
+        self.minLoad = \
+        self.maxLoad = None
+
     def install_dependencies(self):
-        if self.required_packages:
-            # TODO: make dependant on the experiment somehow...
-            pidfile = self.DEPENDS_PIDFILE
-            logfile = self.DEPENDS_LOGFILE
-            
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
-            (out,err),proc = rspawn.remote_spawn(
-                "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
-                    'packages' : ' '.join(self.required_packages),
-                },
-                pidfile = pidfile,
-                stdout = logfile,
-                stderr = rspawn.STDOUT,
-                
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key,
-                sudo = True
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-    
-    def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
-        if self.required_packages:
-            pidfile = self.DEPENDS_PIDFILE
-            
-            # get PID
-            pid = ppid = None
-            for probenum in xrange(pidmax):
-                pidtuple = rspawn.remote_check_pid(
-                    pidfile = pidfile,
-                    host = self.hostname,
-                    port = None,
-                    user = self.slicename,
-                    agent = None,
-                    ident_key = self.ident_path,
-                    server_key = self.server_key
-                    )
-                if pidtuple:
-                    pid, ppid = pidtuple
-                    break
+        if self.required_packages and not self._installed:
+            # If we need rpmfusion, we must install the repo definition and the gpg keys
+            if self.rpmFusion:
+                if self.operatingSystem == 'f12':
+                    # Fedora 12 requires a different rpmfusion package
+                    RPM_FUSION_URL = self.RPM_FUSION_URL_F12
                 else:
-                    time.sleep(pidprobe)
+                    # This one works for f13+
+                    RPM_FUSION_URL = self.RPM_FUSION_URL
+                    
+                rpmFusion = (
+                  '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
+                ) % {
+                    'RPM_FUSION_URL' : RPM_FUSION_URL
+                }
             else:
-                raise RuntimeError, "Failed to obtain pidfile for dependency installer"
-        
-            # wait for it to finish
-            while rspawn.RUNNING is rspawn.remote_status(
-                    pid, ppid,
+                rpmFusion = ''
+            
+            if rpmFusion:
+                (out,err),proc = server.popen_ssh_command(
+                    rpmFusion,
                     host = self.hostname,
                     port = None,
                     user = self.slicename,
                     agent = None,
                     ident_key = self.ident_path,
-                    server_key = self.server_key
-                    ):
-                time.sleep(probe)
-                probe = min(probemax, 1.5*probe)
-            
-            # check results
-            logfile = self.DEPENDS_LOGFILE
-            
-            (out,err),proc = server.popen_ssh_command(
-                "cat %s" % (server.shell_escape(logfile),),
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key
-                )
+                    server_key = self.server_key,
+                    timeout = 600,
+                    )
+                
+                if proc.wait():
+                    raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
             
-            if proc.wait():
-                raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
+            # Launch p2p yum dependency installer
+            self._yum_dependencies.async_setup()
+    
+    def wait_provisioning(self, timeout = 20*60):
+        # Wait for the p2p installer
+        sleeptime = 1.0
+        totaltime = 0.0
+        while not self.is_alive():
+            time.sleep(sleeptime)
+            totaltime += sleeptime
+            sleeptime = min(30.0, sleeptime*1.5)
             
-            success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
-            if not success:
-                raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
+            if totaltime > timeout:
+                # PlanetLab has a 15' delay on configuration propagation
+                # If we're above that delay, the unresponsiveness is not due
+                # to this delay.
+                raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+        
+        # Ensure the node is clean (no apps running that could interfere with operations)
+        if self.enable_cleanup:
+            self.do_cleanup()
+    
+    def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
+        # Wait for the p2p installer
+        if self._yum_dependencies and not self._installed:
+            self._yum_dependencies.async_setup_wait()
+            self._installed = True
         
     def is_alive(self):
         # Make sure all the paths are created where 
         # they have to be created for deployment
-        (out,err),proc = server.popen_ssh_command(
+        (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
             "echo 'ALIVE'",
             host = self.hostname,
             port = None,
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
-            server_key = self.server_key
+            server_key = self.server_key,
+            timeout = 60,
+            err_on_timeout = False
             )
         
         if proc.wait():
@@ -368,47 +464,226 @@ class Node(object):
         else:
             return False
     
+    def destroy(self):
+        if self.enable_cleanup:
+            self.do_cleanup()
+    
+    def do_cleanup(self):
+        if self.testbed().recovering:
+            # WOW - not now
+            return
+            
+        self._logger.info("Cleaning up %s", self.hostname)
+        
+        cmds = [
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+        ]
 
-    def configure_routes(self, routes, devs):
+        for cmd in cmds:
+            (out,err),proc = server.popen_ssh_command(
+                # Some apps need two kills
+                cmd % {
+                    'slicename' : self.slicename ,
+                },
+                host = self.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path,
+                server_key = self.server_key,
+                tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
+                )
+            proc.wait()
+    
+    def prepare_dependencies(self):
+        # Configure p2p yum dependency installer
+        if self.required_packages and not self._installed:
+            self._yum_dependencies = application.YumDependency(self._api)
+            self._yum_dependencies.node = self
+            self._yum_dependencies.home_path = "nepi-yumdep"
+            self._yum_dependencies.depends = ' '.join(self.required_packages)
+
+    def routing_method(self, routes, vsys_vnet):
         """
-        Add the specified routes to the node's routing table
+        There are two methods, vroute and sliceip.
+        
+        vroute:
+            Modifies the node's routing table directly, validating that the IP
+            range lies within the network given by the slice's vsys_vnet tag.
+            This method is the most scalable for very small routing tables
+            that need not modify other routes (including the default)
+        
+        sliceip:
+            Uses policy routing and iptables filters to create per-sliver
+            routing tables. It's the most flexible way, but it doesn't scale
+            as well since only 155 routing tables can be created this way.
+        
+        This method will return the most appropriate routing method, which will
+        prefer vroute for small routing tables.
         """
-        rules = []
         
+        # For now, sliceip results in kernel panics
+        # so we HAVE to use vroute
+        return 'vroute'
+        
+        # We should not make the routing table grow too big
+        if len(routes) > MAX_VROUTE_ROUTES:
+            return 'sliceip'
+        
+        vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+        for route in routes:
+            dest, prefix, nexthop, metric = route
+            dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+            nexthop = ipaddr.IPAddress(nexthop)
+            if dest not in vsys_vnet or nexthop not in vsys_vnet:
+                return 'sliceip'
+        
+        return 'vroute'
+    
+    def format_route(self, route, dev, method, action):
+        dest, prefix, nexthop, metric = route
+        if method == 'vroute':
+            return (
+                "%s %s%s gw %s %s" % (
+                    action,
+                    dest,
+                    (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+                    nexthop,
+                    dev,
+                )
+            )
+        elif method == 'sliceip':
+            return (
+                "route %s to %s%s via %s metric %s dev %s" % (
+                    action,
+                    dest,
+                    (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+                    nexthop,
+                    metric or 1,
+                    dev,
+                )
+            )
+        else:
+            raise AssertionError, "Unknown method"
+    
+    def _annotate_routes_with_devs(self, routes, devs, method):
+        dev_routes = []
         for route in routes:
             for dev in devs:
                 if dev.routes_here(route):
-                    # Schedule rule
-                    dest, prefix, nexthop = route
-                    rules.append(
-                        "add %s%s gw %s %s" % (
-                            dest,
-                            (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
-                            nexthop,
-                            dev.if_name,
-                        )
-                    )
+                    dev_routes.append(tuple(route) + (dev.if_name,))
                     
                     # Stop checking
                     break
             else:
-                raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
-                    "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+                if method == 'sliceip':
+                    dev_routes.append(tuple(route) + ('eth0',))
+                else:
+                    raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
+                        "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+        return dev_routes
+    
+    def configure_routes(self, routes, devs, vsys_vnet):
+        """
+        Add the specified routes to the node's routing table
+        """
+        rules = []
+        method = self.routing_method(routes, vsys_vnet)
+        tdevs = set()
+        
+        # annotate routes with devices
+        dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+        for route in dev_routes:
+            route, dev = route[:-1], route[-1]
+            
+            # Schedule rule
+            tdevs.add(dev)
+            rules.append(self.format_route(route, dev, method, 'add'))
+        
+        if method == 'sliceip':
+            rules = map('enable '.__add__, tdevs) + rules
+        
+        self._logger.info("Setting up routes for %s using %s", self.hostname, method)
+        self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
+        
+        self.apply_route_rules(rules, method)
+        
+        self._configured_routes = set(routes)
+        self._configured_devs = tdevs
+        self._configured_method = method
+    
+    def reconfigure_routes(self, routes, devs, vsys_vnet):
+        """
+        Updates the routes in the node's routing table to match
+        the given route list
+        """
+        method = self._configured_method
         
+        dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+
+        current = self._configured_routes
+        current_devs = self._configured_devs
+        
+        new = set(dev_routes)
+        new_devs = set(map(operator.itemgetter(-1), dev_routes))
+        
+        deletions = current - new
+        insertions = new - current
+        
+        dev_deletions = current_devs - new_devs
+        dev_insertions = new_devs - current_devs
+        
+        # Generate rules
+        rules = []
+        
+        # Rule deletions first
+        for route in deletions:
+            route, dev = route[:-1], route[-1]
+            rules.append(self.format_route(route, dev, method, 'del'))
+        
+        if method == 'sliceip':
+            # Dev deletions now
+            rules.extend(map('disable '.__add__, dev_deletions))
+
+            # Dev insertions now
+            rules.extend(map('enable '.__add__, dev_insertions))
+
+        # Rule insertions now
+        for route in insertions:
+            route, dev = route[:-1], dev[-1]
+            rules.append(self.format_route(route, dev, method, 'add'))
+        
+        # Apply
+        self.apply_route_rules(rules, method)
+        
+        self._configured_routes = dev_routes
+        self._configured_devs = new_devs
+        
+    def apply_route_rules(self, rules, method):
         (out,err),proc = server.popen_ssh_command(
-            "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
-                home = server.shell_escape(self.home_path)),
+            "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
+                home = server.shell_escape(self.home_path),
+                method = method),
             host = self.hostname,
             port = None,
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
             server_key = self.server_key,
-            stdin = '\n'.join(rules)
+            stdin = '\n'.join(rules),
+            timeout = 300
             )
         
         if proc.wait() or err:
             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
-        
-        
+        elif out or err:
+            logger.debug("%s said: %s%s", method, out, err)