From: Claudio-Daniel Freire Date: Tue, 27 Sep 2011 02:02:34 +0000 (+0200) Subject: Parallelize node liveliness tests X-Git-Tag: nepi-3.0.0~213 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=9f9b0684ec1f75bfb477bec6b583433cf89ec2b0;p=nepi.git Parallelize node liveliness tests --- diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index dcb5023d..309dca09 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -9,6 +9,7 @@ from nepi.util.graphtools import mst from nepi.util import ipaddr2 from nepi.util import environ from nepi.util.parallel import ParallelRun +import threading import sys import os import os.path @@ -213,45 +214,66 @@ class TestbedController(testbed_impl.TestbedController): # Initial algo: # look for perfectly defined nodes # (ie: those with only one candidate) - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) + reserve_lock = threading.Lock() + def assignifunique(guid, node): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = self.slice_id) + + node_id = None + reserve_lock.acquire() + try: candidates -= reserved if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) reserved.add(node_id) elif not candidates: # Try again including unassigned nodes - candidates = node.find_candidates() + reserve_lock.release() + try: + candidates = node.find_candidates() + finally: + reserve_lock.acquire() candidates -= reserved if len(candidates) > 1: - continue + return if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) to_provision.add(node_id) reserved.add(node_id) elif not candidates: raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid, node.make_filter_description()) + finally: + reserve_lock.release() + + if node_id is not None: + node.assign_node_id(node_id) + + runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it + runner.start() + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node) and node._node_id is None: + runner.put(assignifunique, guid, node) + runner.sync() # Now do the backtracking search for a suitable solution # First with existing slice nodes reqs = [] nodes = [] + def genreqs(node, filter_slice_id=None): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = filter_slice_id) + candidates -= reserved + reqs.append(candidates) + nodes.append(node) for guid, node in self._elements.iteritems(): if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) - candidates -= reserved - reqs.append(candidates) - nodes.append(node) + runner.put(genreqs, node, self.slice_id) + runner.sync() if nodes and reqs: if recover: @@ -272,16 +294,15 @@ class TestbedController(testbed_impl.TestbedController): # Failed, try again with all nodes reqs = [] for node in nodes: - candidates = node.find_candidates() - candidates -= reserved - reqs.append(candidates) - + runner.put(genreqs, node) + runner.sync() solution = resourcealloc.alloc(reqs, sample=pickbest) to_provision.update(solution) # Do assign nodes for node, node_id in zip(nodes, solution): - node.assign_node_id(node_id) + runner.put(node.assign_node_id, node_id) + runner.join() def do_provisioning(self): if self._to_provision: @@ -304,12 +325,12 @@ class TestbedController(testbed_impl.TestbedController): # Show the magic self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname) - + try: - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node): - self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname) - + runner = ParallelRun(maxqueue=1) + abort = [] + def waitforit(guid, node): + try: node.wait_provisioning( (20*60 if node._node_id in self._just_provisioned else 60) ) @@ -318,9 +339,21 @@ class TestbedController(testbed_impl.TestbedController): # Prepare dependency installer now node.prepare_dependencies() + except: + abort.append(None) + raise + + for guid, node in self._elements.iteritems(): + if abort: + break + if isinstance(node, self._node.Node): + self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname) + runner.put(waitforit, guid, node) + runner.join() + except self._node.UnresponsiveNodeError: # Uh... - self._logger.warn("UNRESPONSIVE Node %s", node.hostname) + self._logger.warn("UNRESPONSIVE Nodes") # Mark all dead nodes (which are unresponsive) on the blacklist # and re-raise