From 99d8b2a4431d8fafd0385e189375106d46f1abd9 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 24 Oct 2013 15:08:59 +0200 Subject: [PATCH] Modified FailureManager to abort only when critical resources fail --- src/nepi/execution/ec.py | 112 ++++++------- src/nepi/execution/resource.py | 141 ++++++++++++---- src/nepi/resources/all/collector.py | 65 ++++---- src/nepi/resources/linux/application.py | 65 ++++---- .../resources/linux/ccn/ccnapplication.py | 23 ++- src/nepi/resources/linux/ccn/ccncontent.py | 32 ++-- src/nepi/resources/linux/ccn/ccnd.py | 62 ++++---- src/nepi/resources/linux/ccn/ccnping.py | 5 +- src/nepi/resources/linux/ccn/ccnpingserver.py | 5 +- src/nepi/resources/linux/ccn/ccnr.py | 26 ++- src/nepi/resources/linux/ccn/fibentry.py | 41 +++-- src/nepi/resources/linux/channel.py | 6 +- src/nepi/resources/linux/interface.py | 28 ++-- src/nepi/resources/linux/mtr.py | 3 +- src/nepi/resources/linux/node.py | 57 ++++--- src/nepi/resources/linux/nping.py | 3 +- src/nepi/resources/linux/ping.py | 3 +- src/nepi/resources/linux/tcpdump.py | 3 +- src/nepi/resources/linux/traceroute.py | 3 +- src/nepi/resources/linux/udptest.py | 7 +- src/nepi/resources/linux/udptunnel.py | 19 ++- src/nepi/resources/omf/application.py | 31 ++-- src/nepi/resources/omf/channel.py | 83 ++++------ src/nepi/resources/omf/interface.py | 55 ++----- src/nepi/resources/omf/node.py | 60 ++----- src/nepi/resources/omf/omf_resource.py | 6 +- src/nepi/resources/planetlab/node.py | 4 +- .../resources/planetlab/openvswitch/ovs.py | 90 +++++------ .../planetlab/openvswitch/ovsport.py | 87 ++++------ .../resources/planetlab/openvswitch/tunnel.py | 89 ++++------- src/nepi/resources/planetlab/tap.py | 32 ++-- src/nepi/util/parallel.py | 150 ++++++------------ test/execution/ec.py | 16 +- test/execution/resource.py | 73 +++++++-- test/resources/planetlab/ovs.py | 6 +- test/util/parallel.py | 90 +++++++++++ 36 files changed, 827 insertions(+), 754 deletions(-) create mode 100644 test/util/parallel.py diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 7277ae62..c5978dea 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -35,52 +35,48 @@ import random import sys import time import threading - -class FailurePolicy(object): - """ Defines how to respond to experiment failures - """ - IGNORE_RM_FAILURE = 1 - ABORT_ON_RM_FAILURE = 2 +import weakref class FailureLevel(object): - """ Describe the system failure state + """ Describes the system failure state """ OK = 1 RM_FAILURE = 2 - TASK_FAILURE = 3 - EC_FAILURE = 4 + EC_FAILURE = 3 class FailureManager(object): """ The FailureManager is responsible for handling errors, and deciding whether an experiment should be aborted """ - def __init__(self, failure_policy = None): + def __init__(self, ec): + self._ec = weakref.ref(ec) self._failure_level = FailureLevel.OK - self._failure_policy = failure_policy or \ - FailurePolicy.ABORT_ON_RM_FAILURE @property - def abort(self): - if self._failure_level == FailureLevel.EC_FAILURE: - return True - - if self._failure_level in [FailureLevel.TASK_FAILURE, - FailureLevel.RM_FAILURE] and \ - self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE: - return True - - return False + def ec(self): + """ Returns the Experiment Controller """ + return self._ec() - def set_rm_failure(self): - self._failure_level = FailureLevel.RM_FAILURE + @property + def abort(self): + if self._failure_level == FailureLevel.OK: + for guid in self.ec.resources: + state = self.ec.state(guid) + critical = self.ec.get(guid, "critical") + + if state == ResourceState.FAILED and critical: + self._failure_level = FailureLevel.RM_FAILURE + self.ec.logger.debug("RM critical failure occurred on guid %d." \ + " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) + break - def set_task_failure(self): - self._failure_level = FailureLevel.TASK_FAILURE + return self._failure_level != FailureLevel.OK def set_ec_failure(self): self._failure_level = FailureLevel.EC_FAILURE + class ECState(object): """ State of the Experiment Controller @@ -175,7 +171,9 @@ class ExperimentController(object): # Resource managers self._resources = dict() - # Scheduler + # Scheduler. It a queue that holds tasks scheduled for + # execution, and yields the next task to be executed + # ordered by execution and arrival time self._scheduler = HeapScheduler() # Tasks @@ -186,22 +184,27 @@ class ExperimentController(object): # generator of globally unique id for groups self._group_id_generator = guid.GuidGenerator() - - # Event processing thread - self._cond = threading.Condition() - self._thread = threading.Thread(target = self._process) - self._thread.setDaemon(True) - self._thread.start() # Flag to stop processing thread self._stop = False # Entity in charge of managing system failures - self._fm = FailureManager() + self._fm = FailureManager(self) # EC state self._state = ECState.RUNNING + # The runner is a pool of threads used to parallelize + # execution of tasks + nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) + self._runner = ParallelRun(maxthreads = nthreads) + + # Event processing thread + self._cond = threading.Condition() + self._thread = threading.Thread(target = self._process) + self._thread.setDaemon(True) + self._thread.start() + @property def logger(self): """ Return the logger of the Experiment Controller @@ -234,9 +237,6 @@ class ExperimentController(object): def abort(self): return self._fm.abort - def set_rm_failure(self): - self._fm.set_rm_failure() - def wait_finished(self, guids): """ Blocking method that wait until all RMs in the 'guid' list reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or @@ -316,17 +316,19 @@ class ExperimentController(object): # If a guid reached one of the target states, remove it from list guid = guids[0] rstate = self.state(guid) + + hrrstate = ResourceState2str.get(rstate) + hrstate = ResourceState2str.get(state) if rstate >= state: guids.remove(guid) + self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % ( + guid, hrrstate, hrstate)) else: # Debug... - hrrstate = ResourceState2str.get(rstate) - hrstate = ResourceState2str.get(state) self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( guid, hrrstate, hrstate)) - - time.sleep(0.5) + time.sleep(0.5) def get_task(self, tid): """ Get a specific task @@ -694,8 +696,10 @@ class ExperimentController(object): guids = self.resources # Remove all pending tasks from the scheduler queue - for tis in self._scheduler.pending: - self._scheduler.remove(tis) + for tid in list(self._scheduler.pending): + self._scheduler.remove(tid) + + self._runner.empty() for guid in guids: rm = self.get_resource(guid) @@ -708,6 +712,10 @@ class ExperimentController(object): Releases all the resources and stops task processing thread """ + # If there was a major failure we can't exit gracefully + if self._state == ECState.FAILED: + raise RuntimeError("EC failure. Can not exit gracefully") + self.release() # Mark the EC state as TERMINATED @@ -788,10 +796,8 @@ class ExperimentController(object): that might have been raised by the workers. """ - nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) - runner = ParallelRun(maxthreads = nthreads) - runner.start() + self._runner.start() while not self._stop: try: @@ -822,7 +828,7 @@ class ExperimentController(object): if task: # Process tasks in parallel - runner.put(self._execute, task) + self._runner.put(self._execute, task) except: import traceback err = traceback.format_exc() @@ -830,13 +836,14 @@ class ExperimentController(object): # Set the EC to FAILED state self._state = ECState.FAILED - - # Set the FailureManager failure level + + # Set the FailureManager failure level to EC failure self._fm.set_ec_failure() self.logger.debug("Exiting the task processing loop ... ") - runner.sync() - runner.destroy() + + self._runner.sync() + self._runner.destroy() def _execute(self, task): """ Executes a single task. @@ -864,9 +871,6 @@ class ExperimentController(object): self.logger.error("Error occurred while executing task: %s" % err) - # Set the FailureManager failure level - self._fm.set_task_failure() - def _notify(self): """ Awakes the processing thread in case it is blocked waiting for a new task to be scheduled. diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 18edeb07..b4994e02 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -19,6 +19,7 @@ from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat from nepi.util.logger import Logger +from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import TraceAttr import copy @@ -80,6 +81,20 @@ def clsinit_copy(cls): cls._clsinit_copy() return cls +def failtrap(func): + def wrapped(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except: + import traceback + err = traceback.format_exc() + self.error(err) + self.debug("SETTING guid %d to state FAILED" % self.guid) + self.fail() + raise + + return wrapped + # Decorator to invoke class initialization method @clsinit class ResourceManager(Logger): @@ -138,8 +153,14 @@ class ResourceManager(Logger): resource attributes """ - pass + critical = Attribute("critical", "Defines whether the resource is critical. " + " A failure on a critical resource will interrupt the experiment. ", + type = Types.Bool, + default = True, + flags = Flags.ExecReadOnly) + cls._register_attribute(critical) + @classmethod def _register_traces(cls): """ Resource subclasses will invoke this method to register @@ -309,7 +330,7 @@ class ResourceManager(Logger): @property def state(self): - """ Get the state of the current RM """ + """ Get the current state of the RM """ return self._state def log_message(self, msg): @@ -344,27 +365,42 @@ class ResourceManager(Logger): def discover(self): """ Performs resource discovery. - This method is resposible for selecting an individual resource + This method is responsible for selecting an individual resource matching user requirements. This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. + """ self.set_discovered() def provision(self): """ Performs resource provisioning. - This method is resposible for provisioning one resource. + This method is responsible for provisioning one resource. After this method has been successfully invoked, the resource - should be acccesible/controllable by the RM. + should be accessible/controllable by the RM. This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. + """ self.set_provisioned() def start(self): - """ Starts the resource. + """ Starts the RM. There is no generic start behavior for all resources. This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. + """ if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) @@ -373,10 +409,15 @@ class ResourceManager(Logger): self.set_started() def stop(self): - """ Stops the resource. + """ Interrupts the RM, stopping any tasks the RM was performing. There is no generic stop behavior for all resources. This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. + """ if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) @@ -385,7 +426,15 @@ class ResourceManager(Logger): self.set_stopped() def deploy(self): - """ Execute all steps required for the RM to reach the state READY + """ Execute all steps required for the RM to reach the state READY. + + This method is responsible for deploying the resource (and invoking the + discover and provision methods). + This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. """ if self.state > ResourceState.READY: @@ -396,14 +445,41 @@ class ResourceManager(Logger): self.set_ready() def release(self): + """ Perform actions to free resources used by the RM. + + This method is responsible for releasing resources that were + used during the experiment by the RM. + This method should be redefined when necessary in child classes. + + If overridden in child classes, this method should never + raise an error and it must ensure the RM is set to state RELEASED. + + """ self.set_released() def finish(self): + """ Sets the RM to state FINISHED. + + The FINISHED state is different from STOPPED in that it should not be + directly invoked by the user. + STOPPED indicates that the user interrupted the RM, FINISHED means + that the RM concluded normally the actions it was supposed to perform. + This method should be redefined when necessary in child classes. + + If overridden in child classes, make sure to use the failtrap + decorator to ensure the RM state will be set to FAILED in the event + of an exception. + + """ + self.set_finished() def fail(self): + """ Sets the RM to state FAILED. + + """ + self.set_failed() - self.ec.set_rm_failure() def set(self, name, value): """ Set the value of the attribute @@ -641,9 +717,11 @@ class ResourceManager(Logger): reschedule = False delay = reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to start are met + if self.ec.abort: + return - # only can start when RM is either STOPPED or READY + # Can only start when RM is either STOPPED or READY if self.state not in [ResourceState.STOPPED, ResourceState.READY]: reschedule = True self.debug("---- RESCHEDULING START ---- state %s " % self.state ) @@ -680,11 +758,14 @@ class ResourceManager(Logger): reschedule = False delay = reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to stop are met + if self.ec.abort: + return # only can stop when RM is STARTED if self.state != ResourceState.STARTED: reschedule = True + self.debug("---- RESCHEDULING STOP ---- state %s " % self.state ) else: self.debug(" ---- STOP CONDITIONS ---- %s" % self.conditions.get(ResourceAction.STOP)) @@ -710,7 +791,9 @@ class ResourceManager(Logger): reschedule = False delay = reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to deploy are met + if self.ec.abort: + return # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, @@ -769,43 +852,43 @@ class ResourceManager(Logger): def set_started(self): """ Mark ResourceManager as STARTED """ - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_state(ResourceState.STARTED, "_start_time") def set_stopped(self): """ Mark ResourceManager as STOPPED """ - self._stop_time = tnow() - self._state = ResourceState.STOPPED + self.set_state(ResourceState.STOPPED, "_stop_time") def set_ready(self): """ Mark ResourceManager as READY """ - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_state(ResourceState.READY, "_ready_time") def set_released(self): """ Mark ResourceManager as REALEASED """ - self._release_time = tnow() - self._state = ResourceState.RELEASED + self.set_state(ResourceState.RELEASED, "_release_time") def set_finished(self): """ Mark ResourceManager as FINISHED """ - self._finish_time = tnow() - self._state = ResourceState.FINISHED + self.set_state(ResourceState.FINISHED, "_finish_time") def set_failed(self): """ Mark ResourceManager as FAILED """ - self._failed_time = tnow() - self._state = ResourceState.FAILED + self.set_state(ResourceState.FAILED, "_failed_time") def set_discovered(self): """ Mark ResourceManager as DISCOVERED """ - self._discover_time = tnow() - self._state = ResourceState.DISCOVERED + self.set_state(ResourceState.DISCOVERED, "_discover_time") def set_provisioned(self): """ Mark ResourceManager as PROVISIONED """ - self._provision_time = tnow() - self._state = ResourceState.PROVISIONED + self.set_state(ResourceState.PROVISIONED, "_provision_time") + + def set_state(self, state, state_time_attr): + # Ensure that RM state will not change after released + if self._state == ResourceState.RELEASED: + return + + setattr(self, state_time_attr, tnow()) + self._state = state class ResourceFactory(object): _resource_types = dict() diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index 812a9392..864750e6 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -19,14 +19,14 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - ResourceAction +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, ResourceAction, failtrap from nepi.util.sshfuncs import ProcStatus import os import tempfile -@clsinit +@clsinit_copy class Collector(ResourceManager): """ The collector is reponsible of collecting traces of a same type associated to RMs into a local directory. @@ -69,7 +69,8 @@ class Collector(ResourceManager): @property def store_path(self): return self._store_path - + + @failtrap def provision(self): trace_name = self.get("traceName") if not trace_name: @@ -97,38 +98,40 @@ class Collector(ResourceManager): super(Collector, self).provision() + @failtrap def deploy(self): - try: - self.discover() - self.provision() - except: - self.fail() - raise + self.discover() + self.provision() super(Collector, self).deploy() def release(self): - trace_name = self.get("traceName") - rename = self.get("rename") or trace_name - - msg = "Collecting '%s' traces to local directory %s" % ( - trace_name, self.store_path) - self.info(msg) - - rms = self.get_connected() - for rm in rms: - result = self.ec.trace(rm.guid, trace_name) - fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, - rename)) - try: - f = open(fpath, "w") - f.write(result) - f.close() - except: - msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, - rm.guid, fpath) - self.error(msg) - continue + try: + trace_name = self.get("traceName") + rename = self.get("rename") or trace_name + + msg = "Collecting '%s' traces to local directory %s" % ( + trace_name, self.store_path) + self.info(msg) + + rms = self.get_connected() + for rm in rms: + result = self.ec.trace(rm.guid, trace_name) + fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, + rename)) + try: + f = open(fpath, "w") + f.write(result) + f.close() + except: + msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, + rm.guid, fpath) + self.error(msg) + continue + except: + import traceback + err = traceback.format_exc() + self.error(err) super(Collector, self).release() diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 763106ce..1848e9a1 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -19,8 +19,8 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -31,7 +31,7 @@ import subprocess # TODO: Resolve wildcards in commands!! # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! -@clsinit +@clsinit_copy class LinuxApplication(ResourceManager): """ .. class:: Class Args : @@ -85,7 +85,6 @@ class LinuxApplication(ResourceManager): _help = "Runs an application on a Linux host with a BASH command " _backend_type = "linux" - @classmethod def _register_attributes(cls): command = Attribute("command", "Command to execute at application start. " @@ -270,7 +269,8 @@ class LinuxApplication(ResourceManager): out = int(out.strip()) return out - + + @failtrap def provision(self): # create run dir for application self.node.mkdir(self.run_home) @@ -302,7 +302,8 @@ class LinuxApplication(ResourceManager): # each step we check that the EC is still for step in steps: if self.ec.abort: - raise RuntimeError, "EC finished" + self.debug("Interrupting provisioning. EC says 'ABORT") + return ret = step() if ret: @@ -470,6 +471,7 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command return self.replace_paths(install) + @failtrap def deploy(self): # Wait until node is associated and deployed node = self.node @@ -477,17 +479,14 @@ class LinuxApplication(ResourceManager): self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) self.ec.schedule(reschedule_delay, self.deploy) else: - try: - command = self.get("command") or "" - self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self.fail() - return + command = self.get("command") or "" + self.info("Deploying command '%s' " % command) + self.discover() + self.provision() super(LinuxApplication, self).deploy() - + + @failtrap def start(self): command = self.get("command") @@ -498,15 +497,10 @@ class LinuxApplication(ResourceManager): # installation), then the application is directly marked as FINISHED self.set_finished() else: - - try: - if self.in_foreground: - self._run_in_foreground() - else: - self._run_in_background() - except: - self.fail() - return + if self.in_foreground: + self._run_in_foreground() + else: + self._run_in_background() super(LinuxApplication, self).start() @@ -514,6 +508,7 @@ class LinuxApplication(ResourceManager): command = self.get("command") sudo = self.get("sudo") or False x11 = self.get("forwardX11") + env = self.get("env") # For a command being executed in foreground, if there is stdin, # it is expected to be text string not a file or pipe @@ -526,7 +521,7 @@ class LinuxApplication(ResourceManager): # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the # thread to block until the execution finishes. - (out, err), self._proc = self.execute_command(self, command, + (out, err), self._proc = self.execute_command(command, env = env, sudo = sudo, stdin = stdin, @@ -582,7 +577,8 @@ class LinuxApplication(ResourceManager): msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg - + + @failtrap def stop(self): """ Stops application execution """ @@ -609,22 +605,25 @@ class LinuxApplication(ResourceManager): if proc.poll() or err: msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) - self.fail() - return super(LinuxApplication, self).stop() def release(self): self.info("Releasing resource") - tear_down = self.get("tearDown") - if tear_down: - self.node.execute(tear_down) + try: + tear_down = self.get("tearDown") + if tear_down: + self.node.execute(tear_down) - self.stop() + self.stop() + except: + import traceback + err = traceback.format_exc() + self.error(err) super(LinuxApplication, self).release() - + @property def state(self): """ Returns the state of the application diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py index 46a3cc49..c13c0920 100644 --- a/src/nepi/resources/linux/ccn/ccnapplication.py +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay + reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow, tdiffsec @@ -44,25 +44,22 @@ class LinuxCCNApplication(LinuxApplication): if self.ccnd: return self.ccnd.node return None + @failtrap def deploy(self): if not self.ccnd or self.ccnd.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) self.ec.schedule(reschedule_delay, self.deploy) else: - try: - command = self.get("command") or "" + command = self.get("command") or "" - self.info("Deploying command '%s' " % command) - - if not self.get("env"): - self.set("env", self._environment) + self.info("Deploying command '%s' " % command) + + if not self.get("env"): + self.set("env", self._environment) + + self.discover() + self.provision() - self.discover() - self.provision() - except: - self.fail() - return - self.debug("----- READY ---- ") self.set_ready() diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index 1fa93ccf..bb2ae460 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay + ResourceAction, reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR from nepi.util.timefuncs import tnow @@ -72,6 +72,7 @@ class LinuxCCNContent(LinuxApplication): if self.ccnr: return self.ccnr.node return None + @failtrap def deploy(self): if not self.ccnr or self.ccnr.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) @@ -79,26 +80,22 @@ class LinuxCCNContent(LinuxApplication): # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: - try: - if not self.get("command"): - self.set("command", self._start_command) + if not self.get("command"): + self.set("command", self._start_command) - if not self.get("env"): - self.set("env", self._environment) + if not self.get("env"): + self.set("env", self._environment) - # set content to stdin, so the content will be - # uploaded during provision - self.set("stdin", self.get("content")) + # set content to stdin, so the content will be + # uploaded during provision + self.set("stdin", self.get("content")) - command = self.get("command") + command = self.get("command") - self.info("Deploying command '%s' " % command) + self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() self.debug("----- READY ---- ") self.set_ready() @@ -125,6 +122,7 @@ class LinuxCCNContent(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -134,7 +132,7 @@ class LinuxCCNContent(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - sef.fail() + raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 71eb12ac..0962936e 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType from nepi.util.timefuncs import tnow, tdiffsec @@ -136,6 +136,7 @@ class LinuxCCND(LinuxApplication): def path(self): return "PATH=$PATH:${BIN}/%s/" % self.version + @failtrap def deploy(self): if not self.node or self.node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) @@ -143,42 +144,38 @@ class LinuxCCND(LinuxApplication): # ccnd needs to wait until node is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: - try: - if not self.get("command"): - self.set("command", self._start_command) - - if not self.get("depends"): - self.set("depends", self._dependencies) + if not self.get("command"): + self.set("command", self._start_command) + + if not self.get("depends"): + self.set("depends", self._dependencies) - if not self.get("sources"): - self.set("sources", self._sources) + if not self.get("sources"): + self.set("sources", self._sources) - sources = self.get("sources") - source = sources.split(" ")[0] - basename = os.path.basename(source) - self._version = ( basename.strip().replace(".tar.gz", "") - .replace(".tar","") - .replace(".gz","") - .replace(".zip","") ) + sources = self.get("sources") + source = sources.split(" ")[0] + basename = os.path.basename(source) + self._version = ( basename.strip().replace(".tar.gz", "") + .replace(".tar","") + .replace(".gz","") + .replace(".zip","") ) - if not self.get("build"): - self.set("build", self._build) + if not self.get("build"): + self.set("build", self._build) - if not self.get("install"): - self.set("install", self._install) + if not self.get("install"): + self.set("install", self._install) - if not self.get("env"): - self.set("env", self._environment) + if not self.get("env"): + self.set("env", self._environment) - command = self.get("command") + command = self.get("command") - self.info("Deploying command '%s' " % command) + self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() self.debug("----- READY ---- ") self.set_ready() @@ -202,6 +199,7 @@ class LinuxCCND(LinuxApplication): env = env, raise_on_error = True) + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -211,8 +209,9 @@ class LinuxCCND(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg + @failtrap def stop(self): command = self.get('command') or '' @@ -245,7 +244,7 @@ class LinuxCCND(LinuxApplication): state_check_delay = 0.5 if self._state == ResourceState.STARTED and \ tdiffsec(tnow(), self._last_state_check) > state_check_delay: - (out, err), proc = self._ccndstatus + (out, err), proc = self._ccndstatus() retcode = proc.poll() @@ -262,7 +261,6 @@ class LinuxCCND(LinuxApplication): return self._state - @property def _ccndstatus(self): env = self.get('env') or "" environ = self.node.format_environment(env, inline = True) diff --git a/src/nepi/resources/linux/ccn/ccnping.py b/src/nepi/resources/linux/ccn/ccnping.py index 78215974..d8289504 100644 --- a/src/nepi/resources/linux/ccn/ccnping.py +++ b/src/nepi/resources/linux/ccn/ccnping.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer from nepi.util.timefuncs import tnow, tdiffsec @@ -65,6 +65,7 @@ class LinuxCCNPing(LinuxCCNPingServer): if ccnpingserver: return ccnpingserver[0] return None + @failtrap def start(self): if not self.ccnpingserver or \ self.ccnpingserver.state < ResourceState.STARTED: diff --git a/src/nepi/resources/linux/ccn/ccnpingserver.py b/src/nepi/resources/linux/ccn/ccnpingserver.py index b566c78e..6e87b27b 100644 --- a/src/nepi/resources/linux/ccn/ccnpingserver.py +++ b/src/nepi/resources/linux/ccn/ccnpingserver.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.util.timefuncs import tnow, tdiffsec @@ -54,6 +54,7 @@ class LinuxCCNPingServer(LinuxCCNApplication): super(LinuxCCNPingServer, self).__init__(ec, guid) self._home = "ccnping-serv-%s" % self.guid + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 46c3f3b6..53199976 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay + ResourceAction, reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow @@ -200,6 +200,7 @@ class LinuxCCNR(LinuxApplication): if self.ccnd: return self.ccnd.node return None + @failtrap def deploy(self): if not self.ccnd or self.ccnd.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state ) @@ -207,22 +208,18 @@ class LinuxCCNR(LinuxApplication): # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: - try: - if not self.get("command"): - self.set("command", self._start_command) + if not self.get("command"): + self.set("command", self._start_command) - if not self.get("env"): - self.set("env", self._environment) + if not self.get("env"): + self.set("env", self._environment) - command = self.get("command") + command = self.get("command") - self.info("Deploying command '%s' " % command) + self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() self.debug("----- READY ---- ") self.set_ready() @@ -255,6 +252,7 @@ class LinuxCCNR(LinuxApplication): env = env, raise_on_error = True) + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -264,7 +262,7 @@ class LinuxCCNR(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index 48c2d6de..490d8f67 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay + ResourceAction, reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow @@ -108,35 +108,32 @@ class LinuxFIBEntry(LinuxApplication): return self.ec.trace(self._traceroute, "stdout", attr, block, offset) return super(LinuxFIBEntry, self).trace(name, attr, block, offset) - + + @failtrap def deploy(self): # Wait until associated ccnd is provisioned if not self.ccnd or self.ccnd.state < ResourceState.READY: # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: - try: - if not self.get("ip"): - host = self.get("host") - ip = socket.gethostbyname(host) - self.set("ip", ip) + if not self.get("ip"): + host = self.get("host") + ip = socket.gethostbyname(host) + self.set("ip", ip) - if not self.get("command"): - self.set("command", self._start_command) + if not self.get("command"): + self.set("command", self._start_command) - if not self.get("env"): - self.set("env", self._environment) + if not self.get("env"): + self.set("env", self._environment) - command = self.get("command") + command = self.get("command") - self.info("Deploying command '%s' " % command) + self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - self.configure() - except: - self.fail() - return + self.discover() + self.provision() + self.configure() self.debug("----- READY ---- ") self.set_ready() @@ -194,6 +191,7 @@ class LinuxFIBEntry(LinuxApplication): # schedule mtr deploy self.ec.deploy(guids=[self._traceroute], group = self.deployment_group) + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -203,8 +201,9 @@ class LinuxFIBEntry(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg + @failtrap def stop(self): command = self.get('command') env = self.get('env') @@ -221,7 +220,7 @@ class LinuxFIBEntry(LinuxApplication): if err: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg @property def _start_command(self): diff --git a/src/nepi/resources/linux/channel.py b/src/nepi/resources/linux/channel.py index 2ed5c014..429051ea 100644 --- a/src/nepi/resources/linux/channel.py +++ b/src/nepi/resources/linux/channel.py @@ -18,9 +18,10 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState -@clsinit +@clsinit_copy class LinuxChannel(ResourceManager): _rtype = "LinuxChannel" _help = "Represents a wireless channel on a network of Linux hosts" @@ -35,3 +36,4 @@ class LinuxChannel(ResourceManager): def valid_connection(self, guid): # TODO: Validate! return True + diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py index a170f412..9ccdc4f6 100644 --- a/src/nepi/resources/linux/interface.py +++ b/src/nepi/resources/linux/interface.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Types, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.node import LinuxNode from nepi.resources.linux.channel import LinuxChannel @@ -33,7 +33,7 @@ import time # TODO: UP, MTU attributes! -@clsinit +@clsinit_copy class LinuxInterface(ResourceManager): _rtype = "LinuxInterface" _help = "Controls network devices on Linux hosts through the ifconfig tool" @@ -102,6 +102,7 @@ class LinuxInterface(ResourceManager): if chan: return chan[0] return None + @failtrap def discover(self): devname = self.get("deviceName") ip4 = self.get("ip4") @@ -183,6 +184,7 @@ class LinuxInterface(ResourceManager): super(LinuxInterface, self).discover() + @failtrap def provision(self): devname = self.get("deviceName") ip4 = self.get("ip4") @@ -226,6 +228,7 @@ class LinuxInterface(ResourceManager): super(LinuxInterface, self).provision() + @failtrap def deploy(self): # Wait until node is provisioned node = self.node @@ -238,19 +241,20 @@ class LinuxInterface(ResourceManager): else: # Verify if the interface exists in node. If not, configue # if yes, load existing configuration - try: - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() super(LinuxInterface, self).deploy() def release(self): - tear_down = self.get("tearDown") - if tear_down: - self.execute(tear_down) + try: + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) + except: + import traceback + err = traceback.format_exc() + self.error(err) super(LinuxInterface, self).release() diff --git a/src/nepi/resources/linux/mtr.py b/src/nepi/resources/linux/mtr.py index 85a9f258..60857217 100644 --- a/src/nepi/resources/linux/mtr.py +++ b/src/nepi/resources/linux/mtr.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy +from nepi.execution.resource import clsinit_copy, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -83,6 +83,7 @@ class LinuxMtr(LinuxApplication): self._home = "mtr-%s" % self.guid self._sudo_kill = True + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 9cab4cae..953793ac 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -17,9 +17,9 @@ # # Author: Alina Quereilhac -from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -57,7 +57,7 @@ class OSType: UBUNTU = "ubuntu" DEBIAN = "debian" -@clsinit +@clsinit_copy class LinuxNode(ResourceManager): """ .. class:: Class Args : @@ -168,14 +168,20 @@ class LinuxNode(ResourceManager): clean_home = Attribute("cleanHome", "Remove all nepi files and directories " " from node home folder before starting experiment", + type = Types.Bool, + default = False, flags = Flags.ExecReadOnly) clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " " from a previous same experiment, before the new experiment starts", + type = Types.Bool, + default = False, flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", "Kill all running processes before starting experiment", + type = Types.Bool, + default = False, flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ @@ -309,7 +315,6 @@ class LinuxNode(ResourceManager): time.sleep(min(30.0, retrydelay)) retrydelay *= 1.5 - @property def use_deb(self): return self.os in [OSType.DEBIAN, OSType.UBUNTU] @@ -323,10 +328,10 @@ class LinuxNode(ResourceManager): def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] + @failtrap def provision(self): # check if host is alive if not self.is_alive(): - msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg @@ -353,14 +358,12 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).provision() + @failtrap def deploy(self): if self.state == ResourceState.NEW: - try: - self.discover() - self.provision() - except: - self.fail() - return + self.info("Deploying node") + self.discover() + self.provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment @@ -374,19 +377,24 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).deploy() def release(self): - # Node needs to wait until all associated RMs are released - # to be released - rms = self.get_connected() - for rm in rms: - if rm.state < ResourceState.STOPPED: - self.ec.schedule(reschedule_delay, self.release) - return - - tear_down = self.get("tearDown") - if tear_down: - self.execute(tear_down) + try: + rms = self.get_connected() + for rm in rms: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state < ResourceState.STOPPED: + self.ec.schedule(reschedule_delay, self.release) + return + + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) - self.clean_processes() + self.clean_processes() + except: + import traceback + err = traceback.format_exc() + self.error(err) super(LinuxNode, self).release() @@ -627,7 +635,6 @@ class LinuxNode(ResourceManager): return (out, err), proc - def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination diff --git a/src/nepi/resources/linux/nping.py b/src/nepi/resources/linux/nping.py index e0d64525..ec874bc2 100644 --- a/src/nepi/resources/linux/nping.py +++ b/src/nepi/resources/linux/nping.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy +from nepi.execution.resource import clsinit_copy, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -133,6 +133,7 @@ class LinuxNPing(LinuxApplication): self._home = "nping-%s" % self.guid self._sudo_kill = True + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/ping.py b/src/nepi/resources/linux/ping.py index 34476586..d085b112 100644 --- a/src/nepi/resources/linux/ping.py +++ b/src/nepi/resources/linux/ping.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy +from nepi.execution.resource import clsinit_copy, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -184,6 +184,7 @@ class LinuxPing(LinuxApplication): super(LinuxPing, self).__init__(ec, guid) self._home = "ping-%s" % self.guid + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/tcpdump.py b/src/nepi/resources/linux/tcpdump.py index 3fa11f2b..e9955f4e 100644 --- a/src/nepi/resources/linux/tcpdump.py +++ b/src/nepi/resources/linux/tcpdump.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy +from nepi.execution.resource import clsinit_copy, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -316,6 +316,7 @@ class LinuxTcpdump(LinuxApplication): self._home = "tcpdump-%s" % self.guid self._sudo_kill = True + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/traceroute.py b/src/nepi/resources/linux/traceroute.py index 2e03f628..99eea6b7 100644 --- a/src/nepi/resources/linux/traceroute.py +++ b/src/nepi/resources/linux/traceroute.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy +from nepi.execution.resource import clsinit_copy, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -64,6 +64,7 @@ class LinuxTraceroute(LinuxApplication): super(LinuxTraceroute, self).__init__(ec, guid) self._home = "traceroute-%s" % self.guid + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) diff --git a/src/nepi/resources/linux/udptest.py b/src/nepi/resources/linux/udptest.py index 1b7ac9ca..6ad0085b 100644 --- a/src/nepi/resources/linux/udptest.py +++ b/src/nepi/resources/linux/udptest.py @@ -19,8 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay -from nepi.execution.resource import clsinit_copy + reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -214,6 +213,7 @@ class LinuxUdpTest(LinuxApplication): super(LinuxUdpTest, self).__init__(ec, guid) self._home = "udptest-%s" % self.guid + @failtrap def deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -247,6 +247,7 @@ class LinuxUdpTest(LinuxApplication): # finished to continue ) self._run_in_background() + @failtrap def start(self): if self.get("s") == True: # Server is already running @@ -258,7 +259,7 @@ class LinuxUdpTest(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, err else: super(LinuxUdpTest, self).start() diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 4dae96c1..1efe2302 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay + reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -161,6 +161,7 @@ class UdpTunnel(LinuxApplication): port = self.wait_local_port(endpoint) return (port, pid, ppid) + @failtrap def provision(self): # create run dir for tunnel on each node self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) @@ -190,21 +191,19 @@ class UdpTunnel(LinuxApplication): self.set_provisioned() + @failtrap def deploy(self): if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): self.ec.schedule(reschedule_delay, self.deploy) else: - try: - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() self.debug("----- READY ---- ") self.set_ready() + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -214,8 +213,9 @@ class UdpTunnel(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg + @failtrap def stop(self): """ Stops application execution """ @@ -234,8 +234,7 @@ class UdpTunnel(LinuxApplication): # check if execution errors occurred msg = " Failed to STOP tunnel" self.error(msg, err1, err2) - self.fail() - return + raise RuntimeError, msg self.set_stopped() diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index f0fc5f10..54f88546 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.node import OMFNode @@ -147,6 +147,7 @@ class OMFApplication(OMFResource): return True + @failtrap def deploy(self): """ Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...) @@ -166,8 +167,7 @@ class OMFApplication(OMFResource): if not self._omf_api : msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - return + raise RuntimeError, msg if self.get('sources'): gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')] @@ -177,7 +177,7 @@ class OMFApplication(OMFResource): super(OMFApplication, self).deploy() - + @failtrap def start(self): """ Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application. @@ -187,8 +187,7 @@ class OMFApplication(OMFResource): if not (self.get('appid') and self.get('path')) : msg = "Application's information are not initialized" self.error(msg) - self.fail() - return + raise RuntimeError, msg if not self.get('args'): self.set('args', " ") @@ -207,11 +206,11 @@ class OMFApplication(OMFResource): except AttributeError: msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() raise super(OMFApplication, self).start() + @failtrap def stop(self): """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application. @@ -223,8 +222,7 @@ class OMFApplication(OMFResource): except AttributeError: msg = "Credentials were not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - return + raise super(OMFApplication, self).stop() self.set_finished() @@ -233,10 +231,15 @@ class OMFApplication(OMFResource): """ Clean the RM at the end of the experiment and release the API. """ - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + try: + if self._omf_api : + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + except: + import traceback + err = traceback.format_exc() + self.error(err) super(OMFApplication, self).release() diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index b5d96a37..0e995677 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource @@ -121,18 +121,7 @@ class OMFChannel(OMFResource): res.append(couple) return res - def discover(self): - """ Discover the availables channels - - """ - pass - - def provision(self): - """ Provision some availables channels - - """ - pass - + @failtrap def deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel. @@ -147,58 +136,44 @@ class OMFChannel(OMFResource): if not self._omf_api : msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - return + raise RuntimeError, msg if not self.get('channel'): msg = "Channel's value is not initialized" self.error(msg) - self.fail() - return + raise RuntimeError, msg + + self._nodes_guid = self._get_target(self._connections) - self._nodes_guid = self._get_target(self._connections) if self._nodes_guid == "reschedule" : self.ec.schedule("2s", self.deploy) - return False - - try: - for couple in self._nodes_guid: - #print "Couple node/alias : " + couple[0] + " , " + couple[1] - attrval = self.get('channel') - attrname = "net/%s/%s" % (couple[1], 'channel') - self._omf_api.configure(couple[0], attrname, attrval) - except AttributeError: - msg = "Credentials are not initialzed. XMPP Connections impossible" - self.error(msg) - self.fail() - return - - super(OMFChannel, self).deploy() - - def start(self): - """ Start the RM. It means nothing special for a channel for now - It becomes STARTED as soon as this method starts. - - """ - - super(OMFChannel, self).start() - - def stop(self): - """ Stop the RM. It means nothing special for a channel for now - It becomes STOPPED as soon as this method is called - - """ - super(OMFChannel, self).stop() - self.set_finished() + else: + try: + for couple in self._nodes_guid: + #print "Couple node/alias : " + couple[0] + " , " + couple[1] + attrval = self.get('channel') + attrname = "net/%s/%s" % (couple[1], 'channel') + self._omf_api.configure(couple[0], attrname, attrval) + except AttributeError: + msg = "Credentials are not initialzed. XMPP Connections impossible" + self.error(msg) + raise + + super(OMFChannel, self).deploy() def release(self): """ Clean the RM at the end of the experiment and release the API """ - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + try: + if self._omf_api : + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + except: + import traceback + err = traceback.format_exc() + self.error(err) super(OMFChannel, self).release() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index e61ad410..81e3b601 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -18,8 +18,8 @@ # Author: Alina Quereilhac # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.node import OMFNode @@ -120,7 +120,6 @@ class OMFWifiInterface(OMFResource): if rm_list: return rm_list[0] return None - def configure_iface(self): """ Configure the interface without the ip @@ -165,6 +164,7 @@ class OMFWifiInterface(OMFResource): return True + @failtrap def deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface. @@ -183,21 +183,18 @@ class OMFWifiInterface(OMFResource): if not self._omf_api : msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - return + raise RuntimeError, msg if not (self.get('mode') and self.get('type') and self.get('essid') \ and self.get('ip')): msg = "Interface's variable are not initialized" self.error(msg) - self.fail() - return False + raise RuntimeError, msg if not self.node.get('hostname') : msg = "The channel is connected with an undefined node" self.error(msg) - self.fail() - return False + raise RuntimeError, msg # Just for information self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \ @@ -205,43 +202,25 @@ class OMFWifiInterface(OMFResource): self.get('essid') + " : " + self.get('ip')) # Check if the node is already deployed - chk1 = True if self.state < ResourceState.PROVISIONED: - chk1 = self.configure_iface() - if chk1: - chk2 = self.configure_ip() + if self.configure_iface(): + self.configure_ip() - if not (chk1 and chk2) : - return False - super(OMFWifiInterface, self).deploy() - return True - - - def start(self): - """ Start the RM. It means nothing special for a channel for now - It becomes STARTED as soon as this method starts. - - """ - - super(OMFWifiInterface, self).start() - - def stop(self): - """ Stop the RM. It means nothing special for a channel for now - It becomes STOPPED as soon as this method is called - - """ - super(OMFWifiInterface, self).stop() - self.set_finished() def release(self): """ Clean the RM at the end of the experiment and release the API """ - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + try: + if self._omf_api : + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + except: + import traceback + err = traceback.format_exc() + self.error(err) super(OMFWifiInterface, self).release() diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index 99bedf67..4da85d39 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -19,15 +19,14 @@ # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.omf_api import OMFAPIFactory import time - @clsinit_copy class OMFNode(OMFResource): """ @@ -98,6 +97,7 @@ class OMFNode(OMFResource): return False + @failtrap def deploy(self): """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment. @@ -112,63 +112,37 @@ class OMFNode(OMFResource): if not self._omf_api : msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - return + raise RuntimeError, msg if not self.get('hostname') : msg = "Hostname's value is not initialized" self.error(msg) - self.fail() - return False + raise RuntimeError, msg try: self._omf_api.enroll_host(self.get('hostname')) except AttributeError: msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) - self.fail() - #raise AttributeError, msg + raise super(OMFNode, self).deploy() - def discover(self): - """ Discover the availables nodes - - """ - pass - - def provision(self): - """ Provision some availables nodes - - """ - pass - - def start(self): - """Start the RM. It means nothing special for an interface for now - It becomes STARTED as soon as this method starts. - - """ - - super(OMFNode, self).start() - - def stop(self): - """Stop the RM. It means nothing special for an interface for now - It becomes STOPPED as soon as this method stops - - """ - super(OMFNode, self).stop() - self.set_finished() - def release(self): """Clean the RM at the end of the experiment """ - if self._omf_api : - self._omf_api.release(self.get('hostname')) - - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + try: + if self._omf_api : + self._omf_api.release(self.get('hostname')) + + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + except: + import traceback + err = traceback.format_exc() + self.error(err) super(OMFNode, self).release() diff --git a/src/nepi/resources/omf/omf_resource.py b/src/nepi/resources/omf/omf_resource.py index ed2de655..632d201e 100644 --- a/src/nepi/resources/omf/omf_resource.py +++ b/src/nepi/resources/omf/omf_resource.py @@ -19,8 +19,8 @@ # Lucia Guevgeozian from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay class ResourceGateway: """ @@ -38,7 +38,7 @@ class ResourceGateway: "nicta" : "??.??.??", }) -@clsinit +@clsinit_copy class OMFResource(ResourceManager): """ Generic resource gathering XMPP credential information and common methods diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index b26a4f35..57196b35 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -19,8 +19,8 @@ # Lucia Guevgeozian from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay, failtrap from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.execfuncs import lexec diff --git a/src/nepi/resources/planetlab/openvswitch/ovs.py b/src/nepi/resources/planetlab/openvswitch/ovs.py index bffc61ef..51628fda 100644 --- a/src/nepi/resources/planetlab/openvswitch/ovs.py +++ b/src/nepi/resources/planetlab/openvswitch/ovs.py @@ -19,7 +19,8 @@ # Alexandros Kouvakas -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, failtrap from nepi.execution.attribute import Attribute, Flags from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.linux.application import LinuxApplication @@ -114,6 +115,7 @@ class OVSWitch(LinuxApplication): # TODO: Validate! return True + @failtrap def provision(self): # create home dir for ovs self.node.mkdir(self.ovs_home) @@ -136,13 +138,16 @@ class OVSWitch(LinuxApplication): stderr = "check_cmd_stderr") (out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode') + if out != "0\n": msg = "Command sliver-ovs does not exist on the VM" self.debug(msg) raise RuntimeError, msg + msg = "Command sliver-ovs exists" self.debug(msg) + @failtrap def deploy(self): """ Wait until node is associated and deployed """ @@ -152,19 +157,15 @@ class OVSWitch(LinuxApplication): self.ec.schedule(reschedule_delay, self.deploy) else: - try: - self.discover() - self.provision() - self.check_sliver_ovs() - self.servers_on() - self.create_bridge() - self.assign_contr() - self.ovs_status() - except: - self._state = ResourceState.FAILED - raise - - self._state = ResourceState.READY + self.discover() + self.provision() + self.check_sliver_ovs() + self.servers_on() + self.create_bridge() + self.assign_contr() + self.ovs_status() + + super(OVSWitch, self).deploy() def servers_on(self): """ Start the openvswitch servers and also checking @@ -201,9 +202,11 @@ class OVSWitch(LinuxApplication): # Check if the servers are running or not (out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode') + if out != "0\n": self.debug("Servers are not running") raise RuntimeError, msg + self.info("Servers started") def del_old_br(self): @@ -279,44 +282,37 @@ class OVSWitch(LinuxApplication): (out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout') self.info(out) - def start(self): - """ Start the RM. It means nothing special for - ovswitch for now. - """ - pass - - def stop(self): - """ Stop the RM.It means nothing - for ovswitch for now. - """ - pass - def release(self): """ Delete the bridge and close the servers """ # Node needs to wait until all associated RMs are released # to be released - from nepi.resources.planetlab.openvswitch.ovsport import OVSPort - rm = self.get_connected(OVSPort.rtype()) + try: + from nepi.resources.planetlab.openvswitch.ovsport import OVSPort + rm = self.get_connected(OVSPort.rtype()) - if rm[0].state < ResourceState.FINISHED: - self.ec.schedule(reschedule_delay, self.release) - return + if rm[0].state < ResourceState.FINISHED: + self.ec.schedule(reschedule_delay, self.release) + return + + msg = "Deleting the bridge %s" % self.get('bridge_name') + self.info(msg) + cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name') + (out, err), proc = self.node.run(cmd, self.ovs_checks, + sudo = True) + cmd = "sliver-ovs stop" + (out, err), proc = self.node.run(cmd, self.ovs_checks, + sudo = True) - msg = "Deleting the bridge %s" % self.get('bridge_name') - self.info(msg) - cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name') - (out, err), proc = self.node.run(cmd, self.ovs_checks, - sudo = True) - cmd = "sliver-ovs stop" - (out, err), proc = self.node.run(cmd, self.ovs_checks, - sudo = True) - - if proc.poll(): - self.fail() - self.error(msg, out, err) - raise RuntimeError, msg - - self._state = ResourceState.RELEASED - + if proc.poll(): + self.fail() + self.error(msg, out, err) + raise RuntimeError, msg + except: + import traceback + err = traceback.format_exc() + self.error(err) + + super(OVSWitch, self).release() + diff --git a/src/nepi/resources/planetlab/openvswitch/ovsport.py b/src/nepi/resources/planetlab/openvswitch/ovsport.py index d3e9d799..a7155fb0 100644 --- a/src/nepi/resources/planetlab/openvswitch/ovsport.py +++ b/src/nepi/resources/planetlab/openvswitch/ovsport.py @@ -19,7 +19,8 @@ # Alexandros Kouvakas from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, failtrap from nepi.resources.planetlab.openvswitch.ovs import OVSWitch from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.linux.application import LinuxApplication @@ -178,16 +179,7 @@ class OVSPort(LinuxApplication): command = self.replace_paths(command) return command - def provision(self): - """ Provision the ports.No meaning. - """ - pass - - def discover(self): - """ Discover the ports.No meaning - """ - pass - + @failtrap def deploy(self): """ Wait until ovswitch is started """ @@ -197,50 +189,39 @@ class OVSPort(LinuxApplication): self.ec.schedule(reschedule_delay, self.deploy) else: - try: - self.discover() - self.provision() - self.get_host_ip() - self.create_port() - self.get_local_end() - self.ovswitch.ovs_status() - self._state = ResourceState.READY - except: - self._state = ResourceState.FAILED - raise - - def start(self): - """ Start the RM. It means nothing special for - ovsport for now. - """ - pass - - def stop(self): - """ Stop the RM. It means nothing special for - ovsport for now. - """ - pass - + self.discover() + self.provision() + self.get_host_ip() + self.create_port() + self.get_local_end() + self.ovswitch.ovs_status() + super(OVSPort, self).deploy() + def release(self): """ Release the port RM means delete the ports """ # OVS needs to wait until all associated RMs are released # to be released - from nepi.resources.planetlab.openvswitch.tunnel import Tunnel - rm = self.get_connected(Tunnel.rtype()) - if rm and rm[0].state < ResourceState.FINISHED: - self.ec.schedule(reschedule_delay, self.release) - return - - msg = "Deleting the port %s" % self.get('port_name') - self.info(msg) - cmd = "sliver-ovs del_port %s" % self.get('port_name') - (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks, - sudo = True) - - if proc.poll(): - self.fail() - self.error(msg, out, err) - raise RuntimeError, msg - - self._state = ResourceState.RELEASED + try: + from nepi.resources.planetlab.openvswitch.tunnel import Tunnel + rm = self.get_connected(Tunnel.rtype()) + if rm and rm[0].state < ResourceState.FINISHED: + self.ec.schedule(reschedule_delay, self.release) + return + + msg = "Deleting the port %s" % self.get('port_name') + self.info(msg) + cmd = "sliver-ovs del_port %s" % self.get('port_name') + (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks, + sudo = True) + + if proc.poll(): + self.fail() + self.error(msg, out, err) + raise RuntimeError, msg + except: + import traceback + err = traceback.format_exc() + self.error(err) + + super(OVSPort, self).release() diff --git a/src/nepi/resources/planetlab/openvswitch/tunnel.py b/src/nepi/resources/planetlab/openvswitch/tunnel.py index 72c6728b..c1f81fe9 100644 --- a/src/nepi/resources/planetlab/openvswitch/tunnel.py +++ b/src/nepi/resources/planetlab/openvswitch/tunnel.py @@ -19,7 +19,8 @@ # Alexandros Kouvakas from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.planetlab.openvswitch.ovs import OVSWitch @@ -29,11 +30,10 @@ import os import time import socket - reschedule_delay = "0.5s" @clsinit_copy -class Tunnel(LinuxApplication): +class OVSTunnel(LinuxApplication): """ .. class:: Class Args : @@ -46,7 +46,7 @@ class Tunnel(LinuxApplication): """ - _rtype = "Tunnel" + _rtype = "OVSTunnel" _authorized_connections = ["OVSPort", "PlanetlabTap"] @classmethod @@ -93,7 +93,7 @@ class Tunnel(LinuxApplication): :type guid: int """ - super(Tunnel, self).__init__(ec, guid) + super(OVSTunnel, self).__init__(ec, guid) self._home = "tunnel-%s" % self.guid self.port_info_tunl = [] self._nodes = [] @@ -246,6 +246,7 @@ class Tunnel(LinuxApplication): self.fail() self.error(msg, out, err) raise RuntimeError, msg + msg = "Connection on host %s configured" \ % self.node.get("hostname") self.info(msg) @@ -344,6 +345,7 @@ class Tunnel(LinuxApplication): self.info(msg) return + @failtrap def provision(self): """ Provision the tunnel """ @@ -363,68 +365,41 @@ class Tunnel(LinuxApplication): (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1) switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2) - self.debug("------- READY -------") - self._provision_time = tnow() - self._state = ResourceState.PROVISIONED - - def discover(self): - """ Discover the tunnel - - """ - pass + super(OVSTunnel, self).provision() + @failtrap def deploy(self): if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): self.ec.schedule(reschedule_delay, self.deploy) else: - try: - self.discover() - self.provision() - except: - self.fail() - raise - - self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY - - def start(self): - """ Start the RM. It means nothing special for - ovsport for now. - """ - pass - - - def stop(self): - """ Stop the RM. It means nothing special for - ovsport for now. - """ - pass + self.discover() + self.provision() + super(OVSTunnel, self).deploy() + def release(self): """ Release the udp_tunnel on endpoint2. On endpoint1 means nothing special. """ - if not self.check_endpoints(): - # Kill the TAP devices - # TODO: Make more generic Release method of PLTAP - if self._pid and self._ppid: - self._nodes = self.get_node(self.endpoint2) - (out, err), proc = self.node.kill(self._pid, - self._ppid, sudo = True) - if err or proc.poll(): - # check if execution errors occurred - msg = " Failed to delete TAP device" - self.error(msg, err, err) - self.fail() - - self._state = ResourceState.RELEASED - - - - - - + try: + if not self.check_endpoints(): + # Kill the TAP devices + # TODO: Make more generic Release method of PLTAP + if self._pid and self._ppid: + self._nodes = self.get_node(self.endpoint2) + (out, err), proc = self.node.kill(self._pid, + self._ppid, sudo = True) + if err or proc.poll(): + # check if execution errors occurred + msg = " Failed to delete TAP device" + self.error(msg, err, err) + self.fail() + except: + import traceback + err = traceback.format_exc() + self.error(err) + + super(OVSTunnel, self).release() diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index 991fa99b..d9cf17c3 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay + reschedule_delay, failtrap from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode from nepi.util.timefuncs import tnow, tdiffsec @@ -147,6 +147,7 @@ class PlanetlabTap(LinuxApplication): if_name = self.wait_if_name() self.set("deviceName", if_name) + @failtrap def deploy(self): if not self.node or self.node.state < ResourceState.PROVISIONED: self.ec.schedule(reschedule_delay, self.deploy) @@ -160,16 +161,13 @@ class PlanetlabTap(LinuxApplication): if not self.get("install"): self.set("install", self._install) - try: - self.discover() - self.provision() - except: - self.fail() - return + self.discover() + self.provision() self.debug("----- READY ---- ") self.set_ready() + @failtrap def start(self): if self.state == ResourceState.READY: command = self.get("command") @@ -179,8 +177,9 @@ class PlanetlabTap(LinuxApplication): else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self.fail() + raise RuntimeError, msg + @failtrap def stop(self): command = self.get('command') or '' @@ -214,12 +213,17 @@ class PlanetlabTap(LinuxApplication): def release(self): # Node needs to wait until all associated RMs are released # to be released - from nepi.resources.linux.udptunnel import UdpTunnel - rms = self.get_connected(UdpTunnel.rtype()) - for rm in rms: - if rm.state < ResourceState.STOPPED: - self.ec.schedule(reschedule_delay, self.release) - return + try: + from nepi.resources.linux.udptunnel import UdpTunnel + rms = self.get_connected(UdpTunnel.rtype()) + for rm in rms: + if rm.state < ResourceState.STOPPED: + self.ec.schedule(reschedule_delay, self.release) + return + except: + import traceback + err = traceback.format_exc() + self.error(err) super(PlanetlabTap, self).release() diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index f5d39d78..b7caeac3 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -30,25 +30,15 @@ N_PROCS = None class WorkerThread(threading.Thread): class QUIT: pass - class REASSIGNED: - pass - + def run(self): while True: task = self.queue.get() - if task is None: - self.done = True - self.queue.task_done() - continue - elif task is self.QUIT: - self.done = True + + if task is self.QUIT: self.queue.task_done() break - elif task is self.REASSIGNED: - continue - else: - self.done = False - + try: try: callable, args, kwargs = task @@ -61,41 +51,35 @@ class WorkerThread(threading.Thread): except: traceback.print_exc(file = sys.stderr) self.delayed_exceptions.append(sys.exc_info()) - - def waitdone(self): - while not self.queue.empty() and not self.done: - self.queue.join() - + def attach(self, queue, rvqueue, delayed_exceptions): - if self.isAlive(): - self.waitdone() - oldqueue = self.queue self.queue = queue self.rvqueue = rvqueue self.delayed_exceptions = delayed_exceptions - if self.isAlive(): - oldqueue.put(self.REASSIGNED) - - def detach(self): - if self.isAlive(): - self.waitdone() - self.oldqueue = self.queue - self.queue = Queue.Queue() - self.rvqueue = None - self.delayed_exceptions = [] - - def detach_signal(self): - if self.isAlive(): - self.oldqueue.put(self.REASSIGNED) - del self.oldqueue - + def quit(self): self.queue.put(self.QUIT) - self.join() -class ParallelMap(object): +class ParallelRun(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): + self.maxqueue = maxqueue + self.maxthreads = maxthreads + + self.queue = Queue.Queue(self.maxqueue or 0) + + self.delayed_exceptions = [] + + if results: + self.rvqueue = Queue.Queue() + else: + self.rvqueue = None + + self.initialize_workers() + + def initialize_workers(self): global N_PROCS + + maxthreads = self.maxthreads # Compute maximum number of threads allowed by the system if maxthreads is None: @@ -112,42 +96,30 @@ class ParallelMap(object): if maxthreads is None: maxthreads = 4 - - self.queue = Queue.Queue(maxqueue or 0) - - self.delayed_exceptions = [] - - if results: - self.rvqueue = Queue.Queue() - else: - self.rvqueue = None - + self.workers = [] # initialize workers for x in xrange(maxthreads): - t = None - if t is None: - t = WorkerThread() - t.setDaemon(True) - else: - t.waitdone() - - t.attach(self.queue, self.rvqueue, self.delayed_exceptions) - self.workers.append(t) + worker = WorkerThread() + worker.attach(self.queue, self.rvqueue, self.delayed_exceptions) + worker.setDaemon(True) + + self.workers.append(worker) def __del__(self): self.destroy() - + + def empty(self): + while True: + try: + self.queue.get(block = False) + self.queue.task_done() + except Queue.Empty: + break + def destroy(self): - for worker in self.workers: - worker.waitdone() - for worker in self.workers: - worker.detach() - for worker in self.workers: - worker.detach_signal() - for worker in self.workers: - worker.quit() + self.join() del self.workers[:] @@ -158,28 +130,21 @@ class ParallelMap(object): self.queue.put_nowait((callable, args, kwargs)) def start(self): - for thread in self.workers: - if not thread.isAlive(): - thread.start() + for worker in self.workers: + if not worker.isAlive(): + worker.start() def join(self): - for thread in self.workers: - # That's the sync signal - self.queue.put(None) - + # Wait until all queued tasks have been processed self.queue.join() - for thread in self.workers: - thread.waitdone() - - if self.delayed_exceptions: - typ,val,loc = self.delayed_exceptions[0] - del self.delayed_exceptions[:] - raise typ,val,loc - - self.destroy() + + for worker in self.workers: + worker.quit() + + for worker in self.workers: + worker.join() def sync(self): - self.queue.join() if self.delayed_exceptions: typ,val,loc = self.delayed_exceptions[0] del self.delayed_exceptions[:] @@ -197,18 +162,3 @@ class ParallelMap(object): except Queue.Empty: raise StopIteration -class ParallelRun(ParallelMap): - def __run(self, x): - fn, args, kwargs = x - return fn(*args, **kwargs) - - def __init__(self, maxthreads = None, maxqueue = None): - super(ParallelRun, self).__init__(maxthreads, maxqueue, True) - - def put(self, what, *args, **kwargs): - super(ParallelRun, self).put(self.__run, (what, args, kwargs)) - - def put_nowait(self, what, *args, **kwargs): - super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs)) - - diff --git a/test/execution/ec.py b/test/execution/ec.py index 46021483..03040146 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -72,17 +72,23 @@ class ExecuteControllersTestCase(unittest.TestCase): def test_schedule_exception(self): def raise_error(): + # When this task is executed and the error raise, + # the FailureManager should set its failure level to + # TASK_FAILURE raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!" ec = ExperimentController() - ec.schedule("2s", raise_error) - while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]: - time.sleep(1) + tid = ec.schedule("2s", raise_error, track = True) - self.assertEquals(ec.ecstate, ECState.FAILED) - ec.shutdown() + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) + self.assertEquals(task.status, TaskStatus.ERROR) if __name__ == '__main__': unittest.main() diff --git a/test/execution/resource.py b/test/execution/resource.py index 091c43e1..e594f268 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -20,15 +20,15 @@ from nepi.execution.attribute import Attribute -from nepi.execution.ec import ExperimentController -from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \ - ResourceAction +from nepi.execution.ec import ExperimentController, FailureLevel +from nepi.execution.resource import ResourceManager, ResourceState, \ + clsinit_copy, ResourceAction, failtrap import random import time import unittest -@clsinit +@clsinit_copy class MyResource(ResourceManager): _rtype = "MyResource" @@ -40,14 +40,13 @@ class MyResource(ResourceManager): def __init__(self, ec, guid): super(MyResource, self).__init__(ec, guid) -@clsinit +@clsinit_copy class AnotherResource(ResourceManager): _rtype = "AnotherResource" def __init__(self, ec, guid): super(AnotherResource, self).__init__(ec, guid) - class Channel(ResourceManager): _rtype = "Channel" @@ -89,7 +88,7 @@ class Node(ResourceManager): self.discover() self.provision() self.logger.debug(" -------- PROVISIONED ------- ") - self.ec.schedule("3s", self.deploy) + self.ec.schedule("1s", self.deploy) elif self.state == ResourceState.PROVISIONED: ifaces = self.get_connected(Interface.rtype()) for rm in ifaces: @@ -111,15 +110,29 @@ class Application(ResourceManager): if node.state < ResourceState.READY: self.ec.schedule("0.5s", self.deploy) else: - time.sleep(random.random() * 5) + time.sleep(random.random() * 2) super(Application, self).deploy() self.logger.debug(" -------- DEPLOYED ------- ") def start(self): super(Application, self).start() - time.sleep(random.random() * 5) + time.sleep(random.random() * 3) self._state = ResourceState.FINISHED - + +class ErrorApplication(ResourceManager): + _rtype = "ErrorApplication" + + def __init__(self, ec, guid): + super(ErrorApplication, self).__init__(ec, guid) + + @failtrap + def deploy(self): + node = self.get_connected(Node.rtype())[0] + if node.state < ResourceState.READY: + self.ec.schedule("0.5s", self.deploy) + else: + time.sleep(random.random() * 2) + raise RuntimeError, "NOT A REAL ERROR. JUST TESTING" class ResourceFactoryTestCase(unittest.TestCase): def test_add_resource_factory(self): @@ -130,13 +143,13 @@ class ResourceFactoryTestCase(unittest.TestCase): ResourceFactory.register_type(AnotherResource) self.assertEquals(MyResource.rtype(), "MyResource") - self.assertEquals(len(MyResource._attributes), 1) + self.assertEquals(len(MyResource._attributes), 2) self.assertEquals(ResourceManager.rtype(), "Resource") - self.assertEquals(len(ResourceManager._attributes), 0) + self.assertEquals(len(ResourceManager._attributes), 1) self.assertEquals(AnotherResource.rtype(), "AnotherResource") - self.assertEquals(len(AnotherResource._attributes), 0) + self.assertEquals(len(AnotherResource._attributes), 1) self.assertEquals(len(ResourceFactory.resource_types()), 2) @@ -274,15 +287,43 @@ class ResourceManagerTestCase(unittest.TestCase): ec.shutdown() - def test_start_with_condition(self): + def test_exception(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(ErrorApplication) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + ResourceFactory.register_type(Channel) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + apps = list() + for i in xrange(10): + app = ec.register_resource("ErrorApplication") + ec.register_connection(app, node) + apps.append(app) + + + ec.deploy() + + ec.wait_finished(apps) + + ec.shutdown() + + self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE) + + + def ztest_start_with_condition(self): # TODO!!! pass - def test_stop_with_condition(self): + def ztest_stop_with_condition(self): # TODO!!! pass - def test_set_with_condition(self): + def ztest_set_with_condition(self): # TODO!!! pass diff --git a/test/resources/planetlab/ovs.py b/test/resources/planetlab/ovs.py index 0cb6c20b..d0720740 100644 --- a/test/resources/planetlab/ovs.py +++ b/test/resources/planetlab/ovs.py @@ -112,15 +112,15 @@ class OvsTestCase(unittest.TestCase): ec.set(tap2, "prefix4", 24) ec.register_connection(tap2, node4) - ovstun1 = ec.register_resource("Tunnel") + ovstun1 = ec.register_resource("OVSTunnel") ec.register_connection(port1, ovstun1) ec.register_connection(tap1, ovstun1) - ovstun2 = ec.register_resource("Tunnel") + ovstun2 = ec.register_resource("OVSTunnel") ec.register_connection(port3, ovstun2) ec.register_connection(tap2, ovstun2) - ovstun3 = ec.register_resource("Tunnel") + ovstun3 = ec.register_resource("OVSTunnel") ec.register_connection(port2, ovstun3) ec.register_connection(port4, ovstun3) diff --git a/test/util/parallel.py b/test/util/parallel.py new file mode 100644 index 00000000..7f8f5a07 --- /dev/null +++ b/test/util/parallel.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + + +from nepi.util.parallel import ParallelRun + +import datetime +import unittest + +class ParallelRunTestCase(unittest.TestCase): + def test_run_simple(self): + runner = ParallelRun(maxthreads = 4) + runner.start() + + count = [0] + + def inc(count): + count[0] += 1 + + for x in xrange(10): + runner.put(inc, count) + + runner.destroy() + + self.assertEquals(count[0], 10) + + def test_run_interrupt(self): + + def sleep(): + import time + time.sleep(5) + + startt = datetime.datetime.now() + + runner = ParallelRun(maxthreads = 4) + runner.start() + + for x in xrange(100): + runner.put(sleep) + + runner.empty() + runner.destroy() + + endt = datetime.datetime.now() + time_elapsed = (endt - startt).seconds + self.assertTrue( time_elapsed < 500) + + def test_run_error(self): + count = [0] + + def inc(count): + count[0] += 1 + + def error(): + raise RuntimeError() + + runner = ParallelRun(maxthreads = 4) + runner.start() + + for x in xrange(4): + runner.put(inc, count) + + runner.put(error) + + runner.destroy() + + self.assertEquals(count[0], 4) + + self.assertRaises(RuntimeError, runner.sync) + +if __name__ == '__main__': + unittest.main() + -- 2.43.0