Merge with HEAD, close aly's branch.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
index 8813ae4..27ed23d 100644 (file)
@@ -1,11 +1,15 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-from constants import TESTBED_ID
+from constants import TESTBED_ID, TESTBED_VERSION
 from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
 from nepi.util.constants import TIME_NOW
 from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
+from nepi.util import environ
+from nepi.util.parallel import ParallelRun
+import sys
 import os
 import os.path
 import time
@@ -19,23 +23,34 @@ import tempfile
 import subprocess
 import random
 import shutil
-
-from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+import logging
+import metadata
+import weakref
 
 class TempKeyError(Exception):
     pass
 
 class TestbedController(testbed_impl.TestbedController):
-    def __init__(self, testbed_version):
-        super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
+    def __init__(self):
+        super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
         self._home_directory = None
         self.slicename = None
         self._traces = dict()
 
-        import node, interfaces, application
+        import node, interfaces, application, multicast
         self._node = node
         self._interfaces = interfaces
         self._app = application
+        self._multicast = multicast
+        
+        self._blacklist = set()
+        self._just_provisioned = set()
+        
+        self._load_blacklist()
+        
+        self._logger = logging.getLogger('nepi.testbeds.planetlab')
+        
+        self.recovering = False
 
     @property
     def home_directory(self):
@@ -68,7 +83,48 @@ class TestbedController(testbed_impl.TestbedController):
                 # If it wasn't found, don't remember this failure, keep trying
                 return None
         return self._slice_id
-
+    
+    @property
+    def vsys_vnet(self):
+        if not hasattr(self, '_vsys_vnet'):
+            slicetags = self.plapi.GetSliceTags(
+                name = self.slicename,
+                tagname = 'vsys_vnet',
+                fields=('value',))
+            if slicetags:
+                self._vsys_vnet = slicetags[0]['value']
+            else:
+                # If it wasn't found, don't remember this failure, keep trying
+                return None
+        return self._vsys_vnet
+    
+    def _load_blacklist(self):
+        blpath = environ.homepath('plblacklist')
+        
+        try:
+            bl = open(blpath, "r")
+        except:
+            self._blacklist = set()
+            return
+            
+        try:
+            self._blacklist = set(
+                map(int,
+                    map(str.strip, bl.readlines())
+                )
+            )
+        finally:
+            bl.close()
+    
+    def _save_blacklist(self):
+        blpath = environ.homepath('plblacklist')
+        bl = open(blpath, "w")
+        try:
+            bl.writelines(
+                map('%s\n'.__mod__, self._blacklist))
+        finally:
+            bl.close()
+    
     def do_setup(self):
         self._home_directory = self._attributes.\
             get_attribute_value("homeDirectory")
@@ -85,6 +141,28 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("plcHost")
         self.plcUrl = self._attributes.\
             get_attribute_value("plcUrl")
+        self.logLevel = self._attributes.\
+            get_attribute_value("plLogLevel")
+        self.tapPortBase = self._attributes.\
+            get_attribute_value("tapPortBase")
+        self.p2pDeployment = self._attributes.\
+            get_attribute_value("p2pDeployment")
+        self.dedicatedSlice = self._attributes.\
+            get_attribute_value("dedicatedSlice")
+        
+        if not self.slicename:
+            raise RuntimeError, "Slice not set"
+        if not self.authUser:
+            raise RuntimeError, "PlanetLab account username not set"
+        if not self.authString:
+            raise RuntimeError, "PlanetLab account passphrase not set"
+        if not self.sliceSSHKey:
+            raise RuntimeError, "PlanetLab account key not specified"
+        if not os.path.exists(self.sliceSSHKey):
+            raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
+        
+        self._logger.setLevel(getattr(logging,self.logLevel))
+        
         super(TestbedController, self).do_setup()
 
     def do_post_asynclaunch(self, guid):
@@ -99,22 +177,39 @@ class TestbedController(testbed_impl.TestbedController):
     do_poststep_configure = staticmethod(do_post_asynclaunch)
 
     def do_preconfigure(self):
