Not fully tested yet, especially untested are cross connections.
from nepi.core.attributes import Attribute, AttributesMap
from nepi.util import validation
-from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
+from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
from nepi.util.parser._xml import XmlExperimentParser
import sys
import re
def stop(self):
raise NotImplementedError
+ def recover(self):
+ raise NotImplementedError
+
def set(self, guid, name, value, time = TIME_NOW):
raise NotImplementedError
raise eTyp, eVal, eLoc
def start(self):
+ self._start()
+
+ def _start(self, recover = False):
parser = XmlExperimentParser()
- data = parser.from_xml_to_data(self._experiment_design_xml)
+
+ if recover:
+ xml = self._experiment_execute_xml
+ else:
+ xml = self._experiment_design_xml
+ data = parser.from_xml_to_data(xml)
# instantiate testbed controllers
- self._init_testbed_controllers(data)
+ to_recover, to_restart = self._init_testbed_controllers(data, recover)
+ all_restart = set(to_restart)
- # persist testbed connection data, for potential recovery
- self._persist_testbed_proxies()
+ if not recover:
+ # persist testbed connection data, for potential recovery
+ self._persist_testbed_proxies()
+ else:
+ # recover recoverable controllers
+ for guid in to_recover:
+ self._testbeds[guid].do_setup()
+ self._testbeds[guid].recover()
def steps_to_configure(self, allowed_guids):
# perform setup in parallel for all test beds,
if guid in allowed_guids])
self._clear_caches()
- steps_to_configure(self, self._testbeds)
+ steps_to_configure(self, to_restart)
if self._netreffed_testbeds:
# initally resolve netrefs
# rinse and repeat, for netreffed testbeds
netreffed_testbeds = set(self._netreffed_testbeds)
- self._init_testbed_controllers(data)
+ to_recover, to_restart = self._init_testbed_controllers(data, recover)
+ all_restart.update(to_restart)
- # persist testbed connection data, for potential recovery
- self._persist_testbed_proxies()
+ if not recover:
+ # persist testbed connection data, for potential recovery
+ self._persist_testbed_proxies()
+ else:
+ # recover recoverable controllers
+ for guid in to_recover:
+ self._testbeds[guid].do_setup()
+ self._testbeds[guid].recover()
# configure dependant testbeds
- steps_to_configure(self, netreffed_testbeds)
+ steps_to_configure(self, to_restart)
+
+ all_restart = [ self._testbeds[guid] for guid in all_restart ]
# final netref step, fail if anything's left unresolved
self.do_netrefs(data, fail_if_undefined=True)
# perform do_configure in parallel for al testbeds
# (it's internal configuration for each)
self._parallel([testbed.do_configure
- for testbed in self._testbeds.itervalues()])
+ for testbed in all_restart])
self._clear_caches()
# Last chance to configure (parallel on all testbeds)
self._parallel([testbed.do_prestart
- for testbed in self._testbeds.itervalues()])
+ for testbed in all_restart])
self._clear_caches()
- # update execution xml with execution-specific values
- # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
- self._update_execute_xml()
- self.persist_execute_xml()
+ if not recover:
+ # update execution xml with execution-specific values
+ # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
+ self._update_execute_xml()
+ self.persist_execute_xml()
# start experiment (parallel start on all testbeds)
self._parallel([testbed.start
- for testbed in self._testbeds.itervalues()])
+ for testbed in all_restart])
self._clear_caches()
self._guids_in_testbed_cache = dict()
def _persist_testbed_proxies(self):
- TRANSIENT = ('Recover',)
+ TRANSIENT = (DC.RECOVER,)
# persist access configuration for all testbeds, so that
# recovery mode can reconnect to them if it becomes necessary
Attribute.INTEGER : 'getint',
}
- TRANSIENT = ('Recover',)
+ TRANSIENT = (DC.RECOVER,)
# deferred import because proxy needs
# our class definitions to define proxies
def recover(self):
# reload perviously persisted testbed access configurations
self._load_testbed_proxies()
-
- # Parse experiment xml
- parser = XmlExperimentParser()
- data = parser.from_xml_to_data(self._experiment_design_xml)
-
- # recreate testbed proxies by reconnecting only
- self._init_testbed_controllers(data, recover = True)
-
- # another time, for netrefs
- self._init_testbed_controllers(data, recover = True)
-
- print >>sys.stderr, "RECOVERED"
+
+ self._start(recover = True)
def is_finished(self, guid):
testbed = self._testbed_for_guid(guid)
return testbed.status(guid) == AS.STATUS_FINISHED
raise RuntimeError("No element exists with guid %d" % guid)
+ def status(self, guid):
+ testbed = self._testbed_for_guid(guid)
+ if testbed != None:
+ return testbed.status(guid)
+ raise RuntimeError("No element exists with guid %d" % guid)
+
def set(self, guid, name, value, time = TIME_NOW):
testbed = self._testbed_for_guid(guid)
if testbed != None:
element_guids = list()
label_guids = dict()
data_guids = data.guids
+ to_recover = set()
+ to_restart = set()
+
+ # gather label associations
+ for guid in data_guids:
+ if not data.is_testbed_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ label = data.get_attribute_data(guid, "label")
+ if label is not None:
+ if label in label_guids:
+ raise RuntimeError, "Label %r is not unique" % (label,)
+ label_guids[label] = guid
# create testbed controllers
for guid in data_guids:
if data.is_testbed_data(guid):
if guid not in self._testbeds:
- self._create_testbed_controller(guid, data, element_guids,
- recover)
- else:
+ try:
+ self._create_testbed_controller(
+ guid, data, element_guids, recover)
+ if recover:
+ # Already programmed
+ blacklist_testbeds.add(guid)
+ else:
+ to_restart.add(guid)
+ except:
+ if recover:
+ policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
+ if policy == DC.POLICY_FAIL:
+ raise
+ elif policy == DC.POLICY_RECOVER:
+ self._create_testbed_controller(
+ guid, data, element_guids, False)
+ to_recover.add(guid)
+ elif policy == DC.POLICY_RESTART:
+ self._create_testbed_controller(
+ guid, data, element_guids, False)
+ to_restart.add(guid)
+ else:
+ raise
+ else:
+ raise
+
+ # queue programmable elements
+ # - that have not been programmed already (blacklist_testbeds)
+ # - including recovered or restarted testbeds
+ # - but those that have no unresolved netrefs
+ for guid in data_guids:
+ if not data.is_testbed_data(guid):
(testbed_guid, factory_id) = data.get_box_data(guid)
if testbed_guid not in blacklist_testbeds:
element_guids.append(guid)
- label = data.get_attribute_data(guid, "label")
- if label is not None:
- if label in label_guids:
- raise RuntimeError, "Label %r is not unique" % (label,)
- label_guids[label] = guid
# replace references to elements labels for its guid
self._resolve_labels(data, data_guids, label_guids)
# program testbed controllers
- if not recover:
+ if element_guids:
self._program_testbed_controllers(element_guids, data)
+
+ return to_recover, to_restart
def _resolve_labels(self, data, data_guids, label_guids):
netrefs = self._netrefs
"validation_function" : validation.is_enum,
"category" : AC.CATEGORY_DEPLOYMENT,
}),
+ DC.RECOVERY_POLICY : dict({
+ "name" : DC.RECOVERY_POLICY,
+ "help" : "Specifies what action to take in the event of a failure.",
+ "type" : Attribute.ENUM,
+ "value" : DC.POLICY_FAIL,
+ "allowed" : [
+ DC.POLICY_FAIL,
+ DC.POLICY_RECOVER,
+ DC.POLICY_RESTART,
+ ],
+ "flags" : Attribute.ExecReadOnly |\
+ Attribute.ExecImmutable |\
+ Attribute.Metadata,
+ "validation_function" : validation.is_enum,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ })
+ PROXY_ATTRIBUTES = dict({
DC.RECOVER : dict({
"name" : DC.RECOVER,
"help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
"category" : AC.CATEGORY_DEPLOYMENT,
}),
})
+ PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
# These attributes could appear in the boxes attribute list
STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
@property
def testbed_id(self):
return self._testbed_id
+
+ @property
+ def supported_recovery_policies(self):
+ return self._metadata.supported_recovery_policies
def testbed_attributes(self):
attributes = AttributesMap()
# configure action
factory = self._factories[factory_id]
- if not getattr(factory, action):
+ if isinstance(action, basestring) and not getattr(factory, action):
continue
def perform_action(guid):
- getattr(factory, action)(self, guid)
+ if isinstance(action, basestring):
+ getattr(factory, action)(self, guid)
+ else:
+ action(self, guid)
if postaction:
postaction(self, guid)
from nepi.core.attributes import Attribute
from nepi.util import tags, validation
from nepi.util.constants import ApplicationStatus as AS, \
- FactoryCategories as FC
+ FactoryCategories as FC, DeploymentConfiguration as DC
from nepi.util.tunchannel_impl import \
preconfigure_tunchannel, postconfigure_tunchannel, \
}),
})
+supported_recovery_policies = [
+ DC.POLICY_FAIL,
+ ]
+
class MetadataInfo(metadata.MetadataInfo):
@property
def connector_types(self):
@property
def testbed_version(self):
return TESTBED_VERSION
+
+ @property
+ def supported_recover_policies(self):
+ return supported_recovery_policies
from constants import TESTBED_ID, TESTBED_VERSION
from nepi.core import metadata
+from nepi.util.constants import DeploymentConfiguration as DC
+
+supported_recovery_policies = [
+ DC.POLICY_FAIL,
+ DC.POLICY_RESTART,
+ ]
class MetadataInfo(metadata.MetadataInfo):
@property
def testbed_version(self):
return TESTBED_VERSION
+ @property
+ def supported_recovery_policies(self):
+ return supported_recovery_policies
+
+
% (e.args[0], e.args[1],)
return local_path
+
+ def recover(self):
+ # We assume a correct deployment, so recovery only
+ # means we mark this dependency as deployed
+ self._setup = True
def setup(self):
self._logger.info("Setting up %s", self)
raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
self._started = True
+
+ def recover(self):
+ # Assuming the application is running on PlanetLab,
+ # proper pidfiles should be present at the app's home path.
+ # So we mark this application as started, and check the pidfiles
+ self._started = True
+ self.checkpid()
def checkpid(self):
# Get PID/PPID
from constants import TESTBED_ID, TESTBED_VERSION
from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
from nepi.util.constants import TIME_NOW
from nepi.util.graphtools import mst
from nepi.util import ipaddr2
import random
import shutil
import logging
+import metadata
class TempKeyError(Exception):
pass
# Configure elements per XML data
super(TestbedController, self).do_preconfigure()
- def do_resource_discovery(self):
+ def do_resource_discovery(self, recover = False):
to_provision = self._to_provision = set()
reserved = set(self._blacklist)
nodes.append(node)
if nodes and reqs:
+ if recover:
+ raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+
try:
solution = resourcealloc.alloc(reqs)
except resourcealloc.ResourceAllocationError:
def follow_trace(self, trace_id, trace):
self._traces[trace_id] = trace
+
+ def recover(self):
+ # Create and connect do not perform any real tasks against
+ # the nodes, it only sets up the object hierarchy,
+ # so we can run them normally
+ self.do_create()
+ self.do_connect_init()
+ self.do_connect_compl()
+
+ # Assign nodes - since we're working off exeucte XML, nodes
+ # have specific hostnames assigned and we don't need to do
+ # real assignment, only find out node ids and check liveliness
+ self.do_resource_discovery(recover = True)
+ self.do_wait_nodes()
+
+ # Pre/post configure, however, tends to set up tunnels
+ # Execute configuration steps only for those object
+ # kinds that do not have side effects
+
+ # Manually recover nodes, to mark dependencies installed
+ self._do_in_factory_order(
+ lambda self, guid : self._elements[guid].recover(),
+ [
+ metadata.NODE,
+ ])
+
+ # Do the ones without side effects,
+ # including nodes that need to set up home
+ # folders and all that
+ self._do_in_factory_order(
+ "preconfigure_function",
+ [
+ metadata.INTERNET,
+ Parallel(metadata.NODE),
+ metadata.NODEIFACE,
+ ])
+
+ # Tunnels require a home path that is configured
+ # at this step. Since we cannot run the step itself,
+ # we need to inject this homepath ourselves
+ for guid, element in self._elements.iteritems():
+ if isinstance(element, self._interfaces.TunIface):
+ element._home_path = "tun-%s" % (guid,)
+
+ # Manually recover tunnels, applications and
+ # netpipes, negating the side effects
+ self._do_in_factory_order(
+ lambda self, guid : self._elements[guid].recover(),
+ [
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ metadata.NETPIPE,
+ Parallel(metadata.NEPIDEPENDENCY),
+ Parallel(metadata.NS3DEPENDENCY),
+ Parallel(metadata.DEPENDENCY),
+ Parallel(metadata.APPLICATION),
+ ])
+
+ # Tunnels are not harmed by configuration after
+ # recovery, and some attributes get set this way
+ # like external_iface
+ self._do_in_factory_order(
+ "preconfigure_function",
+ [
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ ])
+
+ # Post-do the ones without side effects
+ self._do_in_factory_order(
+ "configure_function",
+ [
+ metadata.INTERNET,
+ Parallel(metadata.NODE),
+ metadata.NODEIFACE,
+ Parallel(metadata.TAPIFACE),
+ Parallel(metadata.TUNIFACE),
+ ])
+
+ # There are no required prestart steps
+ # to call upon recovery, so we're done
+
def _make_generic(self, parameters, kind):
app = kind(self.plapi)
impl.port = self.tun_port
return impl
+ def recover(self):
+ self.peer_proto_impl = self._impl_instance(
+ self._home_path,
+ False) # no way to know, no need to know
+ self.peer_proto_impl.recover()
+
def prepare(self, home_path, listening):
if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
# Ad-hoc peer_iface
options = ' '.join(options)
return (scope,options)
+
+ def recover(self):
+ # Rules are safe on their nodes
+ self.configured = True
def configure(self):
# set up rule
from nepi.util import tags, validation
from nepi.util.constants import ApplicationStatus as AS, \
FactoryCategories as FC, \
- ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
+ ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
+ DeploymentConfiguration as DC
import functools
import os
}),
})
+supported_recovery_policies = [
+ DC.POLICY_FAIL,
+ DC.POLICY_RESTART,
+ DC.POLICY_RECOVER,
+ ]
+
class MetadataInfo(metadata.MetadataInfo):
@property
def connector_types(self):
def testbed_version(self):
return TESTBED_VERSION
+ @property
+ def supported_recovery_policies(self):
+ return supported_recovery_policies
+
+
# Those are filled when an actual node is allocated
self._node_id = None
self._yum_dependencies = None
+ self._installed = False
# Logging
self._logger = logging.getLogger('nepi.testbeds.planetlab')
- @property
- def _nepi_testbed_environment_setup(self):
+ def _nepi_testbed_environment_setup_get(self):
command = cStringIO.StringIO()
command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
for envval in envvals:
command.write(' ; export %s=%s' % (envkey, envval))
return command.getvalue()
+ def _nepi_testbed_environment_setup_set(self, value):
+ pass
+ _nepi_testbed_environment_setup = property(
+ _nepi_testbed_environment_setup_get,
+ _nepi_testbed_environment_setup_set)
def build_filters(self, target_filters, filter_map):
for attr, tag in filter_map.iteritems():
if self.slicename is None:
raise AssertionError, "Misconfigured node: unspecified slice"
+ def recover(self):
+ # Just mark dependencies installed
+ self._installed = True
+
def install_dependencies(self):
- if self.required_packages:
+ if self.required_packages and not self._installed:
# If we need rpmfusion, we must install the repo definition and the gpg keys
if self.rpmFusion:
if self.operatingSystem == 'f12':
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
# Wait for the p2p installer
- if self._yum_dependencies:
+ if self._yum_dependencies and not self._installed:
self._yum_dependencies.async_setup_wait()
+ self._installed = True
def is_alive(self):
# Make sure all the paths are created where
def prepare_dependencies(self):
# Configure p2p yum dependency installer
- if self.required_packages:
+ if self.required_packages and not self._installed:
self._yum_dependencies = application.YumDependency(self._api)
self._yum_dependencies.node = self
self._yum_dependencies.home_path = "nepi-yumdep"
self._started = True
+ def recover(self):
+ # Tunnel should be still running in its node
+ # Just check its pidfile and we're done
+ self._started = True
+ self.checkpid()
+
def _launch_and_wait(self, *p, **kw):
try:
self.__launch_and_wait(*p, **kw)
return self._if_name
def async_launch(self, check_proto, listen, extra_args=[]):
- if not self._launcher:
+ if not self._started and not self._launcher:
self._launcher = threading.Thread(
target = self._launch_and_wait,
args = (check_proto, listen, extra_args))
USE_AGENT = "useAgent"
LOG_LEVEL = "logLevel"
RECOVER = "recover"
- RECOVER_POLICY = "recoverPolicy"
+ RECOVERY_POLICY = "recoveryPolicy"
from nepi.core.metadata import Metadata
- for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
+ for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
self.add_attribute(**attr_info)
-
+
if params:
for attr_name, attr_value in params.iteritems():
parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
def get_factory_id(self, guid):
return self._testbed.get_factory_id(guid)
+ @Marshalling.handles(RECOVER)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def recover(self):
+ self._testbed.recover()
+
+
class ExperimentControllerServer(BaseServer):
def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
super(ExperimentControllerServer, self).__init__(root_dir, log_level,
def is_finished(self, guid):
return self._experiment.is_finished(guid)
+ @Marshalling.handles(STATUS)
+ @Marshalling.args(int)
+ @Marshalling.retval(int)
+ def status(self, guid):
+ return self._experiment.is_finished(guid)
+
@Marshalling.handles(GET)
@Marshalling.args(int, Marshalling.base64_data, str)
@Marshalling.retval( Marshalling.pickled_data )
daemonize_testbed = False,
controller_access_configuration = access_config,
environ = environ)
+
+
+ def _test_recover(self, daemonize_testbed, controller_access_configuration, environ = None):
+ pl, exp = self.make_experiment_desc()
+
+ pl.set_attribute_value(DC.RECOVERY_POLICY, DC.POLICY_RECOVER)
+
+ node1 = pl.create("Node")
+ node2 = pl.create("Node")
+ node1.set_attribute_value("hostname", self.host1)
+ node2.set_attribute_value("hostname", self.host2)
+
+ iface1 = pl.create("NodeInterface")
+ iface2 = pl.create("NodeInterface")
+ inet = pl.create("Internet")
+ node1.connector("devs").connect(iface1.connector("node"))
+ node2.connector("devs").connect(iface2.connector("node"))
+ iface1.connector("inet").connect(inet.connector("devs"))
+ iface2.connector("inet").connect(inet.connector("devs"))
+
+ tap1 = pl.create("TapInterface")
+ tap2 = pl.create("TapInterface")
+ node1.connector("devs").connect(tap1.connector("node"))
+ node2.connector("devs").connect(tap2.connector("node"))
+ tap1.connector("udp").connect(tap2.connector("udp"))
+
+ tap1ip = tap1.add_address()
+ tap1ip.set_attribute_value("Address", "192.168.2.2")
+ tap1ip.set_attribute_value("NetPrefix", 24)
+ tap1ip.set_attribute_value("Broadcast", False)
+
+ tap2ip = tap2.add_address()
+ tap2ip.set_attribute_value("Address", "192.168.2.3")
+ tap2ip.set_attribute_value("NetPrefix", 24)
+ tap2ip.set_attribute_value("Broadcast", False)
+
+ app = pl.create("Application")
+ app.set_attribute_value("command", "ping -qc10 192.168.2.3")
+ app.enable_trace("stdout")
+ app.connector("node").connect(node1.connector("apps"))
+
+ if daemonize_testbed:
+ pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+ pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+ if environ:
+ pl.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environ)
+
+ xml = exp.to_xml()
+
+ if controller_access_configuration:
+ controller = proxy.create_experiment_controller(xml,
+ controller_access_configuration)
+ else:
+ controller = ExperimentController(xml, self.root_dir)
+
+ try:
+ controller.start()
+
+ # purposedly break connection
+ controller = None
+
+ # recover
+ if controller_access_configuration:
+ controller_access_configuration.set_attribute_value(
+ DC.RECOVER, True)
+ controller = proxy.create_experiment_controller(None,
+ controller_access_configuration)
+ else:
+ controller = ExperimentController(None, self.root_dir)
+ controller.recover()
+
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(app.guid, "stdout")
+ comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+10 packets transmitted, 10 received, 0% packet loss, time \d*ms.*
+"""
+ self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+ "Unexpected trace:\n" + ping_result)
+
+ finally:
+ if controller is not None:
+ try:
+ controller.stop()
+ controller.shutdown()
+ except:
+ import traceback
+ traceback.print_exc()
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_recover(self):
+ self._test_recover(
+ daemonize_testbed = False,
+ controller_access_configuration = None)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_recover_daemonized(self):
+ access_config = proxy.AccessConfiguration({
+ DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+ DC.ROOT_DIRECTORY : self.root_dir,
+ })
+
+ self._test_recover(
+ daemonize_testbed = False,
+ controller_access_configuration = access_config)
if __name__ == '__main__':