Recovery policy for testbeds, and recovery implementation in PlanetLab.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 28 Jul 2011 09:57:38 +0000 (11:57 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 28 Jul 2011 09:57:38 +0000 (11:57 +0200)
Not fully tested yet, especially untested are cross connections.

14 files changed:
src/nepi/core/execute.py
src/nepi/core/metadata.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/netns/metadata.py
src/nepi/testbeds/ns3/metadata.py
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/constants.py
src/nepi/util/proxy.py
test/testbeds/planetlab/integration.py

index f59461e..44c9c24 100644 (file)
@@ -3,7 +3,7 @@
 
 from nepi.core.attributes import Attribute, AttributesMap
 from nepi.util import validation
-from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
+from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
 from nepi.util.parser._xml import XmlExperimentParser
 import sys
 import re
@@ -146,6 +146,9 @@ class TestbedController(object):
     def stop(self):
         raise NotImplementedError
 
+    def recover(self):
+        raise NotImplementedError
+
     def set(self, guid, name, value, time = TIME_NOW):
         raise NotImplementedError
 
@@ -301,14 +304,29 @@ class ExperimentController(object):
             raise eTyp, eVal, eLoc
 
     def start(self):
+        self._start()
+
+    def _start(self, recover = False):
         parser = XmlExperimentParser()
-        data = parser.from_xml_to_data(self._experiment_design_xml)
+        
+        if recover:
+            xml = self._experiment_execute_xml
+        else:
+            xml = self._experiment_design_xml
+        data = parser.from_xml_to_data(xml)
 
         # instantiate testbed controllers
-        self._init_testbed_controllers(data)
+        to_recover, to_restart = self._init_testbed_controllers(data, recover)
+        all_restart = set(to_restart)
         
-        # persist testbed connection data, for potential recovery
-        self._persist_testbed_proxies()
+        if not recover:
+            # persist testbed connection data, for potential recovery
+            self._persist_testbed_proxies()
+        else:
+            # recover recoverable controllers
+            for guid in to_recover:
+                self._testbeds[guid].do_setup()
+                self._testbeds[guid].recover()
         
         def steps_to_configure(self, allowed_guids):
             # perform setup in parallel for all test beds,
@@ -336,7 +354,7 @@ class ExperimentController(object):
                             if guid in allowed_guids])
             self._clear_caches()
 
-        steps_to_configure(self, self._testbeds)
+        steps_to_configure(self, to_restart)
 
         if self._netreffed_testbeds:
             # initally resolve netrefs
@@ -345,13 +363,22 @@ class ExperimentController(object):
             # rinse and repeat, for netreffed testbeds
             netreffed_testbeds = set(self._netreffed_testbeds)
 
-            self._init_testbed_controllers(data)
+            to_recover, to_restart = self._init_testbed_controllers(data, recover)
+            all_restart.update(to_restart)
             
-            # persist testbed connection data, for potential recovery
-            self._persist_testbed_proxies()
+            if not recover:
+                # persist testbed connection data, for potential recovery
+                self._persist_testbed_proxies()
+            else:
+                # recover recoverable controllers
+                for guid in to_recover:
+                    self._testbeds[guid].do_setup()
+                    self._testbeds[guid].recover()
 
             # configure dependant testbeds
-            steps_to_configure(self, netreffed_testbeds)
+            steps_to_configure(self, to_restart)
+        
+        all_restart = [ self._testbeds[guid] for guid in all_restart ]
             
         # final netref step, fail if anything's left unresolved
         self.do_netrefs(data, fail_if_undefined=True)
@@ -363,7 +390,7 @@ class ExperimentController(object):
         # perform do_configure in parallel for al testbeds
         # (it's internal configuration for each)
         self._parallel([testbed.do_configure
-                        for testbed in self._testbeds.itervalues()])
+                        for testbed in all_restart])
 
         self._clear_caches()
 
