X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fexecute.py;h=69abd71c7f720f0671423ad9908cc9ccd5f2c02f;hb=e303e39c78476fd140bf615b1eb631e44c332b80;hp=9cd1299fef16458f80364448de7c30999c933666;hpb=836ebc94bed105ade229ca5304a2af356a245df6;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 9cd1299f..69abd71c 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -9,6 +9,7 @@ from nepi.util.graphtools import mst from nepi.util import ipaddr2 from nepi.util import environ from nepi.util.parallel import ParallelRun +import threading import sys import os import os.path @@ -25,6 +26,7 @@ import random import shutil import logging import metadata +import weakref class TempKeyError(Exception): pass @@ -36,10 +38,11 @@ class TestbedController(testbed_impl.TestbedController): self.slicename = None self._traces = dict() - import node, interfaces, application + import node, interfaces, application, multicast self._node = node self._interfaces = interfaces self._app = application + self._multicast = multicast self._blacklist = set() self._just_provisioned = set() @@ -47,6 +50,8 @@ class TestbedController(testbed_impl.TestbedController): self._load_blacklist() self._logger = logging.getLogger('nepi.testbeds.planetlab') + + self.recovering = False @property def home_directory(self): @@ -146,6 +151,17 @@ class TestbedController(testbed_impl.TestbedController): self.dedicatedSlice = self._attributes.\ get_attribute_value("dedicatedSlice") + if not self.slicename: + raise RuntimeError, "Slice not set" + if not self.authUser: + raise RuntimeError, "PlanetLab account username not set" + if not self.authString: + raise RuntimeError, "PlanetLab account passphrase not set" + if not self.sliceSSHKey: + raise RuntimeError, "PlanetLab account key not specified" + if not os.path.exists(self.sliceSSHKey): + raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,) + self._logger.setLevel(getattr(logging,self.logLevel)) super(TestbedController, self).do_setup() @@ -198,66 +214,95 @@ class TestbedController(testbed_impl.TestbedController): # Initial algo: # look for perfectly defined nodes # (ie: those with only one candidate) - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) + reserve_lock = threading.Lock() + def assignifunique(guid, node): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = self.slice_id) + + node_id = None + reserve_lock.acquire() + try: candidates -= reserved if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) reserved.add(node_id) elif not candidates: # Try again including unassigned nodes - candidates = node.find_candidates() + reserve_lock.release() + try: + candidates = node.find_candidates() + finally: + reserve_lock.acquire() candidates -= reserved if len(candidates) > 1: - continue + return if len(candidates) == 1: node_id = iter(candidates).next() - node.assign_node_id(node_id) to_provision.add(node_id) reserved.add(node_id) elif not candidates: raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid, node.make_filter_description()) + finally: + reserve_lock.release() + + if node_id is not None: + node.assign_node_id(node_id) + + runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it + runner.start() + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node) and node._node_id is None: + runner.put(assignifunique, guid, node) + runner.sync() # Now do the backtracking search for a suitable solution # First with existing slice nodes reqs = [] nodes = [] + def genreqs(node, filter_slice_id=None): + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = filter_slice_id) + candidates -= reserved + reqs.append(candidates) + nodes.append(node) for guid, node in self._elements.iteritems(): if isinstance(node, self._node.Node) and node._node_id is None: - # Try existing nodes first - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = self.slice_id) - candidates -= reserved - reqs.append(candidates) - nodes.append(node) + runner.put(genreqs, node, self.slice_id) + runner.sync() if nodes and reqs: if recover: raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,) + + def pickbest(fullset, nreq, node=nodes[0]): + if len(fullset) > nreq: + fullset = zip(node.rate_nodes(fullset),fullset) + fullset.sort(reverse=True) + del fullset[nreq:] + return set(map(operator.itemgetter(1),fullset)) + else: + return fullset try: - solution = resourcealloc.alloc(reqs) + solution = resourcealloc.alloc(reqs, sample=pickbest) except resourcealloc.ResourceAllocationError: # Failed, try again with all nodes reqs = [] for node in nodes: - candidates = node.find_candidates() - candidates -= reserved - reqs.append(candidates) - - solution = resourcealloc.alloc(reqs) + runner.put(genreqs, node) + runner.sync() + solution = resourcealloc.alloc(reqs, sample=pickbest) to_provision.update(solution) # Do assign nodes for node, node_id in zip(nodes, solution): - node.assign_node_id(node_id) + runner.put(node.assign_node_id, node_id) + runner.join() def do_provisioning(self): if self._to_provision: @@ -280,12 +325,12 @@ class TestbedController(testbed_impl.TestbedController): # Show the magic self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname) - + try: - for guid, node in self._elements.iteritems(): - if isinstance(node, self._node.Node): - self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname) - + runner = ParallelRun(maxthreads=64, maxqueue=1) + abort = [] + def waitforit(guid, node): + try: node.wait_provisioning( (20*60 if node._node_id in self._just_provisioned else 60) ) @@ -294,9 +339,21 @@ class TestbedController(testbed_impl.TestbedController): # Prepare dependency installer now node.prepare_dependencies() + except: + abort.append(None) + raise + + for guid, node in self._elements.iteritems(): + if abort: + break + if isinstance(node, self._node.Node): + self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname) + runner.put(waitforit, guid, node) + runner.join() + except self._node.UnresponsiveNodeError: # Uh... - self._logger.warn("UNRESPONSIVE Node %s", node.hostname) + self._logger.warn("UNRESPONSIVE Nodes") # Mark all dead nodes (which are unresponsive) on the blacklist # and re-raise @@ -330,6 +387,7 @@ class TestbedController(testbed_impl.TestbedController): app.node.architecture, app.node.operatingSystem, app.node.pl_distro, + app.__class__, ) depgroups = collections.defaultdict(list) @@ -412,9 +470,7 @@ class TestbedController(testbed_impl.TestbedController): suffix = ".pub") # Create secure 256-bits temporary passphrase - passphrase = ''.join(map(chr,[rng.randint(0,255) - for rng in (random.SystemRandom(),) - for i in xrange(32)] )).encode("hex") + passphrase = os.urandom(32).encode("hex") # Copy keys oprk = open(self.sliceSSHKey, "rb") @@ -469,7 +525,15 @@ class TestbedController(testbed_impl.TestbedController): # TODO: take on account schedule time for the task element = self._elements[guid] if element: - setattr(element, name, value) + try: + setattr(element, name, value) + except: + # We ignore these errors while recovering. + # Some attributes are immutable, and setting + # them is necessary (to recover the state), but + # some are not (they throw an exception). + if not self.recovering: + raise if hasattr(element, 'refresh'): # invoke attribute refresh hook @@ -526,10 +590,10 @@ class TestbedController(testbed_impl.TestbedController): self._traces.clear() def trace(self, guid, trace_id, attribute='value'): - app = self._elements[guid] + elem = self._elements[guid] if attribute == 'value': - path = app.sync_trace(self.home_directory, trace_id) + path = elem.sync_trace(self.home_directory, trace_id) if path: fd = open(path, "r") content = fd.read() @@ -537,7 +601,9 @@ class TestbedController(testbed_impl.TestbedController): else: content = None elif attribute == 'path': - content = app.remote_trace_path(trace_id) + content = elem.remote_trace_path(trace_id) + elif attribute == 'name': + content = elem.remote_trace_name(trace_id) else: content = None return content @@ -546,95 +612,110 @@ class TestbedController(testbed_impl.TestbedController): self._traces[trace_id] = trace def recover(self): - # Create and connect do not perform any real tasks against - # the nodes, it only sets up the object hierarchy, - # so we can run them normally - self.do_create() - self.do_connect_init() - self.do_connect_compl() - - # Manually recover nodes, to mark dependencies installed - # and clean up mutable attributes - self._do_in_factory_order( - lambda self, guid : self._elements[guid].recover(), - [ - metadata.NODE, - ]) - - # Assign nodes - since we're working off exeucte XML, nodes - # have specific hostnames assigned and we don't need to do - # real assignment, only find out node ids and check liveliness - self.do_resource_discovery(recover = True) - self.do_wait_nodes() - - # Pre/post configure, however, tends to set up tunnels - # Execute configuration steps only for those object - # kinds that do not have side effects - - # Do the ones without side effects, - # including nodes that need to set up home - # folders and all that - self._do_in_factory_order( - "preconfigure_function", - [ - metadata.INTERNET, - Parallel(metadata.NODE), - metadata.NODEIFACE, - ]) - - # Tunnels require a home path that is configured - # at this step. Since we cannot run the step itself, - # we need to inject this homepath ourselves - for guid, element in self._elements.iteritems(): - if isinstance(element, self._interfaces.TunIface): - element._home_path = "tun-%s" % (guid,) - - # Manually recover tunnels, applications and - # netpipes, negating the side effects - self._do_in_factory_order( - lambda self, guid : self._elements[guid].recover(), - [ - Parallel(metadata.TAPIFACE), - Parallel(metadata.TUNIFACE), - metadata.NETPIPE, - Parallel(metadata.NEPIDEPENDENCY), - Parallel(metadata.NS3DEPENDENCY), - Parallel(metadata.DEPENDENCY), - Parallel(metadata.APPLICATION), - ]) - - # Tunnels are not harmed by configuration after - # recovery, and some attributes get set this way - # like external_iface - self._do_in_factory_order( - "preconfigure_function", - [ - Parallel(metadata.TAPIFACE), - Parallel(metadata.TUNIFACE), - ]) - - # Post-do the ones without side effects - self._do_in_factory_order( - "configure_function", - [ - metadata.INTERNET, - Parallel(metadata.NODE), - metadata.NODEIFACE, - Parallel(metadata.TAPIFACE), - Parallel(metadata.TUNIFACE), - ]) - - # There are no required prestart steps - # to call upon recovery, so we're done - + try: + # An internal flag, so we know to behave differently in + # a few corner cases. + self.recovering = True + + # Create and connect do not perform any real tasks against + # the nodes, it only sets up the object hierarchy, + # so we can run them normally + self.do_create() + self.do_connect_init() + self.do_connect_compl() + + # Manually recover nodes, to mark dependencies installed + # and clean up mutable attributes + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + metadata.NODE, + ]) + + # Assign nodes - since we're working off exeucte XML, nodes + # have specific hostnames assigned and we don't need to do + # real assignment, only find out node ids and check liveliness + self.do_resource_discovery(recover = True) + self.do_wait_nodes() + + # Pre/post configure, however, tends to set up tunnels + # Execute configuration steps only for those object + # kinds that do not have side effects + + # Do the ones without side effects, + # including nodes that need to set up home + # folders and all that + self._do_in_factory_order( + "preconfigure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + ]) + + # Tunnels require a home path that is configured + # at this step. Since we cannot run the step itself, + # we need to inject this homepath ourselves + for guid, element in self._elements.iteritems(): + if isinstance(element, self._interfaces.TunIface): + element._home_path = "tun-%s" % (guid,) + + # Manually recover tunnels, applications and + # netpipes, negating the side effects + self._do_in_factory_order( + lambda self, guid : self._elements[guid].recover(), + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + metadata.NETPIPE, + Parallel(metadata.NEPIDEPENDENCY), + Parallel(metadata.NS3DEPENDENCY), + Parallel(metadata.DEPENDENCY), + Parallel(metadata.APPLICATION), + ]) + + # Tunnels are not harmed by configuration after + # recovery, and some attributes get set this way + # like external_iface + self._do_in_factory_order( + "preconfigure_function", + [ + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # Post-do the ones without side effects + self._do_in_factory_order( + "configure_function", + [ + metadata.INTERNET, + Parallel(metadata.NODE), + metadata.NODEIFACE, + Parallel(metadata.TAPIFACE), + Parallel(metadata.TUNIFACE), + ]) + + # There are no required prestart steps + # to call upon recovery, so we're done + finally: + self.recovering = True def _make_generic(self, parameters, kind): app = kind(self.plapi) + app.testbed = weakref.ref(self) # Note: there is 1-to-1 correspondence between attribute names # If that changes, this has to change as well for attr,val in parameters.iteritems(): - setattr(app, attr, val) + try: + setattr(app, attr, val) + except: + # We ignore these errors while recovering. + # Some attributes are immutable, and setting + # them is necessary (to recover the state), but + # some are not (they throw an exception). + if not self.recovering: + raise return app @@ -670,3 +751,22 @@ class TestbedController(testbed_impl.TestbedController): def _make_ns3_dependency(self, parameters): return self._make_generic(parameters, self._app.NS3Dependency) + def _make_tun_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.TunFilter) + + def _make_class_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ClassQueueFilter) + + def _make_tos_queue_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.ToSQueueFilter) + + def _make_multicast_forwarder(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastForwarder) + + def _make_multicast_announcer(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastAnnouncer) + + def _make_multicast_router(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastRouter) + +