From: Claudio-Daniel Freire Date: Thu, 28 Jul 2011 09:57:38 +0000 (+0200) Subject: Recovery policy for testbeds, and recovery implementation in PlanetLab. X-Git-Tag: nepi-3.0.0~340 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=e8fadabbad74aef9ba5d95ebebd6f9d30c0f4fb6;p=nepi.git Recovery policy for testbeds, and recovery implementation in PlanetLab. Not fully tested yet, especially untested are cross connections. --- diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index f59461e3..44c9c240 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -3,7 +3,7 @@ from nepi.core.attributes import Attribute, AttributesMap from nepi.util import validation -from nepi.util.constants import ApplicationStatus as AS, TIME_NOW +from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC from nepi.util.parser._xml import XmlExperimentParser import sys import re @@ -146,6 +146,9 @@ class TestbedController(object): def stop(self): raise NotImplementedError + def recover(self): + raise NotImplementedError + def set(self, guid, name, value, time = TIME_NOW): raise NotImplementedError @@ -301,14 +304,29 @@ class ExperimentController(object): raise eTyp, eVal, eLoc def start(self): + self._start() + + def _start(self, recover = False): parser = XmlExperimentParser() - data = parser.from_xml_to_data(self._experiment_design_xml) + + if recover: + xml = self._experiment_execute_xml + else: + xml = self._experiment_design_xml + data = parser.from_xml_to_data(xml) # instantiate testbed controllers - self._init_testbed_controllers(data) + to_recover, to_restart = self._init_testbed_controllers(data, recover) + all_restart = set(to_restart) - # persist testbed connection data, for potential recovery - self._persist_testbed_proxies() + if not recover: + # persist testbed connection data, for potential recovery + self._persist_testbed_proxies() + else: + # recover recoverable controllers + for guid in to_recover: + self._testbeds[guid].do_setup() + self._testbeds[guid].recover() def steps_to_configure(self, allowed_guids): # perform setup in parallel for all test beds, @@ -336,7 +354,7 @@ class ExperimentController(object): if guid in allowed_guids]) self._clear_caches() - steps_to_configure(self, self._testbeds) + steps_to_configure(self, to_restart) if self._netreffed_testbeds: # initally resolve netrefs @@ -345,13 +363,22 @@ class ExperimentController(object): # rinse and repeat, for netreffed testbeds netreffed_testbeds = set(self._netreffed_testbeds) - self._init_testbed_controllers(data) + to_recover, to_restart = self._init_testbed_controllers(data, recover) + all_restart.update(to_restart) - # persist testbed connection data, for potential recovery - self._persist_testbed_proxies() + if not recover: + # persist testbed connection data, for potential recovery + self._persist_testbed_proxies() + else: + # recover recoverable controllers + for guid in to_recover: + self._testbeds[guid].do_setup() + self._testbeds[guid].recover() # configure dependant testbeds - steps_to_configure(self, netreffed_testbeds) + steps_to_configure(self, to_restart) + + all_restart = [ self._testbeds[guid] for guid in all_restart ] # final netref step, fail if anything's left unresolved self.do_netrefs(data, fail_if_undefined=True) @@ -363,7 +390,7 @@ class ExperimentController(object): # perform do_configure in parallel for al testbeds # (it's internal configuration for each) self._parallel([testbed.do_configure - for testbed in self._testbeds.itervalues()]) + for testbed in all_restart]) self._clear_caches() @@ -383,18 +410,19 @@ class ExperimentController(object): # Last chance to configure (parallel on all testbeds) self._parallel([testbed.do_prestart - for testbed in self._testbeds.itervalues()]) + for testbed in all_restart]) self._clear_caches() - # update execution xml with execution-specific values - # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)" - self._update_execute_xml() - self.persist_execute_xml() + if not recover: + # update execution xml with execution-specific values + # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)" + self._update_execute_xml() + self.persist_execute_xml() # start experiment (parallel start on all testbeds) self._parallel([testbed.start - for testbed in self._testbeds.itervalues()]) + for testbed in all_restart]) self._clear_caches() @@ -403,7 +431,7 @@ class ExperimentController(object): self._guids_in_testbed_cache = dict() def _persist_testbed_proxies(self): - TRANSIENT = ('Recover',) + TRANSIENT = (DC.RECOVER,) # persist access configuration for all testbeds, so that # recovery mode can reconnect to them if it becomes necessary @@ -429,7 +457,7 @@ class ExperimentController(object): Attribute.INTEGER : 'getint', } - TRANSIENT = ('Recover',) + TRANSIENT = (DC.RECOVER,) # deferred import because proxy needs # our class definitions to define proxies @@ -520,18 +548,8 @@ class ExperimentController(object): def recover(self): # reload perviously persisted testbed access configurations self._load_testbed_proxies() - - # Parse experiment xml - parser = XmlExperimentParser() - data = parser.from_xml_to_data(self._experiment_design_xml) - - # recreate testbed proxies by reconnecting only - self._init_testbed_controllers(data, recover = True) - - # another time, for netrefs - self._init_testbed_controllers(data, recover = True) - - print >>sys.stderr, "RECOVERED" + + self._start(recover = True) def is_finished(self, guid): testbed = self._testbed_for_guid(guid) @@ -539,6 +557,12 @@ class ExperimentController(object): return testbed.status(guid) == AS.STATUS_FINISHED raise RuntimeError("No element exists with guid %d" % guid) + def status(self, guid): + testbed = self._testbed_for_guid(guid) + if testbed != None: + return testbed.status(guid) + raise RuntimeError("No element exists with guid %d" % guid) + def set(self, guid, name, value, time = TIME_NOW): testbed = self._testbed_for_guid(guid) if testbed != None: @@ -691,29 +715,67 @@ class ExperimentController(object): element_guids = list() label_guids = dict() data_guids = data.guids + to_recover = set() + to_restart = set() + + # gather label associations + for guid in data_guids: + if not data.is_testbed_data(guid): + (testbed_guid, factory_id) = data.get_box_data(guid) + label = data.get_attribute_data(guid, "label") + if label is not None: + if label in label_guids: + raise RuntimeError, "Label %r is not unique" % (label,) + label_guids[label] = guid # create testbed controllers for guid in data_guids: if data.is_testbed_data(guid): if guid not in self._testbeds: - self._create_testbed_controller(guid, data, element_guids, - recover) - else: + try: + self._create_testbed_controller( + guid, data, element_guids, recover) + if recover: + # Already programmed + blacklist_testbeds.add(guid) + else: + to_restart.add(guid) + except: + if recover: + policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY) + if policy == DC.POLICY_FAIL: + raise + elif policy == DC.POLICY_RECOVER: + self._create_testbed_controller( + guid, data, element_guids, False) + to_recover.add(guid) + elif policy == DC.POLICY_RESTART: + self._create_testbed_controller( + guid, data, element_guids, False) + to_restart.add(guid) + else: + raise + else: + raise + + # queue programmable elements + # - that have not been programmed already (blacklist_testbeds) + # - including recovered or restarted testbeds + # - but those that have no unresolved netrefs + for guid in data_guids: + if not data.is_testbed_data(guid): (testbed_guid, factory_id) = data.get_box_data(guid) if testbed_guid not in blacklist_testbeds: element_guids.append(guid) - label = data.get_attribute_data(guid, "label") - if label is not None: - if label in label_guids: - raise RuntimeError, "Label %r is not unique" % (label,) - label_guids[label] = guid # replace references to elements labels for its guid self._resolve_labels(data, data_guids, label_guids) # program testbed controllers - if not recover: + if element_guids: self._program_testbed_controllers(element_guids, data) + + return to_recover, to_restart def _resolve_labels(self, data, data_guids, label_guids): netrefs = self._netrefs diff --git a/src/nepi/core/metadata.py b/src/nepi/core/metadata.py index 523484ec..1a995123 100644 --- a/src/nepi/core/metadata.py +++ b/src/nepi/core/metadata.py @@ -356,6 +356,24 @@ class Metadata(object): "validation_function" : validation.is_enum, "category" : AC.CATEGORY_DEPLOYMENT, }), + DC.RECOVERY_POLICY : dict({ + "name" : DC.RECOVERY_POLICY, + "help" : "Specifies what action to take in the event of a failure.", + "type" : Attribute.ENUM, + "value" : DC.POLICY_FAIL, + "allowed" : [ + DC.POLICY_FAIL, + DC.POLICY_RECOVER, + DC.POLICY_RESTART, + ], + "flags" : Attribute.ExecReadOnly |\ + Attribute.ExecImmutable |\ + Attribute.Metadata, + "validation_function" : validation.is_enum, + "category" : AC.CATEGORY_DEPLOYMENT, + }), + }) + PROXY_ATTRIBUTES = dict({ DC.RECOVER : dict({ "name" : DC.RECOVER, "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", @@ -368,6 +386,7 @@ class Metadata(object): "category" : AC.CATEGORY_DEPLOYMENT, }), }) + PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES) # These attributes could appear in the boxes attribute list STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({ @@ -462,6 +481,10 @@ class Metadata(object): @property def testbed_id(self): return self._testbed_id + + @property + def supported_recovery_policies(self): + return self._metadata.supported_recovery_policies def testbed_attributes(self): attributes = AttributesMap() diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index 84cab891..88217b0b 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -238,10 +238,13 @@ class TestbedController(execute.TestbedController): # configure action factory = self._factories[factory_id] - if not getattr(factory, action): + if isinstance(action, basestring) and not getattr(factory, action): continue def perform_action(guid): - getattr(factory, action)(self, guid) + if isinstance(action, basestring): + getattr(factory, action)(self, guid) + else: + action(self, guid) if postaction: postaction(self, guid) diff --git a/src/nepi/testbeds/netns/metadata.py b/src/nepi/testbeds/netns/metadata.py index 7c46f7a1..86359f40 100644 --- a/src/nepi/testbeds/netns/metadata.py +++ b/src/nepi/testbeds/netns/metadata.py @@ -6,7 +6,7 @@ from nepi.core import metadata from nepi.core.attributes import Attribute from nepi.util import tags, validation from nepi.util.constants import ApplicationStatus as AS, \ - FactoryCategories as FC + FactoryCategories as FC, DeploymentConfiguration as DC from nepi.util.tunchannel_impl import \ preconfigure_tunchannel, postconfigure_tunchannel, \ @@ -550,6 +550,10 @@ testbed_attributes = dict({ }), }) +supported_recovery_policies = [ + DC.POLICY_FAIL, + ] + class MetadataInfo(metadata.MetadataInfo): @property def connector_types(self): @@ -590,4 +594,8 @@ class MetadataInfo(metadata.MetadataInfo): @property def testbed_version(self): return TESTBED_VERSION + + @property + def supported_recover_policies(self): + return supported_recovery_policies diff --git a/src/nepi/testbeds/ns3/metadata.py b/src/nepi/testbeds/ns3/metadata.py index 14c4ab7c..b1a4d7aa 100644 --- a/src/nepi/testbeds/ns3/metadata.py +++ b/src/nepi/testbeds/ns3/metadata.py @@ -3,6 +3,12 @@ from constants import TESTBED_ID, TESTBED_VERSION from nepi.core import metadata +from nepi.util.constants import DeploymentConfiguration as DC + +supported_recovery_policies = [ + DC.POLICY_FAIL, + DC.POLICY_RESTART, + ] class MetadataInfo(metadata.MetadataInfo): @property @@ -53,3 +59,8 @@ class MetadataInfo(metadata.MetadataInfo): def testbed_version(self): return TESTBED_VERSION + @property + def supported_recovery_policies(self): + return supported_recovery_policies + + diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index 66f6a5d6..73544366 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -140,6 +140,11 @@ class Dependency(object): % (e.args[0], e.args[1],) return local_path + + def recover(self): + # We assume a correct deployment, so recovery only + # means we mark this dependency as deployed + self._setup = True def setup(self): self._logger.info("Setting up %s", self) @@ -660,6 +665,13 @@ class Application(Dependency): raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) self._started = True + + def recover(self): + # Assuming the application is running on PlanetLab, + # proper pidfiles should be present at the app's home path. + # So we mark this application as started, and check the pidfiles + self._started = True + self.checkpid() def checkpid(self): # Get PID/PPID diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 7b98ed9e..2cbdaff2 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -3,6 +3,7 @@ from constants import TESTBED_ID, TESTBED_VERSION from nepi.core import testbed_impl +from nepi.core.metadata import Parallel from nepi.util.constants import TIME_NOW from nepi.util.graphtools import mst from nepi.util import ipaddr2 @@ -23,6 +24,7 @@ import subprocess import random import shutil import logging +import metadata class TempKeyError(Exception): pass @@ -169,7 +171,7 @@ class TestbedController(testbed_impl.TestbedController): # Configure elements per XML data super(TestbedController, self).do_preconfigure() - def do_resource_discovery(self): + def do_resource_discovery(self, recover = False): to_provision = self._to_provision = set() reserved = set(self._blacklist) @@ -221,6 +223,9 @@ class TestbedController(testbed_impl.TestbedController): nodes.append(node) if nodes and reqs: + if recover: + raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,) + try: solution = resourcealloc.alloc(reqs) except resourcealloc.ResourceAllocationError: @@ -526,6 +531,88 @@ class TestbedController(testbed_impl.TestbedController): def follow_trace(self, trace_id, trace): self._traces[trace_id] = trace + + def recover(self): + # Create and connect do not perform any real tasks against + # the nodes, it only sets up the object hierarchy, + # so we can run them normally + self.do_create() + self.do_connect_init() + self.do_connect_compl() + + # Assign nodes - since we're working off exeucte XML, nodes + # have specific hostnames assigned and we don't need to do + # real assignment, only find out node ids and check liveliness + self.do_resource_discovery(recover = True) + self.do_wait_nodes() + + # Pre/post configure, however, tends to set up tunnels + # Execute configuration steps only for those object + # kinds that do not have side effects + + # Manually recover nodes, to mark dependencies installed + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + metadata.NODE, + ]) + + # Do the ones without side effects, + # including nodes that need to set up home + # folders and all that + self._do_in_factory_order( + "preconfigure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + ]) + + # Tunnels require a home path that is configured + # at this step. Since we cannot run the step itself, + # we need to inject this homepath ourselves + for guid, element in self._elements.iteritems(): + if isinstance(element, self._interfaces.TunIface): + element._home_path = "tun-%s" % (guid,) + + # Manually recover tunnels, applications and + # netpipes, negating the side effects + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + metadata.NETPIPE, + Parallel(metadata.NEPIDEPENDENCY), + Parallel(metadata.NS3DEPENDENCY), + Parallel(metadata.DEPENDENCY), + Parallel(metadata.APPLICATION), + ]) + + # Tunnels are not harmed by configuration after + # recovery, and some attributes get set this way + # like external_iface + self._do_in_factory_order( + "preconfigure_function", + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # Post-do the ones without side effects + self._do_in_factory_order( + "configure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # There are no required prestart steps + # to call upon recovery, so we're done + def _make_generic(self, parameters, kind): app = kind(self.plapi) diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index d23e73d8..0a05bab0 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -231,6 +231,12 @@ class TunIface(object): impl.port = self.tun_port return impl + def recover(self): + self.peer_proto_impl = self._impl_instance( + self._home_path, + False) # no way to know, no need to know + self.peer_proto_impl.recover() + def prepare(self, home_path, listening): if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))): # Ad-hoc peer_iface @@ -343,6 +349,10 @@ class NetPipe(object): options = ' '.join(options) return (scope,options) + + def recover(self): + # Rules are safe on their nodes + self.configured = True def configure(self): # set up rule diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index fff9b8b6..d3970d43 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -10,7 +10,8 @@ from nepi.core.attributes import Attribute from nepi.util import tags, validation from nepi.util.constants import ApplicationStatus as AS, \ FactoryCategories as FC, \ - ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP + ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \ + DeploymentConfiguration as DC import functools import os @@ -1160,6 +1161,12 @@ testbed_attributes = dict({ }), }) +supported_recovery_policies = [ + DC.POLICY_FAIL, + DC.POLICY_RESTART, + DC.POLICY_RECOVER, + ] + class MetadataInfo(metadata.MetadataInfo): @property def connector_types(self): @@ -1209,3 +1216,8 @@ class MetadataInfo(metadata.MetadataInfo): def testbed_version(self): return TESTBED_VERSION + @property + def supported_recovery_policies(self): + return supported_recovery_policies + + diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 734d998b..dec4d825 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -81,12 +81,12 @@ class Node(object): # Those are filled when an actual node is allocated self._node_id = None self._yum_dependencies = None + self._installed = False # Logging self._logger = logging.getLogger('nepi.testbeds.planetlab') - @property - def _nepi_testbed_environment_setup(self): + def _nepi_testbed_environment_setup_get(self): command = cStringIO.StringIO() command.write('export PYTHONPATH=$PYTHONPATH:%s' % ( ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath]) @@ -99,6 +99,11 @@ class Node(object): for envval in envvals: command.write(' ; export %s=%s' % (envkey, envval)) return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): + pass + _nepi_testbed_environment_setup = property( + _nepi_testbed_environment_setup_get, + _nepi_testbed_environment_setup_set) def build_filters(self, target_filters, filter_map): for attr, tag in filter_map.iteritems(): @@ -316,8 +321,12 @@ class Node(object): if self.slicename is None: raise AssertionError, "Misconfigured node: unspecified slice" + def recover(self): + # Just mark dependencies installed + self._installed = True + def install_dependencies(self): - if self.required_packages: + if self.required_packages and not self._installed: # If we need rpmfusion, we must install the repo definition and the gpg keys if self.rpmFusion: if self.operatingSystem == 'f12': @@ -369,8 +378,9 @@ class Node(object): def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): # Wait for the p2p installer - if self._yum_dependencies: + if self._yum_dependencies and not self._installed: self._yum_dependencies.async_setup_wait() + self._installed = True def is_alive(self): # Make sure all the paths are created where @@ -394,7 +404,7 @@ class Node(object): def prepare_dependencies(self): # Configure p2p yum dependency installer - if self.required_packages: + if self.required_packages and not self._installed: self._yum_dependencies = application.YumDependency(self._api) self._yum_dependencies.node = self self._yum_dependencies.home_path = "nepi-yumdep" diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index d066a8d2..08a33ed7 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -237,6 +237,12 @@ class TunProtoBase(object): self._started = True + def recover(self): + # Tunnel should be still running in its node + # Just check its pidfile and we're done + self._started = True + self.checkpid() + def _launch_and_wait(self, *p, **kw): try: self.__launch_and_wait(*p, **kw) @@ -312,7 +318,7 @@ class TunProtoBase(object): return self._if_name def async_launch(self, check_proto, listen, extra_args=[]): - if not self._launcher: + if not self._started and not self._launcher: self._launcher = threading.Thread( target = self._launch_and_wait, args = (check_proto, listen, extra_args)) diff --git a/src/nepi/util/constants.py b/src/nepi/util/constants.py index 03b354a5..741edd8b 100644 --- a/src/nepi/util/constants.py +++ b/src/nepi/util/constants.py @@ -74,6 +74,6 @@ class DeploymentConfiguration: USE_AGENT = "useAgent" LOG_LEVEL = "logLevel" RECOVER = "recover" - RECOVER_POLICY = "recoverPolicy" + RECOVERY_POLICY = "recoveryPolicy" diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index 14f928e5..a3236884 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -165,9 +165,9 @@ class AccessConfiguration(AttributesMap): from nepi.core.metadata import Metadata - for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems(): + for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems(): self.add_attribute(**attr_info) - + if params: for attr_name, attr_value in params.iteritems(): parser = Attribute.type_parsers[self.get_attribute_type(attr_name)] @@ -686,6 +686,13 @@ class TestbedControllerServer(BaseServer): def get_factory_id(self, guid): return self._testbed.get_factory_id(guid) + @Marshalling.handles(RECOVER) + @Marshalling.args() + @Marshalling.retvoid + def recover(self): + self._testbed.recover() + + class ExperimentControllerServer(BaseServer): def __init__(self, root_dir, log_level, experiment_xml, environment_setup): super(ExperimentControllerServer, self).__init__(root_dir, log_level, @@ -734,6 +741,12 @@ class ExperimentControllerServer(BaseServer): def is_finished(self, guid): return self._experiment.is_finished(guid) + @Marshalling.handles(STATUS) + @Marshalling.args(int) + @Marshalling.retval(int) + def status(self, guid): + return self._experiment.is_finished(guid) + @Marshalling.handles(GET) @Marshalling.args(int, Marshalling.base64_data, str) @Marshalling.retval( Marshalling.pickled_data ) diff --git a/test/testbeds/planetlab/integration.py b/test/testbeds/planetlab/integration.py index 2d24e3e2..3c890861 100755 --- a/test/testbeds/planetlab/integration.py +++ b/test/testbeds/planetlab/integration.py @@ -226,6 +226,117 @@ FIONREAD = 0x[0-9a-fA-F]{8}.* daemonize_testbed = False, controller_access_configuration = access_config, environ = environ) + + + def _test_recover(self, daemonize_testbed, controller_access_configuration, environ = None): + pl, exp = self.make_experiment_desc() + + pl.set_attribute_value(DC.RECOVERY_POLICY, DC.POLICY_RECOVER) + + node1 = pl.create("Node") + node2 = pl.create("Node") + node1.set_attribute_value("hostname", self.host1) + node2.set_attribute_value("hostname", self.host2) + + iface1 = pl.create("NodeInterface") + iface2 = pl.create("NodeInterface") + inet = pl.create("Internet") + node1.connector("devs").connect(iface1.connector("node")) + node2.connector("devs").connect(iface2.connector("node")) + iface1.connector("inet").connect(inet.connector("devs")) + iface2.connector("inet").connect(inet.connector("devs")) + + tap1 = pl.create("TapInterface") + tap2 = pl.create("TapInterface") + node1.connector("devs").connect(tap1.connector("node")) + node2.connector("devs").connect(tap2.connector("node")) + tap1.connector("udp").connect(tap2.connector("udp")) + + tap1ip = tap1.add_address() + tap1ip.set_attribute_value("Address", "192.168.2.2") + tap1ip.set_attribute_value("NetPrefix", 24) + tap1ip.set_attribute_value("Broadcast", False) + + tap2ip = tap2.add_address() + tap2ip.set_attribute_value("Address", "192.168.2.3") + tap2ip.set_attribute_value("NetPrefix", 24) + tap2ip.set_attribute_value("Broadcast", False) + + app = pl.create("Application") + app.set_attribute_value("command", "ping -qc10 192.168.2.3") + app.enable_trace("stdout") + app.connector("node").connect(node1.connector("apps")) + + if daemonize_testbed: + pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + + if environ: + pl.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environ) + + xml = exp.to_xml() + + if controller_access_configuration: + controller = proxy.create_experiment_controller(xml, + controller_access_configuration) + else: + controller = ExperimentController(xml, self.root_dir) + + try: + controller.start() + + # purposedly break connection + controller = None + + # recover + if controller_access_configuration: + controller_access_configuration.set_attribute_value( + DC.RECOVER, True) + controller = proxy.create_experiment_controller(None, + controller_access_configuration) + else: + controller = ExperimentController(None, self.root_dir) + controller.recover() + + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(app.guid, "stdout") + comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data. + +--- .* ping statistics --- +10 packets transmitted, 10 received, 0% packet loss, time \d*ms.* +""" + self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE), + "Unexpected trace:\n" + ping_result) + + finally: + if controller is not None: + try: + controller.stop() + controller.shutdown() + except: + import traceback + traceback.print_exc() + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_recover(self): + self._test_recover( + daemonize_testbed = False, + controller_access_configuration = None) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_recover_daemonized(self): + access_config = proxy.AccessConfiguration({ + DC.DEPLOYMENT_MODE : DC.MODE_DAEMON, + DC.ROOT_DIRECTORY : self.root_dir, + }) + + self._test_recover( + daemonize_testbed = False, + controller_access_configuration = access_config) if __name__ == '__main__':