@@ -383,18 +410,19 @@ class ExperimentController(object):
 
         # Last chance to configure (parallel on all testbeds)
         self._parallel([testbed.do_prestart
-                        for testbed in self._testbeds.itervalues()])
+                        for testbed in all_restart])
 
         self._clear_caches()
         
-        # update execution xml with execution-specific values
-        # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
-        self._update_execute_xml()
-        self.persist_execute_xml()
+        if not recover:
+            # update execution xml with execution-specific values
+            # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
+            self._update_execute_xml()
+            self.persist_execute_xml()
 
         # start experiment (parallel start on all testbeds)
         self._parallel([testbed.start
-                        for testbed in self._testbeds.itervalues()])
+                        for testbed in all_restart])
 
         self._clear_caches()
 
@@ -403,7 +431,7 @@ class ExperimentController(object):
         self._guids_in_testbed_cache = dict()
 
     def _persist_testbed_proxies(self):
-        TRANSIENT = ('Recover',)
+        TRANSIENT = (DC.RECOVER,)
         
         # persist access configuration for all testbeds, so that
         # recovery mode can reconnect to them if it becomes necessary
@@ -429,7 +457,7 @@ class ExperimentController(object):
             Attribute.INTEGER : 'getint',
         }
         
-        TRANSIENT = ('Recover',)
+        TRANSIENT = (DC.RECOVER,)
         
         # deferred import because proxy needs
         # our class definitions to define proxies
@@ -520,18 +548,8 @@ class ExperimentController(object):
     def recover(self):
         # reload perviously persisted testbed access configurations
         self._load_testbed_proxies()
-        
-        # Parse experiment xml
-        parser = XmlExperimentParser()
-        data = parser.from_xml_to_data(self._experiment_design_xml)
-        
-        # recreate testbed proxies by reconnecting only
-        self._init_testbed_controllers(data, recover = True)
-        
-        # another time, for netrefs
-        self._init_testbed_controllers(data, recover = True)
-        
-        print >>sys.stderr, "RECOVERED"
+
+        self._start(recover = True)
 
     def is_finished(self, guid):
         testbed = self._testbed_for_guid(guid)
@@ -539,6 +557,12 @@ class ExperimentController(object):
             return testbed.status(guid) == AS.STATUS_FINISHED
         raise RuntimeError("No element exists with guid %d" % guid)    
 
+    def status(self, guid):
+        testbed = self._testbed_for_guid(guid)
+        if testbed != None:
+            return testbed.status(guid)
+        raise RuntimeError("No element exists with guid %d" % guid)    
+
     def set(self, guid, name, value, time = TIME_NOW):
         testbed = self._testbed_for_guid(guid)
         if testbed != None:
@@ -691,29 +715,67 @@ class ExperimentController(object):
         element_guids = list()
         label_guids = dict()
         data_guids = data.guids
+        to_recover = set()
+        to_restart = set()
+
+        # gather label associations
+        for guid in data_guids:
+            if not data.is_testbed_data(guid):
+                (testbed_guid, factory_id) = data.get_box_data(guid)
+                label = data.get_attribute_data(guid, "label")
+                if label is not None:
+                    if label in label_guids:
+                        raise RuntimeError, "Label %r is not unique" % (label,)
+                    label_guids[label] = guid
 
         # create testbed controllers
         for guid in data_guids:
             if data.is_testbed_data(guid):
                 if guid not in self._testbeds:
-                    self._create_testbed_controller(guid, data, element_guids,
-                            recover)
-            else:
+                    try:
+                        self._create_testbed_controller(
+                            guid, data, element_guids, recover)
+                        if recover:
+                            # Already programmed
+                            blacklist_testbeds.add(guid)
+                        else:
+                            to_restart.add(guid)
+                    except:
+                        if recover:
+                            policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
+                            if policy == DC.POLICY_FAIL:
+                                raise
+                            elif policy == DC.POLICY_RECOVER:
+                                self._create_testbed_controller(
+                                    guid, data, element_guids, False)
+                                to_recover.add(guid)
+                            elif policy == DC.POLICY_RESTART:
+                                self._create_testbed_controller(
+                                    guid, data, element_guids, False)
+                                to_restart.add(guid)
+                            else:
+                                raise
+                        else:
+                            raise
+        
+        # queue programmable elements
+        #  - that have not been programmed already (blacklist_testbeds)
+        #  - including recovered or restarted testbeds
+        #  - but those that have no unresolved netrefs
+        for guid in data_guids:
+            if not data.is_testbed_data(guid):
                 (testbed_guid, factory_id) = data.get_box_data(guid)
                 if testbed_guid not in blacklist_testbeds:
                     element_guids.append(guid)
