X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fexecute.py;h=69abd71c7f720f0671423ad9908cc9ccd5f2c02f;hb=e303e39c78476fd140bf615b1eb631e44c332b80;hp=1420601eb8412c245ebc39c7e5e3997ad4433a7f;hpb=77989fabbd3186885cbd47c6cde9eb7493227f27;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 1420601e..69abd71c 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -3,10 +3,13 @@ from constants import TESTBED_ID, TESTBED_VERSION from nepi.core import testbed_impl +from nepi.core.metadata import Parallel from nepi.util.constants import TIME_NOW from nepi.util.graphtools import mst from nepi.util import ipaddr2 from nepi.util import environ +from nepi.util.parallel import ParallelRun +import threading import sys import os import os.path @@ -21,6 +24,9 @@ import tempfile import subprocess import random import shutil +import logging +import metadata +import weakref class TempKeyError(Exception): pass @@ -32,15 +38,20 @@ class TestbedController(testbed_impl.TestbedController): self.slicename = None self._traces = dict() - import node, interfaces, application + import node, interfaces, application, multicast self._node = node self._interfaces = interfaces self._app = application + self._multicast = multicast self._blacklist = set() self._just_provisioned = set() self._load_blacklist() + + self._logger = logging.getLogger('nepi.testbeds.planetlab') + + self.recovering = False @property def home_directory(self): @@ -74,6 +85,20 @@ class TestbedController(testbed_impl.TestbedController): return None return self._slice_id + @property + def vsys_vnet(self): + if not hasattr(self, '_vsys_vnet'): + slicetags = self.plapi.GetSliceTags( + name = self.slicename, + tagname = 'vsys_vnet', + fields=('value',)) + if slicetags: + self._vsys_vnet = slicetags[0]['value'] + else: + # If it wasn't found, don't remember this failure, keep trying + return None + return self._vsys_vnet + def _load_blacklist(self): blpath = environ.homepath('plblacklist') @@ -117,6 +142,28 @@ class TestbedController(testbed_impl.TestbedController): get_attribute_value("plcHost") self.plcUrl = self._attributes.\ get_attribute_value("plcUrl") + self.logLevel = self._attributes.\ + get_attribute_value("plLogLevel") + self.tapPortBase = self._attributes.\ + get_attribute_value("tapPortBase") + self.p2pDeployment = self._attributes.\ + get_attribute_value("p2pDeployment") + self.dedicatedSlice = self._attributes.\ + get_attribute_value("dedicatedSlice") + + if not self.slicename: + raise RuntimeError, "Slice not set" + if not self.authUser: + raise RuntimeError, "PlanetLab account username not set" + if not self.authString: + raise RuntimeError, "PlanetLab account passphrase not set" + if not self.sliceSSHKey: + raise RuntimeError, "PlanetLab account key not specified" + if not os.path.exists(self.sliceSSHKey): + raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,) + + self._logger.setLevel(getattr(logging,self.logLevel)) + super(TestbedController, self).do_setup() def do_post_asynclaunch(self, guid): @@ -149,13 +196,14 @@ class TestbedController(testbed_impl.TestbedController): # Oh... retry... pass - # Plan application deployment - self.do_spanning_deployment_plan() + if self.p2pDeployment: + # Plan application deployment + self.do_spanning_deployment_plan() # Configure elements per XML data super(TestbedController, self).do_preconfigure() - def do_resource_discovery(self): + def do_resource_discovery(self, recover = False): to_provision = self._to_provision = set() reserved = set(self._blacklist) @@ -166,63 +214,95 @@ class TestbedController(testbed_impl.TestbedController): # Initial algo: # look for perfectly defined nodes # (ie: those with only one candidate) - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) + reserve_lock = threading.Lock() + def assignifunique(guid, node): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = self.slice_id) + + node_id = None + reserve_lock.acquire() + try: candidates -= reserved if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) reserved.add(node_id) elif not candidates: # Try again including unassigned nodes - candidates = node.find_candidates() + reserve_lock.release() + try: + candidates = node.find_candidates() + finally: + reserve_lock.acquire() candidates -= reserved if len(candidates) > 1: - continue + return if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) to_provision.add(node_id) reserved.add(node_id) elif not candidates: raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid, node.make_filter_description()) + finally: + reserve_lock.release() + + if node_id is not None: + node.assign_node_id(node_id) + + runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it + runner.start() + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node) and node._node_id is None: + runner.put(assignifunique, guid, node) + runner.sync() # Now do the backtracking search for a suitable solution # First with existing slice nodes reqs = [] nodes = [] + def genreqs(node, filter_slice_id=None): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = filter_slice_id) + candidates -= reserved + reqs.append(candidates) + nodes.append(node) for guid, node in self._elements.iteritems(): if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) - candidates -= reserved - reqs.append(candidates) - nodes.append(node) + runner.put(genreqs, node, self.slice_id) + runner.sync() if nodes and reqs: + if recover: + raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,) + + def pickbest(fullset, nreq, node=nodes[0]): + if len(fullset) > nreq: + fullset = zip(node.rate_nodes(fullset),fullset) + fullset.sort(reverse=True) + del fullset[nreq:] + return set(map(operator.itemgetter(1),fullset)) + else: + return fullset + try: - solution = resourcealloc.alloc(reqs) + solution = resourcealloc.alloc(reqs, sample=pickbest) except resourcealloc.ResourceAllocationError: # Failed, try again with all nodes reqs = [] for node in nodes: - candidates = node.find_candidates() - candidates -= reserved - reqs.append(candidates) - - solution = resourcealloc.alloc(reqs) + runner.put(genreqs, node) + runner.sync() + solution = resourcealloc.alloc(reqs, sample=pickbest) to_provision.update(solution) # Do assign nodes for node, node_id in zip(nodes, solution): - node.assign_node_id(node_id) + runner.put(node.assign_node_id, node_id) + runner.join() def do_provisioning(self): if self._to_provision: @@ -244,29 +324,43 @@ class TestbedController(testbed_impl.TestbedController): node.slicename = self.slicename # Show the magic - print >>sys.stderr, "PlanetLab Node", guid, "configured at", node.hostname - + self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname) + try: - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node): - print >>sys.stderr, "Waiting for Node", guid, "configured at", node.hostname, - sys.stdout.flush() - + runner = ParallelRun(maxthreads=64, maxqueue=1) + abort = [] + def waitforit(guid, node): + try: node.wait_provisioning( (20*60 if node._node_id in self._just_provisioned else 60) ) - print >>sys.stderr, "READY" + self._logger.info("READY Node %s at %s", guid, node.hostname) + + # Prepare dependency installer now + node.prepare_dependencies() + except: + abort.append(None) + raise + + for guid, node in self._elements.iteritems(): + if abort: + break + if isinstance(node, self._node.Node): + self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname) + runner.put(waitforit, guid, node) + runner.join() + except self._node.UnresponsiveNodeError: # Uh... - print >>sys.stderr, "UNRESPONSIVE" + self._logger.warn("UNRESPONSIVE Nodes") # Mark all dead nodes (which are unresponsive) on the blacklist # and re-raise for guid, node in self._elements.iteritems(): if isinstance(node, self._node.Node): if not node.is_alive(): - print >>sys.stderr, "Blacklisting", node.hostname, "for unresponsiveness" + self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname) self._blacklist.add(node._node_id) node.unassign_node() @@ -293,6 +387,7 @@ class TestbedController(testbed_impl.TestbedController): app.node.architecture, app.node.operatingSystem, app.node.pl_distro, + app.__class__, ) depgroups = collections.defaultdict(list) @@ -300,6 +395,10 @@ class TestbedController(testbed_impl.TestbedController): for element in self._elements.itervalues(): if isinstance(element, self._app.Dependency): depgroups[dephash(element)].append(element) + elif isinstance(element, self._node.Node): + deps = element._yum_dependencies + if deps: + depgroups[dephash(deps)].append(deps) # Set up spanning deployment for those applications that # have been deployed in several nodes. @@ -371,9 +470,7 @@ class TestbedController(testbed_impl.TestbedController): suffix = ".pub") # Create secure 256-bits temporary passphrase - passphrase = ''.join(map(chr,[rng.randint(0,255) - for rng in (random.SystemRandom(),) - for i in xrange(32)] )).encode("hex") + passphrase = os.urandom(32).encode("hex") # Copy keys oprk = open(self.sliceSSHKey, "rb") @@ -428,7 +525,15 @@ class TestbedController(testbed_impl.TestbedController): # TODO: take on account schedule time for the task element = self._elements[guid] if element: - setattr(element, name, value) + try: + setattr(element, name, value) + except: + # We ignore these errors while recovering. + # Some attributes are immutable, and setting + # them is necessary (to recover the state), but + # some are not (they throw an exception). + if not self.recovering: + raise if hasattr(element, 'refresh'): # invoke attribute refresh hook @@ -467,22 +572,28 @@ class TestbedController(testbed_impl.TestbedController): def shutdown(self): for trace in self._traces.itervalues(): trace.close() - for element in self._elements.itervalues(): - # invoke cleanup hooks - if hasattr(element, 'cleanup'): - element.cleanup() - for element in self._elements.itervalues(): - # invoke destroy hooks - if hasattr(element, 'destroy'): - element.destroy() + + def invokeif(action, testbed, guid): + element = self._elements[guid] + if hasattr(element, action): + getattr(element, action)() + + self._do_in_factory_order( + functools.partial(invokeif, 'cleanup'), + metadata.shutdown_order) + + self._do_in_factory_order( + functools.partial(invokeif, 'destroy'), + metadata.shutdown_order) + self._elements.clear() self._traces.clear() def trace(self, guid, trace_id, attribute='value'): - app = self._elements[guid] + elem = self._elements[guid] if attribute == 'value': - path = app.sync_trace(self.home_directory, trace_id) + path = elem.sync_trace(self.home_directory, trace_id) if path: fd = open(path, "r") content = fd.read() @@ -490,26 +601,128 @@ class TestbedController(testbed_impl.TestbedController): else: content = None elif attribute == 'path': - content = app.remote_trace_path(trace_id) + content = elem.remote_trace_path(trace_id) + elif attribute == 'name': + content = elem.remote_trace_name(trace_id) else: content = None return content def follow_trace(self, trace_id, trace): self._traces[trace_id] = trace + + def recover(self): + try: + # An internal flag, so we know to behave differently in + # a few corner cases. + self.recovering = True + + # Create and connect do not perform any real tasks against + # the nodes, it only sets up the object hierarchy, + # so we can run them normally + self.do_create() + self.do_connect_init() + self.do_connect_compl() + + # Manually recover nodes, to mark dependencies installed + # and clean up mutable attributes + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + metadata.NODE, + ]) + + # Assign nodes - since we're working off exeucte XML, nodes + # have specific hostnames assigned and we don't need to do + # real assignment, only find out node ids and check liveliness + self.do_resource_discovery(recover = True) + self.do_wait_nodes() + + # Pre/post configure, however, tends to set up tunnels + # Execute configuration steps only for those object + # kinds that do not have side effects + + # Do the ones without side effects, + # including nodes that need to set up home + # folders and all that + self._do_in_factory_order( + "preconfigure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + ]) + + # Tunnels require a home path that is configured + # at this step. Since we cannot run the step itself, + # we need to inject this homepath ourselves + for guid, element in self._elements.iteritems(): + if isinstance(element, self._interfaces.TunIface): + element._home_path = "tun-%s" % (guid,) + + # Manually recover tunnels, applications and + # netpipes, negating the side effects + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + metadata.NETPIPE, + Parallel(metadata.NEPIDEPENDENCY), + Parallel(metadata.NS3DEPENDENCY), + Parallel(metadata.DEPENDENCY), + Parallel(metadata.APPLICATION), + ]) + + # Tunnels are not harmed by configuration after + # recovery, and some attributes get set this way + # like external_iface + self._do_in_factory_order( + "preconfigure_function", + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # Post-do the ones without side effects + self._do_in_factory_order( + "configure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # There are no required prestart steps + # to call upon recovery, so we're done + finally: + self.recovering = True def _make_generic(self, parameters, kind): app = kind(self.plapi) + app.testbed = weakref.ref(self) # Note: there is 1-to-1 correspondence between attribute names # If that changes, this has to change as well for attr,val in parameters.iteritems(): - setattr(app, attr, val) + try: + setattr(app, attr, val) + except: + # We ignore these errors while recovering. + # Some attributes are immutable, and setting + # them is necessary (to recover the state), but + # some are not (they throw an exception). + if not self.recovering: + raise return app def _make_node(self, parameters): - return self._make_generic(parameters, self._node.Node) + node = self._make_generic(parameters, self._node.Node) + node.enable_cleanup = self.dedicatedSlice + return node def _make_node_iface(self, parameters): return self._make_generic(parameters, self._interfaces.NodeIface) @@ -538,3 +751,22 @@ class TestbedController(testbed_impl.TestbedController): def _make_ns3_dependency(self, parameters): return self._make_generic(parameters, self._app.NS3Dependency) + def _make_tun_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.TunFilter) + + def _make_class_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ClassQueueFilter) + + def _make_tos_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ToSQueueFilter) + + def _make_multicast_forwarder(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastForwarder) + + def _make_multicast_announcer(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastAnnouncer) + + def _make_multicast_router(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastRouter) + +