From: Claudio-Daniel Freire Date: Tue, 19 Jul 2011 14:46:35 +0000 (+0200) Subject: Ticket #71: inner parallelization of setup phases X-Git-Tag: nepi-3.0.0~361 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=7de95b1721fedbafccd70624cfbaa71755c5369c;p=nepi.git Ticket #71: inner parallelization of setup phases --- diff --git a/src/nepi/core/metadata.py b/src/nepi/core/metadata.py index 83b408e6..d84ccbd9 100644 --- a/src/nepi/core/metadata.py +++ b/src/nepi/core/metadata.py @@ -11,6 +11,11 @@ from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \ DeploymentConfiguration as DC, \ AttributeCategories as AC +class Parallel(object): + def __init__(self, factory, maxthreads = 16): + self.factory = factory + self.maxthreads = maxthreads + class MetadataInfo(object): @property def connector_types(self): @@ -69,21 +74,24 @@ class MetadataInfo(object): @property def create_order(self): """ list of factory ids that indicates the order in which the elements - should be instantiated. + should be instantiated. If wrapped within a Parallel instance, they + will be instantiated in parallel. """ raise NotImplementedError @property def configure_order(self): """ list of factory ids that indicates the order in which the elements - should be configured. + should be configured. If wrapped within a Parallel instance, they + will be configured in parallel. """ raise NotImplementedError @property def preconfigure_order(self): """ list of factory ids that indicates the order in which the elements - should be preconfigured. + should be preconfigured. If wrapped within a Parallel instance, they + will be configured in parallel. Default: same as configure_order """ @@ -92,7 +100,8 @@ class MetadataInfo(object): @property def prestart_order(self): """ list of factory ids that indicates the order in which the elements - should be prestart-configured. + should be prestart-configured. If wrapped within a Parallel instance, they + will be configured in parallel. Default: same as configure_order """ @@ -101,7 +110,8 @@ class MetadataInfo(object): @property def start_order(self): """ list of factory ids that indicates the order in which the elements - should be started. + should be started. If wrapped within a Parallel instance, they + will be started in parallel. Default: same as configure_order """ diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index 8b49c8e5..f496f4a6 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -2,12 +2,13 @@ # -*- coding: utf-8 -*- from nepi.core import execute -from nepi.core.metadata import Metadata +from nepi.core.metadata import Metadata, Parallel from nepi.util import validation from nepi.util.constants import TIME_NOW, \ ApplicationStatus as AS, \ TestbedStatus as TS, \ CONNECTION_DELAY +from nepi.util.parallel import ParallelRun import collections import copy @@ -216,18 +217,40 @@ class TestbedController(execute.TestbedController): # order guids (elements) according to factory_id for guid, factory_id in self._create.iteritems(): guids[factory_id].append(guid) + # configure elements following the factory_id order for factory_id in order: + # Create a parallel runner if we're given a Parallel() wrapper + runner = None + if isinstance(factory_id, Parallel): + runner = ParallelRun(factory_id.maxthreads) + factory_id = factory_id.factory + # omit the factories that have no element to create if factory_id not in guids: continue + + # configure action factory = self._factories[factory_id] if not getattr(factory, action): continue - for guid in guids[factory_id]: + def perform_action(guid): getattr(factory, action)(self, guid) if postaction: postaction(self, guid) + + # perform the action on all elements, in parallel if so requested + if runner: + runner.start() + for guid in guids[factory_id]: + if runner: + runner.put(perform_action, guid) + else: + perform_action(guid) + if runner: + runner.join() + + # post hook if poststep: for guid in guids[factory_id]: poststep(self, guid) diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 1420601e..6714f2be 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -7,6 +7,7 @@ from nepi.util.constants import TIME_NOW from nepi.util.graphtools import mst from nepi.util import ipaddr2 from nepi.util import environ +from nepi.util.parallel import ParallelRun import sys import os import os.path @@ -467,14 +468,23 @@ class TestbedController(testbed_impl.TestbedController): def shutdown(self): for trace in self._traces.itervalues(): trace.close() + + runner = ParallelRun(16) + runner.start() for element in self._elements.itervalues(): # invoke cleanup hooks if hasattr(element, 'cleanup'): - element.cleanup() + runner.put(element.cleanup) + runner.join() + + runner = ParallelRun(16) + runner.start() for element in self._elements.itervalues(): # invoke destroy hooks if hasattr(element, 'destroy'): - element.destroy() + runner.put(element.destroy) + runner.join() + self._elements.clear() self._traces.clear() diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index f2672e8f..16646a5a 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -5,6 +5,7 @@ import time from constants import TESTBED_ID, TESTBED_VERSION from nepi.core import metadata +from nepi.core.metadata import Parallel from nepi.core.attributes import Attribute from nepi.util import tags, validation from nepi.util.constants import ApplicationStatus as AS, \ @@ -937,10 +938,10 @@ traces = dict({ create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] -configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] +configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] # Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes -start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] +start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] factories_info = dict({ NODE: dict({ diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index abf22021..53ae8f1a 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -32,6 +32,8 @@ class ParallelMap(object): self.workers = [ threading.Thread(target = self.worker) for x in xrange(maxthreads) ] + self.delayed_exceptions = [] + if results: self.rvqueue = Queue.Queue() else: @@ -56,6 +58,10 @@ class ParallelMap(object): for thread in self.workers: thread.join() + if self.delayed_exceptions: + typ,val,loc = self.delayed_exceptions[0] + raise typ,val,loc + def worker(self): while True: task = self.queue.get() @@ -74,6 +80,7 @@ class ParallelMap(object): self.queue.task_done() except: traceback.print_exc(file = sys.stderr) + self.delayed_exceptions.apped(sys.exc_info()) def __iter__(self): if self.rvqueue is not None: @@ -113,6 +120,20 @@ class ParallelFilter(ParallelMap): if rv is not self._FILTERED: yield rv +class ParallelRun(ParallelMap): + def __run(self, x): + fn, args, kwargs = x + return fn(*args, **kwargs) + + def __init__(self, maxthreads = None, maxqueue = None): + super(ParallelRun, self).__init__(maxthreads, maxqueue, True) + + def put(self, what, *args, **kwargs): + super(ParallelRun, self).put(self.__run, (what, args, kwargs)) + + def put_nowait(self, what, *args, **kwargs): + super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs)) + def pmap(mapping, iterable, maxthreads = None, maxqueue = None): mapper = ParallelMap(