-                    label = data.get_attribute_data(guid, "label")
-                    if label is not None:
-                        if label in label_guids:
-                            raise RuntimeError, "Label %r is not unique" % (label,)
-                        label_guids[label] = guid
 
         # replace references to elements labels for its guid
         self._resolve_labels(data, data_guids, label_guids)
     
         # program testbed controllers
-        if not recover:
+        if element_guids:
             self._program_testbed_controllers(element_guids, data)
+        
+        return to_recover, to_restart
 
     def _resolve_labels(self, data, data_guids, label_guids):
         netrefs = self._netrefs
index 523484e..1a99512 100644 (file)
@@ -356,6 +356,24 @@ class Metadata(object):
             "validation_function" : validation.is_enum,
             "category" : AC.CATEGORY_DEPLOYMENT,
             }),
+        DC.RECOVERY_POLICY : dict({
+            "name" : DC.RECOVERY_POLICY,
+            "help" : "Specifies what action to take in the event of a failure.", 
+            "type" : Attribute.ENUM,
+            "value" : DC.POLICY_FAIL,
+            "allowed" : [
+                    DC.POLICY_FAIL,
+                    DC.POLICY_RECOVER,
+                    DC.POLICY_RESTART,
+                ],
+            "flags" : Attribute.ExecReadOnly |\
+                    Attribute.ExecImmutable |\
+                    Attribute.Metadata,
+            "validation_function" : validation.is_enum,
+            "category" : AC.CATEGORY_DEPLOYMENT,
+            }),
+        })
+    PROXY_ATTRIBUTES = dict({
         DC.RECOVER : dict({
             "name" : DC.RECOVER,
             "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
@@ -368,6 +386,7 @@ class Metadata(object):
             "category" : AC.CATEGORY_DEPLOYMENT,
             }),
         })
+    PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
   
     # These attributes could appear in the boxes attribute list
     STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
@@ -462,6 +481,10 @@ class Metadata(object):
     @property
     def testbed_id(self):
         return self._testbed_id
+    
+    @property
+    def supported_recovery_policies(self):
+        return self._metadata.supported_recovery_policies
 
     def testbed_attributes(self):
         attributes = AttributesMap()
index 84cab89..88217b0 100644 (file)
@@ -238,10 +238,13 @@ class TestbedController(execute.TestbedController):
             
             # configure action
             factory = self._factories[factory_id]
-            if not getattr(factory, action):
+            if isinstance(action, basestring) and not getattr(factory, action):
                 continue
             def perform_action(guid):
-                getattr(factory, action)(self, guid)
+                if isinstance(action, basestring):
+                    getattr(factory, action)(self, guid)
+                else:
+                    action(self, guid)
                 if postaction:
                     postaction(self, guid)
 
index 7c46f7a..86359f4 100644 (file)
@@ -6,7 +6,7 @@ from nepi.core import metadata
 from nepi.core.attributes import Attribute
 from nepi.util import tags, validation
 from nepi.util.constants import ApplicationStatus as AS, \
-        FactoryCategories as FC
+        FactoryCategories as FC, DeploymentConfiguration as DC
 
 from nepi.util.tunchannel_impl import \
     preconfigure_tunchannel, postconfigure_tunchannel, \
@@ -550,6 +550,10 @@ testbed_attributes = dict({
             }),
     })
 
+supported_recovery_policies = [
+        DC.POLICY_FAIL,
+    ]
+
 class MetadataInfo(metadata.MetadataInfo):
     @property
     def connector_types(self):