-        # Perform resource discovery if we don't have
-        # specific resources assigned yet
-        self.do_resource_discovery()
+        while True:
+            # Perform resource discovery if we don't have
+            # specific resources assigned yet
+            self.do_resource_discovery()
 
-        # Create PlanetLab slivers
-        self.do_provisioning()
+            # Create PlanetLab slivers
+            self.do_provisioning()
+            
+            try:
+                # Wait for provisioning
+                self.do_wait_nodes()
+                
+                # Okkey...
+                break
+            except self._node.UnresponsiveNodeError:
+                # Oh... retry...
+                pass
         
-        # Plan application deployment
-        self.do_spanning_deployment_plan()
+        if self.p2pDeployment:
+            # Plan application deployment
+            self.do_spanning_deployment_plan()
 
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
 
-    def do_resource_discovery(self):
+    def do_resource_discovery(self, recover = False):
         to_provision = self._to_provision = set()
         
+        reserved = set(self._blacklist)
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is not None:
+                reserved.add(node._node_id)
+        
         # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
@@ -124,17 +219,22 @@ class TestbedController(testbed_impl.TestbedController):
                 # If we have only one candidate, simply use it
                 candidates = node.find_candidates(
                     filter_slice_id = self.slice_id)
+                candidates -= reserved
                 if len(candidates) == 1:
-                    node.assign_node_id(iter(candidates).next())
-                else:
+                    node_id = iter(candidates).next()
+                    node.assign_node_id(node_id)
+                    reserved.add(node_id)
+                elif not candidates:
                     # Try again including unassigned nodes
                     candidates = node.find_candidates()
+                    candidates -= reserved
                     if len(candidates) > 1:
                         continue
                     if len(candidates) == 1:
                         node_id = iter(candidates).next()
                         node.assign_node_id(node_id)
                         to_provision.add(node_id)
+                        reserved.add(node_id)
                     elif not candidates:
                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
                             node.make_filter_description())
@@ -149,10 +249,14 @@ class TestbedController(testbed_impl.TestbedController):
                 # If we have only one candidate, simply use it
                 candidates = node.find_candidates(
                     filter_slice_id = self.slice_id)
+                candidates -= reserved
                 reqs.append(candidates)
                 nodes.append(node)
         
         if nodes and reqs:
+            if recover:
+                raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+            
             try:
                 solution = resourcealloc.alloc(reqs)
             except resourcealloc.ResourceAllocationError:
@@ -160,6 +264,7 @@ class TestbedController(testbed_impl.TestbedController):
                 reqs = []
                 for node in nodes:
                     candidates = node.find_candidates()
+                    candidates -= reserved
                     reqs.append(candidates)
                 
                 solution = resourcealloc.alloc(reqs)
@@ -177,8 +282,55 @@ class TestbedController(testbed_impl.TestbedController):
             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
 
         # cleanup
+        self._just_provisioned = self._to_provision
         del self._to_provision
     
+    def do_wait_nodes(self):
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node):
+                # Just inject configuration stuff
+                node.home_path = "nepi-node-%s" % (guid,)
+                node.ident_path = self.sliceSSHKey
+                node.slicename = self.slicename
+            
+                # Show the magic
+                self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
+            
+        try:
+            for guid, node in self._elements.iteritems():
+                if isinstance(node, self._node.Node):
+                    self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+                    
+                    node.wait_provisioning(
+                        (20*60 if node._node_id in self._just_provisioned else 60)
+                    )
+                    
+                    self._logger.info("READY Node %s at %s", guid, node.hostname)
+                    
+                    # Prepare dependency installer now
+                    node.prepare_dependencies()
+        except self._node.UnresponsiveNodeError:
+            # Uh... 
+            self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
+            
+            # Mark all dead nodes (which are unresponsive) on the blacklist
+            # and re-raise
+            for guid, node in self._elements.iteritems():
+                if isinstance(node, self._node.Node):
+                    if not node.is_alive():
+                        self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
+                        self._blacklist.add(node._node_id)
+                        node.unassign_node()
+            
+            try:
+                self._save_blacklist()
+            except:
+                # not important...
+                import traceback
+                traceback.print_exc()
+            
+            raise
+    
     def do_spanning_deployment_plan(self):
         # Create application groups by collecting all applications
         # based on their hash - the hash should contain everything that
