From: Alina Quereilhac Date: Wed, 30 Oct 2013 14:42:15 +0000 (+0100) Subject: Fixing RM.DEPLOY being executed after/during RM.RELEASE by adding a release_lock... X-Git-Tag: nepi-3.0.0~24 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=09ac796bac9aa2c41c5ad830f404fe128fffb22d;p=nepi.git Fixing RM.DEPLOY being executed after/during RM.RELEASE by adding a release_lock in the ResourceManager class --- diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index b4994e02..ed16e088 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -28,6 +28,7 @@ import logging import os import pkgutil import sys +import threading import weakref reschedule_delay = "1s" @@ -68,20 +69,48 @@ ResourceState2str = dict({ def clsinit(cls): """ Initializes template information (i.e. attributes and traces) - for the ResourceManager class - """ + on classes derived from the ResourceManager class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + """ + cls._clsinit() return cls def clsinit_copy(cls): """ Initializes template information (i.e. attributes and traces) - for the ResourceManager class, inheriting attributes and traces - from the parent class + on classes direved from the ResourceManager class. + It differs from the clsinit method in that it forces inheritance + of attributes and traces from the parent class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + + clsinit_copy should be prefered to clsinit when creating new + ResourceManager child classes. + """ + cls._clsinit_copy() return cls def failtrap(func): + """ Decorator function for instance methods that should set the + RM state to FAILED when an error is raised. The methods that must be + decorated are: discover, provision, deploy, start, stop and finish. + + """ def wrapped(self, *args, **kwargs): try: return func(self, *args, **kwargs) @@ -95,7 +124,6 @@ def failtrap(func): return wrapped -# Decorator to invoke class initialization method @clsinit class ResourceManager(Logger): """ Base clase for all ResourceManagers. @@ -121,6 +149,7 @@ class ResourceManager(Logger): resource attribute """ + cls._attributes[attr.name] = attr @classmethod @@ -129,6 +158,7 @@ class ResourceManager(Logger): resource attribute """ + del cls._attributes[name] @classmethod @@ -137,6 +167,7 @@ class ResourceManager(Logger): resource trace """ + cls._traces[trace.name] = trace @classmethod @@ -145,16 +176,23 @@ class ResourceManager(Logger): resource trace """ + del cls._traces[name] @classmethod def _register_attributes(cls): """ Resource subclasses will invoke this method to register - resource attributes + resource attributes. + + This method should be overriden in the RMs that define + attributes. """ - critical = Attribute("critical", "Defines whether the resource is critical. " - " A failure on a critical resource will interrupt the experiment. ", + + critical = Attribute("critical", + "Defines whether the resource is critical. " + "A failure on a critical resource will interrupt " + "the experiment. ", type = Types.Bool, default = True, flags = Flags.ExecReadOnly) @@ -166,19 +204,24 @@ class ResourceManager(Logger): """ Resource subclasses will invoke this method to register resource traces + This method should be overriden in the RMs that define traces. + """ + pass @classmethod def _clsinit(cls): - """ ResourceManager child classes have different attributes and traces. - Since the templates that hold the information of attributes and traces - are 'class attribute' dictionaries, initially they all point to the - parent class ResourceManager instances of those dictionaries. - In order to make these templates independent from the parent's one, - it is necessary re-initialize the corresponding dictionaries. - This is the objective of the _clsinit method + """ ResourceManager classes have different attributes and traces. + Attribute and traces are stored in 'class attribute' dictionaries. + When a new ResourceManager class is created, the _clsinit method is + called to create a new instance of those dictionaries and initialize + them. + + The _clsinit method is called by the clsinit decorator method. + """ + # static template for resource attributes cls._attributes = dict() cls._register_attributes() @@ -189,8 +232,12 @@ class ResourceManager(Logger): @classmethod def _clsinit_copy(cls): - """ Same as _clsinit, except that it also inherits all attributes and traces - from the parent class. + """ Same as _clsinit, except that after creating new instances of the + dictionaries it copies all the attributes and traces from the parent + class. + + The _clsinit_copy method is called by the clsinit_copy decorator method. + """ # static template for resource attributes cls._attributes = copy.deepcopy(cls._attributes) @@ -265,6 +312,11 @@ class ResourceManager(Logger): self._state = ResourceState.NEW + # instance lock to synchronize exclusive state change methods (such + # as deploy and release methods), in order to prevent them from being + # executed at the same time + self._release_lock = threading.Lock() + @property def guid(self): """ Returns the global unique identifier of the RM """ @@ -272,60 +324,62 @@ class ResourceManager(Logger): @property def ec(self): - """ Returns the Experiment Controller """ + """ Returns the Experiment Controller of the RM """ return self._ec() @property def connections(self): - """ Returns the set of guids of connected RMs""" + """ Returns the set of guids of connected RMs """ return self._connections @property def conditions(self): """ Returns the conditions to which the RM is subjected to. - The object returned by this method is a dictionary indexed by - ResourceAction.""" + This method returns a dictionary of conditions lists indexed by + a ResourceAction. + + """ return self._conditions @property def start_time(self): - """ Returns the start time of the RM as a timestamp""" + """ Returns the start time of the RM as a timestamp """ return self._start_time @property def stop_time(self): - """ Returns the stop time of the RM as a timestamp""" + """ Returns the stop time of the RM as a timestamp """ return self._stop_time @property def discover_time(self): - """ Returns the time discovering was finished for the RM as a timestamp""" + """ Returns the discover time of the RM as a timestamp """ return self._discover_time @property def provision_time(self): - """ Returns the time provisioning was finished for the RM as a timestamp""" + """ Returns the provision time of the RM as a timestamp """ return self._provision_time @property def ready_time(self): - """ Returns the time deployment was finished for the RM as a timestamp""" + """ Returns the deployment time of the RM as a timestamp """ return self._ready_time @property def release_time(self): - """ Returns the release time of the RM as a timestamp""" + """ Returns the release time of the RM as a timestamp """ return self._release_time @property def finish_time(self): - """ Returns the finalization time of the RM as a timestamp""" + """ Returns the finalization time of the RM as a timestamp """ return self._finish_time @property def failed_time(self): - """ Returns the time failure occured for the RM as a timestamp""" + """ Returns the time failure occured for the RM as a timestamp """ return self._failed_time @property @@ -339,147 +393,174 @@ class ResourceManager(Logger): :param msg: text message :type msg: str :rtype: str + """ return " %s guid: %d - %s " % (self._rtype, self.guid, msg) def register_connection(self, guid): """ Registers a connection to the RM identified by guid + This method should not be overriden. Specific functionality + should be added in the do_connect method. + :param guid: Global unique identified of the RM to connect to :type guid: int + """ if self.valid_connection(guid): - self.connect(guid) + self.do_connect(guid) self._connections.add(guid) def unregister_connection(self, guid): """ Removes a registered connection to the RM identified by guid + + This method should not be overriden. Specific functionality + should be added in the do_disconnect method. :param guid: Global unique identified of the RM to connect to :type guid: int + """ if guid in self._connections: - self.disconnect(guid) + self.do_disconnect(guid) self._connections.remove(guid) + @failtrap def discover(self): """ Performs resource discovery. - + This method is responsible for selecting an individual resource matching user requirements. - This method should be redefined when necessary in child classes. - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. + This method should not be overriden directly. Specific functionality + should be added in the do_discover method. """ - self.set_discovered() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_discover() + @failtrap def provision(self): """ Performs resource provisioning. This method is responsible for provisioning one resource. After this method has been successfully invoked, the resource should be accessible/controllable by the RM. - This method should be redefined when necessary in child classes. - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. + This method should not be overriden directly. Specific functionality + should be added in the do_provision method. """ - self.set_provisioned() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_provision() + @failtrap def start(self): - """ Starts the RM. - - There is no generic start behavior for all resources. - This method should be redefined when necessary in child classes. + """ Starts the RM (e.g. launch remote process). + + There is no standard start behavior. Some RMs will not need to perform + any actions upon start. - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. + This method should not be overriden directly. Specific functionality + should be added in the do_start method. """ if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self.set_started() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_start() + @failtrap def stop(self): """ Interrupts the RM, stopping any tasks the RM was performing. - - There is no generic stop behavior for all resources. - This method should be redefined when necessary in child classes. - - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. - + + There is no standard stop behavior. Some RMs will not need to perform + any actions upon stop. + + This method should not be overriden directly. Specific functionality + should be added in the do_stop method. + """ if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return - self.set_stopped() + with self._release_lock: + self.do_stop() + @failtrap def deploy(self): """ Execute all steps required for the RM to reach the state READY. - This method is responsible for deploying the resource (and invoking the - discover and provision methods). - This method should be redefined when necessary in child classes. - - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. - + This method is responsible for deploying the resource (and invoking + the discover and provision methods). + + This method should not be overriden directly. Specific functionality + should be added in the do_deploy method. + """ if self.state > ResourceState.READY: self.error("Wrong state %s for deploy" % self.state) return - self.debug("----- READY ---- ") - self.set_ready() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_deploy() + self.debug("----- READY ---- ") def release(self): """ Perform actions to free resources used by the RM. - + This method is responsible for releasing resources that were used during the experiment by the RM. - This method should be redefined when necessary in child classes. - - If overridden in child classes, this method should never - raise an error and it must ensure the RM is set to state RELEASED. + This method should not be overriden directly. Specific functionality + should be added in the do_release method. + """ - self.set_released() - + with self._release_lock: + try: + self.do_release() + except: + import traceback + err = traceback.format_exc() + self.error(err) + + self.set_released() + self.debug("----- RELEASED ---- ") + + @failtrap def finish(self): """ Sets the RM to state FINISHED. - - The FINISHED state is different from STOPPED in that it should not be - directly invoked by the user. + + The FINISHED state is different from STOPPED state in that it + should not be directly invoked by the user. STOPPED indicates that the user interrupted the RM, FINISHED means that the RM concluded normally the actions it was supposed to perform. - This method should be redefined when necessary in child classes. + + This method should not be overriden directly. Specific functionality + should be added in the do_finish method. - If overridden in child classes, make sure to use the failtrap - decorator to ensure the RM state will be set to FAILED in the event - of an exception. - """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_finish() - self.set_finished() - def fail(self): """ Sets the RM to state FAILED. - """ + This method should not be overriden directly. Specific functionality + should be added in the do_fail method. - self.set_failed() + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_fail() def set(self, name, value): """ Set the value of the attribute @@ -825,13 +906,13 @@ class ResourceManager(Logger): self.debug("----- STARTING ---- ") self.deploy() - def connect(self, guid): + def do_connect(self, guid): """ Performs actions that need to be taken upon associating RMs. This method should be redefined when necessary in child classes. """ pass - def disconnect(self, guid): + def do_disconnect(self, guid): """ Performs actions that need to be taken upon disassociating RMs. This method should be redefined when necessary in child classes. """ @@ -849,7 +930,31 @@ class ResourceManager(Logger): """ # TODO: Validate! return True - + + def do_discover(self): + self.set_discovered() + + def do_provision(self): + self.set_provisioned() + + def do_start(self): + self.set_started() + + def do_stop(self): + self.set_stopped() + + def do_deploy(self): + self.set_ready() + + def do_release(self): + pass + + def do_finish(self): + self.set_finished() + + def do_fail(self): + self.set_failed() + def set_started(self): """ Mark ResourceManager as STARTED """ self.set_state(ResourceState.STARTED, "_start_time") diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index 864750e6..0b6ad230 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, ResourceAction, failtrap + ResourceState, ResourceAction from nepi.util.sshfuncs import ProcStatus import os @@ -70,8 +70,7 @@ class Collector(ResourceManager): def store_path(self): return self._store_path - @failtrap - def provision(self): + def do_provision(self): trace_name = self.get("traceName") if not trace_name: self.fail() @@ -96,44 +95,38 @@ class Collector(ResourceManager): except OSError: pass - super(Collector, self).provision() + super(Collector, self).do_provision() - @failtrap - def deploy(self): - self.discover() - self.provision() + def do_deploy(self): + self.do_discover() + self.do_provision() - super(Collector, self).deploy() + super(Collector, self).do_deploy() - def release(self): - try: - trace_name = self.get("traceName") - rename = self.get("rename") or trace_name - - msg = "Collecting '%s' traces to local directory %s" % ( - trace_name, self.store_path) - self.info(msg) - - rms = self.get_connected() - for rm in rms: - result = self.ec.trace(rm.guid, trace_name) - fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, - rename)) - try: - f = open(fpath, "w") - f.write(result) - f.close() - except: - msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, - rm.guid, fpath) - self.error(msg) - continue - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(Collector, self).release() + def do_release(self): + trace_name = self.get("traceName") + rename = self.get("rename") or trace_name + + msg = "Collecting '%s' traces to local directory %s" % ( + trace_name, self.store_path) + self.info(msg) + + rms = self.get_connected() + for rm in rms: + result = self.ec.trace(rm.guid, trace_name) + fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, + rename)) + try: + f = open(fpath, "w") + f.write(result) + f.close() + except: + msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, + rm.guid, fpath) + self.error(msg) + continue + + super(Collector, self).do_release() def valid_connection(self, guid): # TODO: Validate! diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 1848e9a1..92e49606 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -270,8 +270,7 @@ class LinuxApplication(ResourceManager): return out - @failtrap - def provision(self): + def do_provision(self): # create run dir for application self.node.mkdir(self.run_home) @@ -318,7 +317,7 @@ class LinuxApplication(ResourceManager): self.info("Provisioning finished") - super(LinuxApplication, self).provision() + super(LinuxApplication, self).do_provision() def upload_start_command(self): # Upload command to remote bash script @@ -471,8 +470,7 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command return self.replace_paths(install) - @failtrap - def deploy(self): + def do_deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: @@ -481,13 +479,12 @@ class LinuxApplication(ResourceManager): else: command = self.get("command") or "" self.info("Deploying command '%s' " % command) - self.discover() - self.provision() + self.do_discover() + self.do_provision() - super(LinuxApplication, self).deploy() + super(LinuxApplication, self).do_deploy() - @failtrap - def start(self): + def do_start(self): command = self.get("command") self.info("Starting command '%s'" % command) @@ -495,14 +492,14 @@ class LinuxApplication(ResourceManager): if not command: # If no command was given (i.e. Application was used for dependency # installation), then the application is directly marked as FINISHED - self.set_finished() + super(LinuxApplication, self).do_finished() else: if self.in_foreground: self._run_in_foreground() else: self._run_in_background() - super(LinuxApplication, self).start() + super(LinuxApplication, self).do_start() def _run_in_foreground(self): command = self.get("command") @@ -578,8 +575,7 @@ class LinuxApplication(ResourceManager): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def stop(self): + def do_stop(self): """ Stops application execution """ command = self.get('command') or '' @@ -606,23 +602,18 @@ class LinuxApplication(ResourceManager): msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) - super(LinuxApplication, self).stop() + super(LinuxApplication, self).do_stop() - def release(self): + def do_release(self): self.info("Releasing resource") - try: - tear_down = self.get("tearDown") - if tear_down: - self.node.execute(tear_down) + tear_down = self.get("tearDown") + if tear_down: + self.node.execute(tear_down) - self.stop() - except: - import traceback - err = traceback.format_exc() - self.error(err) + self.do_stop() - super(LinuxApplication, self).release() + super(LinuxApplication, self).do_release() @property def state(self): diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py index c13c0920..ec5312ec 100644 --- a/src/nepi/resources/linux/ccn/ccnapplication.py +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay, failtrap + reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow, tdiffsec @@ -44,8 +44,7 @@ class LinuxCCNApplication(LinuxApplication): if self.ccnd: return self.ccnd.node return None - @failtrap - def deploy(self): + def do_deploy(self): if not self.ccnd or self.ccnd.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) self.ec.schedule(reschedule_delay, self.deploy) @@ -57,8 +56,8 @@ class LinuxCCNApplication(LinuxApplication): if not self.get("env"): self.set("env", self._environment) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index bb2ae460..2595edac 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay, failtrap + ResourceAction, reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR from nepi.util.timefuncs import tnow @@ -72,8 +72,7 @@ class LinuxCCNContent(LinuxApplication): if self.ccnr: return self.ccnr.node return None - @failtrap - def deploy(self): + def do_deploy(self): if not self.ccnr or self.ccnr.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) @@ -94,8 +93,8 @@ class LinuxCCNContent(LinuxApplication): self.info("Deploying command '%s' " % command) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() @@ -122,8 +121,7 @@ class LinuxCCNContent(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 0962936e..081435ca 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType from nepi.util.timefuncs import tnow, tdiffsec @@ -136,8 +136,7 @@ class LinuxCCND(LinuxApplication): def path(self): return "PATH=$PATH:${BIN}/%s/" % self.version - @failtrap - def deploy(self): + def do_deploy(self): if not self.node or self.node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) @@ -174,8 +173,8 @@ class LinuxCCND(LinuxApplication): self.info("Deploying command '%s' " % command) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() @@ -199,8 +198,7 @@ class LinuxCCND(LinuxApplication): env = env, raise_on_error = True) - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) @@ -211,8 +209,7 @@ class LinuxCCND(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def stop(self): + def do_stop(self): command = self.get('command') or '' if self.state == ResourceState.STARTED: diff --git a/src/nepi/resources/linux/ccn/ccnping.py b/src/nepi/resources/linux/ccn/ccnping.py index d8289504..934db3cd 100644 --- a/src/nepi/resources/linux/ccn/ccnping.py +++ b/src/nepi/resources/linux/ccn/ccnping.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer from nepi.util.timefuncs import tnow, tdiffsec @@ -65,15 +65,14 @@ class LinuxCCNPing(LinuxCCNPingServer): if ccnpingserver: return ccnpingserver[0] return None - @failtrap - def start(self): + def do_start(self): if not self.ccnpingserver or \ self.ccnpingserver.state < ResourceState.STARTED: self.debug("---- RESCHEDULING START---- ccnpingserver state %s " % \ self.ccnpingserver.state ) self.ec.schedule(reschedule_delay, self.start) else: - super(LinuxCCNPing, self).start() + super(LinuxCCNPing, self).do_start() @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/ccnpingserver.py b/src/nepi/resources/linux/ccn/ccnpingserver.py index 6e87b27b..2dee4db2 100644 --- a/src/nepi/resources/linux/ccn/ccnpingserver.py +++ b/src/nepi/resources/linux/ccn/ccnpingserver.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.util.timefuncs import tnow, tdiffsec @@ -54,8 +54,7 @@ class LinuxCCNPingServer(LinuxCCNApplication): super(LinuxCCNPingServer, self).__init__(ec, guid) self._home = "ccnping-serv-%s" % self.guid - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -71,7 +70,7 @@ class LinuxCCNPingServer(LinuxCCNApplication): if not self.get("install"): self.set("install", self._install) - super(LinuxCCNPingServer, self).deploy() + super(LinuxCCNPingServer, self).do_deploy() @property def _start_command(self): diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 53199976..6213b74e 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay, failtrap + ResourceAction, reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow @@ -200,8 +200,7 @@ class LinuxCCNR(LinuxApplication): if self.ccnd: return self.ccnd.node return None - @failtrap - def deploy(self): + def do_deploy(self): if not self.ccnd or self.ccnd.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state ) @@ -218,8 +217,8 @@ class LinuxCCNR(LinuxApplication): self.info("Deploying command '%s' " % command) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() @@ -252,8 +251,7 @@ class LinuxCCNR(LinuxApplication): env = env, raise_on_error = True) - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index 490d8f67..1010f2f4 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction, reschedule_delay, failtrap + ResourceAction, reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow @@ -109,8 +109,7 @@ class LinuxFIBEntry(LinuxApplication): return super(LinuxFIBEntry, self).trace(name, attr, block, offset) - @failtrap - def deploy(self): + def do_deploy(self): # Wait until associated ccnd is provisioned if not self.ccnd or self.ccnd.state < ResourceState.READY: # ccnr needs to wait until ccnd is deployed and running @@ -131,8 +130,8 @@ class LinuxFIBEntry(LinuxApplication): self.info("Deploying command '%s' " % command) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.configure() self.debug("----- READY ---- ") @@ -191,8 +190,7 @@ class LinuxFIBEntry(LinuxApplication): # schedule mtr deploy self.ec.deploy(guids=[self._traceroute], group = self.deployment_group) - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) @@ -203,8 +201,7 @@ class LinuxFIBEntry(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def stop(self): + def do_stop(self): command = self.get('command') env = self.get('env') diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py index 9ccdc4f6..59bbe2aa 100644 --- a/src/nepi/resources/linux/interface.py +++ b/src/nepi/resources/linux/interface.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Types, Flags from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.resources.linux.channel import LinuxChannel @@ -102,8 +102,7 @@ class LinuxInterface(ResourceManager): if chan: return chan[0] return None - @failtrap - def discover(self): + def do_discover(self): devname = self.get("deviceName") ip4 = self.get("ip4") ip6 = self.get("ip4") @@ -182,10 +181,9 @@ class LinuxInterface(ResourceManager): self.error(msg) raise RuntimeError, msg - super(LinuxInterface, self).discover() + super(LinuxInterface, self).do_discover() - @failtrap - def provision(self): + def do_provision(self): devname = self.get("deviceName") ip4 = self.get("ip4") ip6 = self.get("ip4") @@ -226,10 +224,9 @@ class LinuxInterface(ResourceManager): self.error(msg, out, err) raise RuntimeError, "%s - %s - %s" % (msg, out, err) - super(LinuxInterface, self).provision() + super(LinuxInterface, self).do_provision() - @failtrap - def deploy(self): + def do_deploy(self): # Wait until node is provisioned node = self.node chan = self.channel @@ -241,22 +238,17 @@ class LinuxInterface(ResourceManager): else: # Verify if the interface exists in node. If not, configue # if yes, load existing configuration - self.discover() - self.provision() - - super(LinuxInterface, self).deploy() - - def release(self): - try: - tear_down = self.get("tearDown") - if tear_down: - self.execute(tear_down) - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(LinuxInterface, self).release() + self.do_discover() + self.do_provision() + + super(LinuxInterface, self).do_deploy() + + def do_release(self): + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) + + super(LinuxInterface, self).do_release() def valid_connection(self, guid): # TODO: Validate! diff --git a/src/nepi/resources/linux/mtr.py b/src/nepi/resources/linux/mtr.py index 60857217..1edc6b53 100644 --- a/src/nepi/resources/linux/mtr.py +++ b/src/nepi/resources/linux/mtr.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy, failtrap +from nepi.execution.resource import clsinit_copy from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -83,8 +83,7 @@ class LinuxMtr(LinuxApplication): self._home = "mtr-%s" % self.guid self._sudo_kill = True - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -94,7 +93,7 @@ class LinuxMtr(LinuxApplication): if not self.get("depends"): self.set("depends", "mtr") - super(LinuxMtr, self).deploy() + super(LinuxMtr, self).do_deploy() @property def _start_command(self): diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 67fbbbcb..b547ce55 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -330,8 +330,7 @@ class LinuxNode(ResourceManager): def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] - @failtrap - def provision(self): + def do_provision(self): # check if host is alive if not self.is_alive(): msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") @@ -358,14 +357,13 @@ class LinuxNode(ResourceManager): # Create experiment node home directory self.mkdir(self.node_home) - super(LinuxNode, self).provision() + super(LinuxNode, self).do_provision() - @failtrap - def deploy(self): + def do_deploy(self): if self.state == ResourceState.NEW: self.info("Deploying node") - self.discover() - self.provision() + self.do_discover() + self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment @@ -376,29 +374,24 @@ class LinuxNode(ResourceManager): self.ec.schedule(reschedule_delay, self.deploy) return - super(LinuxNode, self).deploy() + super(LinuxNode, self).do_deploy() - def release(self): - try: - rms = self.get_connected() - for rm in rms: - # Node needs to wait until all associated RMs are released - # before it can be released - if rm.state < ResourceState.STOPPED: - self.ec.schedule(reschedule_delay, self.release) - return + def do_release(self): + rms = self.get_connected() + for rm in rms: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state != ResourceState.RELEASED: + self.ec.schedule(reschedule_delay, self.release) + return - tear_down = self.get("tearDown") - if tear_down: - self.execute(tear_down) + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) - self.clean_processes() - except: - import traceback - err = traceback.format_exc() - self.error(err) + self.clean_processes() - super(LinuxNode, self).release() + super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! @@ -422,8 +415,8 @@ class LinuxNode(ResourceManager): "sudo -S killall -u %s || /bin/true ; " % self.get("username")) out = err = "" - (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) + def clean_home(self): """ Cleans all NEPI related folders in the Linux host """ diff --git a/src/nepi/resources/linux/nping.py b/src/nepi/resources/linux/nping.py index ec874bc2..62bacd89 100644 --- a/src/nepi/resources/linux/nping.py +++ b/src/nepi/resources/linux/nping.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy, failtrap +from nepi.execution.resource import clsinit_copy from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -133,8 +133,7 @@ class LinuxNPing(LinuxApplication): self._home = "nping-%s" % self.guid self._sudo_kill = True - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -147,7 +146,7 @@ class LinuxNPing(LinuxApplication): if not self.get("depends"): self.set("depends", "nmap") - super(LinuxNPing, self).deploy() + super(LinuxNPing, self).do_deploy() @property def _start_command(self): diff --git a/src/nepi/resources/linux/ping.py b/src/nepi/resources/linux/ping.py index d085b112..6db0a8d7 100644 --- a/src/nepi/resources/linux/ping.py +++ b/src/nepi/resources/linux/ping.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy, failtrap +from nepi.execution.resource import clsinit_copy from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -184,12 +184,11 @@ class LinuxPing(LinuxApplication): super(LinuxPing, self).__init__(ec, guid) self._home = "ping-%s" % self.guid - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) - super(LinuxPing, self).deploy() + super(LinuxPing, self).do_deploy() @property def _start_command(self): diff --git a/src/nepi/resources/linux/tcpdump.py b/src/nepi/resources/linux/tcpdump.py index e9955f4e..12d11a28 100644 --- a/src/nepi/resources/linux/tcpdump.py +++ b/src/nepi/resources/linux/tcpdump.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy, failtrap +from nepi.execution.resource import clsinit_copy from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -316,8 +316,7 @@ class LinuxTcpdump(LinuxApplication): self._home = "tcpdump-%s" % self.guid self._sudo_kill = True - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -327,7 +326,7 @@ class LinuxTcpdump(LinuxApplication): if not self.get("depends"): self.set("depends", "tcpdump") - super(LinuxTcpdump, self).deploy() + super(LinuxTcpdump, self).do_deploy() @property def _start_command(self): diff --git a/src/nepi/resources/linux/udptest.py b/src/nepi/resources/linux/udptest.py index 6ad0085b..76c59140 100644 --- a/src/nepi/resources/linux/udptest.py +++ b/src/nepi/resources/linux/udptest.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay, failtrap + reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow @@ -213,8 +213,7 @@ class LinuxUdpTest(LinuxApplication): super(LinuxUdpTest, self).__init__(ec, guid) self._home = "udptest-%s" % self.guid - @failtrap - def deploy(self): + def do_deploy(self): if not self.get("command"): self.set("command", self._start_command) @@ -233,10 +232,9 @@ class LinuxUdpTest(LinuxApplication): if not self.get("depends"): self.set("depends", self._depends) - super(LinuxUdpTest, self).deploy() + super(LinuxUdpTest, self).do_deploy() def upload_start_command(self): - super(LinuxUdpTest, self).upload_start_command() if self.get("s") == True: @@ -247,8 +245,7 @@ class LinuxUdpTest(LinuxApplication): # finished to continue ) self._run_in_background() - @failtrap - def start(self): + def do_start(self): if self.get("s") == True: # Server is already running if self.state == ResourceState.READY: @@ -261,7 +258,7 @@ class LinuxUdpTest(LinuxApplication): self.error(msg, out, err) raise RuntimeError, err else: - super(LinuxUdpTest, self).start() + super(LinuxUdpTest, self).do_start() @property def _start_command(self): diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 1efe2302..3cfefbed 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay, failtrap + reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -161,8 +161,7 @@ class UdpTunnel(LinuxApplication): port = self.wait_local_port(endpoint) return (port, pid, ppid) - @failtrap - def provision(self): + def do_provision(self): # create run dir for tunnel on each node self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) @@ -191,20 +190,18 @@ class UdpTunnel(LinuxApplication): self.set_provisioned() - @failtrap - def deploy(self): + def do_deploy(self): if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): self.ec.schedule(reschedule_delay, self.deploy) else: - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) @@ -215,8 +212,7 @@ class UdpTunnel(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def stop(self): + def do_stop(self): """ Stops application execution """ if self.state == ResourceState.STARTED: diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index 37d244cd..4a3f027f 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -19,7 +19,7 @@ # Julien Tribino from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.node import OMFNode @@ -136,8 +136,7 @@ class OMFApplication(OMFResource): return True - @failtrap - def deploy(self): + def do_deploy(self): """ Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...) It becomes DEPLOYED after getting the xmpp client. @@ -153,10 +152,9 @@ class OMFApplication(OMFResource): self.error(msg) raise RuntimeError, msg - super(OMFApplication, self).deploy() + super(OMFApplication, self).do_deploy() - @failtrap - def start(self): + def do_start(self): """ Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application. It becomes STARTED before the messages are sent (for coordination) @@ -186,10 +184,9 @@ class OMFApplication(OMFResource): self.error(msg) raise - super(OMFApplication, self).start() + super(OMFApplication, self).do_start() - @failtrap - def stop(self): + def do_stop(self): """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application. State is set to STOPPED after the message is sent. @@ -202,22 +199,16 @@ class OMFApplication(OMFResource): self.error(msg) raise - super(OMFApplication, self).stop() - self.set_finished() + super(OMFApplication, self).do_stop() - def release(self): + def do_release(self): """ Clean the RM at the end of the experiment and release the API. """ - try: - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OMFApplication, self).release() + if self._omf_api: + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMFApplication, self).do_release() diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index 0e995677..4b412b79 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -19,7 +19,7 @@ # Julien Tribino from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource @@ -121,8 +121,7 @@ class OMFChannel(OMFResource): res.append(couple) return res - @failtrap - def deploy(self): + def do_deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel. It becomes DEPLOYED after sending messages to configure the channel @@ -159,21 +158,16 @@ class OMFChannel(OMFResource): self.error(msg) raise - super(OMFChannel, self).deploy() + super(OMFChannel, self).do_deploy() - def release(self): + def do_release(self): """ Clean the RM at the end of the experiment and release the API """ - try: - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OMFChannel, self).release() + if self._omf_api : + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMFChannel, self).do_release() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index 1a253ac0..b4c1df7b 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -19,7 +19,7 @@ # Julien Tribino from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.node import OMFNode @@ -164,8 +164,7 @@ class OMFWifiInterface(OMFResource): return True - @failtrap - def deploy(self): + def do_deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface. It becomes DEPLOYED after sending messages to configure the interface @@ -201,21 +200,16 @@ class OMFWifiInterface(OMFResource): if self.configure_iface(): self.configure_ip() - super(OMFWifiInterface, self).deploy() + super(OMFWifiInterface, self).do_deploy() - def release(self): + def do_release(self): """ Clean the RM at the end of the experiment and release the API """ - try: - if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OMFWifiInterface, self).release() + if self._omf_api: + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMFWifiInterface, self).do_release() diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index 79c04f20..4229f440 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -19,7 +19,7 @@ # Julien Tribino from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.omf_api import OMFAPIFactory @@ -96,8 +96,7 @@ class OMFNode(OMFResource): return False - @failtrap - def deploy(self): + def do_deploy(self): """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment. It becomes DEPLOYED after sending messages to enroll the node @@ -125,23 +124,18 @@ class OMFNode(OMFResource): self.error(msg) raise - super(OMFNode, self).deploy() + super(OMFNode, self).do_deploy() - def release(self): - """Clean the RM at the end of the experiment + def do_release(self): + """ Clean the RM at the end of the experiment """ - try: - if self._omf_api : - self._omf_api.release(self.get('hostname')) - - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OMFNode, self).release() + if self._omf_api: + self._omf_api.release(self.get('hostname')) + + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMFNode, self).do_release() diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index eadcd0df..c8039a1c 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.execfuncs import lexec @@ -37,6 +37,8 @@ class PlanetlabNode(LinuxNode): "associated to a PlanetLab user account" _backend = "planetlab" + ## XXX A.Q. This lock could use a more descriptive name and + # an explanatory comment lock = threading.Lock() @classmethod @@ -207,7 +209,7 @@ class PlanetlabNode(LinuxNode): return self._plapi - def discover(self): + def do_discover(self): """ Based on the attributes defined by the user, discover the suitable nodes """ @@ -237,7 +239,7 @@ class PlanetlabNode(LinuxNode): else: self._put_node_in_provision(node_id) self._node_to_provision = node_id - super(PlanetlabNode, self).discover() + super(PlanetlabNode, self).do_discover() else: self.fail_node_not_available(hostname) @@ -265,11 +267,11 @@ class PlanetlabNode(LinuxNode): if node_id: self._node_to_provision = node_id - super(PlanetlabNode, self).discover() + super(PlanetlabNode, self).do_discover() else: self.fail_not_enough_nodes() - def provision(self): + def do_provision(self): """ Add node to user's slice after verifing that the node is functioning correctly @@ -288,7 +290,7 @@ class PlanetlabNode(LinuxNode): except: with PlanetlabNode.lock: self._blacklist_node(node) - self.discover() + self.do_discover() continue self._add_node_to_slice(node) @@ -315,7 +317,7 @@ class PlanetlabNode(LinuxNode): self._blacklist_node(node) self._delete_node_from_slice(node) self.set('hostname', None) - self.discover() + self.do_discover() continue # check /proc directory is mounted (ssh_ok = True) @@ -327,7 +329,7 @@ class PlanetlabNode(LinuxNode): self._blacklist_node(node) self._delete_node_from_slice(node) self.set('hostname', None) - self.discover() + self.do_discover() continue else: @@ -336,12 +338,13 @@ class PlanetlabNode(LinuxNode): ip = self._get_ip(node) self.set("ip", ip) - super(PlanetlabNode, self).provision() + super(PlanetlabNode, self).do_provision() def _filter_based_on_attributes(self): """ Retrive the list of nodes ids that match user's constraints """ + # Map user's defined attributes with tagnames of PlanetLab timeframe = self.get("timeframe")[0] attr_to_tags = { @@ -513,7 +516,7 @@ class PlanetlabNode(LinuxNode): return nodes_id def _choose_random_node(self, nodes): - """ + """ From the possible nodes for provision, choose randomly to decrese the probability of different RMs choosing the same node for provision """ @@ -632,23 +635,19 @@ class PlanetlabNode(LinuxNode): return ip def fail_discovery(self): - self.fail() msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg def fail_node_not_alive(self, hostname=None): - self.fail() msg = "Node %s not alive" % hostname raise RuntimeError, msg def fail_node_not_available(self, hostname): - self.fail() msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg def fail_not_enough_nodes(self): - self.fail() msg = "Not enough nodes available for provisioning" raise RuntimeError, msg diff --git a/src/nepi/resources/planetlab/openvswitch/ovs.py b/src/nepi/resources/planetlab/openvswitch/ovs.py index 51628fda..842f5c2c 100644 --- a/src/nepi/resources/planetlab/openvswitch/ovs.py +++ b/src/nepi/resources/planetlab/openvswitch/ovs.py @@ -20,7 +20,7 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, failtrap + ResourceState from nepi.execution.attribute import Attribute, Flags from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.linux.application import LinuxApplication @@ -115,12 +115,13 @@ class OVSWitch(LinuxApplication): # TODO: Validate! return True - @failtrap - def provision(self): + def do_provision(self): # create home dir for ovs self.node.mkdir(self.ovs_home) # create dir for ovs checks self.node.mkdir(self.ovs_checks) + + super(OVSWitch, self).do_provision() def check_sliver_ovs(self): """ Check if sliver-ovs exists. If it does not exist, we interrupt @@ -147,8 +148,7 @@ class OVSWitch(LinuxApplication): msg = "Command sliver-ovs exists" self.debug(msg) - @failtrap - def deploy(self): + def do_deploy(self): """ Wait until node is associated and deployed """ node = self.node @@ -157,15 +157,15 @@ class OVSWitch(LinuxApplication): self.ec.schedule(reschedule_delay, self.deploy) else: - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.check_sliver_ovs() self.servers_on() self.create_bridge() self.assign_contr() self.ovs_status() - super(OVSWitch, self).deploy() + super(OVSWitch, self).do_deploy() def servers_on(self): """ Start the openvswitch servers and also checking @@ -282,37 +282,31 @@ class OVSWitch(LinuxApplication): (out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout') self.info(out) - def release(self): + def do_release(self): """ Delete the bridge and close the servers """ # Node needs to wait until all associated RMs are released # to be released - try: - from nepi.resources.planetlab.openvswitch.ovsport import OVSPort - rm = self.get_connected(OVSPort.rtype()) - - if rm[0].state < ResourceState.FINISHED: - self.ec.schedule(reschedule_delay, self.release) - return - - msg = "Deleting the bridge %s" % self.get('bridge_name') - self.info(msg) - cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name') - (out, err), proc = self.node.run(cmd, self.ovs_checks, - sudo = True) - cmd = "sliver-ovs stop" - (out, err), proc = self.node.run(cmd, self.ovs_checks, - sudo = True) + from nepi.resources.planetlab.openvswitch.ovsport import OVSPort + rm = self.get_connected(OVSPort.rtype()) + + if rm[0].state < ResourceState.FINISHED: + self.ec.schedule(reschedule_delay, self.release) + return - if proc.poll(): - self.fail() - self.error(msg, out, err) - raise RuntimeError, msg - except: - import traceback - err = traceback.format_exc() - self.error(err) + msg = "Deleting the bridge %s" % self.get('bridge_name') + self.info(msg) + cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name') + (out, err), proc = self.node.run(cmd, self.ovs_checks, + sudo = True) + cmd = "sliver-ovs stop" + (out, err), proc = self.node.run(cmd, self.ovs_checks, + sudo = True) + + if proc.poll(): + self.error(msg, out, err) + raise RuntimeError, msg - super(OVSWitch, self).release() + super(OVSWitch, self).do_release() diff --git a/src/nepi/resources/planetlab/openvswitch/ovsport.py b/src/nepi/resources/planetlab/openvswitch/ovsport.py index a7155fb0..ec5f6f1a 100644 --- a/src/nepi/resources/planetlab/openvswitch/ovsport.py +++ b/src/nepi/resources/planetlab/openvswitch/ovsport.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, failtrap + ResourceState from nepi.resources.planetlab.openvswitch.ovs import OVSWitch from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.linux.application import LinuxApplication @@ -179,8 +179,7 @@ class OVSPort(LinuxApplication): command = self.replace_paths(command) return command - @failtrap - def deploy(self): + def do_deploy(self): """ Wait until ovswitch is started """ ovswitch = self.ovswitch @@ -189,39 +188,35 @@ class OVSPort(LinuxApplication): self.ec.schedule(reschedule_delay, self.deploy) else: - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.get_host_ip() self.create_port() self.get_local_end() self.ovswitch.ovs_status() - super(OVSPort, self).deploy() - def release(self): + super(OVSPort, self).do_deploy() + + def do_release(self): """ Release the port RM means delete the ports """ # OVS needs to wait until all associated RMs are released # to be released - try: - from nepi.resources.planetlab.openvswitch.tunnel import Tunnel - rm = self.get_connected(Tunnel.rtype()) - if rm and rm[0].state < ResourceState.FINISHED: - self.ec.schedule(reschedule_delay, self.release) - return - - msg = "Deleting the port %s" % self.get('port_name') - self.info(msg) - cmd = "sliver-ovs del_port %s" % self.get('port_name') - (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks, - sudo = True) - - if proc.poll(): - self.fail() - self.error(msg, out, err) - raise RuntimeError, msg - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OVSPort, self).release() + from nepi.resources.planetlab.openvswitch.tunnel import Tunnel + rm = self.get_connected(Tunnel.rtype()) + if rm and rm[0].state < ResourceState.FINISHED: + self.ec.schedule(reschedule_delay, self.release) + return + + msg = "Deleting the port %s" % self.get('port_name') + self.info(msg) + cmd = "sliver-ovs del_port %s" % self.get('port_name') + (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks, + sudo = True) + + if proc.poll(): + self.error(msg, out, err) + raise RuntimeError, msg + + super(OVSPort, self).do_release() + diff --git a/src/nepi/resources/planetlab/openvswitch/tunnel.py b/src/nepi/resources/planetlab/openvswitch/tunnel.py index c1f81fe9..8ae73939 100644 --- a/src/nepi/resources/planetlab/openvswitch/tunnel.py +++ b/src/nepi/resources/planetlab/openvswitch/tunnel.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, failtrap + ResourceState from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode from nepi.resources.planetlab.openvswitch.ovs import OVSWitch @@ -243,7 +243,6 @@ class OVSTunnel(LinuxApplication): msg = "Failed to connect endpoints" if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg @@ -261,7 +260,6 @@ class OVSTunnel(LinuxApplication): (out, err), proc = self.node.check_errors(self.run_home(self.node)) # Out is what was written in the stderr file if err: - self.fail() msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg @@ -296,7 +294,6 @@ class OVSTunnel(LinuxApplication): msg = "Failed to connect endpoints" if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg else: @@ -337,7 +334,6 @@ class OVSTunnel(LinuxApplication): msg = "Failed to connect endpoints" if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg else: @@ -345,8 +341,7 @@ class OVSTunnel(LinuxApplication): self.info(msg) return - @failtrap - def provision(self): + def do_provision(self): """ Provision the tunnel """ # Create folders @@ -365,41 +360,33 @@ class OVSTunnel(LinuxApplication): (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1) switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2) - super(OVSTunnel, self).provision() + super(OVSTunnel, self).do_provision() - @failtrap - def deploy(self): + def do_deploy(self): if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): self.ec.schedule(reschedule_delay, self.deploy) else: - self.discover() - self.provision() + self.do_discover() + self.do_provision() - super(OVSTunnel, self).deploy() + super(OVSTunnel, self).do_deploy() - def release(self): + def do_release(self): """ Release the udp_tunnel on endpoint2. On endpoint1 means nothing special. """ - try: - if not self.check_endpoints(): - # Kill the TAP devices - # TODO: Make more generic Release method of PLTAP - if self._pid and self._ppid: - self._nodes = self.get_node(self.endpoint2) - (out, err), proc = self.node.kill(self._pid, - self._ppid, sudo = True) - if err or proc.poll(): - # check if execution errors occurred - msg = " Failed to delete TAP device" - self.error(msg, err, err) - self.fail() - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(OVSTunnel, self).release() - + if not self.check_endpoints(): + # Kill the TAP devices + # TODO: Make more generic Release method of PLTAP + if self._pid and self._ppid: + self._nodes = self.get_node(self.endpoint2) + (out, err), proc = self.node.kill(self._pid, + self._ppid, sudo = True) + if err or proc.poll(): + # check if execution errors occurred + msg = " Failed to delete TAP device" + self.error(msg, err, err) + + super(OVSTunnel, self).do_release() diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index d9cf17c3..736c973e 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay, failtrap + reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode from nepi.util.timefuncs import tnow, tdiffsec @@ -147,8 +147,7 @@ class PlanetlabTap(LinuxApplication): if_name = self.wait_if_name() self.set("deviceName", if_name) - @failtrap - def deploy(self): + def do_deploy(self): if not self.node or self.node.state < ResourceState.PROVISIONED: self.ec.schedule(reschedule_delay, self.deploy) else: @@ -161,14 +160,13 @@ class PlanetlabTap(LinuxApplication): if not self.get("install"): self.set("install", self._install) - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.debug("----- READY ---- ") self.set_ready() - @failtrap - def start(self): + def do_start(self): if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) @@ -179,8 +177,7 @@ class PlanetlabTap(LinuxApplication): self.error(msg, out, err) raise RuntimeError, msg - @failtrap - def stop(self): + def do_stop(self): command = self.get('command') or '' if self.state == ResourceState.STARTED: @@ -210,22 +207,17 @@ class PlanetlabTap(LinuxApplication): return self._state - def release(self): + def do_release(self): # Node needs to wait until all associated RMs are released # to be released - try: - from nepi.resources.linux.udptunnel import UdpTunnel - rms = self.get_connected(UdpTunnel.rtype()) - for rm in rms: - if rm.state < ResourceState.STOPPED: - self.ec.schedule(reschedule_delay, self.release) - return - except: - import traceback - err = traceback.format_exc() - self.error(err) - - super(PlanetlabTap, self).release() + from nepi.resources.linux.udptunnel import UdpTunnel + rms = self.get_connected(UdpTunnel.rtype()) + for rm in rms: + if rm.state < ResourceState.STOPPED: + self.ec.schedule(reschedule_delay, self.release) + return + + super(PlanetlabTap, self).do_release() def wait_if_name(self): """ Waits until the if_name file for the command is generated, diff --git a/test/execution/resource.py b/test/execution/resource.py index 88b4156b..2aaa4b97 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -22,7 +22,7 @@ from nepi.execution.attribute import Attribute from nepi.execution.ec import ExperimentController, FailureLevel from nepi.execution.resource import ResourceManager, ResourceState, \ - clsinit_copy, ResourceAction, failtrap + clsinit_copy, ResourceAction import random import time @@ -53,9 +53,9 @@ class Channel(ResourceManager): def __init__(self, ec, guid): super(Channel, self).__init__(ec, guid) - def deploy(self): + def do_deploy(self): time.sleep(1) - super(Channel, self).deploy() + super(Channel, self).do_deploy() self.logger.debug(" -------- DEPLOYED ------- ") class Interface(ResourceManager): @@ -64,7 +64,7 @@ class Interface(ResourceManager): def __init__(self, ec, guid): super(Interface, self).__init__(ec, guid) - def deploy(self): + def do_deploy(self): node = self.get_connected(Node.rtype())[0] chan = self.get_connected(Channel.rtype())[0] @@ -74,7 +74,7 @@ class Interface(ResourceManager): self.ec.schedule("0.5s", self.deploy) else: time.sleep(2) - super(Interface, self).deploy() + super(Interface, self).do_deploy() self.logger.debug(" -------- DEPLOYED ------- ") class Node(ResourceManager): @@ -83,10 +83,10 @@ class Node(ResourceManager): def __init__(self, ec, guid): super(Node, self).__init__(ec, guid) - def deploy(self): + def do_deploy(self): if self.state == ResourceState.NEW: - self.discover() - self.provision() + self.do_discover() + self.do_provision() self.logger.debug(" -------- PROVISIONED ------- ") self.ec.schedule("1s", self.deploy) elif self.state == ResourceState.PROVISIONED: @@ -96,7 +96,7 @@ class Node(ResourceManager): self.ec.schedule("0.5s", self.deploy) return - super(Node, self).deploy() + super(Node, self).do_deploy() self.logger.debug(" -------- DEPLOYED ------- ") class Application(ResourceManager): @@ -105,19 +105,19 @@ class Application(ResourceManager): def __init__(self, ec, guid): super(Application, self).__init__(ec, guid) - def deploy(self): + def do_deploy(self): node = self.get_connected(Node.rtype())[0] if node.state < ResourceState.READY: self.ec.schedule("0.5s", self.deploy) else: time.sleep(random.random() * 2) - super(Application, self).deploy() + super(Application, self).do_deploy() self.logger.debug(" -------- DEPLOYED ------- ") - def start(self): - super(Application, self).start() + def do_start(self): + super(Application, self).do_start() time.sleep(random.random() * 3) - self._state = ResourceState.FINISHED + self.ec.schedule("0.5s", self.finish) class ErrorApplication(ResourceManager): _rtype = "ErrorApplication" @@ -125,8 +125,7 @@ class ErrorApplication(ResourceManager): def __init__(self, ec, guid): super(ErrorApplication, self).__init__(ec, guid) - @failtrap - def deploy(self): + def do_deploy(self): node = self.get_connected(Node.rtype())[0] if node.state < ResourceState.READY: self.ec.schedule("0.5s", self.deploy) @@ -245,15 +244,15 @@ class ResourceManagerTestCase(unittest.TestCase): self.assertTrue(rmnode1.ready_time < rmapp1.ready_time) self.assertTrue(rmnode2.ready_time < rmapp2.ready_time) - # - Node needs to wait until Interface is ready to be ready + # - Node needs to wait until Interface is ready to be ready self.assertTrue(rmnode1.ready_time > rmiface1.ready_time) self.assertTrue(rmnode2.ready_time > rmiface2.ready_time) - # - Interface needs to wait until Node is provisioned to be ready + # - Interface needs to wait until Node is provisioned to be ready self.assertTrue(rmnode1.provision_time < rmiface1.ready_time) self.assertTrue(rmnode2.provision_time < rmiface2.ready_time) - # - Interface needs to wait until Channel is ready to be ready + # - Interface needs to wait until Channel is ready to be ready self.assertTrue(rmchan.ready_time < rmiface1.ready_time) self.assertTrue(rmchan.ready_time < rmiface2.ready_time) @@ -301,7 +300,6 @@ class ResourceManagerTestCase(unittest.TestCase): app = ec.register_resource("ErrorApplication") ec.register_connection(app, node) - apps.append(app) ec.deploy() diff --git a/test/resources/planetlab/node.py b/test/resources/planetlab/node.py index ba69eeba..982192bb 100755 --- a/test/resources/planetlab/node.py +++ b/test/resources/planetlab/node.py @@ -28,11 +28,9 @@ import time import unittest import multiprocessing -class DummyEC(ExperimentController): - pass - -def create_node(ec, username, pl_user, pl_password, hostname=None, country=None, - operatingSystem=None, minBandwidth=None, minCpu=None): +def create_node(ec, username, pl_user, pl_password, hostname = None, + country = None, operatingSystem = None, minBandwidth = None, + minCpu = None): node = ec.register_resource("PlanetlabNode") @@ -60,7 +58,6 @@ def create_node(ec, username, pl_user, pl_password, hostname=None, country=None, return node class PLNodeFactoryTestCase(unittest.TestCase): - def test_creation_phase(self): self.assertEquals(PlanetlabNode.rtype(), "PlanetlabNode") self.assertEquals(len(PlanetlabNode._attributes), 29) @@ -73,7 +70,7 @@ class PLNodeTestCase(unittest.TestCase): """ def setUp(self): - self.ec = DummyEC() + self.ec = ExperimentController() self.username = "inria_sfatest" self.pl_user = os.environ.get("PL_USER") self.pl_password = os.environ.get("PL_PASS")