@@ -590,4 +594,8 @@ class MetadataInfo(metadata.MetadataInfo):
     @property
     def testbed_version(self):
         return TESTBED_VERSION
+    
+    @property
+    def supported_recover_policies(self):
+        return supported_recovery_policies
 
index 14c4ab7..b1a4d7a 100644 (file)
@@ -3,6 +3,12 @@
 
 from constants import TESTBED_ID, TESTBED_VERSION
 from nepi.core import metadata
+from nepi.util.constants import DeploymentConfiguration as DC
+
+supported_recovery_policies = [
+        DC.POLICY_FAIL,
+        DC.POLICY_RESTART,
+    ]
 
 class MetadataInfo(metadata.MetadataInfo):
     @property
@@ -53,3 +59,8 @@ class MetadataInfo(metadata.MetadataInfo):
     def testbed_version(self):
         return TESTBED_VERSION
 
+    @property
+    def supported_recovery_policies(self):
+        return supported_recovery_policies
+
+
index 66f6a5d..7354436 100644 (file)
@@ -140,6 +140,11 @@ class Dependency(object):
                     % (e.args[0], e.args[1],)
         
         return local_path
+    
+    def recover(self):
+        # We assume a correct deployment, so recovery only
+        # means we mark this dependency as deployed
+        self._setup = True
 
     def setup(self):
         self._logger.info("Setting up %s", self)
@@ -660,6 +665,13 @@ class Application(Dependency):
             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
 
         self._started = True
+    
+    def recover(self):
+        # Assuming the application is running on PlanetLab,
+        # proper pidfiles should be present at the app's home path.
+        # So we mark this application as started, and check the pidfiles
+        self._started = True
+        self.checkpid()
 
     def checkpid(self):            
         # Get PID/PPID
index 7b98ed9..2cbdaff 100644 (file)
@@ -3,6 +3,7 @@
 
 from constants import TESTBED_ID, TESTBED_VERSION
 from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
 from nepi.util.constants import TIME_NOW
 from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
@@ -23,6 +24,7 @@ import subprocess
 import random
 import shutil
 import logging
+import metadata
 
 class TempKeyError(Exception):
     pass
@@ -169,7 +171,7 @@ class TestbedController(testbed_impl.TestbedController):
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
 
-    def do_resource_discovery(self):
+    def do_resource_discovery(self, recover = False):
         to_provision = self._to_provision = set()
         
         reserved = set(self._blacklist)
@@ -221,6 +223,9 @@ class TestbedController(testbed_impl.TestbedController):
                 nodes.append(node)
         
         if nodes and reqs:
+            if recover:
+                raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+            
             try:
                 solution = resourcealloc.alloc(reqs)
             except resourcealloc.ResourceAllocationError:
@@ -526,6 +531,88 @@ class TestbedController(testbed_impl.TestbedController):
 
     def follow_trace(self, trace_id, trace):
         self._traces[trace_id] = trace
