DeploymentConfiguration as DC, \
AttributeCategories as AC
+class Parallel(object):
+ def __init__(self, factory, maxthreads = 16):
+ self.factory = factory
+ self.maxthreads = maxthreads
+
class MetadataInfo(object):
@property
def connector_types(self):
@property
def create_order(self):
""" list of factory ids that indicates the order in which the elements
- should be instantiated.
+ should be instantiated. If wrapped within a Parallel instance, they
+ will be instantiated in parallel.
"""
raise NotImplementedError
@property
def configure_order(self):
""" list of factory ids that indicates the order in which the elements
- should be configured.
+ should be configured. If wrapped within a Parallel instance, they
+ will be configured in parallel.
"""
raise NotImplementedError
@property
def preconfigure_order(self):
""" list of factory ids that indicates the order in which the elements
- should be preconfigured.
+ should be preconfigured. If wrapped within a Parallel instance, they
+ will be configured in parallel.
Default: same as configure_order
"""
@property
def prestart_order(self):
""" list of factory ids that indicates the order in which the elements
- should be prestart-configured.
+ should be prestart-configured. If wrapped within a Parallel instance, they
+ will be configured in parallel.
Default: same as configure_order
"""
@property
def start_order(self):
""" list of factory ids that indicates the order in which the elements
- should be started.
+ should be started. If wrapped within a Parallel instance, they
+ will be started in parallel.
Default: same as configure_order
"""
# -*- coding: utf-8 -*-
from nepi.core import execute
-from nepi.core.metadata import Metadata
+from nepi.core.metadata import Metadata, Parallel
from nepi.util import validation
from nepi.util.constants import TIME_NOW, \
ApplicationStatus as AS, \
TestbedStatus as TS, \
CONNECTION_DELAY
+from nepi.util.parallel import ParallelRun
import collections
import copy
# order guids (elements) according to factory_id
for guid, factory_id in self._create.iteritems():
guids[factory_id].append(guid)
+
# configure elements following the factory_id order
for factory_id in order:
+ # Create a parallel runner if we're given a Parallel() wrapper
+ runner = None
+ if isinstance(factory_id, Parallel):
+ runner = ParallelRun(factory_id.maxthreads)
+ factory_id = factory_id.factory
+
# omit the factories that have no element to create
if factory_id not in guids:
continue
+
+ # configure action
factory = self._factories[factory_id]
if not getattr(factory, action):
continue
- for guid in guids[factory_id]:
+ def perform_action(guid):
getattr(factory, action)(self, guid)
if postaction:
postaction(self, guid)
+
+ # perform the action on all elements, in parallel if so requested
+ if runner:
+ runner.start()
+ for guid in guids[factory_id]:
+ if runner:
+ runner.put(perform_action, guid)
+ else:
+ perform_action(guid)
+ if runner:
+ runner.join()
+
+ # post hook
if poststep:
for guid in guids[factory_id]:
poststep(self, guid)
from nepi.util.graphtools import mst
from nepi.util import ipaddr2
from nepi.util import environ
+from nepi.util.parallel import ParallelRun
import sys
import os
import os.path
def shutdown(self):
for trace in self._traces.itervalues():
trace.close()
+
+ runner = ParallelRun(16)
+ runner.start()
for element in self._elements.itervalues():
# invoke cleanup hooks
if hasattr(element, 'cleanup'):
- element.cleanup()
+ runner.put(element.cleanup)
+ runner.join()
+
+ runner = ParallelRun(16)
+ runner.start()
for element in self._elements.itervalues():
# invoke destroy hooks
if hasattr(element, 'destroy'):
- element.destroy()
+ runner.put(element.destroy)
+ runner.join()
+
self._elements.clear()
self._traces.clear()
from constants import TESTBED_ID, TESTBED_VERSION
from nepi.core import metadata
+from nepi.core.metadata import Parallel
from nepi.core.attributes import Attribute
from nepi.util import tags, validation
from nepi.util.constants import ApplicationStatus as AS, \
create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
-configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
-start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
factories_info = dict({
NODE: dict({
self.workers = [ threading.Thread(target = self.worker)
for x in xrange(maxthreads) ]
+ self.delayed_exceptions = []
+
if results:
self.rvqueue = Queue.Queue()
else:
for thread in self.workers:
thread.join()
+ if self.delayed_exceptions:
+ typ,val,loc = self.delayed_exceptions[0]
+ raise typ,val,loc
+
def worker(self):
while True:
task = self.queue.get()
self.queue.task_done()
except:
traceback.print_exc(file = sys.stderr)
+ self.delayed_exceptions.apped(sys.exc_info())
def __iter__(self):
if self.rvqueue is not None:
if rv is not self._FILTERED:
yield rv
+class ParallelRun(ParallelMap):
+ def __run(self, x):
+ fn, args, kwargs = x
+ return fn(*args, **kwargs)
+
+ def __init__(self, maxthreads = None, maxqueue = None):
+ super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
+
+ def put(self, what, *args, **kwargs):
+ super(ParallelRun, self).put(self.__run, (what, args, kwargs))
+
+ def put_nowait(self, what, *args, **kwargs):
+ super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
+
def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
mapper = ParallelMap(