@@ -200,6 +352,10 @@ class TestbedController(testbed_impl.TestbedController):
         for element in self._elements.itervalues():
             if isinstance(element, self._app.Dependency):
                 depgroups[dephash(element)].append(element)
+            elif isinstance(element, self._node.Node):
+                deps = element._yum_dependencies
+                if deps:
+                    depgroups[dephash(deps)].append(deps)
         
         # Set up spanning deployment for those applications that
         # have been deployed in several nodes.
@@ -328,7 +484,15 @@ class TestbedController(testbed_impl.TestbedController):
         # TODO: take on account schedule time for the task
         element = self._elements[guid]
         if element:
-            setattr(element, name, value)
+            try:
+                setattr(element, name, value)
+            except:
+                # We ignore these errors while recovering.
+                # Some attributes are immutable, and setting
+                # them is necessary (to recover the state), but
+                # some are not (they throw an exception).
+                if not self.recovering:
+                    raise
 
             if hasattr(element, 'refresh'):
                 # invoke attribute refresh hook
@@ -339,12 +503,10 @@ class TestbedController(testbed_impl.TestbedController):
         # TODO: take on account schedule time for the task
         factory_id = self._create[guid]
         factory = self._factories[factory_id]
-        if factory.box_attributes.is_attribute_design_only(name):
-            return value
         element = self._elements.get(guid)
         try:
             return getattr(element, name)
-        except KeyError, AttributeError:
+        except (KeyError, AttributeError):
             return value
 
     def get_address(self, guid, index, attribute='Address'):
@@ -369,22 +531,28 @@ class TestbedController(testbed_impl.TestbedController):
     def shutdown(self):
         for trace in self._traces.itervalues():
             trace.close()
-        for element in self._elements.itervalues():
-            # invoke cleanup hooks
-            if hasattr(element, 'cleanup'):
-                element.cleanup()
-        for element in self._elements.itervalues():
-            # invoke destroy hooks
-            if hasattr(element, 'destroy'):
-                element.destroy()
+        
+        def invokeif(action, testbed, guid):
+            element = self._elements[guid]
+            if hasattr(element, action):
+                getattr(element, action)()
+        
+        self._do_in_factory_order(
+            functools.partial(invokeif, 'cleanup'),
+            metadata.shutdown_order)
+
+        self._do_in_factory_order(
+            functools.partial(invokeif, 'destroy'),
+            metadata.shutdown_order)
+            
         self._elements.clear()
         self._traces.clear()
 
     def trace(self, guid, trace_id, attribute='value'):
-        app = self._elements[guid]
+        elem = self._elements[guid]
 
         if attribute == 'value':
-            path = app.sync_trace(self.home_directory, trace_id)
+            path = elem.sync_trace(self.home_directory, trace_id)
             if path:
                 fd = open(path, "r")
                 content = fd.read()
@@ -392,36 +560,127 @@ class TestbedController(testbed_impl.TestbedController):
             else:
                 content = None
         elif attribute == 'path':
-            content = app.remote_trace_path(trace_id)
-        elif attribute == 'size':
-            # TODO
-            raise NotImplementedError
+            content = elem.remote_trace_path(trace_id)
+        elif attribute == 'name':
+            content = elem.remote_trace_name(trace_id)
         else:
             content = None
         return content
 
     def follow_trace(self, trace_id, trace):
         self._traces[trace_id] = trace