+
+    def recover(self):
+        # Create and connect do not perform any real tasks against
+        # the nodes, it only sets up the object hierarchy,
+        # so we can run them normally
+        self.do_create()
+        self.do_connect_init()
+        self.do_connect_compl()
+        
+        # Assign nodes - since we're working off exeucte XML, nodes
+        # have specific hostnames assigned and we don't need to do
+        # real assignment, only find out node ids and check liveliness
+        self.do_resource_discovery(recover = True)
+        self.do_wait_nodes()
+        
+        # Pre/post configure, however, tends to set up tunnels
+        # Execute configuration steps only for those object
+        # kinds that do not have side effects
+        
+        # Manually recover nodes, to mark dependencies installed
+        self._do_in_factory_order(
+            lambda self, guid : self._elements[guid].recover(), 
+            [
+                metadata.NODE,
+            ])
+        
+        # Do the ones without side effects,
+        # including nodes that need to set up home 
+        # folders and all that
+        self._do_in_factory_order(
+            "preconfigure_function", 
+            [
+                metadata.INTERNET,
+                Parallel(metadata.NODE),
+                metadata.NODEIFACE,
+            ])
+        
+        # Tunnels require a home path that is configured
+        # at this step. Since we cannot run the step itself,
+        # we need to inject this homepath ourselves
+        for guid, element in self._elements.iteritems():
+            if isinstance(element, self._interfaces.TunIface):
+                element._home_path = "tun-%s" % (guid,)
+        
+        # Manually recover tunnels, applications and
+        # netpipes, negating the side effects
+        self._do_in_factory_order(
+            lambda self, guid : self._elements[guid].recover(), 
+            [
+                Parallel(metadata.TAPIFACE),
+                Parallel(metadata.TUNIFACE),
+                metadata.NETPIPE,
+                Parallel(metadata.NEPIDEPENDENCY),
+                Parallel(metadata.NS3DEPENDENCY),
+                Parallel(metadata.DEPENDENCY),
+                Parallel(metadata.APPLICATION),
+            ])
+
+        # Tunnels are not harmed by configuration after
+        # recovery, and some attributes get set this way
+        # like external_iface
+        self._do_in_factory_order(
+            "preconfigure_function", 
+            [
+                Parallel(metadata.TAPIFACE),
+                Parallel(metadata.TUNIFACE),
+            ])
+
+        # Post-do the ones without side effects
+        self._do_in_factory_order(
+            "configure_function", 
+            [
+                metadata.INTERNET,
+                Parallel(metadata.NODE),
+                metadata.NODEIFACE,
+                Parallel(metadata.TAPIFACE),
+                Parallel(metadata.TUNIFACE),
+            ])
+        
+        # There are no required prestart steps
+        # to call upon recovery, so we're done
+        
     
     def _make_generic(self, parameters, kind):
         app = kind(self.plapi)
index d23e73d..0a05bab 100644 (file)
@@ -231,6 +231,12 @@ class TunIface(object):
         impl.port = self.tun_port
         return impl
     
+    def recover(self):
+        self.peer_proto_impl = self._impl_instance(
+            self._home_path,
+            False) # no way to know, no need to know
+        self.peer_proto_impl.recover()
+    
     def prepare(self, home_path, listening):
         if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
             # Ad-hoc peer_iface
@@ -343,6 +349,10 @@ class NetPipe(object):
         options = ' '.join(options)
         
         return (scope,options)
+    
+    def recover(self):
+        # Rules are safe on their nodes
+        self.configured = True
 
     def configure(self):
         # set up rule
index fff9b8b..d3970d4 100644 (file)
@@ -10,7 +10,8 @@ from nepi.core.attributes import Attribute
 from nepi.util import tags, validation
 from nepi.util.constants import ApplicationStatus as AS, \
         FactoryCategories as FC, \
-        ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
+        ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
+        DeploymentConfiguration as DC
 
 import functools
 import os
@@ -1160,6 +1161,12 @@ testbed_attributes = dict({
         }),
     })
 
+supported_recovery_policies = [
+        DC.POLICY_FAIL,
+        DC.POLICY_RESTART,
+        DC.POLICY_RECOVER,
+    ]
+
 class MetadataInfo(metadata.MetadataInfo):
     @property
     def connector_types(self):
@@ -1209,3 +1216,8 @@ class MetadataInfo(metadata.MetadataInfo):
     def testbed_version(self):
         return TESTBED_VERSION
 
+    @property
+    def supported_recovery_policies(self):
+        return supported_recovery_policies
+
+
index 734d998..dec4d82 100644 (file)
@@ -81,12 +81,12 @@ class Node(object):
         # Those are filled when an actual node is allocated
         self._node_id = None
         self._yum_dependencies = None
+        self._installed = False
 
         # Logging
         self._logger = logging.getLogger('nepi.testbeds.planetlab')
     
-    @property
-    def _nepi_testbed_environment_setup(self):
+    def _nepi_testbed_environment_setup_get(self):
         command = cStringIO.StringIO()
         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
