import os
import pkgutil
import sys
+import threading
import weakref
reschedule_delay = "1s"
def clsinit(cls):
""" Initializes template information (i.e. attributes and traces)
- for the ResourceManager class
- """
+ on classes derived from the ResourceManager class.
+
+ It is used as a decorator in the class declaration as follows:
+
+ @clsinit
+ class MyResourceManager(ResourceManager):
+
+ ...
+
+ """
+
cls._clsinit()
return cls
def clsinit_copy(cls):
""" Initializes template information (i.e. attributes and traces)
- for the ResourceManager class, inheriting attributes and traces
- from the parent class
+ on classes direved from the ResourceManager class.
+ It differs from the clsinit method in that it forces inheritance
+ of attributes and traces from the parent class.
+
+ It is used as a decorator in the class declaration as follows:
+
+ @clsinit
+ class MyResourceManager(ResourceManager):
+
+ ...
+
+
+ clsinit_copy should be prefered to clsinit when creating new
+ ResourceManager child classes.
+
"""
+
cls._clsinit_copy()
return cls
def failtrap(func):
+ """ Decorator function for instance methods that should set the
+ RM state to FAILED when an error is raised. The methods that must be
+ decorated are: discover, provision, deploy, start, stop and finish.
+
+ """
def wrapped(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
return wrapped
-# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
""" Base clase for all ResourceManagers.
resource attribute
"""
+
cls._attributes[attr.name] = attr
@classmethod
resource attribute
"""
+
del cls._attributes[name]
@classmethod
resource trace
"""
+
cls._traces[trace.name] = trace
@classmethod
resource trace
"""
+
del cls._traces[name]
@classmethod
def _register_attributes(cls):
""" Resource subclasses will invoke this method to register
- resource attributes
+ resource attributes.
+
+ This method should be overriden in the RMs that define
+ attributes.
"""
- critical = Attribute("critical", "Defines whether the resource is critical. "
- " A failure on a critical resource will interrupt the experiment. ",
+
+ critical = Attribute("critical",
+ "Defines whether the resource is critical. "
+ "A failure on a critical resource will interrupt "
+ "the experiment. ",
type = Types.Bool,
default = True,
flags = Flags.ExecReadOnly)
""" Resource subclasses will invoke this method to register
resource traces
+ This method should be overriden in the RMs that define traces.
+
"""
+
pass
@classmethod
def _clsinit(cls):
- """ ResourceManager child classes have different attributes and traces.
- Since the templates that hold the information of attributes and traces
- are 'class attribute' dictionaries, initially they all point to the
- parent class ResourceManager instances of those dictionaries.
- In order to make these templates independent from the parent's one,
- it is necessary re-initialize the corresponding dictionaries.
- This is the objective of the _clsinit method
+ """ ResourceManager classes have different attributes and traces.
+ Attribute and traces are stored in 'class attribute' dictionaries.
+ When a new ResourceManager class is created, the _clsinit method is
+ called to create a new instance of those dictionaries and initialize
+ them.
+
+ The _clsinit method is called by the clsinit decorator method.
+
"""
+
# static template for resource attributes
cls._attributes = dict()
cls._register_attributes()
@classmethod
def _clsinit_copy(cls):
- """ Same as _clsinit, except that it also inherits all attributes and traces
- from the parent class.
+ """ Same as _clsinit, except that after creating new instances of the
+ dictionaries it copies all the attributes and traces from the parent
+ class.
+
+ The _clsinit_copy method is called by the clsinit_copy decorator method.
+
"""
# static template for resource attributes
cls._attributes = copy.deepcopy(cls._attributes)
self._state = ResourceState.NEW
+ # instance lock to synchronize exclusive state change methods (such
+ # as deploy and release methods), in order to prevent them from being
+ # executed at the same time
+ self._release_lock = threading.Lock()
+
@property
def guid(self):
""" Returns the global unique identifier of the RM """
@property
def ec(self):
- """ Returns the Experiment Controller """
+ """ Returns the Experiment Controller of the RM """
return self._ec()
@property
def connections(self):
- """ Returns the set of guids of connected RMs"""
+ """ Returns the set of guids of connected RMs """
return self._connections
@property
def conditions(self):
""" Returns the conditions to which the RM is subjected to.
- The object returned by this method is a dictionary indexed by
- ResourceAction."""
+ This method returns a dictionary of conditions lists indexed by
+ a ResourceAction.
+
+ """
return self._conditions
@property
def start_time(self):
- """ Returns the start time of the RM as a timestamp"""
+ """ Returns the start time of the RM as a timestamp """
return self._start_time
@property
def stop_time(self):
- """ Returns the stop time of the RM as a timestamp"""
+ """ Returns the stop time of the RM as a timestamp """
return self._stop_time
@property
def discover_time(self):
- """ Returns the time discovering was finished for the RM as a timestamp"""
+ """ Returns the discover time of the RM as a timestamp """
return self._discover_time
@property
def provision_time(self):
- """ Returns the time provisioning was finished for the RM as a timestamp"""
+ """ Returns the provision time of the RM as a timestamp """
return self._provision_time
@property
def ready_time(self):
- """ Returns the time deployment was finished for the RM as a timestamp"""
+ """ Returns the deployment time of the RM as a timestamp """
return self._ready_time
@property
def release_time(self):
- """ Returns the release time of the RM as a timestamp"""
+ """ Returns the release time of the RM as a timestamp """
return self._release_time
@property
def finish_time(self):
- """ Returns the finalization time of the RM as a timestamp"""
+ """ Returns the finalization time of the RM as a timestamp """
return self._finish_time
@property
def failed_time(self):
- """ Returns the time failure occured for the RM as a timestamp"""
+ """ Returns the time failure occured for the RM as a timestamp """
return self._failed_time
@property
:param msg: text message
:type msg: str
:rtype: str
+
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
def register_connection(self, guid):
""" Registers a connection to the RM identified by guid
+ This method should not be overriden. Specific functionality
+ should be added in the do_connect method.
+
:param guid: Global unique identified of the RM to connect to
:type guid: int
+
"""
if self.valid_connection(guid):
- self.connect(guid)
+ self.do_connect(guid)
self._connections.add(guid)
def unregister_connection(self, guid):
""" Removes a registered connection to the RM identified by guid
+
+ This method should not be overriden. Specific functionality
+ should be added in the do_disconnect method.
:param guid: Global unique identified of the RM to connect to
:type guid: int
+
"""
if guid in self._connections:
- self.disconnect(guid)
+ self.do_disconnect(guid)
self._connections.remove(guid)
+ @failtrap
def discover(self):
""" Performs resource discovery.
-
+
This method is responsible for selecting an individual resource
matching user requirements.
- This method should be redefined when necessary in child classes.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_discover method.
"""
- self.set_discovered()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_discover()
+ @failtrap
def provision(self):
""" Performs resource provisioning.
This method is responsible for provisioning one resource.
After this method has been successfully invoked, the resource
should be accessible/controllable by the RM.
- This method should be redefined when necessary in child classes.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_provision method.
"""
- self.set_provisioned()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_provision()
+ @failtrap
def start(self):
- """ Starts the RM.
-
- There is no generic start behavior for all resources.
- This method should be redefined when necessary in child classes.
+ """ Starts the RM (e.g. launch remote process).
+
+ There is no standard start behavior. Some RMs will not need to perform
+ any actions upon start.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_start method.
"""
if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self.set_started()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_start()
+ @failtrap
def stop(self):
""" Interrupts the RM, stopping any tasks the RM was performing.
-
- There is no generic stop behavior for all resources.
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
-
+
+ There is no standard stop behavior. Some RMs will not need to perform
+ any actions upon stop.
+
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_stop method.
+
"""
if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
- self.set_stopped()
+ with self._release_lock:
+ self.do_stop()
+ @failtrap
def deploy(self):
""" Execute all steps required for the RM to reach the state READY.
- This method is responsible for deploying the resource (and invoking the
- discover and provision methods).
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
-
+ This method is responsible for deploying the resource (and invoking
+ the discover and provision methods).
+
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_deploy method.
+
"""
if self.state > ResourceState.READY:
self.error("Wrong state %s for deploy" % self.state)
return
- self.debug("----- READY ---- ")
- self.set_ready()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_deploy()
+ self.debug("----- READY ---- ")
def release(self):
""" Perform actions to free resources used by the RM.
-
+
This method is responsible for releasing resources that were
used during the experiment by the RM.
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, this method should never
- raise an error and it must ensure the RM is set to state RELEASED.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_release method.
+
"""
- self.set_released()
-
+ with self._release_lock:
+ try:
+ self.do_release()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ self.set_released()
+ self.debug("----- RELEASED ---- ")
+
+ @failtrap
def finish(self):
""" Sets the RM to state FINISHED.
-
- The FINISHED state is different from STOPPED in that it should not be
- directly invoked by the user.
+
+ The FINISHED state is different from STOPPED state in that it
+ should not be directly invoked by the user.
STOPPED indicates that the user interrupted the RM, FINISHED means
that the RM concluded normally the actions it was supposed to perform.
- This method should be redefined when necessary in child classes.
+
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_finish method.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
-
"""
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_finish()
- self.set_finished()
-
def fail(self):
""" Sets the RM to state FAILED.
- """
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_fail method.
- self.set_failed()
+ """
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_fail()
def set(self, name, value):
""" Set the value of the attribute
self.debug("----- STARTING ---- ")
self.deploy()
- def connect(self, guid):
+ def do_connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
This method should be redefined when necessary in child classes.
"""
pass
- def disconnect(self, guid):
+ def do_disconnect(self, guid):
""" Performs actions that need to be taken upon disassociating RMs.
This method should be redefined when necessary in child classes.
"""
"""
# TODO: Validate!
return True
-
+
+ def do_discover(self):
+ self.set_discovered()
+
+ def do_provision(self):
+ self.set_provisioned()
+
+ def do_start(self):
+ self.set_started()
+
+ def do_stop(self):
+ self.set_stopped()
+
+ def do_deploy(self):
+ self.set_ready()
+
+ def do_release(self):
+ pass
+
+ def do_finish(self):
+ self.set_finished()
+
+ def do_fail(self):
+ self.set_failed()
+
def set_started(self):
""" Mark ResourceManager as STARTED """
self.set_state(ResourceState.STARTED, "_start_time")
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, ResourceAction, failtrap
+ ResourceState, ResourceAction
from nepi.util.sshfuncs import ProcStatus
import os
def store_path(self):
return self._store_path
- @failtrap
- def provision(self):
+ def do_provision(self):
trace_name = self.get("traceName")
if not trace_name:
self.fail()
except OSError:
pass
- super(Collector, self).provision()
+ super(Collector, self).do_provision()
- @failtrap
- def deploy(self):
- self.discover()
- self.provision()
+ def do_deploy(self):
+ self.do_discover()
+ self.do_provision()
- super(Collector, self).deploy()
+ super(Collector, self).do_deploy()
- def release(self):
- try:
- trace_name = self.get("traceName")
- rename = self.get("rename") or trace_name
-
- msg = "Collecting '%s' traces to local directory %s" % (
- trace_name, self.store_path)
- self.info(msg)
-
- rms = self.get_connected()
- for rm in rms:
- result = self.ec.trace(rm.guid, trace_name)
- fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
- rename))
- try:
- f = open(fpath, "w")
- f.write(result)
- f.close()
- except:
- msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
- rm.guid, fpath)
- self.error(msg)
- continue
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(Collector, self).release()
+ def do_release(self):
+ trace_name = self.get("traceName")
+ rename = self.get("rename") or trace_name
+
+ msg = "Collecting '%s' traces to local directory %s" % (
+ trace_name, self.store_path)
+ self.info(msg)
+
+ rms = self.get_connected()
+ for rm in rms:
+ result = self.ec.trace(rm.guid, trace_name)
+ fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
+ rename))
+ try:
+ f = open(fpath, "w")
+ f.write(result)
+ f.close()
+ except:
+ msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
+ rm.guid, fpath)
+ self.error(msg)
+ continue
+
+ super(Collector, self).do_release()
def valid_connection(self, guid):
# TODO: Validate!
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
return out
- @failtrap
- def provision(self):
+ def do_provision(self):
# create run dir for application
self.node.mkdir(self.run_home)
self.info("Provisioning finished")
- super(LinuxApplication, self).provision()
+ super(LinuxApplication, self).do_provision()
def upload_start_command(self):
# Upload command to remote bash script
# replace application specific paths in the command
return self.replace_paths(install)
- @failtrap
- def deploy(self):
+ def do_deploy(self):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
else:
command = self.get("command") or ""
self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
- super(LinuxApplication, self).deploy()
+ super(LinuxApplication, self).do_deploy()
- @failtrap
- def start(self):
+ def do_start(self):
command = self.get("command")
self.info("Starting command '%s'" % command)
if not command:
# If no command was given (i.e. Application was used for dependency
# installation), then the application is directly marked as FINISHED
- self.set_finished()
+ super(LinuxApplication, self).do_finished()
else:
if self.in_foreground:
self._run_in_foreground()
else:
self._run_in_background()
- super(LinuxApplication, self).start()
+ super(LinuxApplication, self).do_start()
def _run_in_foreground(self):
command = self.get("command")
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def stop(self):
+ def do_stop(self):
""" Stops application execution
"""
command = self.get('command') or ''
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
- super(LinuxApplication, self).stop()
+ super(LinuxApplication, self).do_stop()
- def release(self):
+ def do_release(self):
self.info("Releasing resource")
- try:
- tear_down = self.get("tearDown")
- if tear_down:
- self.node.execute(tear_down)
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.node.execute(tear_down)
- self.stop()
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
+ self.do_stop()
- super(LinuxApplication, self).release()
+ super(LinuxApplication, self).do_release()
@property
def state(self):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay, failtrap
+ reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow, tdiffsec
if self.ccnd: return self.ccnd.node
return None
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
if not self.get("env"):
self.set("env", self._environment)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay, failtrap
+ ResourceAction, reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
from nepi.util.timefuncs import tnow
if self.ccnr: return self.ccnr.node
return None
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.ccnr or self.ccnr.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
from nepi.util.timefuncs import tnow, tdiffsec
def path(self):
return "PATH=$PATH:${BIN}/%s/" % self.version
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.node or self.node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def stop(self):
+ def do_stop(self):
command = self.get('command') or ''
if self.state == ResourceState.STARTED:
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
from nepi.util.timefuncs import tnow, tdiffsec
if ccnpingserver: return ccnpingserver[0]
return None
- @failtrap
- def start(self):
+ def do_start(self):
if not self.ccnpingserver or \
self.ccnpingserver.state < ResourceState.STARTED:
self.debug("---- RESCHEDULING START---- ccnpingserver state %s " % \
self.ccnpingserver.state )
self.ec.schedule(reschedule_delay, self.start)
else:
- super(LinuxCCNPing, self).start()
+ super(LinuxCCNPing, self).do_start()
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.util.timefuncs import tnow, tdiffsec
super(LinuxCCNPingServer, self).__init__(ec, guid)
self._home = "ccnping-serv-%s" % self.guid
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
if not self.get("install"):
self.set("install", self._install)
- super(LinuxCCNPingServer, self).deploy()
+ super(LinuxCCNPingServer, self).do_deploy()
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay, failtrap
+ ResourceAction, reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
if self.ccnd: return self.ccnd.node
return None
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay, failtrap
+ ResourceAction, reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
- @failtrap
- def deploy(self):
+ def do_deploy(self):
# Wait until associated ccnd is provisioned
if not self.ccnd or self.ccnd.state < ResourceState.READY:
# ccnr needs to wait until ccnd is deployed and running
self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.configure()
self.debug("----- READY ---- ")
# schedule mtr deploy
self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def stop(self):
+ def do_stop(self):
command = self.get('command')
env = self.get('env')
from nepi.execution.attribute import Attribute, Types, Flags
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.channel import LinuxChannel
if chan: return chan[0]
return None
- @failtrap
- def discover(self):
+ def do_discover(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
ip6 = self.get("ip4")
self.error(msg)
raise RuntimeError, msg
- super(LinuxInterface, self).discover()
+ super(LinuxInterface, self).do_discover()
- @failtrap
- def provision(self):
+ def do_provision(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
ip6 = self.get("ip4")
self.error(msg, out, err)
raise RuntimeError, "%s - %s - %s" % (msg, out, err)
- super(LinuxInterface, self).provision()
+ super(LinuxInterface, self).do_provision()
- @failtrap
- def deploy(self):
+ def do_deploy(self):
# Wait until node is provisioned
node = self.node
chan = self.channel
else:
# Verify if the interface exists in node. If not, configue
# if yes, load existing configuration
- self.discover()
- self.provision()
-
- super(LinuxInterface, self).deploy()
-
- def release(self):
- try:
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(LinuxInterface, self).release()
+ self.do_discover()
+ self.do_provision()
+
+ super(LinuxInterface, self).do_deploy()
+
+ def do_release(self):
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
+
+ super(LinuxInterface, self).do_release()
def valid_connection(self, guid):
# TODO: Validate!
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy, failtrap
+from nepi.execution.resource import clsinit_copy
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "mtr-%s" % self.guid
self._sudo_kill = True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
if not self.get("depends"):
self.set("depends", "mtr")
- super(LinuxMtr, self).deploy()
+ super(LinuxMtr, self).do_deploy()
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
- @failtrap
- def provision(self):
+ def do_provision(self):
# check if host is alive
if not self.is_alive():
msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
# Create experiment node home directory
self.mkdir(self.node_home)
- super(LinuxNode, self).provision()
+ super(LinuxNode, self).do_provision()
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if self.state == ResourceState.NEW:
self.info("Deploying node")
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
self.ec.schedule(reschedule_delay, self.deploy)
return
- super(LinuxNode, self).deploy()
+ super(LinuxNode, self).do_deploy()
- def release(self):
- try:
- rms = self.get_connected()
- for rm in rms:
- # Node needs to wait until all associated RMs are released
- # before it can be released
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
+ def do_release(self):
+ rms = self.get_connected()
+ for rm in rms:
+ # Node needs to wait until all associated RMs are released
+ # before it can be released
+ if rm.state != ResourceState.RELEASED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
- self.clean_processes()
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
+ self.clean_processes()
- super(LinuxNode, self).release()
+ super(LinuxNode, self).do_release()
def valid_connection(self, guid):
# TODO: Validate!
"sudo -S killall -u %s || /bin/true ; " % self.get("username"))
out = err = ""
- (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
-
+ (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
+
def clean_home(self):
""" Cleans all NEPI related folders in the Linux host
"""
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy, failtrap
+from nepi.execution.resource import clsinit_copy
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "nping-%s" % self.guid
self._sudo_kill = True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
if not self.get("depends"):
self.set("depends", "nmap")
- super(LinuxNPing, self).deploy()
+ super(LinuxNPing, self).do_deploy()
@property
def _start_command(self):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy, failtrap
+from nepi.execution.resource import clsinit_copy
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxPing, self).__init__(ec, guid)
self._home = "ping-%s" % self.guid
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
- super(LinuxPing, self).deploy()
+ super(LinuxPing, self).do_deploy()
@property
def _start_command(self):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy, failtrap
+from nepi.execution.resource import clsinit_copy
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "tcpdump-%s" % self.guid
self._sudo_kill = True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
if not self.get("depends"):
self.set("depends", "tcpdump")
- super(LinuxTcpdump, self).deploy()
+ super(LinuxTcpdump, self).do_deploy()
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay, failtrap
+ reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxUdpTest, self).__init__(ec, guid)
self._home = "udptest-%s" % self.guid
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
if not self.get("depends"):
self.set("depends", self._depends)
- super(LinuxUdpTest, self).deploy()
+ super(LinuxUdpTest, self).do_deploy()
def upload_start_command(self):
-
super(LinuxUdpTest, self).upload_start_command()
if self.get("s") == True:
# finished to continue )
self._run_in_background()
- @failtrap
- def start(self):
+ def do_start(self):
if self.get("s") == True:
# Server is already running
if self.state == ResourceState.READY:
self.error(msg, out, err)
raise RuntimeError, err
else:
- super(LinuxUdpTest, self).start()
+ super(LinuxUdpTest, self).do_start()
@property
def _start_command(self):
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay, failtrap
+ reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
port = self.wait_local_port(endpoint)
return (port, pid, ppid)
- @failtrap
- def provision(self):
+ def do_provision(self):
# create run dir for tunnel on each node
self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
self.endpoint2.node.mkdir(self.run_home(self.endpoint2))
self.set_provisioned()
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def stop(self):
+ def do_stop(self):
""" Stops application execution
"""
if self.state == ResourceState.STARTED:
# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.node import OMFNode
return True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Deploy the RM. It means nothing special for an application
for now (later it will be upload sources, ...)
It becomes DEPLOYED after getting the xmpp client.
self.error(msg)
raise RuntimeError, msg
- super(OMFApplication, self).deploy()
+ super(OMFApplication, self).do_deploy()
- @failtrap
- def start(self):
+ def do_start(self):
""" Start the RM. It means : Send Xmpp Message Using OMF protocol
to execute the application.
It becomes STARTED before the messages are sent (for coordination)
self.error(msg)
raise
- super(OMFApplication, self).start()
+ super(OMFApplication, self).do_start()
- @failtrap
- def stop(self):
+ def do_stop(self):
""" Stop the RM. It means : Send Xmpp Message Using OMF protocol to
kill the application.
State is set to STOPPED after the message is sent.
self.error(msg)
raise
- super(OMFApplication, self).stop()
- self.set_finished()
+ super(OMFApplication, self).do_stop()
- def release(self):
+ def do_release(self):
""" Clean the RM at the end of the experiment and release the API.
"""
- try:
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OMFApplication, self).release()
+ if self._omf_api:
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMFApplication, self).do_release()
# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
res.append(couple)
return res
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the channel.
It becomes DEPLOYED after sending messages to configure the channel
self.error(msg)
raise
- super(OMFChannel, self).deploy()
+ super(OMFChannel, self).do_deploy()
- def release(self):
+ def do_release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- try:
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OMFChannel, self).release()
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMFChannel, self).do_release()
# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.node import OMFNode
return True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the interface.
It becomes DEPLOYED after sending messages to configure the interface
if self.configure_iface():
self.configure_ip()
- super(OMFWifiInterface, self).deploy()
+ super(OMFWifiInterface, self).do_deploy()
- def release(self):
+ def do_release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- try:
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OMFWifiInterface, self).release()
+ if self._omf_api:
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMFWifiInterface, self).do_release()
# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.omf_api import OMFAPIFactory
return False
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Deploy the RM. It means : Send Xmpp Message Using OMF protocol
to enroll the node into the experiment.
It becomes DEPLOYED after sending messages to enroll the node
self.error(msg)
raise
- super(OMFNode, self).deploy()
+ super(OMFNode, self).do_deploy()
- def release(self):
- """Clean the RM at the end of the experiment
+ def do_release(self):
+ """ Clean the RM at the end of the experiment
"""
- try:
- if self._omf_api :
- self._omf_api.release(self.get('hostname'))
-
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OMFNode, self).release()
+ if self._omf_api:
+ self._omf_api.release(self.get('hostname'))
+
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMFNode, self).do_release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.execfuncs import lexec
"associated to a PlanetLab user account"
_backend = "planetlab"
+ ## XXX A.Q. This lock could use a more descriptive name and
+ # an explanatory comment
lock = threading.Lock()
@classmethod
return self._plapi
- def discover(self):
+ def do_discover(self):
"""
Based on the attributes defined by the user, discover the suitable nodes
"""
else:
self._put_node_in_provision(node_id)
self._node_to_provision = node_id
- super(PlanetlabNode, self).discover()
+ super(PlanetlabNode, self).do_discover()
else:
self.fail_node_not_available(hostname)
if node_id:
self._node_to_provision = node_id
- super(PlanetlabNode, self).discover()
+ super(PlanetlabNode, self).do_discover()
else:
self.fail_not_enough_nodes()
- def provision(self):
+ def do_provision(self):
"""
Add node to user's slice after verifing that the node is functioning
correctly
except:
with PlanetlabNode.lock:
self._blacklist_node(node)
- self.discover()
+ self.do_discover()
continue
self._add_node_to_slice(node)
self._blacklist_node(node)
self._delete_node_from_slice(node)
self.set('hostname', None)
- self.discover()
+ self.do_discover()
continue
# check /proc directory is mounted (ssh_ok = True)
self._blacklist_node(node)
self._delete_node_from_slice(node)
self.set('hostname', None)
- self.discover()
+ self.do_discover()
continue
else:
ip = self._get_ip(node)
self.set("ip", ip)
- super(PlanetlabNode, self).provision()
+ super(PlanetlabNode, self).do_provision()
def _filter_based_on_attributes(self):
"""
Retrive the list of nodes ids that match user's constraints
"""
+
# Map user's defined attributes with tagnames of PlanetLab
timeframe = self.get("timeframe")[0]
attr_to_tags = {
return nodes_id
def _choose_random_node(self, nodes):
- """
+ """
From the possible nodes for provision, choose randomly to decrese the
probability of different RMs choosing the same node for provision
"""
return ip
def fail_discovery(self):
- self.fail()
msg = "Discovery failed. No candidates found for node"
self.error(msg)
raise RuntimeError, msg
def fail_node_not_alive(self, hostname=None):
- self.fail()
msg = "Node %s not alive" % hostname
raise RuntimeError, msg
def fail_node_not_available(self, hostname):
- self.fail()
msg = "Node %s not available for provisioning" % hostname
raise RuntimeError, msg
def fail_not_enough_nodes(self):
- self.fail()
msg = "Not enough nodes available for provisioning"
raise RuntimeError, msg
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, failtrap
+ ResourceState
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
# TODO: Validate!
return True
- @failtrap
- def provision(self):
+ def do_provision(self):
# create home dir for ovs
self.node.mkdir(self.ovs_home)
# create dir for ovs checks
self.node.mkdir(self.ovs_checks)
+
+ super(OVSWitch, self).do_provision()
def check_sliver_ovs(self):
""" Check if sliver-ovs exists. If it does not exist, we interrupt
msg = "Command sliver-ovs exists"
self.debug(msg)
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Wait until node is associated and deployed
"""
node = self.node
self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.check_sliver_ovs()
self.servers_on()
self.create_bridge()
self.assign_contr()
self.ovs_status()
- super(OVSWitch, self).deploy()
+ super(OVSWitch, self).do_deploy()
def servers_on(self):
""" Start the openvswitch servers and also checking
(out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
self.info(out)
- def release(self):
+ def do_release(self):
""" Delete the bridge and
close the servers
"""
# Node needs to wait until all associated RMs are released
# to be released
- try:
- from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
- rm = self.get_connected(OVSPort.rtype())
-
- if rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- msg = "Deleting the bridge %s" % self.get('bridge_name')
- self.info(msg)
- cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
- cmd = "sliver-ovs stop"
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
+ from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
+ rm = self.get_connected(OVSPort.rtype())
+
+ if rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
+ msg = "Deleting the bridge %s" % self.get('bridge_name')
+ self.info(msg)
+ cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
+ cmd = "sliver-ovs stop"
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
+
+ if proc.poll():
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- super(OVSWitch, self).release()
+ super(OVSWitch, self).do_release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, failtrap
+ ResourceState
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
command = self.replace_paths(command)
return command
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Wait until ovswitch is started
"""
ovswitch = self.ovswitch
self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.get_host_ip()
self.create_port()
self.get_local_end()
self.ovswitch.ovs_status()
- super(OVSPort, self).deploy()
- def release(self):
+ super(OVSPort, self).do_deploy()
+
+ def do_release(self):
""" Release the port RM means delete the ports
"""
# OVS needs to wait until all associated RMs are released
# to be released
- try:
- from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
- rm = self.get_connected(Tunnel.rtype())
- if rm and rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- msg = "Deleting the port %s" % self.get('port_name')
- self.info(msg)
- cmd = "sliver-ovs del_port %s" % self.get('port_name')
- (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
- sudo = True)
-
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OVSPort, self).release()
+ from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
+ rm = self.get_connected(Tunnel.rtype())
+ if rm and rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ msg = "Deleting the port %s" % self.get('port_name')
+ self.info(msg)
+ cmd = "sliver-ovs del_port %s" % self.get('port_name')
+ (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
+ sudo = True)
+
+ if proc.poll():
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ super(OVSPort, self).do_release()
+
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, failtrap
+ ResourceState
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
msg = "Failed to connect endpoints"
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
(out, err), proc = self.node.check_errors(self.run_home(self.node))
# Out is what was written in the stderr file
if err:
- self.fail()
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
msg = "Failed to connect endpoints"
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
else:
msg = "Failed to connect endpoints"
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
else:
self.info(msg)
return
- @failtrap
- def provision(self):
+ def do_provision(self):
""" Provision the tunnel
"""
# Create folders
(self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
- super(OVSTunnel, self).provision()
+ super(OVSTunnel, self).do_provision()
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
- super(OVSTunnel, self).deploy()
+ super(OVSTunnel, self).do_deploy()
- def release(self):
+ def do_release(self):
""" Release the udp_tunnel on endpoint2.
On endpoint1 means nothing special.
"""
- try:
- if not self.check_endpoints():
- # Kill the TAP devices
- # TODO: Make more generic Release method of PLTAP
- if self._pid and self._ppid:
- self._nodes = self.get_node(self.endpoint2)
- (out, err), proc = self.node.kill(self._pid,
- self._ppid, sudo = True)
- if err or proc.poll():
- # check if execution errors occurred
- msg = " Failed to delete TAP device"
- self.error(msg, err, err)
- self.fail()
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OVSTunnel, self).release()
-
+ if not self.check_endpoints():
+ # Kill the TAP devices
+ # TODO: Make more generic Release method of PLTAP
+ if self._pid and self._ppid:
+ self._nodes = self.get_node(self.endpoint2)
+ (out, err), proc = self.node.kill(self._pid,
+ self._ppid, sudo = True)
+ if err or proc.poll():
+ # check if execution errors occurred
+ msg = " Failed to delete TAP device"
+ self.error(msg, err, err)
+
+ super(OVSTunnel, self).do_release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay, failtrap
+ reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.util.timefuncs import tnow, tdiffsec
if_name = self.wait_if_name()
self.set("deviceName", if_name)
- @failtrap
- def deploy(self):
+ def do_deploy(self):
if not self.node or self.node.state < ResourceState.PROVISIONED:
self.ec.schedule(reschedule_delay, self.deploy)
else:
if not self.get("install"):
self.set("install", self._install)
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.debug("----- READY ---- ")
self.set_ready()
- @failtrap
- def start(self):
+ def do_start(self):
if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
self.error(msg, out, err)
raise RuntimeError, msg
- @failtrap
- def stop(self):
+ def do_stop(self):
command = self.get('command') or ''
if self.state == ResourceState.STARTED:
return self._state
- def release(self):
+ def do_release(self):
# Node needs to wait until all associated RMs are released
# to be released
- try:
- from nepi.resources.linux.udptunnel import UdpTunnel
- rms = self.get_connected(UdpTunnel.rtype())
- for rm in rms:
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(PlanetlabTap, self).release()
+ from nepi.resources.linux.udptunnel import UdpTunnel
+ rms = self.get_connected(UdpTunnel.rtype())
+ for rm in rms:
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ super(PlanetlabTap, self).do_release()
def wait_if_name(self):
""" Waits until the if_name file for the command is generated,
from nepi.execution.attribute import Attribute
from nepi.execution.ec import ExperimentController, FailureLevel
from nepi.execution.resource import ResourceManager, ResourceState, \
- clsinit_copy, ResourceAction, failtrap
+ clsinit_copy, ResourceAction
import random
import time
def __init__(self, ec, guid):
super(Channel, self).__init__(ec, guid)
- def deploy(self):
+ def do_deploy(self):
time.sleep(1)
- super(Channel, self).deploy()
+ super(Channel, self).do_deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
class Interface(ResourceManager):
def __init__(self, ec, guid):
super(Interface, self).__init__(ec, guid)
- def deploy(self):
+ def do_deploy(self):
node = self.get_connected(Node.rtype())[0]
chan = self.get_connected(Channel.rtype())[0]
self.ec.schedule("0.5s", self.deploy)
else:
time.sleep(2)
- super(Interface, self).deploy()
+ super(Interface, self).do_deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
class Node(ResourceManager):
def __init__(self, ec, guid):
super(Node, self).__init__(ec, guid)
- def deploy(self):
+ def do_deploy(self):
if self.state == ResourceState.NEW:
- self.discover()
- self.provision()
+ self.do_discover()
+ self.do_provision()
self.logger.debug(" -------- PROVISIONED ------- ")
self.ec.schedule("1s", self.deploy)
elif self.state == ResourceState.PROVISIONED:
self.ec.schedule("0.5s", self.deploy)
return
- super(Node, self).deploy()
+ super(Node, self).do_deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
class Application(ResourceManager):
def __init__(self, ec, guid):
super(Application, self).__init__(ec, guid)
- def deploy(self):
+ def do_deploy(self):
node = self.get_connected(Node.rtype())[0]
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
else:
time.sleep(random.random() * 2)
- super(Application, self).deploy()
+ super(Application, self).do_deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
- def start(self):
- super(Application, self).start()
+ def do_start(self):
+ super(Application, self).do_start()
time.sleep(random.random() * 3)
- self._state = ResourceState.FINISHED
+ self.ec.schedule("0.5s", self.finish)
class ErrorApplication(ResourceManager):
_rtype = "ErrorApplication"
def __init__(self, ec, guid):
super(ErrorApplication, self).__init__(ec, guid)
- @failtrap
- def deploy(self):
+ def do_deploy(self):
node = self.get_connected(Node.rtype())[0]
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
self.assertTrue(rmnode1.ready_time < rmapp1.ready_time)
self.assertTrue(rmnode2.ready_time < rmapp2.ready_time)
- # - Node needs to wait until Interface is ready to be ready
+ # - Node needs to wait until Interface is ready to be ready
self.assertTrue(rmnode1.ready_time > rmiface1.ready_time)
self.assertTrue(rmnode2.ready_time > rmiface2.ready_time)
- # - Interface needs to wait until Node is provisioned to be ready
+ # - Interface needs to wait until Node is provisioned to be ready
self.assertTrue(rmnode1.provision_time < rmiface1.ready_time)
self.assertTrue(rmnode2.provision_time < rmiface2.ready_time)
- # - Interface needs to wait until Channel is ready to be ready
+ # - Interface needs to wait until Channel is ready to be ready
self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
app = ec.register_resource("ErrorApplication")
ec.register_connection(app, node)
- apps.append(app)
ec.deploy()
import unittest
import multiprocessing
-class DummyEC(ExperimentController):
- pass
-
-def create_node(ec, username, pl_user, pl_password, hostname=None, country=None,
- operatingSystem=None, minBandwidth=None, minCpu=None):
+def create_node(ec, username, pl_user, pl_password, hostname = None,
+ country = None, operatingSystem = None, minBandwidth = None,
+ minCpu = None):
node = ec.register_resource("PlanetlabNode")
return node
class PLNodeFactoryTestCase(unittest.TestCase):
-
def test_creation_phase(self):
self.assertEquals(PlanetlabNode.rtype(), "PlanetlabNode")
self.assertEquals(len(PlanetlabNode._attributes), 29)
"""
def setUp(self):
- self.ec = DummyEC()
+ self.ec = ExperimentController()
self.username = "inria_sfatest"
self.pl_user = os.environ.get("PL_USER")
self.pl_password = os.environ.get("PL_PASS")