From 386498468dfb01f71b0efbbe0c208819f18f82ec Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 7 Oct 2013 08:59:47 +0200 Subject: [PATCH] ec_shutdown --- src/nepi/execution/ec.py | 262 +++++++++++------- src/nepi/execution/resource.py | 2 +- src/nepi/resources/linux/application.py | 26 +- .../resources/linux/ccn/ccnapplication.py | 2 +- src/nepi/resources/linux/ccn/ccncontent.py | 6 +- src/nepi/resources/linux/ccn/ccnd.py | 7 +- src/nepi/resources/linux/ccn/ccnr.py | 5 +- src/nepi/resources/linux/ccn/fibentry.py | 7 +- src/nepi/resources/linux/interface.py | 2 +- src/nepi/resources/linux/node.py | 3 +- src/nepi/resources/linux/udptest.py | 1 - src/nepi/resources/linux/udptunnel.py | 8 +- src/nepi/resources/omf/application.py | 2 +- src/nepi/resources/omf/channel.py | 4 +- src/nepi/resources/omf/interface.py | 40 ++- src/nepi/resources/omf/node.py | 13 +- src/nepi/resources/planetlab/node.py | 1 - src/nepi/resources/planetlab/tap.py | 9 +- src/nepi/util/parallel.py | 81 +----- 19 files changed, 218 insertions(+), 263 deletions(-) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index e59e626f..30dd7a40 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -36,6 +36,51 @@ import sys import time import threading +class FailurePolicy(object): + """ Defines how to respond to experiment failures + """ + IGNORE_RM_FAILURE = 1 + ABORT_ON_RM_FAILURE = 2 + +class FailureLevel(object): + """ Describe the system failure state + """ + OK = 1 + RM_FAILURE = 2 + TASK_FAILURE = 3 + EC_FAILURE = 4 + +class FailureManager(object): + """ The FailureManager is responsible for handling errors, + and deciding whether an experiment should be aborted + """ + + def __init__(self, failure_policy = None): + self._failure_level = FailureLevel.OK + self._failure_policy = failure_policy or \ + FailurePolicy.ABORT_ON_RM_FAILURE + + @property + def abort(self): + if self._failure_level == FailureLevel.EC_FAILURE: + return True + + if self._failure_level in [FailureLevel.TASK_FAILURE, + FailureLevel.RM_FAILURE] and \ + self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE: + return True + + return False + + def set_rm_failure(self): + self._failure_level = FailureLevel.RM_FAILURE + + def set_task_failure(self): + self._failure_level = FailureLevel.TASK_FAILURE + + def set_ec_failure(self): + self._failure_level = FailureLevel.EC_FAILURE + class ECState(object): """ State of the Experiment Controller @@ -49,19 +94,23 @@ class ExperimentController(object): .. class:: Class Args : :param exp_id: Human readable identifier for the experiment scenario. - It will be used in the name of the directory - where experiment related information is stored :type exp_id: str .. note:: - An experiment, or scenario, is defined by a concrete use, behavior, - configuration and interconnection of resources that describe a single - experiment case (We call this the experiment description). - A same experiment (scenario) can be run many times. + An experiment, or scenario, is defined by a concrete set of resources, + behavior, configuration and interconnection of those resources. + The Experiment Description (ED) is a detailed representation of a + single experiment. It contains all the necessary information to + allow repeating the experiment. NEPI allows to describe + experiments by registering components (resources), configuring them + and interconnecting them. + + A same experiment (scenario) can be executed many times, generating + different results. We call an experiment execution (instance) a 'run'. - The ExperimentController (EC), is the entity responsible for - managing an experiment instance (run). The same scenario can be + The ExperimentController (EC), is the entity responsible of + managing an experiment run. The same scenario can be recreated (and re-run) by instantiating an EC and recreating the same experiment description. @@ -75,15 +124,15 @@ class ExperimentController(object): single resource. ResourceManagers are specific to a resource type (i.e. An RM to control a Linux application will not be the same as the RM used to control a ns-3 simulation). - In order for a new type of resource to be supported in NEPI - a new RM must be implemented. NEPI already provides different + To support a new type of resource in NEPI, a new RM must be + implemented. NEPI already provides a variety of RMs to control basic resources, and new can be extended from the existing ones. Through the EC interface the user can create ResourceManagers (RMs), - configure them and interconnect them, in order to describe an experiment. + configure them and interconnect them, to describe an experiment. Describing an experiment through the EC does not run the experiment. - Only when the 'deploy()' method is invoked on the EC, will the EC take + Only when the 'deploy()' method is invoked on the EC, the EC will take actions to transform the 'described' experiment into a 'running' experiment. While the experiment is running, it is possible to continue to @@ -97,8 +146,8 @@ class ExperimentController(object): However, since a same 'experiment' can be run many times, the experiment id is not enough to identify an experiment instance (run). For this reason, the ExperimentController has two identifier, the - exp_id, which can be re-used by different ExperimentController instances, - and the run_id, which unique to a ExperimentController instance, and + exp_id, which can be re-used in different ExperimentController, + and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. """ @@ -108,12 +157,16 @@ class ExperimentController(object): # Logging self._logger = logging.getLogger("ExperimentController") - # Run identifier. It identifies a concrete instance (run) of an experiment. - # Since a same experiment (same configuration) can be run many times, - # this id permits to identify concrete exoeriment run + # Run identifier. It identifies a concrete execution instance (run) + # of an experiment. + # Since a same experiment (same configuration) can be executed many + # times, this run_id permits to separate result files generated on + # different experiment executions self._run_id = tsformat() # Experiment identifier. Usually assigned by the user + # Identifies the experiment scenario (i.e. configuration, + # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') # generator of globally unique ids @@ -128,7 +181,7 @@ class ExperimentController(object): # Tasks self._tasks = dict() - # RM groups + # RM groups (for deployment) self._groups = dict() # generator of globally unique id for groups @@ -140,6 +193,12 @@ class ExperimentController(object): self._thread.setDaemon(True) self._thread.start() + # Flag to stop processing thread + self._stop = False + + # Entity in charge of managing system failures + self._fm = FailureManager() + # EC state self._state = ECState.RUNNING @@ -172,69 +231,86 @@ class ExperimentController(object): return self._run_id @property - def finished(self): - """ Put the state of the Experiment Controller into a final state : - Either TERMINATED or FAILED + def abort(self): + return self._fm.abort - """ - return self.ecstate in [ECState.FAILED, ECState.TERMINATED] + def set_rm_failure(self): + self._fm.set_rm_failure() def wait_finished(self, guids): - """ Blocking method that wait until all the RM from the 'guid' list - reached the state FINISHED ( or STOPPED, FAILED or RELEASED ) + """ Blocking method that wait until all RMs in the 'guid' list + reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or + RELEASED ) or until a System Failure occurs (e.g. Task Failure) :param guids: List of guids :type guids: list + """ - return self.wait(guids) + + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.STOPPED, + quit = quit) def wait_started(self, guids): - """ Blocking method that wait until all the RM from the 'guid' list - reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED) + """ Blocking method that wait until all RMs in the 'guid' list + reach a state >= STARTED or until a System Failure occurs + (e.g. Task Failure) :param guids: List of guids :type guids: list """ - return self.wait(guids, state = ResourceState.STARTED) + + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.STARTED, + quit = quit) def wait_released(self, guids): - """ Blocking method that wait until all the RM from the 'guid' list - reached the state RELEASED (or FAILED) + """ Blocking method that wait until all RMs in the 'guid' list + reach a state = RELEASED or until the EC fails :param guids: List of guids :type guids: list """ - # TODO: solve state concurrency BUG and !!!! - # correct waited release state to state = ResourceState.FAILED) - return self.wait(guids, state = ResourceState.FINISHED) + + def quit(): + return self._state == ECState.FAILED + + return self.wait(guids, state = ResourceState.RELEASED, + quit = quit) def wait_deployed(self, guids): - """ Blocking method that wait until all the RM from the 'guid' list - reached the state READY (or any higher state) + """ Blocking method that wait until all RMs in the 'guid' list + reach a state >= READY or until a System Failure occurs + (e.g. Task Failure) :param guids: List of guids :type guids: list """ - return self.wait(guids, state = ResourceState.READY) - def wait(self, guids, state = ResourceState.STOPPED): - """ Blocking method that waits until all the RM from the 'guid' list - reached state 'state' or until a failure occurs - + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.READY, + quit = quit) + + def wait(self, guids, state, quit): + """ Blocking method that wait until all RMs in the 'guid' list + reach a state >= 'state' or until quit yileds True + :param guids: List of guids :type guids: list """ if isinstance(guids, int): guids = [guids] - # we randomly alter the order of the guids to avoid ordering - # dependencies (e.g. LinuxApplication RMs runing on the same - # linux host will be synchronized by the LinuxNode SSH lock) - random.shuffle(guids) - while True: - # If no more guids to wait for or an error occured, then exit - if len(guids) == 0 or self.finished: + # If there are no more guids to wait for + # or the quit function returns True, exit the loop + if len(guids) == 0 or quit(): break # If a guid reached one of the target states, remove it from list @@ -245,26 +321,11 @@ class ExperimentController(object): guids.remove(guid) else: # Debug... - self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid, - self.state(guid, hr = True), state)) - - # Take the opportunity to 'refresh' the states of the RMs. - # Query only the first up to N guids (not to overwhelm - # the local machine) - n = 100 - lim = n if len(guids) > n else ( len(guids) -1 ) - nguids = guids[0: lim] - - # schedule state request for all guids (take advantage of - # scheduler multi threading). - for guid in nguids: - callback = functools.partial(self.state, guid) - self.schedule("0s", callback) - - # If the guid is not in one of the target states, wait and - # continue quering. We keep the sleep big to decrease the - # number of RM state queries - time.sleep(4) + hrstate = ResourceState2str.get(rstate) + self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( + guid, rstate, state)) + + time.sleep(0.5) def get_task(self, tid): """ Get a specific task @@ -574,7 +635,7 @@ class ExperimentController(object): self.logger.debug(" ------- DEPLOY START ------ ") if not guids: - # If no guids list was indicated, all 'NEW' RMs will be deployed + # If no guids list was passed, all 'NEW' RMs will be deployed guids = [] for guid in self.resources: if self.state(guid) == ResourceState.NEW: @@ -584,6 +645,7 @@ class ExperimentController(object): guids = [guids] # Create deployment group + # New guids can be added to a same deployment group later on new_group = False if not group: new_group = True @@ -594,20 +656,9 @@ class ExperimentController(object): self._groups[group].extend(guids) - # Before starting deployment we disorder the guids list with the - # purpose of speeding up the whole deployment process. - # It is likely that the user inserted in the 'guids' list closely - # resources one after another (e.g. all applications - # connected to the same node can likely appear one after another). - # This can originate a slow down in the deployment since the N - # threads the parallel runner uses to processes tasks may all - # be taken up by the same family of resources waiting for the - # same conditions (e.g. LinuxApplications running on a same - # node share a single lock, so they will tend to be serialized). - # If we disorder the guids list, this problem can be mitigated. - random.shuffle(guids) - def wait_all_and_start(group): + # Function that checks if all resources are READY + # before scheduling a start_with_conditions for each RM reschedule = False # Get all guids in group @@ -630,9 +681,9 @@ class ExperimentController(object): if wait_all_ready and new_group: # Schedule a function to check that all resources are # READY, and only then schedule the start. - # This aimes at reducing the number of tasks looping in the + # This aims at reducing the number of tasks looping in the # scheduler. - # Intead of having N start tasks, we will have only one for + # Instead of having many start tasks, we will have only one for # the whole group. callback = functools.partial(wait_all_and_start, group) self.schedule("1s", callback) @@ -672,11 +723,17 @@ class ExperimentController(object): Releases all the resources and stops task processing thread """ + # TODO: Clean the parallel runner!! STOP all ongoing tasks + #### + self.release() # Mark the EC state as TERMINATED self._state = ECState.TERMINATED + # Stop processing thread + self._stop = True + # Notify condition to wake up the processing thread self._notify() @@ -754,8 +811,8 @@ class ExperimentController(object): runner = ParallelRun(maxthreads = nthreads) runner.start() - try: - while not self.finished: + while not self._stop: + try: self._cond.acquire() task = self._scheduler.next() @@ -784,16 +841,20 @@ class ExperimentController(object): if task: # Process tasks in parallel runner.put(self._execute, task) - except: - import traceback - err = traceback.format_exc() - self.logger.error("Error while processing tasks in the EC: %s" % err) + except: + import traceback + err = traceback.format_exc() + self.logger.error("Error while processing tasks in the EC: %s" % err) - self._state = ECState.FAILED - finally: - self.logger.debug("Exiting the task processing loop ... ") - runner.sync() - runner.destroy() + # Set the EC to FAILED state + self._state = ECState.FAILED + + # Set the FailureManager failure level + self._fm.set_ec_failure() + + self.logger.debug("Exiting the task processing loop ... ") + runner.sync() + runner.destroy() def _execute(self, task): """ Executes a single task. @@ -821,15 +882,8 @@ class ExperimentController(object): self.logger.error("Error occurred while executing task: %s" % err) - # Set the EC to FAILED state (this will force to exit the task - # processing thread) - self._state = ECState.FAILED - - # Notify condition to wake up the processing thread - self._notify() - - # Propage error to the ParallelRunner - raise + # Set the FailureManager failure level + self._fm.set_task_failure() def _notify(self): """ Awakes the processing thread in case it is blocked waiting diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index a9087aec..18edeb07 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -403,6 +403,7 @@ class ResourceManager(Logger): def fail(self): self.set_failed() + self.ec.set_rm_failure() def set(self, name, value): """ Set the value of the attribute @@ -741,7 +742,6 @@ class ResourceManager(Logger): self.debug("----- STARTING ---- ") self.deploy() - def connect(self, guid): """ Performs actions that need to be taken upon associating RMs. This method should be redefined when necessary in child classes. diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 4f4b64ea..763106ce 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -301,7 +301,7 @@ class LinuxApplication(ResourceManager): # Since provisioning takes a long time, before # each step we check that the EC is still for step in steps: - if self.ec.finished: + if self.ec.abort: raise RuntimeError, "EC finished" ret = step() @@ -484,7 +484,7 @@ class LinuxApplication(ResourceManager): self.provision() except: self.fail() - raise + return super(LinuxApplication, self).deploy() @@ -499,10 +499,14 @@ class LinuxApplication(ResourceManager): self.set_finished() else: - if self.in_foreground: - self._run_in_foreground() - else: - self._run_in_background() + try: + if self.in_foreground: + self._run_in_foreground() + else: + self._run_in_background() + except: + self.fail() + return super(LinuxApplication, self).start() @@ -530,7 +534,6 @@ class LinuxApplication(ResourceManager): blocking = False) if self._proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg @@ -560,7 +563,6 @@ class LinuxApplication(ResourceManager): msg = " Failed to start command '%s' " % command if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg @@ -577,7 +579,6 @@ class LinuxApplication(ResourceManager): # Out is what was written in the stderr file if err: - self.fail() msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg @@ -609,8 +610,8 @@ class LinuxApplication(ResourceManager): msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) self.fail() + return - if self.state == ResourceState.STARTED: super(LinuxApplication, self).stop() def release(self): @@ -622,10 +623,7 @@ class LinuxApplication(ResourceManager): self.stop() - if self.state != ResourceState.FAILED: - self.info("Resource released") - - super(LinuxApplication, self).release() + super(LinuxApplication, self).release() @property def state(self): diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py index 3d8943ab..46a3cc49 100644 --- a/src/nepi/resources/linux/ccn/ccnapplication.py +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -61,7 +61,7 @@ class LinuxCCNApplication(LinuxApplication): self.provision() except: self.fail() - raise + return self.debug("----- READY ---- ") self.set_ready() diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index cae55b58..1fa93ccf 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -98,8 +98,8 @@ class LinuxCCNContent(LinuxApplication): self.provision() except: self.fail() - raise - + return + self.debug("----- READY ---- ") self.set_ready() @@ -121,7 +121,6 @@ class LinuxCCNContent(LinuxApplication): env, blocking = True) if proc.poll(): - self.fail() msg = "Failed to execute command" self.error(msg, out, err) raise RuntimeError, msg @@ -136,7 +135,6 @@ class LinuxCCNContent(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) sef.fail() - raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 4fdb0ce6..71eb12ac 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -178,8 +178,8 @@ class LinuxCCND(LinuxApplication): self.provision() except: self.fail() - raise - + return + self.debug("----- READY ---- ") self.set_ready() @@ -211,8 +211,7 @@ class LinuxCCND(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.set_failed() - raise RuntimeError, msg + self.fail() def stop(self): command = self.get('command') or '' diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 378f93c6..46c3f3b6 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -222,8 +222,8 @@ class LinuxCCNR(LinuxApplication): self.provision() except: self.fail() - raise - + return + self.debug("----- READY ---- ") self.set_ready() @@ -265,7 +265,6 @@ class LinuxCCNR(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index aad03caf..48c2d6de 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -136,8 +136,8 @@ class LinuxFIBEntry(LinuxApplication): self.configure() except: self.fail() - raise - + return + self.debug("----- READY ---- ") self.set_ready() @@ -161,7 +161,6 @@ class LinuxFIBEntry(LinuxApplication): if proc.poll(): msg = "Failed to execute command" self.error(msg, out, err) - self.fail() raise RuntimeError, msg def configure(self): @@ -205,7 +204,6 @@ class LinuxFIBEntry(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg def stop(self): command = self.get('command') @@ -224,7 +222,6 @@ class LinuxFIBEntry(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py index 26a1549e..a170f412 100644 --- a/src/nepi/resources/linux/interface.py +++ b/src/nepi/resources/linux/interface.py @@ -243,7 +243,7 @@ class LinuxInterface(ResourceManager): self.provision() except: self.fail() - raise + return super(LinuxInterface, self).deploy() diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index b50d7fad..9cab4cae 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -326,7 +326,6 @@ class LinuxNode(ResourceManager): def provision(self): # check if host is alive if not self.is_alive(): - self.fail() msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) @@ -361,7 +360,7 @@ class LinuxNode(ResourceManager): self.provision() except: self.fail() - raise + return # Node needs to wait until all associated interfaces are # ready before it can finalize deployment diff --git a/src/nepi/resources/linux/udptest.py b/src/nepi/resources/linux/udptest.py index 779b8c1e..1b7ac9ca 100644 --- a/src/nepi/resources/linux/udptest.py +++ b/src/nepi/resources/linux/udptest.py @@ -259,7 +259,6 @@ class LinuxUdpTest(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg else: super(LinuxUdpTest, self).start() diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 82f01393..4dae96c1 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -141,7 +141,6 @@ class UdpTunnel(LinuxApplication): msg = " Failed to connect endpoints " if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg @@ -154,7 +153,6 @@ class UdpTunnel(LinuxApplication): (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint)) # Out is what was written in the stderr file if err: - self.fail() msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg @@ -202,7 +200,7 @@ class UdpTunnel(LinuxApplication): self.provision() except: self.fail() - raise + return self.debug("----- READY ---- ") self.set_ready() @@ -217,7 +215,6 @@ class UdpTunnel(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg def stop(self): """ Stops application execution @@ -238,8 +235,8 @@ class UdpTunnel(LinuxApplication): msg = " Failed to STOP tunnel" self.error(msg, err1, err2) self.fail() + return - if self.state == ResourceState.STARTED: self.set_stopped() @property @@ -311,7 +308,6 @@ class UdpTunnel(LinuxApplication): else: msg = "Couldn't retrieve %s" % filename self.error(msg, out, err) - self.fail() raise RuntimeError, msg return result diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index 673f8100..0bc0c622 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -195,7 +195,7 @@ class OMFApplication(ResourceManager): msg = "Credentials were not initialzed. XMPP Connections impossible" self.error(msg) self.fail() - #raise + return super(OMFApplication, self).stop() diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index b51cd895..ccd67be0 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -163,7 +163,7 @@ class OMFChannel(ResourceManager): msg = "Channel's value is not initialized" self.error(msg) self.fail() - raise + return self._nodes_guid = self._get_target(self._connections) if self._nodes_guid == "reschedule" : @@ -180,7 +180,7 @@ class OMFChannel(ResourceManager): msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) self.fail() - raise + return super(OMFChannel, self).deploy() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index a3b9c3a3..1fb17626 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -102,7 +102,6 @@ class OMFWifiInterface(ResourceManager): msg = "Connection between %s %s and %s %s accepted" % \ (self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) - return True msg = "Connection between %s %s and %s %s refused" % \ @@ -123,7 +122,6 @@ class OMFWifiInterface(ResourceManager): if rm_list: return rm_list[0] return None - def configure_iface(self): """ Configure the interface without the ip @@ -139,13 +137,11 @@ class OMFWifiInterface(ResourceManager): self._omf_api.configure(self.node.get('hostname'), attrname, attrval) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" - self.debug(msg) - #raise + self.error(msg) + raise super(OMFWifiInterface, self).provision() - return True def configure_ip(self): """ Configure the ip of the interface @@ -162,23 +158,21 @@ class OMFWifiInterface(ResourceManager): attrval) except AttributeError: msg = "Credentials are not initialzed. XMPP Connections impossible" - self.debug(msg) - self.fail() - #raise + self.error(msg) + raise - return True def deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface. It becomes DEPLOYED after sending messages to configure the interface """ - if not self._omf_api : + if not self._omf_api: self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.ec.exp_id) - if not self._omf_api : + if not self._omf_api: msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) self.fail() @@ -189,13 +183,13 @@ class OMFWifiInterface(ResourceManager): msg = "Interface's variable are not initialized" self.error(msg) self.fail() - return False + return if not self.node.get('hostname') : msg = "The channel is connected with an undefined node" self.error(msg) self.fail() - return False + return # Just for information self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \ @@ -203,23 +197,21 @@ class OMFWifiInterface(ResourceManager): self.get('essid') + " : " + self.get('ip')) # Check if the node is already deployed - chk1 = True - if self.state < ResourceState.PROVISIONED: - chk1 = self.configure_iface() - if chk1: - chk2 = self.configure_ip() + try: + if self.state < ResourceState.PROVISIONED: + if self.configure_iface(): + self.configure_ip() + except: + self.fail() + return - if not (chk1 and chk2) : - return False - super(OMFWifiInterface, self).deploy() - return True def release(self): """ Clean the RM at the end of the experiment and release the API """ - if self._omf_api : + if self._omf_api: OMFAPIFactory.release_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.ec.exp_id) diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index 1421078d..570afd07 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -130,22 +130,22 @@ class OMFNode(ResourceManager): It becomes DEPLOYED after sending messages to enroll the node """ - if not self._omf_api : + if not self._omf_api: self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.ec.exp_id) - if not self._omf_api : + if not self._omf_api: msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) self.fail() return - if not self.get('hostname') : + if not self.get('hostname'): msg = "Hostname's value is not initialized" self.error(msg) self.fail() - return False + return try: self._omf_api.enroll_host(self.get('hostname')) @@ -153,7 +153,7 @@ class OMFNode(ResourceManager): msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) self.fail() - #raise AttributeError, msg + return super(OMFNode, self).deploy() @@ -174,7 +174,6 @@ class OMFNode(ResourceManager): It becomes STARTED as soon as this method starts. """ - super(OMFNode, self).start() def stop(self): @@ -188,7 +187,7 @@ class OMFNode(ResourceManager): """Clean the RM at the end of the experiment """ - if self._omf_api : + if self._omf_api: self._omf_api.release(self.get('hostname')) OMFAPIFactory.release_api(self.get('xmppSlice'), diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 3cd5b548..b26a4f35 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -193,7 +193,6 @@ class PlanetlabNode(LinuxNode): cls._register_attribute(min_cpu) cls._register_attribute(max_cpu) cls._register_attribute(timeframe) - def __init__(self, ec, guid): super(PlanetlabNode, self).__init__(ec, guid) diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index c4867b2d..991fa99b 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -165,8 +165,8 @@ class PlanetlabTap(LinuxApplication): self.provision() except: self.fail() - raise - + return + self.debug("----- READY ---- ") self.set_ready() @@ -180,7 +180,6 @@ class PlanetlabTap(LinuxApplication): msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self.fail() - raise RuntimeError, msg def stop(self): command = self.get('command') or '' @@ -206,8 +205,7 @@ class PlanetlabTap(LinuxApplication): if out.strip().find(self.get("deviceName")) == -1: # tap is not running is not running (socket not found) - self._finish_time = tnow() - self._state = ResourceState.FINISHED + self.finish() self._last_state_check = tnow() @@ -243,7 +241,6 @@ class PlanetlabTap(LinuxApplication): else: msg = "Couldn't retrieve if_name" self.error(msg, out, err) - self.fail() raise RuntimeError, msg return if_name diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index 3ca20cd5..6868c4aa 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -28,9 +28,6 @@ import os N_PROCS = None -#THREADCACHE = [] -#THREADCACHEPID = None - class WorkerThread(threading.Thread): class QUIT: pass @@ -100,9 +97,8 @@ class WorkerThread(threading.Thread): class ParallelMap(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): global N_PROCS - #global THREADCACHE - #global THREADCACHEPID - + + # Compute maximum number of threads allowed by the system if maxthreads is None: if N_PROCS is None: try: @@ -126,25 +122,18 @@ class ParallelMap(object): self.rvqueue = Queue.Queue() else: self.rvqueue = None - - # Check threadcache - #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - # del THREADCACHE[:] - # THREADCACHEPID = os.getpid() self.workers = [] + + # initialize workers for x in xrange(maxthreads): t = None - #if THREADCACHE: - # try: - # t = THREADCACHE.pop() - # except: - # pass if t is None: t = WorkerThread() t.setDaemon(True) else: t.waitdone() + t.attach(self.queue, self.rvqueue, self.delayed_exceptions) self.workers.append(t) @@ -152,13 +141,6 @@ class ParallelMap(object): self.destroy() def destroy(self): - # Check threadcache - #global THREADCACHE - #global THREADCACHEPID - #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - # del THREADCACHE[:] - # THREADCACHEPID = os.getpid() - for worker in self.workers: worker.waitdone() for worker in self.workers: @@ -168,9 +150,6 @@ class ParallelMap(object): for worker in self.workers: worker.quit() - # TO FIX: - # THREADCACHE.extend(self.workers) - del self.workers[:] def put(self, callable, *args, **kwargs): @@ -219,32 +198,6 @@ class ParallelMap(object): except Queue.Empty: raise StopIteration - -class ParallelFilter(ParallelMap): - class _FILTERED: - pass - - def __filter(self, x): - if self.filter_condition(x): - return x - else: - return self._FILTERED - - def __init__(self, filter_condition, maxthreads = None, maxqueue = None): - super(ParallelFilter, self).__init__(maxthreads, maxqueue, True) - self.filter_condition = filter_condition - - def put(self, what): - super(ParallelFilter, self).put(self.__filter, what) - - def put_nowait(self, what): - super(ParallelFilter, self).put_nowait(self.__filter, what) - - def __iter__(self): - for rv in super(ParallelFilter, self).__iter__(): - if rv is not self._FILTERED: - yield rv - class ParallelRun(ParallelMap): def __run(self, x): fn, args, kwargs = x @@ -260,27 +213,3 @@ class ParallelRun(ParallelMap): super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs)) -def pmap(mapping, iterable, maxthreads = None, maxqueue = None): - mapper = ParallelMap( - maxthreads = maxthreads, - maxqueue = maxqueue, - results = True) - mapper.start() - for elem in iterable: - mapper.put(elem) - rv = list(mapper) - mapper.join() - return rv - -def pfilter(condition, iterable, maxthreads = None, maxqueue = None): - filtrer = ParallelFilter( - condition, - maxthreads = maxthreads, - maxqueue = maxqueue) - filtrer.start() - for elem in iterable: - filtrer.put(elem) - rv = list(filtrer) - filtrer.join() - return rv - -- 2.43.0