@@ -99,6 +99,11 @@ class Node(object):
                 for envval in envvals:
                     command.write(' ; export %s=%s' % (envkey, envval))
         return command.getvalue()
+    def _nepi_testbed_environment_setup_set(self, value):
+        pass
+    _nepi_testbed_environment_setup = property(
+        _nepi_testbed_environment_setup_get,
+        _nepi_testbed_environment_setup_set)
     
     def build_filters(self, target_filters, filter_map):
         for attr, tag in filter_map.iteritems():
@@ -316,8 +321,12 @@ class Node(object):
         if self.slicename is None:
             raise AssertionError, "Misconfigured node: unspecified slice"
 
+    def recover(self):
+        # Just mark dependencies installed
+        self._installed = True
+
     def install_dependencies(self):
-        if self.required_packages:
+        if self.required_packages and not self._installed:
             # If we need rpmfusion, we must install the repo definition and the gpg keys
             if self.rpmFusion:
                 if self.operatingSystem == 'f12':
@@ -369,8 +378,9 @@ class Node(object):
     
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
         # Wait for the p2p installer
-        if self._yum_dependencies:
+        if self._yum_dependencies and not self._installed:
             self._yum_dependencies.async_setup_wait()
+            self._installed = True
         
     def is_alive(self):
         # Make sure all the paths are created where 
@@ -394,7 +404,7 @@ class Node(object):
     
     def prepare_dependencies(self):
         # Configure p2p yum dependency installer
-        if self.required_packages:
+        if self.required_packages and not self._installed:
             self._yum_dependencies = application.YumDependency(self._api)
             self._yum_dependencies.node = self
             self._yum_dependencies.home_path = "nepi-yumdep"
index d066a8d..08a33ed 100644 (file)
@@ -237,6 +237,12 @@ class TunProtoBase(object):
         
         self._started = True
     
+    def recover(self):
+        # Tunnel should be still running in its node
+        # Just check its pidfile and we're done
+        self._started = True
+        self.checkpid()
+    
     def _launch_and_wait(self, *p, **kw):
         try:
             self.__launch_and_wait(*p, **kw)
@@ -312,7 +318,7 @@ class TunProtoBase(object):
         return self._if_name
     
     def async_launch(self, check_proto, listen, extra_args=[]):
-        if not self._launcher:
+        if not self._started and not self._launcher:
             self._launcher = threading.Thread(
                 target = self._launch_and_wait,
                 args = (check_proto, listen, extra_args))
index 03b354a..741edd8 100644 (file)
@@ -74,6 +74,6 @@ class DeploymentConfiguration:
     USE_AGENT = "useAgent"
     LOG_LEVEL = "logLevel"
     RECOVER = "recover"
-    RECOVER_POLICY = "recoverPolicy"
+    RECOVERY_POLICY = "recoveryPolicy"
 
 
index 14f928e..a323688 100644 (file)
@@ -165,9 +165,9 @@ class AccessConfiguration(AttributesMap):
         
         from nepi.core.metadata import Metadata
         
-        for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
+        for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
             self.add_attribute(**attr_info)
-        
+
         if params:
             for attr_name, attr_value in params.iteritems():
                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
@@ -686,6 +686,13 @@ class TestbedControllerServer(BaseServer):
     def get_factory_id(self, guid):
         return self._testbed.get_factory_id(guid)
 
+    @Marshalling.handles(RECOVER)
+    @Marshalling.args()
+    @Marshalling.retvoid
+    def recover(self):
+        self._testbed.recover()
+
+
 class ExperimentControllerServer(BaseServer):
     def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
@@ -734,6 +741,12 @@ class ExperimentControllerServer(BaseServer):
     def is_finished(self, guid):
         return self._experiment.is_finished(guid)
 
+    @Marshalling.handles(STATUS)
+    @Marshalling.args(int)
+    @Marshalling.retval(int)
+    def status(self, guid):
+        return self._experiment.is_finished(guid)
+
     @Marshalling.handles(GET)
     @Marshalling.args(int, Marshalling.base64_data, str)
     @Marshalling.retval( Marshalling.pickled_data )
