import sys
import time
import threading
-
-class FailurePolicy(object):
- """ Defines how to respond to experiment failures
- """
- IGNORE_RM_FAILURE = 1
- ABORT_ON_RM_FAILURE = 2
+import weakref
class FailureLevel(object):
- """ Describe the system failure state
+ """ Describes the system failure state
"""
OK = 1
RM_FAILURE = 2
- TASK_FAILURE = 3
- EC_FAILURE = 4
+ EC_FAILURE = 3
class FailureManager(object):
""" The FailureManager is responsible for handling errors,
and deciding whether an experiment should be aborted
"""
- def __init__(self, failure_policy = None):
+ def __init__(self, ec):
+ self._ec = weakref.ref(ec)
self._failure_level = FailureLevel.OK
- self._failure_policy = failure_policy or \
- FailurePolicy.ABORT_ON_RM_FAILURE
@property
- def abort(self):
- if self._failure_level == FailureLevel.EC_FAILURE:
- return True
-
- if self._failure_level in [FailureLevel.TASK_FAILURE,
- FailureLevel.RM_FAILURE] and \
- self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
- return True
-
- return False
+ def ec(self):
+ """ Returns the Experiment Controller """
+ return self._ec()
- def set_rm_failure(self):
- self._failure_level = FailureLevel.RM_FAILURE
+ @property
+ def abort(self):
+ if self._failure_level == FailureLevel.OK:
+ for guid in self.ec.resources:
+ state = self.ec.state(guid)
+ critical = self.ec.get(guid, "critical")
+
+ if state == ResourceState.FAILED and critical:
+ self._failure_level = FailureLevel.RM_FAILURE
+ self.ec.logger.debug("RM critical failure occurred on guid %d." \
+ " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+ break
- def set_task_failure(self):
- self._failure_level = FailureLevel.TASK_FAILURE
+ return self._failure_level != FailureLevel.OK
def set_ec_failure(self):
self._failure_level = FailureLevel.EC_FAILURE
+
class ECState(object):
""" State of the Experiment Controller
# Resource managers
self._resources = dict()
- # Scheduler
+ # Scheduler. It a queue that holds tasks scheduled for
+ # execution, and yields the next task to be executed
+ # ordered by execution and arrival time
self._scheduler = HeapScheduler()
# Tasks
# generator of globally unique id for groups
self._group_id_generator = guid.GuidGenerator()
-
- # Event processing thread
- self._cond = threading.Condition()
- self._thread = threading.Thread(target = self._process)
- self._thread.setDaemon(True)
- self._thread.start()
# Flag to stop processing thread
self._stop = False
# Entity in charge of managing system failures
- self._fm = FailureManager()
+ self._fm = FailureManager(self)
# EC state
self._state = ECState.RUNNING
+ # The runner is a pool of threads used to parallelize
+ # execution of tasks
+ nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+ self._runner = ParallelRun(maxthreads = nthreads)
+
+ # Event processing thread
+ self._cond = threading.Condition()
+ self._thread = threading.Thread(target = self._process)
+ self._thread.setDaemon(True)
+ self._thread.start()
+
@property
def logger(self):
""" Return the logger of the Experiment Controller
def abort(self):
return self._fm.abort
- def set_rm_failure(self):
- self._fm.set_rm_failure()
-
def wait_finished(self, guids):
""" Blocking method that wait until all RMs in the 'guid' list
reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
# If a guid reached one of the target states, remove it from list
guid = guids[0]
rstate = self.state(guid)
+
+ hrrstate = ResourceState2str.get(rstate)
+ hrstate = ResourceState2str.get(state)
if rstate >= state:
guids.remove(guid)
+ self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
+ guid, hrrstate, hrstate))
else:
# Debug...
- hrrstate = ResourceState2str.get(rstate)
- hrstate = ResourceState2str.get(state)
self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
guid, hrrstate, hrstate))
-
- time.sleep(0.5)
+ time.sleep(0.5)
def get_task(self, tid):
""" Get a specific task
guids = self.resources
# Remove all pending tasks from the scheduler queue
- for tis in self._scheduler.pending:
- self._scheduler.remove(tis)
+ for tid in list(self._scheduler.pending):
+ self._scheduler.remove(tid)
+
+ self._runner.empty()
for guid in guids:
rm = self.get_resource(guid)
Releases all the resources and stops task processing thread
"""
+ # If there was a major failure we can't exit gracefully
+ if self._state == ECState.FAILED:
+ raise RuntimeError("EC failure. Can not exit gracefully")
+
self.release()
# Mark the EC state as TERMINATED
that might have been raised by the workers.
"""
- nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
- runner = ParallelRun(maxthreads = nthreads)
- runner.start()
+ self._runner.start()
while not self._stop:
try:
if task:
# Process tasks in parallel
- runner.put(self._execute, task)
+ self._runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
# Set the EC to FAILED state
self._state = ECState.FAILED
-
- # Set the FailureManager failure level
+
+ # Set the FailureManager failure level to EC failure
self._fm.set_ec_failure()
self.logger.debug("Exiting the task processing loop ... ")
- runner.sync()
- runner.destroy()
+
+ self._runner.sync()
+ self._runner.destroy()
def _execute(self, task):
""" Executes a single task.
self.logger.error("Error occurred while executing task: %s" % err)
- # Set the FailureManager failure level
- self._fm.set_task_failure()
-
def _notify(self):
""" Awakes the processing thread in case it is blocked waiting
for a new task to be scheduled.
from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
+from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import TraceAttr
import copy
cls._clsinit_copy()
return cls
+def failtrap(func):
+ def wrapped(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+ self.debug("SETTING guid %d to state FAILED" % self.guid)
+ self.fail()
+ raise
+
+ return wrapped
+
# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
resource attributes
"""
- pass
+ critical = Attribute("critical", "Defines whether the resource is critical. "
+ " A failure on a critical resource will interrupt the experiment. ",
+ type = Types.Bool,
+ default = True,
+ flags = Flags.ExecReadOnly)
+ cls._register_attribute(critical)
+
@classmethod
def _register_traces(cls):
""" Resource subclasses will invoke this method to register
@property
def state(self):
- """ Get the state of the current RM """
+ """ Get the current state of the RM """
return self._state
def log_message(self, msg):
def discover(self):
""" Performs resource discovery.
- This method is resposible for selecting an individual resource
+ This method is responsible for selecting an individual resource
matching user requirements.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
self.set_discovered()
def provision(self):
""" Performs resource provisioning.
- This method is resposible for provisioning one resource.
+ This method is responsible for provisioning one resource.
After this method has been successfully invoked, the resource
- should be acccesible/controllable by the RM.
+ should be accessible/controllable by the RM.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
self.set_provisioned()
def start(self):
- """ Starts the resource.
+ """ Starts the RM.
There is no generic start behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
self.set_started()
def stop(self):
- """ Stops the resource.
+ """ Interrupts the RM, stopping any tasks the RM was performing.
There is no generic stop behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
self.set_stopped()
def deploy(self):
- """ Execute all steps required for the RM to reach the state READY
+ """ Execute all steps required for the RM to reach the state READY.
+
+ This method is responsible for deploying the resource (and invoking the
+ discover and provision methods).
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
"""
if self.state > ResourceState.READY:
self.set_ready()
def release(self):
+ """ Perform actions to free resources used by the RM.
+
+ This method is responsible for releasing resources that were
+ used during the experiment by the RM.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, this method should never
+ raise an error and it must ensure the RM is set to state RELEASED.
+
+ """
self.set_released()
def finish(self):
+ """ Sets the RM to state FINISHED.
+
+ The FINISHED state is different from STOPPED in that it should not be
+ directly invoked by the user.
+ STOPPED indicates that the user interrupted the RM, FINISHED means
+ that the RM concluded normally the actions it was supposed to perform.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+
self.set_finished()
def fail(self):
+ """ Sets the RM to state FAILED.
+
+ """
+
self.set_failed()
- self.ec.set_rm_failure()
def set(self, name, value):
""" Set the value of the attribute
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to start are met
+ if self.ec.abort:
+ return
- # only can start when RM is either STOPPED or READY
+ # Can only start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
self.debug("---- RESCHEDULING START ---- state %s " % self.state )
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to stop are met
+ if self.ec.abort:
+ return
# only can stop when RM is STARTED
if self.state != ResourceState.STARTED:
reschedule = True
+ self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
else:
self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to deploy are met
+ if self.ec.abort:
+ return
# only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
def set_started(self):
""" Mark ResourceManager as STARTED """
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_state(ResourceState.STARTED, "_start_time")
def set_stopped(self):
""" Mark ResourceManager as STOPPED """
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ self.set_state(ResourceState.STOPPED, "_stop_time")
def set_ready(self):
""" Mark ResourceManager as READY """
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_state(ResourceState.READY, "_ready_time")
def set_released(self):
""" Mark ResourceManager as REALEASED """
- self._release_time = tnow()
- self._state = ResourceState.RELEASED
+ self.set_state(ResourceState.RELEASED, "_release_time")
def set_finished(self):
""" Mark ResourceManager as FINISHED """
- self._finish_time = tnow()
- self._state = ResourceState.FINISHED
+ self.set_state(ResourceState.FINISHED, "_finish_time")
def set_failed(self):
""" Mark ResourceManager as FAILED """
- self._failed_time = tnow()
- self._state = ResourceState.FAILED
+ self.set_state(ResourceState.FAILED, "_failed_time")
def set_discovered(self):
""" Mark ResourceManager as DISCOVERED """
- self._discover_time = tnow()
- self._state = ResourceState.DISCOVERED
+ self.set_state(ResourceState.DISCOVERED, "_discover_time")
def set_provisioned(self):
""" Mark ResourceManager as PROVISIONED """
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
+ self.set_state(ResourceState.PROVISIONED, "_provision_time")
+
+ def set_state(self, state, state_time_attr):
+ # Ensure that RM state will not change after released
+ if self._state == ResourceState.RELEASED:
+ return
+
+ setattr(self, state_time_attr, tnow())
+ self._state = state
class ResourceFactory(object):
_resource_types = dict()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- ResourceAction
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, ResourceAction, failtrap
from nepi.util.sshfuncs import ProcStatus
import os
import tempfile
-@clsinit
+@clsinit_copy
class Collector(ResourceManager):
""" The collector is reponsible of collecting traces
of a same type associated to RMs into a local directory.
@property
def store_path(self):
return self._store_path
-
+
+ @failtrap
def provision(self):
trace_name = self.get("traceName")
if not trace_name:
super(Collector, self).provision()
+ @failtrap
def deploy(self):
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ self.discover()
+ self.provision()
super(Collector, self).deploy()
def release(self):
- trace_name = self.get("traceName")
- rename = self.get("rename") or trace_name
-
- msg = "Collecting '%s' traces to local directory %s" % (
- trace_name, self.store_path)
- self.info(msg)
-
- rms = self.get_connected()
- for rm in rms:
- result = self.ec.trace(rm.guid, trace_name)
- fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
- rename))
- try:
- f = open(fpath, "w")
- f.write(result)
- f.close()
- except:
- msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
- rm.guid, fpath)
- self.error(msg)
- continue
+ try:
+ trace_name = self.get("traceName")
+ rename = self.get("rename") or trace_name
+
+ msg = "Collecting '%s' traces to local directory %s" % (
+ trace_name, self.store_path)
+ self.info(msg)
+
+ rms = self.get_connected()
+ for rm in rms:
+ result = self.ec.trace(rm.guid, trace_name)
+ fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
+ rename))
+ try:
+ f = open(fpath, "w")
+ f.write(result)
+ f.close()
+ except:
+ msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
+ rm.guid, fpath)
+ self.error(msg)
+ continue
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(Collector, self).release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
# TODO: Resolve wildcards in commands!!
# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
-@clsinit
+@clsinit_copy
class LinuxApplication(ResourceManager):
"""
.. class:: Class Args :
_help = "Runs an application on a Linux host with a BASH command "
_backend_type = "linux"
-
@classmethod
def _register_attributes(cls):
command = Attribute("command", "Command to execute at application start. "
out = int(out.strip())
return out
-
+
+ @failtrap
def provision(self):
# create run dir for application
self.node.mkdir(self.run_home)
# each step we check that the EC is still
for step in steps:
if self.ec.abort:
- raise RuntimeError, "EC finished"
+ self.debug("Interrupting provisioning. EC says 'ABORT")
+ return
ret = step()
if ret:
# replace application specific paths in the command
return self.replace_paths(install)
+ @failtrap
def deploy(self):
# Wait until node is associated and deployed
node = self.node
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ command = self.get("command") or ""
+ self.info("Deploying command '%s' " % command)
+ self.discover()
+ self.provision()
super(LinuxApplication, self).deploy()
-
+
+ @failtrap
def start(self):
command = self.get("command")
# installation), then the application is directly marked as FINISHED
self.set_finished()
else:
-
- try:
- if self.in_foreground:
- self._run_in_foreground()
- else:
- self._run_in_background()
- except:
- self.fail()
- return
+ if self.in_foreground:
+ self._run_in_foreground()
+ else:
+ self._run_in_background()
super(LinuxApplication, self).start()
command = self.get("command")
sudo = self.get("sudo") or False
x11 = self.get("forwardX11")
+ env = self.get("env")
# For a command being executed in foreground, if there is stdin,
# it is expected to be text string not a file or pipe
# to be able to kill the process from the stop method.
# We also set blocking = False, since we don't want the
# thread to block until the execution finishes.
- (out, err), self._proc = self.execute_command(self, command,
+ (out, err), self._proc = self.execute_command(command,
env = env,
sudo = sudo,
stdin = stdin,
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
-
+
+ @failtrap
def stop(self):
""" Stops application execution
"""
if proc.poll() or err:
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
- self.fail()
- return
super(LinuxApplication, self).stop()
def release(self):
self.info("Releasing resource")
- tear_down = self.get("tearDown")
- if tear_down:
- self.node.execute(tear_down)
+ try:
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.node.execute(tear_down)
- self.stop()
+ self.stop()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(LinuxApplication, self).release()
-
+
@property
def state(self):
""" Returns the state of the application
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow, tdiffsec
if self.ccnd: return self.ccnd.node
return None
+ @failtrap
def deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- command = self.get("command") or ""
+ command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
-
- if not self.get("env"):
- self.set("env", self._environment)
+ self.info("Deploying command '%s' " % command)
+
+ if not self.get("env"):
+ self.set("env", self._environment)
+
+ self.discover()
+ self.provision()
- self.discover()
- self.provision()
- except:
- self.fail()
- return
-
self.debug("----- READY ---- ")
self.set_ready()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
from nepi.util.timefuncs import tnow
if self.ccnr: return self.ccnr.node
return None
+ @failtrap
def deploy(self):
if not self.ccnr or self.ccnr.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- # set content to stdin, so the content will be
- # uploaded during provision
- self.set("stdin", self.get("content"))
+ # set content to stdin, so the content will be
+ # uploaded during provision
+ self.set("stdin", self.get("content"))
- command = self.get("command")
+ command = self.get("command")
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
self.error(msg, out, err)
raise RuntimeError, msg
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- sef.fail()
+ raise RuntimeError, msg
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
from nepi.util.timefuncs import tnow, tdiffsec
def path(self):
return "PATH=$PATH:${BIN}/%s/" % self.version
+ @failtrap
def deploy(self):
if not self.node or self.node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
# ccnd needs to wait until node is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
-
- if not self.get("depends"):
- self.set("depends", self._dependencies)
+ if not self.get("command"):
+ self.set("command", self._start_command)
+
+ if not self.get("depends"):
+ self.set("depends", self._dependencies)
- if not self.get("sources"):
- self.set("sources", self._sources)
+ if not self.get("sources"):
+ self.set("sources", self._sources)
- sources = self.get("sources")
- source = sources.split(" ")[0]
- basename = os.path.basename(source)
- self._version = ( basename.strip().replace(".tar.gz", "")
- .replace(".tar","")
- .replace(".gz","")
- .replace(".zip","") )
+ sources = self.get("sources")
+ source = sources.split(" ")[0]
+ basename = os.path.basename(source)
+ self._version = ( basename.strip().replace(".tar.gz", "")
+ .replace(".tar","")
+ .replace(".gz","")
+ .replace(".zip","") )
- if not self.get("build"):
- self.set("build", self._build)
+ if not self.get("build"):
+ self.set("build", self._build)
- if not self.get("install"):
- self.set("install", self._install)
+ if not self.get("install"):
+ self.set("install", self._install)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- command = self.get("command")
+ command = self.get("command")
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command') or ''
state_check_delay = 0.5
if self._state == ResourceState.STARTED and \
tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- (out, err), proc = self._ccndstatus
+ (out, err), proc = self._ccndstatus()
retcode = proc.poll()
return self._state
- @property
def _ccndstatus(self):
env = self.get('env') or ""
environ = self.node.format_environment(env, inline = True)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
from nepi.util.timefuncs import tnow, tdiffsec
if ccnpingserver: return ccnpingserver[0]
return None
+ @failtrap
def start(self):
if not self.ccnpingserver or \
self.ccnpingserver.state < ResourceState.STARTED:
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.util.timefuncs import tnow, tdiffsec
super(LinuxCCNPingServer, self).__init__(ec, guid)
self._home = "ccnping-serv-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
if self.ccnd: return self.ccnd.node
return None
+ @failtrap
def deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- command = self.get("command")
+ command = self.get("command")
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
-
+
+ @failtrap
def deploy(self):
# Wait until associated ccnd is provisioned
if not self.ccnd or self.ccnd.state < ResourceState.READY:
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("ip"):
- host = self.get("host")
- ip = socket.gethostbyname(host)
- self.set("ip", ip)
+ if not self.get("ip"):
+ host = self.get("host")
+ ip = socket.gethostbyname(host)
+ self.set("ip", ip)
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- command = self.get("command")
+ command = self.get("command")
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- self.configure()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
+ self.configure()
self.debug("----- READY ---- ")
self.set_ready()
# schedule mtr deploy
self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command')
env = self.get('env')
if err:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
@property
def _start_command(self):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState
-@clsinit
+@clsinit_copy
class LinuxChannel(ResourceManager):
_rtype = "LinuxChannel"
_help = "Represents a wireless channel on a network of Linux hosts"
def valid_connection(self, guid):
# TODO: Validate!
return True
+
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.channel import LinuxChannel
# TODO: UP, MTU attributes!
-@clsinit
+@clsinit_copy
class LinuxInterface(ResourceManager):
_rtype = "LinuxInterface"
_help = "Controls network devices on Linux hosts through the ifconfig tool"
if chan: return chan[0]
return None
+ @failtrap
def discover(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
super(LinuxInterface, self).discover()
+ @failtrap
def provision(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
super(LinuxInterface, self).provision()
+ @failtrap
def deploy(self):
# Wait until node is provisioned
node = self.node
else:
# Verify if the interface exists in node. If not, configue
# if yes, load existing configuration
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
super(LinuxInterface, self).deploy()
def release(self):
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
+ try:
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(LinuxInterface, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "mtr-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
UBUNTU = "ubuntu"
DEBIAN = "debian"
-@clsinit
+@clsinit_copy
class LinuxNode(ResourceManager):
"""
.. class:: Class Args :
clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
" from node home folder before starting experiment",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
" from a previous same experiment, before the new experiment starts",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
clean_processes = Attribute("cleanProcesses",
"Kill all running processes before starting experiment",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
tear_down = Attribute("tearDown", "Bash script to be executed before " + \
time.sleep(min(30.0, retrydelay))
retrydelay *= 1.5
-
@property
def use_deb(self):
return self.os in [OSType.DEBIAN, OSType.UBUNTU]
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+ @failtrap
def provision(self):
# check if host is alive
if not self.is_alive():
-
msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
self.error(msg)
raise RuntimeError, msg
super(LinuxNode, self).provision()
+ @failtrap
def deploy(self):
if self.state == ResourceState.NEW:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.info("Deploying node")
+ self.discover()
+ self.provision()
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
super(LinuxNode, self).deploy()
def release(self):
- # Node needs to wait until all associated RMs are released
- # to be released
- rms = self.get_connected()
- for rm in rms:
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
+ try:
+ rms = self.get_connected()
+ for rm in rms:
+ # Node needs to wait until all associated RMs are released
+ # before it can be released
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
- self.clean_processes()
+ self.clean_processes()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(LinuxNode, self).release()
return (out, err), proc
-
def upload(self, src, dst, text = False, overwrite = True):
""" Copy content to destination
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "nping-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxPing, self).__init__(ec, guid)
self._home = "ping-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "tcpdump-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxTraceroute, self).__init__(ec, guid)
self._home = "traceroute-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
-from nepi.execution.resource import clsinit_copy
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxUdpTest, self).__init__(ec, guid)
self._home = "udptest-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# finished to continue )
self._run_in_background()
+ @failtrap
def start(self):
if self.get("s") == True:
# Server is already running
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, err
else:
super(LinuxUdpTest, self).start()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
port = self.wait_local_port(endpoint)
return (port, pid, ppid)
+ @failtrap
def provision(self):
# create run dir for tunnel on each node
self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
self.set_provisioned()
+ @failtrap
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
+ @failtrap
def stop(self):
""" Stops application execution
"""
# check if execution errors occurred
msg = " Failed to STOP tunnel"
self.error(msg, err1, err2)
- self.fail()
- return
+ raise RuntimeError, msg
self.set_stopped()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.node import OMFNode
return True
+ @failtrap
def deploy(self):
""" Deploy the RM. It means nothing special for an application
for now (later it will be upload sources, ...)
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if self.get('sources'):
gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
super(OMFApplication, self).deploy()
-
+ @failtrap
def start(self):
""" Start the RM. It means : Send Xmpp Message Using OMF protocol
to execute the application.
if not (self.get('appid') and self.get('path')) :
msg = "Application's information are not initialized"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('args'):
self.set('args', " ")
except AttributeError:
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
raise
super(OMFApplication, self).start()
+ @failtrap
def stop(self):
""" Stop the RM. It means : Send Xmpp Message Using OMF protocol to
kill the application.
except AttributeError:
msg = "Credentials were not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise
super(OMFApplication, self).stop()
self.set_finished()
""" Clean the RM at the end of the experiment and release the API.
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFApplication, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
res.append(couple)
return res
- def discover(self):
- """ Discover the availables channels
-
- """
- pass
-
- def provision(self):
- """ Provision some availables channels
-
- """
- pass
-
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the channel.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('channel'):
msg = "Channel's value is not initialized"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
+
+ self._nodes_guid = self._get_target(self._connections)
- self._nodes_guid = self._get_target(self._connections)
if self._nodes_guid == "reschedule" :
self.ec.schedule("2s", self.deploy)
- return False
-
- try:
- for couple in self._nodes_guid:
- #print "Couple node/alias : " + couple[0] + " , " + couple[1]
- attrval = self.get('channel')
- attrname = "net/%s/%s" % (couple[1], 'channel')
- self._omf_api.configure(couple[0], attrname, attrval)
- except AttributeError:
- msg = "Credentials are not initialzed. XMPP Connections impossible"
- self.error(msg)
- self.fail()
- return
-
- super(OMFChannel, self).deploy()
-
- def start(self):
- """ Start the RM. It means nothing special for a channel for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFChannel, self).start()
-
- def stop(self):
- """ Stop the RM. It means nothing special for a channel for now
- It becomes STOPPED as soon as this method is called
-
- """
- super(OMFChannel, self).stop()
- self.set_finished()
+ else:
+ try:
+ for couple in self._nodes_guid:
+ #print "Couple node/alias : " + couple[0] + " , " + couple[1]
+ attrval = self.get('channel')
+ attrname = "net/%s/%s" % (couple[1], 'channel')
+ self._omf_api.configure(couple[0], attrname, attrval)
+ except AttributeError:
+ msg = "Credentials are not initialzed. XMPP Connections impossible"
+ self.error(msg)
+ raise
+
+ super(OMFChannel, self).deploy()
def release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFChannel, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.node import OMFNode
if rm_list: return rm_list[0]
return None
-
def configure_iface(self):
""" Configure the interface without the ip
return True
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the interface.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not (self.get('mode') and self.get('type') and self.get('essid') \
and self.get('ip')):
msg = "Interface's variable are not initialized"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
if not self.node.get('hostname') :
msg = "The channel is connected with an undefined node"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
# Just for information
self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
self.get('essid') + " : " + self.get('ip'))
# Check if the node is already deployed
- chk1 = True
if self.state < ResourceState.PROVISIONED:
- chk1 = self.configure_iface()
- if chk1:
- chk2 = self.configure_ip()
+ if self.configure_iface():
+ self.configure_ip()
- if not (chk1 and chk2) :
- return False
-
super(OMFWifiInterface, self).deploy()
- return True
-
-
- def start(self):
- """ Start the RM. It means nothing special for a channel for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFWifiInterface, self).start()
-
- def stop(self):
- """ Stop the RM. It means nothing special for a channel for now
- It becomes STOPPED as soon as this method is called
-
- """
- super(OMFWifiInterface, self).stop()
- self.set_finished()
def release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFWifiInterface, self).release()
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.omf_api import OMFAPIFactory
import time
-
@clsinit_copy
class OMFNode(OMFResource):
"""
return False
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Send Xmpp Message Using OMF protocol
to enroll the node into the experiment.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('hostname') :
msg = "Hostname's value is not initialized"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
try:
self._omf_api.enroll_host(self.get('hostname'))
except AttributeError:
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- #raise AttributeError, msg
+ raise
super(OMFNode, self).deploy()
- def discover(self):
- """ Discover the availables nodes
-
- """
- pass
-
- def provision(self):
- """ Provision some availables nodes
-
- """
- pass
-
- def start(self):
- """Start the RM. It means nothing special for an interface for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFNode, self).start()
-
- def stop(self):
- """Stop the RM. It means nothing special for an interface for now
- It becomes STOPPED as soon as this method stops
-
- """
- super(OMFNode, self).stop()
- self.set_finished()
-
def release(self):
"""Clean the RM at the end of the experiment
"""
- if self._omf_api :
- self._omf_api.release(self.get('hostname'))
-
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ self._omf_api.release(self.get('hostname'))
+
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFNode, self).release()
# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
class ResourceGateway:
"""
"nicta" : "??.??.??",
})
-@clsinit
+@clsinit_copy
class OMFResource(ResourceManager):
"""
Generic resource gathering XMPP credential information and common methods
# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.execfuncs import lexec
# Alexandros Kouvakas <alexandros.kouvakas@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
# TODO: Validate!
return True
+ @failtrap
def provision(self):
# create home dir for ovs
self.node.mkdir(self.ovs_home)
stderr = "check_cmd_stderr")
(out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode')
+
if out != "0\n":
msg = "Command sliver-ovs does not exist on the VM"
self.debug(msg)
raise RuntimeError, msg
+
msg = "Command sliver-ovs exists"
self.debug(msg)
+ @failtrap
def deploy(self):
""" Wait until node is associated and deployed
"""
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- self.check_sliver_ovs()
- self.servers_on()
- self.create_bridge()
- self.assign_contr()
- self.ovs_status()
- except:
- self._state = ResourceState.FAILED
- raise
-
- self._state = ResourceState.READY
+ self.discover()
+ self.provision()
+ self.check_sliver_ovs()
+ self.servers_on()
+ self.create_bridge()
+ self.assign_contr()
+ self.ovs_status()
+
+ super(OVSWitch, self).deploy()
def servers_on(self):
""" Start the openvswitch servers and also checking
# Check if the servers are running or not
(out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode')
+
if out != "0\n":
self.debug("Servers are not running")
raise RuntimeError, msg
+
self.info("Servers started")
def del_old_br(self):
(out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
self.info(out)
- def start(self):
- """ Start the RM. It means nothing special for
- ovswitch for now.
- """
- pass
-
- def stop(self):
- """ Stop the RM.It means nothing
- for ovswitch for now.
- """
- pass
-
def release(self):
""" Delete the bridge and
close the servers
"""
# Node needs to wait until all associated RMs are released
# to be released
- from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
- rm = self.get_connected(OVSPort.rtype())
+ try:
+ from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
+ rm = self.get_connected(OVSPort.rtype())
- if rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
+ if rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ msg = "Deleting the bridge %s" % self.get('bridge_name')
+ self.info(msg)
+ cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
+ cmd = "sliver-ovs stop"
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
- msg = "Deleting the bridge %s" % self.get('bridge_name')
- self.info(msg)
- cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
- cmd = "sliver-ovs stop"
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
-
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- self._state = ResourceState.RELEASED
-
+ if proc.poll():
+ self.fail()
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSWitch, self).release()
+
# Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
command = self.replace_paths(command)
return command
- def provision(self):
- """ Provision the ports.No meaning.
- """
- pass
-
- def discover(self):
- """ Discover the ports.No meaning
- """
- pass
-
+ @failtrap
def deploy(self):
""" Wait until ovswitch is started
"""
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- self.get_host_ip()
- self.create_port()
- self.get_local_end()
- self.ovswitch.ovs_status()
- self._state = ResourceState.READY
- except:
- self._state = ResourceState.FAILED
- raise
-
- def start(self):
- """ Start the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
- def stop(self):
- """ Stop the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
+ self.discover()
+ self.provision()
+ self.get_host_ip()
+ self.create_port()
+ self.get_local_end()
+ self.ovswitch.ovs_status()
+ super(OVSPort, self).deploy()
+
def release(self):
""" Release the port RM means delete the ports
"""
# OVS needs to wait until all associated RMs are released
# to be released
- from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
- rm = self.get_connected(Tunnel.rtype())
- if rm and rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- msg = "Deleting the port %s" % self.get('port_name')
- self.info(msg)
- cmd = "sliver-ovs del_port %s" % self.get('port_name')
- (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
- sudo = True)
-
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- self._state = ResourceState.RELEASED
+ try:
+ from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
+ rm = self.get_connected(Tunnel.rtype())
+ if rm and rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ msg = "Deleting the port %s" % self.get('port_name')
+ self.info(msg)
+ cmd = "sliver-ovs del_port %s" % self.get('port_name')
+ (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
+ sudo = True)
+
+ if proc.poll():
+ self.fail()
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSPort, self).release()
# Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
import time
import socket
-
reschedule_delay = "0.5s"
@clsinit_copy
-class Tunnel(LinuxApplication):
+class OVSTunnel(LinuxApplication):
"""
.. class:: Class Args :
"""
- _rtype = "Tunnel"
+ _rtype = "OVSTunnel"
_authorized_connections = ["OVSPort", "PlanetlabTap"]
@classmethod
:type guid: int
"""
- super(Tunnel, self).__init__(ec, guid)
+ super(OVSTunnel, self).__init__(ec, guid)
self._home = "tunnel-%s" % self.guid
self.port_info_tunl = []
self._nodes = []
self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
+
msg = "Connection on host %s configured" \
% self.node.get("hostname")
self.info(msg)
self.info(msg)
return
+ @failtrap
def provision(self):
""" Provision the tunnel
"""
(self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
- self.debug("------- READY -------")
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
-
- def discover(self):
- """ Discover the tunnel
-
- """
- pass
+ super(OVSTunnel, self).provision()
+ @failtrap
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
- self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
-
- def start(self):
- """ Start the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
-
- def stop(self):
- """ Stop the RM. It means nothing special for
- ovsport for now.
- """
- pass
+ self.discover()
+ self.provision()
+ super(OVSTunnel, self).deploy()
+
def release(self):
""" Release the udp_tunnel on endpoint2.
On endpoint1 means nothing special.
"""
- if not self.check_endpoints():
- # Kill the TAP devices
- # TODO: Make more generic Release method of PLTAP
- if self._pid and self._ppid:
- self._nodes = self.get_node(self.endpoint2)
- (out, err), proc = self.node.kill(self._pid,
- self._ppid, sudo = True)
- if err or proc.poll():
- # check if execution errors occurred
- msg = " Failed to delete TAP device"
- self.error(msg, err, err)
- self.fail()
-
- self._state = ResourceState.RELEASED
-
-
-
-
-
-
+ try:
+ if not self.check_endpoints():
+ # Kill the TAP devices
+ # TODO: Make more generic Release method of PLTAP
+ if self._pid and self._ppid:
+ self._nodes = self.get_node(self.endpoint2)
+ (out, err), proc = self.node.kill(self._pid,
+ self._ppid, sudo = True)
+ if err or proc.poll():
+ # check if execution errors occurred
+ msg = " Failed to delete TAP device"
+ self.error(msg, err, err)
+ self.fail()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSTunnel, self).release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.util.timefuncs import tnow, tdiffsec
if_name = self.wait_if_name()
self.set("deviceName", if_name)
+ @failtrap
def deploy(self):
if not self.node or self.node.state < ResourceState.PROVISIONED:
self.ec.schedule(reschedule_delay, self.deploy)
if not self.get("install"):
self.set("install", self._install)
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- return
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
+ raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command') or ''
def release(self):
# Node needs to wait until all associated RMs are released
# to be released
- from nepi.resources.linux.udptunnel import UdpTunnel
- rms = self.get_connected(UdpTunnel.rtype())
- for rm in rms:
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
+ try:
+ from nepi.resources.linux.udptunnel import UdpTunnel
+ rms = self.get_connected(UdpTunnel.rtype())
+ for rm in rms:
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(PlanetlabTap, self).release()
class WorkerThread(threading.Thread):
class QUIT:
pass
- class REASSIGNED:
- pass
-
+
def run(self):
while True:
task = self.queue.get()
- if task is None:
- self.done = True
- self.queue.task_done()
- continue
- elif task is self.QUIT:
- self.done = True
+
+ if task is self.QUIT:
self.queue.task_done()
break
- elif task is self.REASSIGNED:
- continue
- else:
- self.done = False
-
+
try:
try:
callable, args, kwargs = task
except:
traceback.print_exc(file = sys.stderr)
self.delayed_exceptions.append(sys.exc_info())
-
- def waitdone(self):
- while not self.queue.empty() and not self.done:
- self.queue.join()
-
+
def attach(self, queue, rvqueue, delayed_exceptions):
- if self.isAlive():
- self.waitdone()
- oldqueue = self.queue
self.queue = queue
self.rvqueue = rvqueue
self.delayed_exceptions = delayed_exceptions
- if self.isAlive():
- oldqueue.put(self.REASSIGNED)
-
- def detach(self):
- if self.isAlive():
- self.waitdone()
- self.oldqueue = self.queue
- self.queue = Queue.Queue()
- self.rvqueue = None
- self.delayed_exceptions = []
-
- def detach_signal(self):
- if self.isAlive():
- self.oldqueue.put(self.REASSIGNED)
- del self.oldqueue
-
+
def quit(self):
self.queue.put(self.QUIT)
- self.join()
-class ParallelMap(object):
+class ParallelRun(object):
def __init__(self, maxthreads = None, maxqueue = None, results = True):
+ self.maxqueue = maxqueue
+ self.maxthreads = maxthreads
+
+ self.queue = Queue.Queue(self.maxqueue or 0)
+
+ self.delayed_exceptions = []
+
+ if results:
+ self.rvqueue = Queue.Queue()
+ else:
+ self.rvqueue = None
+
+ self.initialize_workers()
+
+ def initialize_workers(self):
global N_PROCS
+
+ maxthreads = self.maxthreads
# Compute maximum number of threads allowed by the system
if maxthreads is None:
if maxthreads is None:
maxthreads = 4
-
- self.queue = Queue.Queue(maxqueue or 0)
-
- self.delayed_exceptions = []
-
- if results:
- self.rvqueue = Queue.Queue()
- else:
- self.rvqueue = None
-
+
self.workers = []
# initialize workers
for x in xrange(maxthreads):
- t = None
- if t is None:
- t = WorkerThread()
- t.setDaemon(True)
- else:
- t.waitdone()
-
- t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
- self.workers.append(t)
+ worker = WorkerThread()
+ worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+ worker.setDaemon(True)
+
+ self.workers.append(worker)
def __del__(self):
self.destroy()
-
+
+ def empty(self):
+ while True:
+ try:
+ self.queue.get(block = False)
+ self.queue.task_done()
+ except Queue.Empty:
+ break
+
def destroy(self):
- for worker in self.workers:
- worker.waitdone()
- for worker in self.workers:
- worker.detach()
- for worker in self.workers:
- worker.detach_signal()
- for worker in self.workers:
- worker.quit()
+ self.join()
del self.workers[:]
self.queue.put_nowait((callable, args, kwargs))
def start(self):
- for thread in self.workers:
- if not thread.isAlive():
- thread.start()
+ for worker in self.workers:
+ if not worker.isAlive():
+ worker.start()
def join(self):
- for thread in self.workers:
- # That's the sync signal
- self.queue.put(None)
-
+ # Wait until all queued tasks have been processed
self.queue.join()
- for thread in self.workers:
- thread.waitdone()
-
- if self.delayed_exceptions:
- typ,val,loc = self.delayed_exceptions[0]
- del self.delayed_exceptions[:]
- raise typ,val,loc
-
- self.destroy()
+
+ for worker in self.workers:
+ worker.quit()
+
+ for worker in self.workers:
+ worker.join()
def sync(self):
- self.queue.join()
if self.delayed_exceptions:
typ,val,loc = self.delayed_exceptions[0]
del self.delayed_exceptions[:]
except Queue.Empty:
raise StopIteration
-class ParallelRun(ParallelMap):
- def __run(self, x):
- fn, args, kwargs = x
- return fn(*args, **kwargs)
-
- def __init__(self, maxthreads = None, maxqueue = None):
- super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
-
- def put(self, what, *args, **kwargs):
- super(ParallelRun, self).put(self.__run, (what, args, kwargs))
-
- def put_nowait(self, what, *args, **kwargs):
- super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-
-
def test_schedule_exception(self):
def raise_error():
+ # When this task is executed and the error raise,
+ # the FailureManager should set its failure level to
+ # TASK_FAILURE
raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
ec = ExperimentController()
- ec.schedule("2s", raise_error)
- while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
- time.sleep(1)
+ tid = ec.schedule("2s", raise_error, track = True)
- self.assertEquals(ec.ecstate, ECState.FAILED)
- ec.shutdown()
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
+ self.assertEquals(task.status, TaskStatus.ERROR)
if __name__ == '__main__':
unittest.main()
from nepi.execution.attribute import Attribute
-from nepi.execution.ec import ExperimentController
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
- ResourceAction
+from nepi.execution.ec import ExperimentController, FailureLevel
+from nepi.execution.resource import ResourceManager, ResourceState, \
+ clsinit_copy, ResourceAction, failtrap
import random
import time
import unittest
-@clsinit
+@clsinit_copy
class MyResource(ResourceManager):
_rtype = "MyResource"
def __init__(self, ec, guid):
super(MyResource, self).__init__(ec, guid)
-@clsinit
+@clsinit_copy
class AnotherResource(ResourceManager):
_rtype = "AnotherResource"
def __init__(self, ec, guid):
super(AnotherResource, self).__init__(ec, guid)
-
class Channel(ResourceManager):
_rtype = "Channel"
self.discover()
self.provision()
self.logger.debug(" -------- PROVISIONED ------- ")
- self.ec.schedule("3s", self.deploy)
+ self.ec.schedule("1s", self.deploy)
elif self.state == ResourceState.PROVISIONED:
ifaces = self.get_connected(Interface.rtype())
for rm in ifaces:
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
else:
- time.sleep(random.random() * 5)
+ time.sleep(random.random() * 2)
super(Application, self).deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
def start(self):
super(Application, self).start()
- time.sleep(random.random() * 5)
+ time.sleep(random.random() * 3)
self._state = ResourceState.FINISHED
-
+
+class ErrorApplication(ResourceManager):
+ _rtype = "ErrorApplication"
+
+ def __init__(self, ec, guid):
+ super(ErrorApplication, self).__init__(ec, guid)
+
+ @failtrap
+ def deploy(self):
+ node = self.get_connected(Node.rtype())[0]
+ if node.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ else:
+ time.sleep(random.random() * 2)
+ raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
class ResourceFactoryTestCase(unittest.TestCase):
def test_add_resource_factory(self):
ResourceFactory.register_type(AnotherResource)
self.assertEquals(MyResource.rtype(), "MyResource")
- self.assertEquals(len(MyResource._attributes), 1)
+ self.assertEquals(len(MyResource._attributes), 2)
self.assertEquals(ResourceManager.rtype(), "Resource")
- self.assertEquals(len(ResourceManager._attributes), 0)
+ self.assertEquals(len(ResourceManager._attributes), 1)
self.assertEquals(AnotherResource.rtype(), "AnotherResource")
- self.assertEquals(len(AnotherResource._attributes), 0)
+ self.assertEquals(len(AnotherResource._attributes), 1)
self.assertEquals(len(ResourceFactory.resource_types()), 2)
ec.shutdown()
- def test_start_with_condition(self):
+ def test_exception(self):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(ErrorApplication)
+ ResourceFactory.register_type(Node)
+ ResourceFactory.register_type(Interface)
+ ResourceFactory.register_type(Channel)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("Node")
+
+ apps = list()
+ for i in xrange(10):
+ app = ec.register_resource("ErrorApplication")
+ ec.register_connection(app, node)
+ apps.append(app)
+
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ ec.shutdown()
+
+ self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE)
+
+
+ def ztest_start_with_condition(self):
# TODO!!!
pass
- def test_stop_with_condition(self):
+ def ztest_stop_with_condition(self):
# TODO!!!
pass
- def test_set_with_condition(self):
+ def ztest_set_with_condition(self):
# TODO!!!
pass
ec.set(tap2, "prefix4", 24)
ec.register_connection(tap2, node4)
- ovstun1 = ec.register_resource("Tunnel")
+ ovstun1 = ec.register_resource("OVSTunnel")
ec.register_connection(port1, ovstun1)
ec.register_connection(tap1, ovstun1)
- ovstun2 = ec.register_resource("Tunnel")
+ ovstun2 = ec.register_resource("OVSTunnel")
ec.register_connection(port3, ovstun2)
ec.register_connection(tap2, ovstun2)
- ovstun3 = ec.register_resource("Tunnel")
+ ovstun3 = ec.register_resource("OVSTunnel")
ec.register_connection(port2, ovstun3)
ec.register_connection(port4, ovstun3)
--- /dev/null
+#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+from nepi.util.parallel import ParallelRun
+
+import datetime
+import unittest
+
+class ParallelRunTestCase(unittest.TestCase):
+ def test_run_simple(self):
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ count = [0]
+
+ def inc(count):
+ count[0] += 1
+
+ for x in xrange(10):
+ runner.put(inc, count)
+
+ runner.destroy()
+
+ self.assertEquals(count[0], 10)
+
+ def test_run_interrupt(self):
+
+ def sleep():
+ import time
+ time.sleep(5)
+
+ startt = datetime.datetime.now()
+
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ for x in xrange(100):
+ runner.put(sleep)
+
+ runner.empty()
+ runner.destroy()
+
+ endt = datetime.datetime.now()
+ time_elapsed = (endt - startt).seconds
+ self.assertTrue( time_elapsed < 500)
+
+ def test_run_error(self):
+ count = [0]
+
+ def inc(count):
+ count[0] += 1
+
+ def error():
+ raise RuntimeError()
+
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ for x in xrange(4):
+ runner.put(inc, count)
+
+ runner.put(error)
+
+ runner.destroy()
+
+ self.assertEquals(count[0], 4)
+
+ self.assertRaises(RuntimeError, runner.sync)
+
+if __name__ == '__main__':
+ unittest.main()
+