+
+    def recover(self):
+        try:
+            # An internal flag, so we know to behave differently in
+            # a few corner cases.
+            self.recovering = True
+            
+            # Create and connect do not perform any real tasks against
+            # the nodes, it only sets up the object hierarchy,
+            # so we can run them normally
+            self.do_create()
+            self.do_connect_init()
+            self.do_connect_compl()
+            
+            # Manually recover nodes, to mark dependencies installed
+            # and clean up mutable attributes
+            self._do_in_factory_order(
+                lambda self, guid : self._elements[guid].recover(), 
+                [
+                    metadata.NODE,
+                ])
+            
+            # Assign nodes - since we're working off exeucte XML, nodes
+            # have specific hostnames assigned and we don't need to do
+            # real assignment, only find out node ids and check liveliness
+            self.do_resource_discovery(recover = True)
+            self.do_wait_nodes()
+            
+            # Pre/post configure, however, tends to set up tunnels
+            # Execute configuration steps only for those object
+            # kinds that do not have side effects
+            
+            # Do the ones without side effects,
+            # including nodes that need to set up home 
+            # folders and all that
+            self._do_in_factory_order(
+                "preconfigure_function", 
+                [
+                    metadata.INTERNET,
+                    Parallel(metadata.NODE),
+                    metadata.NODEIFACE,
+                ])
+            
+            # Tunnels require a home path that is configured
+            # at this step. Since we cannot run the step itself,
+            # we need to inject this homepath ourselves
+            for guid, element in self._elements.iteritems():
+                if isinstance(element, self._interfaces.TunIface):
+                    element._home_path = "tun-%s" % (guid,)
+            
+            # Manually recover tunnels, applications and
+            # netpipes, negating the side effects
+            self._do_in_factory_order(
+                lambda self, guid : self._elements[guid].recover(), 
+                [
+                    Parallel(metadata.TAPIFACE),
+                    Parallel(metadata.TUNIFACE),
+                    metadata.NETPIPE,
+                    Parallel(metadata.NEPIDEPENDENCY),
+                    Parallel(metadata.NS3DEPENDENCY),
+                    Parallel(metadata.DEPENDENCY),
+                    Parallel(metadata.APPLICATION),
+                ])
+
+            # Tunnels are not harmed by configuration after
+            # recovery, and some attributes get set this way
+            # like external_iface
+            self._do_in_factory_order(
+                "preconfigure_function", 
+                [
+                    Parallel(metadata.TAPIFACE),
+                    Parallel(metadata.TUNIFACE),
+                ])
+
+            # Post-do the ones without side effects
+            self._do_in_factory_order(
+                "configure_function", 
+                [
+                    metadata.INTERNET,
+                    Parallel(metadata.NODE),
+                    metadata.NODEIFACE,
+                    Parallel(metadata.TAPIFACE),
+                    Parallel(metadata.TUNIFACE),
+                ])
+            
+            # There are no required prestart steps
+            # to call upon recovery, so we're done
+        finally:
+            self.recovering = True
     
     def _make_generic(self, parameters, kind):
         app = kind(self.plapi)
+        app.testbed = weakref.ref(self)
 
         # Note: there is 1-to-1 correspondence between attribute names
         #   If that changes, this has to change as well
         for attr,val in parameters.iteritems():
-            setattr(app, attr, val)
+            try:
+                setattr(app, attr, val)
+            except:
+                # We ignore these errors while recovering.
+                # Some attributes are immutable, and setting
+                # them is necessary (to recover the state), but
+                # some are not (they throw an exception).
+                if not self.recovering:
+                    raise
 
         return app
 
     def _make_node(self, parameters):
         node = self._make_generic(parameters, self._node.Node)
-
-        # If emulation is enabled, we automatically need
-        # some vsys interfaces and packages
-        if node.emulation:
-            node.required_vsys.add('ipfw-be')
-            node.required_packages.add('ipfwslice')
-
+        node.enable_cleanup = self.dedicatedSlice
         return node
 
     def _make_node_iface(self, parameters):
@@ -451,3 +710,22 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_ns3_dependency(self, parameters):
         return self._make_generic(parameters, self._app.NS3Dependency)
 
+    def _make_tun_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.TunFilter)
+
+    def _make_class_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
+
+    def _make_tos_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+
+    def _make_multicast_forwarder(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+    def _make_multicast_announcer(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+    def _make_multicast_router(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastRouter)
+
+