index 2d24e3e..3c89086 100755 (executable)
@@ -226,6 +226,117 @@ FIONREAD = 0x[0-9a-fA-F]{8}.*
             daemonize_testbed = False,
             controller_access_configuration = access_config,
             environ = environ)
+
+
+    def _test_recover(self, daemonize_testbed, controller_access_configuration, environ = None):
+        pl, exp = self.make_experiment_desc()
+        
+        pl.set_attribute_value(DC.RECOVERY_POLICY, DC.POLICY_RECOVER)
+        
+        node1 = pl.create("Node")
+        node2 = pl.create("Node")
+        node1.set_attribute_value("hostname", self.host1)
+        node2.set_attribute_value("hostname", self.host2)
+        
+        iface1 = pl.create("NodeInterface")
+        iface2 = pl.create("NodeInterface")
+        inet = pl.create("Internet")
+        node1.connector("devs").connect(iface1.connector("node"))
+        node2.connector("devs").connect(iface2.connector("node"))
+        iface1.connector("inet").connect(inet.connector("devs"))
+        iface2.connector("inet").connect(inet.connector("devs"))
+        
+        tap1 = pl.create("TapInterface")
+        tap2 = pl.create("TapInterface")
+        node1.connector("devs").connect(tap1.connector("node"))
+        node2.connector("devs").connect(tap2.connector("node"))
+        tap1.connector("udp").connect(tap2.connector("udp"))
+        
+        tap1ip = tap1.add_address()
+        tap1ip.set_attribute_value("Address", "192.168.2.2")
+        tap1ip.set_attribute_value("NetPrefix", 24)
+        tap1ip.set_attribute_value("Broadcast", False)
+
+        tap2ip = tap2.add_address()
+        tap2ip.set_attribute_value("Address", "192.168.2.3")
+        tap2ip.set_attribute_value("NetPrefix", 24)
+        tap2ip.set_attribute_value("Broadcast", False)
+        
+        app = pl.create("Application")
+        app.set_attribute_value("command", "ping -qc10 192.168.2.3")
+        app.enable_trace("stdout")
+        app.connector("node").connect(node1.connector("apps"))
+
+        if daemonize_testbed:
+            pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+            inst_root_dir = os.path.join(self.root_dir, "instance")
+            os.mkdir(inst_root_dir)
+            pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+            pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+            if environ:
+                pl.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environ)
+
+        xml = exp.to_xml()
+
+        if controller_access_configuration:
+            controller = proxy.create_experiment_controller(xml, 
+                controller_access_configuration)
+        else:
+            controller = ExperimentController(xml, self.root_dir)
+        
+        try:
+            controller.start()
+            
+            # purposedly break connection
+            controller = None
+            
+            # recover
+            if controller_access_configuration:
+                controller_access_configuration.set_attribute_value(
+                    DC.RECOVER, True)
+                controller = proxy.create_experiment_controller(None, 
+                    controller_access_configuration)
+            else:
+                controller = ExperimentController(None, self.root_dir)
+                controller.recover()
+            
+            while not controller.is_finished(app.guid):
+                time.sleep(0.5)
+            ping_result = controller.trace(app.guid, "stdout")
+            comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+10 packets transmitted, 10 received, 0% packet loss, time \d*ms.*
+"""
+            self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+                "Unexpected trace:\n" + ping_result)
+        
+        finally:
+            if controller is not None:
+                try:
+                    controller.stop()
+                    controller.shutdown()
+                except:
+                    import traceback
+                    traceback.print_exc()
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_recover(self):
+        self._test_recover(
+            daemonize_testbed = False,
+            controller_access_configuration = None)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_recover_daemonized(self):
+        access_config = proxy.AccessConfiguration({
+            DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+            DC.ROOT_DIRECTORY : self.root_dir,
+        })
+
+        self._test_recover(
+            daemonize_testbed = False,
+            controller_access_configuration = access_config)
         
 
 if __name__ == '__main__':