From bf43c83ced9389c8fa9468d7c23f67d35af963da Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sun, 4 Aug 2013 11:48:40 -0700 Subject: [PATCH] Code cleanup. Setting resource state through specific functions --- src/nepi/execution/ec.py | 58 ++++----- src/nepi/execution/resource.py | 119 +++++++++++------- src/nepi/resources/linux/application.py | 27 ++-- .../resources/linux/ccn/ccnapplication.py | 5 +- src/nepi/resources/linux/ccn/ccncontent.py | 14 +-- src/nepi/resources/linux/ccn/ccnd.py | 24 ++-- src/nepi/resources/linux/ccn/ccnr.py | 10 +- src/nepi/resources/linux/ccn/fibentry.py | 19 +-- src/nepi/resources/linux/node.py | 24 ++-- src/nepi/resources/linux/traceroute.py | 17 ++- src/nepi/resources/linux/udptest.py | 9 +- src/nepi/resources/linux/udptunnel.py | 37 +++--- src/nepi/resources/omf/application.py | 68 +++++----- src/nepi/resources/omf/channel.py | 81 ++++++------ src/nepi/resources/omf/interface.py | 62 ++++----- src/nepi/resources/omf/node.py | 38 +++--- src/nepi/resources/omf/omf_api.py | 33 +++-- src/nepi/resources/planetlab/node.py | 35 +++++- src/nepi/resources/planetlab/tap.py | 19 ++- 19 files changed, 382 insertions(+), 317 deletions(-) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index de013d1c..6beb3d76 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -17,14 +17,6 @@ # # Author: Alina Quereilhac -import functools -import logging -import os -import random -import sys -import time -import threading - from nepi.util import guid from nepi.util.parallel import ParallelRun from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat @@ -34,9 +26,16 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading -# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) +import functools +import logging +import os +import random +import sys +import time +import threading + class ECState(object): """ State of the Experiment Controller @@ -182,7 +181,7 @@ class ExperimentController(object): def wait_finished(self, guids): """ Blocking method that wait until all the RM from the 'guid' list - reached the state FINISHED + reached the state FINISHED ( or STOPPED, FAILED or RELEASED ) :param guids: List of guids :type guids: list @@ -191,31 +190,34 @@ class ExperimentController(object): def wait_started(self, guids): """ Blocking method that wait until all the RM from the 'guid' list - reached the state STARTED + reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED) :param guids: List of guids :type guids: list """ - return self.wait(guids, states = [ResourceState.STARTED, - ResourceState.STOPPED, - ResourceState.FAILED, - ResourceState.FINISHED]) + return self.wait(guids, state = ResourceState.STARTED) def wait_released(self, guids): """ Blocking method that wait until all the RM from the 'guid' list - reached the state RELEASED + reached the state RELEASED (or FAILED) + + :param guids: List of guids + :type guids: list + """ + # TODO: solve state concurrency BUG and !!!! + # correct waited release state to state = ResourceState.FAILED) + return self.wait(guids, state = ResourceState.FINISHED) + + def wait_deployed(self, guids): + """ Blocking method that wait until all the RM from the 'guid' list + reached the state READY (or any higher state) :param guids: List of guids :type guids: list """ - return self.wait(guids, states = [ResourceState.RELEASED, - ResourceState.STOPPED, - ResourceState.FAILED, - ResourceState.FINISHED]) + return self.wait(guids, state = ResourceState.READY) - def wait(self, guids, states = [ResourceState.FINISHED, - ResourceState.FAILED, - ResourceState.STOPPED]): + def wait(self, guids, state = ResourceState.STOPPED): """ Blocking method that waits until all the RM from the 'guid' list reached state 'state' or until a failure occurs @@ -237,14 +239,14 @@ class ExperimentController(object): # If a guid reached one of the target states, remove it from list guid = guids[0] - state = self.state(guid) + rstate = self.state(guid) - if state in states: + if rstate >= state: guids.remove(guid) else: # Debug... - self.logger.debug(" WAITING FOR %g - state %s " % (guid, - self.state(guid, hr = True))) + self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid, + self.state(guid, hr = True), state)) # Take the opportunity to 'refresh' the states of the RMs. # Query only the first up to N guids (not to overwhelm @@ -262,7 +264,7 @@ class ExperimentController(object): # If the guid is not in one of the target states, wait and # continue quering. We keep the sleep big to decrease the # number of RM state queries - time.sleep(2) + time.sleep(4) def get_task(self, tid): """ Get a specific task diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 978f0eeb..2d738c5d 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -200,8 +200,8 @@ class ResourceManager(Logger): # the resource instance gets a copy of all traces self._trcs = copy.deepcopy(self._traces) - self._state = ResourceState.NEW - + # Each resource is placed on a deployment group by the EC + # during deployment self.deployment_group = None self._start_time = None @@ -213,6 +213,8 @@ class ResourceManager(Logger): self._finish_time = None self._failed_time = None + self._state = ResourceState.NEW + @property def guid(self): """ Returns the global unique identifier of the RM """ @@ -316,9 +318,8 @@ class ResourceManager(Logger): This method is resposible for selecting an individual resource matching user requirements. This method should be redefined when necessary in child classes. - """ - self._discover_time = tnow() - self._state = ResourceState.DISCOVERED + """ + self.set_discovered() def provision(self): """ Performs resource provisioning. @@ -327,9 +328,8 @@ class ResourceManager(Logger): After this method has been successfully invoked, the resource should be acccesible/controllable by the RM. This method should be redefined when necessary in child classes. - """ - self._provision_time = tnow() - self._state = ResourceState.PROVISIONED + """ + self.set_provisioned() def start(self): """ Starts the resource. @@ -337,12 +337,11 @@ class ResourceManager(Logger): There is no generic start behavior for all resources. This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.READY, ResourceState.STOPPED]: + if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() def stop(self): """ Stops the resource. @@ -350,12 +349,31 @@ class ResourceManager(Logger): There is no generic stop behavior for all resources. This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.STARTED]: + if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return + + self.set_stopped() - self._stop_time = tnow() - self._state = ResourceState.STOPPED + def deploy(self): + """ Execute all steps required for the RM to reach the state READY + + """ + if self.state > ResourceState.READY: + self.error("Wrong state %s for deploy" % self.state) + return + + self.debug("----- READY ---- ") + self.set_ready() + + def release(self): + self.set_released() + + def finish(self): + self.set_finished() + + def fail(self): + self.set_failed() def set(self, name, value): """ Set the value of the attribute @@ -655,39 +673,6 @@ class ResourceManager(Logger): self.debug(" ----- STOPPING ---- ") self.stop() - def deploy(self): - """ Execute all steps required for the RM to reach the state READY - - """ - if self._state > ResourceState.READY: - self.error("Wrong state %s for deploy" % self.state) - return - - self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY - - def release(self): - """Release any resources used by this RM - - """ - self._release_time = tnow() - self._state = ResourceState.RELEASED - - def finish(self): - """ Mark ResourceManager as FINISHED - - """ - self._finish_time = tnow() - self._state = ResourceState.FINISHED - - def fail(self): - """ Mark ResourceManager as FAILED - - """ - self._failed_time = tnow() - self._state = ResourceState.FAILED - def connect(self, guid): """ Performs actions that need to be taken upon associating RMs. This method should be redefined when necessary in child classes. @@ -712,6 +697,46 @@ class ResourceManager(Logger): """ # TODO: Validate! return True + + def set_started(self): + """ Mark ResourceManager as STARTED """ + self._start_time = tnow() + self._state = ResourceState.STARTED + + def set_stopped(self): + """ Mark ResourceManager as STOPPED """ + self._stop_time = tnow() + self._state = ResourceState.STOPPED + + def set_ready(self): + """ Mark ResourceManager as READY """ + self._ready_time = tnow() + self._state = ResourceState.READY + + def set_released(self): + """ Mark ResourceManager as REALEASED """ + self._release_time = tnow() + self._state = ResourceState.RELEASED + + def set_finished(self): + """ Mark ResourceManager as FINISHED """ + self._finish_time = tnow() + self._state = ResourceState.FINISHED + + def set_failed(self): + """ Mark ResourceManager as FAILED """ + self._failed_time = tnow() + self._state = ResourceState.FAILED + + def set_discovered(self): + """ Mark ResourceManager as DISCOVERED """ + self._discover_time = tnow() + self._state = ResourceState.DISCOVERED + + def set_provisioned(self): + """ Mark ResourceManager as PROVISIONED """ + self._provision_time = tnow() + self._state = ResourceState.PROVISIONED class ResourceFactory(object): _resource_types = dict() diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 3519138b..f9c46c82 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay + reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -29,6 +29,7 @@ import os import subprocess # TODO: Resolve wildcards in commands!! +# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! @clsinit class LinuxApplication(ResourceManager): @@ -483,7 +484,7 @@ class LinuxApplication(ResourceManager): raise super(LinuxApplication, self).deploy() - + def start(self): command = self.get("command") @@ -492,7 +493,7 @@ class LinuxApplication(ResourceManager): if not command: # If no command was given (i.e. Application was used for dependency # installation), then the application is directly marked as FINISHED - self._state = ResourceState.FINISHED + self.set_finished() else: if self.in_foreground: @@ -585,14 +586,12 @@ class LinuxApplication(ResourceManager): if self.state == ResourceState.STARTED: - self.info("Stopping command '%s'" % command) + self.info("Stopping command '%s' " % command) # If the command is running in foreground (it was launched using # the node 'execute' method), then we use the handler to the Popen # process to kill it. Else we send a kill signal using the pid and ppid # retrieved after running the command with the node 'run' method - stopped = True - if self._proc: self._proc.kill() else: @@ -602,12 +601,12 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.kill(self.pid, self.ppid, sudo = self._sudo_kill) + # TODO: check if execution errors occurred if proc.poll() or err: - # check if execution errors occurred msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) self.fail() - + if self.state == ResourceState.STARTED: super(LinuxApplication, self).stop() @@ -620,11 +619,11 @@ class LinuxApplication(ResourceManager): self.stop() - if self.state == ResourceState.STOPPED: + if self.state != ResourceState.FAILED: self.info("Resource released") super(LinuxApplication, self).release() - + @property def state(self): """ Returns the state of the application @@ -644,9 +643,9 @@ class LinuxApplication(ResourceManager): err = self._proc.stderr.read() self.error(msg, out, err) self.fail() - elif retcode == 0: - self._state = ResourceState.FINISHED + elif retcode == 0: + self.finish() else: # We need to query the status of the command we launched in # background. In order to avoid overwhelming the remote host and @@ -665,12 +664,12 @@ class LinuxApplication(ResourceManager): self.run_home) if err: - msg = " Failed to execute command '%s'" % \ + msg = "Failed to execute command '%s'" % \ self.get("command") self.error(msg, out, err) self.fail() else: - self._state = ResourceState.FINISHED + self.finish() self._last_state_check = tnow() diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py index 8a091224..3d8943ab 100644 --- a/src/nepi/resources/linux/ccn/ccnapplication.py +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -17,7 +17,7 @@ # # Author: Alina Quereilhac -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND @@ -64,8 +64,7 @@ class LinuxCCNApplication(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() @property def _environment(self): diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index 5b9d9252..cae55b58 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -101,8 +101,7 @@ class LinuxCCNContent(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def upload_start_command(self): command = self.get("command") @@ -128,22 +127,17 @@ class LinuxCCNContent(LinuxApplication): raise RuntimeError, msg def start(self): - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + sef.fail() raise RuntimeError, msg - @property - def state(self): - return self._state - @property def _start_command(self): command = ["ccnseqwriter"] diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 20c04fc1..4fdb0ce6 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -19,8 +19,8 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType from nepi.util.timefuncs import tnow, tdiffsec @@ -181,8 +181,7 @@ class LinuxCCND(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def upload_start_command(self): command = self.get("command") @@ -204,23 +203,21 @@ class LinuxCCND(LinuxApplication): raise_on_error = True) def start(self): - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.set_failed() raise RuntimeError, msg def stop(self): command = self.get('command') or '' - state = self.state - if state == ResourceState.STARTED: + if self.state == ResourceState.STARTED: self.info("Stopping command '%s'" % command) command = "ccndstop" @@ -241,8 +238,7 @@ class LinuxCCND(LinuxApplication): stdout = "ccndstop_stdout", stderr = "ccndstop_stderr") - self._stop_time = tnow() - self._state = ResourceState.STOPPED + self.set_stopped() @property def state(self): @@ -256,12 +252,12 @@ class LinuxCCND(LinuxApplication): if retcode == 1 and err.find("No such file or directory") > -1: # ccnd is not running (socket not found) - self._state = ResourceState.FINISHED + self.set_finished() elif retcode: # other errors ... msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() self._last_state_check = tnow() diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index ac10840f..378f93c6 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -225,8 +225,7 @@ class LinuxCCNR(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def upload_start_command(self): command = self.get("command") @@ -257,16 +256,15 @@ class LinuxCCNR(LinuxApplication): raise_on_error = True) def start(self): - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() raise RuntimeError, msg @property diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index 62d9049d..9d8e8c25 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -139,8 +139,7 @@ class LinuxFIBEntry(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def upload_start_command(self): command = self.get("command") @@ -160,9 +159,9 @@ class LinuxFIBEntry(LinuxApplication): env, blocking = True) if proc.poll(): - self._state = ResourceState.FAILED msg = "Failed to execute command" self.error(msg, out, err) + self.fail() raise RuntimeError, msg def configure(self): @@ -197,16 +196,15 @@ class LinuxFIBEntry(LinuxApplication): self.ec.deploy(guids=[self._traceroute], group = self.deployment_group) def start(self): - if self._state in [ResourceState.READY, ResourceState.STARTED]: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() raise RuntimeError, msg def stop(self): @@ -222,12 +220,7 @@ class LinuxFIBEntry(LinuxApplication): if proc.poll(): pass - self._stop_time = tnow() - self._state = ResourceState.STOPPED - - @property - def state(self): - return self._state + self.set_stopped() @property def _start_command(self): diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 9188022f..d71d21f7 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -196,8 +196,14 @@ class LinuxNode(ResourceManager): # home directory at Linux host self._home_dir = "" - # lock to avoid concurrency issues on methods used by applications - self._lock = threading.Lock() + # lock to prevent concurrent applications on the same node, + # to execute commands at the same time. There are potential + # concurrency issues when using SSH to a same host from + # multiple threads. There are also possible operational + # issues, e.g. an application querying the existence + # of a file or folder prior to its creation, and another + # application creating the same file or folder in between. + self._node_lock = threading.Lock() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, @@ -351,7 +357,7 @@ class LinuxNode(ResourceManager): self.discover() self.provision() except: - self._state = ResourceState.FAILED + self.fail() raise # Node needs to wait until all associated interfaces are @@ -456,7 +462,7 @@ class LinuxNode(ResourceManager): env = env) else: if with_lock: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), @@ -524,7 +530,7 @@ class LinuxNode(ResourceManager): sudo = sudo, user = user) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, @@ -549,7 +555,7 @@ class LinuxNode(ResourceManager): if self.localhost: pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: - with self._lock: + with self._node_lock: pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), @@ -566,7 +572,7 @@ class LinuxNode(ResourceManager): if self.localhost: status = execfuncs.lstatus(pid, ppid) else: - with self._lock: + with self._node_lock: status = sshfuncs.rstatus( pid, ppid, host = self.get("hostname"), @@ -588,7 +594,7 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rkill( pid, ppid, host = self.get("hostname"), @@ -608,7 +614,7 @@ class LinuxNode(ResourceManager): recursive = True, strict_host_checking = False) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rcopy( src, dst, port = self.get("port"), diff --git a/src/nepi/resources/linux/traceroute.py b/src/nepi/resources/linux/traceroute.py index 4d55eb10..2e03f628 100644 --- a/src/nepi/resources/linux/traceroute.py +++ b/src/nepi/resources/linux/traceroute.py @@ -23,6 +23,7 @@ from nepi.resources.linux.application import LinuxApplication from nepi.util.timefuncs import tnow import os +import socket @clsinit_copy class LinuxTraceroute(LinuxApplication): @@ -42,12 +43,21 @@ class LinuxTraceroute(LinuxApplication): default = False, flags = Flags.ExecReadOnly) + use_ip = Attribute("useIP", + "Use the IP address instead of the host domain name. " + "Useful for environments were dns resolution problems occur " + "frequently", + type = Types.Bool, + default = False, + flags = Flags.ExecReadOnly) + target = Attribute("target", "Traceroute target host (host that will be pinged)", flags = Flags.ExecReadOnly) cls._register_attribute(countinuous) cls._register_attribute(print_timestamp) + cls._register_attribute(use_ip) cls._register_attribute(target) def __init__(self, ec, guid): @@ -71,7 +81,12 @@ class LinuxTraceroute(LinuxApplication): if self.get("printTimestamp") == True: args.append("""echo "`date +'%Y%m%d%H%M%S'`";""") args.append("traceroute") - args.append(self.get("target")) + + target = self.get("target") + if self.get("useIP") == True: + target = socket.gethostbyname(target) + args.append(target) + if self.get("continuous") == True: args.append("; sleep 2 ; done ") diff --git a/src/nepi/resources/linux/udptest.py b/src/nepi/resources/linux/udptest.py index dbb08137..779b8c1e 100644 --- a/src/nepi/resources/linux/udptest.py +++ b/src/nepi/resources/linux/udptest.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay from nepi.execution.resource import clsinit_copy from nepi.resources.linux.application import LinuxApplication @@ -250,16 +250,15 @@ class LinuxUdpTest(LinuxApplication): def start(self): if self.get("s") == True: # Server is already running - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() raise RuntimeError, msg else: super(LinuxUdpTest, self).start() diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 349ddc27..1d23736c 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.util.sshfuncs import ProcStatus @@ -187,9 +187,7 @@ class UdpTunnel(LinuxApplication): self.info("Provisioning finished") - self.debug("----- READY ---- ") - self._provision_time = tnow() - self._state = ResourceState.PROVISIONED + self.set_provisioned() def deploy(self): if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ @@ -204,29 +202,24 @@ class UdpTunnel(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def start(self): - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - - self._start_time = tnow() - self._state = ResourceState.STARTED + + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() raise RuntimeError, msg - # XXX: Leaves process unkilled!! - # Implement another mechanism to kill the tunnel! def stop(self): """ Stops application execution """ if self.state == ResourceState.STARTED: - stopped = True self.info("Stopping tunnel") # Only try to kill the process if the pid and ppid @@ -242,11 +235,9 @@ class UdpTunnel(LinuxApplication): msg = " Failed to STOP tunnel" self.error(msg, err1, err2) self.fail() - stopped = False - if stopped: - self._stop_time = tnow() - self._state = ResourceState.STOPPED + if self.state == ResourceState.STARTED: + self.set_stopped() @property def state(self): @@ -280,7 +271,7 @@ class UdpTunnel(LinuxApplication): self.error(msg, err1, err2) self.fail() else: - self._state = ResourceState.FINISHED + self.set_finished() self._last_state_check = tnow() @@ -288,11 +279,15 @@ class UdpTunnel(LinuxApplication): def wait_local_port(self, endpoint): """ Waits until the local_port file for the endpoint is generated, - and returns the port number """ + and returns the port number + + """ return self.wait_file(endpoint, "local_port") def wait_result(self, endpoint): - """ Waits until the return code file for the endpoint is generated """ + """ Waits until the return code file for the endpoint is generated + + """ return self.wait_file(endpoint, "ret_file") def wait_file(self, endpoint, filename): diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index 3b80df16..673f8100 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -39,7 +39,8 @@ class OMFApplication(ResourceManager): .. note:: - This class is used only by the Experiment Controller through the Resource Factory + This class is used only by the Experiment Controller through the + Resource Factory """ _rtype = "OMFApplication" @@ -47,7 +48,8 @@ class OMFApplication(ResourceManager): @classmethod def _register_attributes(cls): - """Register the attributes of an OMF application + """ Register the attributes of an OMF application + """ appid = Attribute("appid", "Name of the application") @@ -78,7 +80,6 @@ class OMFApplication(ResourceManager): :type creds: dict """ - super(OMFApplication, self).__init__(ec, guid) self.set('appid', "") @@ -90,12 +91,6 @@ class OMFApplication(ResourceManager): self._omf_api = None - @property - def exp_id(self): - if self.ec.exp_id.startswith('exp-'): - return None - return self.ec.exp_id - @property def node(self): rm_list = self.get_connected(OMFNode.rtype()) @@ -103,7 +98,8 @@ class OMFApplication(ResourceManager): return None def valid_connection(self, guid): - """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed. + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. :param guid: Guid of RM it will be connected :type guid: int @@ -112,47 +108,57 @@ class OMFApplication(ResourceManager): """ rm = self.ec.get_resource(guid) if rm.rtype() not in self._authorized_connections: - msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" %\ + msg = ("Connection between %s %s and %s %s refused: " + "An Application can be connected only to a Node" ) % \ (self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return False + elif len(self.connections) != 0 : - msg = "Connection between %s %s and %s %s refused : This Application is already connected" % \ + msg = ("Connection between %s %s and %s %s refused: " + "This Application is already connected" ) % \ (self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return False + else : - msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid) + msg = "Connection between %s %s and %s %s accepted" % ( + self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) - return True - + return True def deploy(self): - """Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...) - It becomes DEPLOYED after getting the xmpp client. + """ Deploy the RM. It means nothing special for an application + for now (later it will be upload sources, ...) + It becomes DEPLOYED after getting the xmpp client. + """ if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) if not self._omf_api : - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() return super(OMFApplication, self).deploy() def start(self): - """Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application - It becomes STARTED before the messages are sent (for coordination) + """ Start the RM. It means : Send Xmpp Message Using OMF protocol + to execute the application. + It becomes STARTED before the messages are sent (for coordination) """ if not (self.get('appid') and self.get('path')) : - self._state = ResourceState.FAILED msg = "Application's information are not initialized" self.error(msg) + self.fail() return if not self.get('args'): @@ -170,39 +176,37 @@ class OMFApplication(ResourceManager): self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \ self.get('args'), self.get('path'), self.get('env')) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() raise - super(OMFApplication, self).start() - def stop(self): - """Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application - It becomes STOPPED after the message is sent. + """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to + kill the application. + State is set to STOPPED after the message is sent. """ try: self._omf_api.exit(self.node.get('hostname'),self.get('appid')) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials were not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() #raise super(OMFApplication, self).stop() - self._state = ResourceState.FINISHED - def release(self): - """Clean the RM at the end of the experiment and release the API. + """ Clean the RM at the end of the experiment and release the API. """ if self._omf_api : OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) super(OMFApplication, self).release() diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index bc4f9b55..b51cd895 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -1,21 +1,22 @@ -""" - NEPI, a framework to manage network experiments - Copyright (C) 2013 INRIA - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . - -""" +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ reschedule_delay @@ -44,10 +45,10 @@ class OMFChannel(ResourceManager): _rtype = "OMFChannel" _authorized_connections = ["OMFWifiInterface", "OMFNode"] - @classmethod def _register_attributes(cls): """Register the attributes of an OMF channel + """ channel = Attribute("channel", "Name of the application") xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) @@ -83,7 +84,8 @@ class OMFChannel(ResourceManager): return self.ec.exp_id def valid_connection(self, guid): - """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed. + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. :param guid: Guid of the current RM :type guid: int @@ -91,12 +93,17 @@ class OMFChannel(ResourceManager): """ rm = self.ec.get_resource(guid) + if rm.rtype() in self._authorized_connections: - msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid) + msg = "Connection between %s %s and %s %s accepted" % ( + self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) return True - msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid) + + msg = "Connection between %s %s and %s %s refused" % ( + self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return False def _get_target(self, conn_set): @@ -115,7 +122,8 @@ class OMFChannel(ResourceManager): for conn in rm_iface.connections: rm_node = self.ec.get_resource(conn) if rm_node.rtype() == "OMFNode" and rm_node.get('hostname'): - if rm_iface.state < ResourceState.PROVISIONED or rm_node.state < ResourceState.READY: + if rm_iface.state < ResourceState.PROVISIONED or \ + rm_node.state < ResourceState.READY: return "reschedule" couple = [rm_node.get('hostname'), rm_iface.get('alias')] #print couple @@ -135,24 +143,26 @@ class OMFChannel(ResourceManager): pass def deploy(self): - """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel - It becomes DEPLOYED after sending messages to configure the channel + """ Deploy the RM. It means : Get the xmpp client and send messages + using OMF 5.4 protocol to configure the channel. + It becomes DEPLOYED after sending messages to configure the channel """ if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) if not self._omf_api : - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() return if not self.get('channel'): - self._state = ResourceState.FAILED msg = "Channel's value is not initialized" self.error(msg) + self.fail() raise self._nodes_guid = self._get_target(self._connections) @@ -167,35 +177,36 @@ class OMFChannel(ResourceManager): attrname = "net/%s/%s" % (couple[1], 'channel') self._omf_api.configure(couple[0], attrname, attrval) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() raise super(OMFChannel, self).deploy() def start(self): - """Start the RM. It means nothing special for a channel for now - It becomes STARTED as soon as this method starts. + """ Start the RM. It means nothing special for a channel for now + It becomes STARTED as soon as this method starts. """ super(OMFChannel, self).start() def stop(self): - """Stop the RM. It means nothing special for a channel for now - It becomes STOPPED as soon as this method is called + """ Stop the RM. It means nothing special for a channel for now + It becomes STOPPED as soon as this method is called """ super(OMFChannel, self).stop() def release(self): - """Clean the RM at the end of the experiment and release the API + """ Clean the RM at the end of the experiment and release the API """ if self._omf_api : OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) super(OMFChannel, self).release() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index c2be38eb..a3b9c3a3 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -26,7 +26,6 @@ from nepi.resources.omf.node import OMFNode from nepi.resources.omf.channel import OMFChannel from nepi.resources.omf.omf_api import OMFAPIFactory - @clsinit class OMFWifiInterface(ResourceManager): """ @@ -41,7 +40,8 @@ class OMFWifiInterface(ResourceManager): .. note:: - This class is used only by the Experiment Controller through the Resource Factory + This class is used only by the Experiment Controller through the Resource + Factory """ _rtype = "OMFWifiInterface" @@ -88,14 +88,9 @@ class OMFWifiInterface(ResourceManager): self._omf_api = None self._alias = self.get('alias') - @property - def exp_id(self): - if self.ec.exp_id.startswith('exp-'): - return None - return self.ec.exp_id - def valid_connection(self, guid): - """ Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed. + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. :param guid: Guid of the current RM :type guid: int @@ -107,10 +102,13 @@ class OMFWifiInterface(ResourceManager): msg = "Connection between %s %s and %s %s accepted" % \ (self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return True + msg = "Connection between %s %s and %s %s refused" % \ (self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return False @property @@ -138,7 +136,8 @@ class OMFWifiInterface(ResourceManager): for attrname in ["mode", "type", "essid"]: attrval = self.get(attrname) attrname = "net/%s/%s" % (self._alias, attrname) - self._omf_api.configure(self.node.get('hostname'), attrname, attrval) + self._omf_api.configure(self.node.get('hostname'), attrname, + attrval) except AttributeError: self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" @@ -152,7 +151,6 @@ class OMFWifiInterface(ResourceManager): """ Configure the ip of the interface """ - if self.channel.state < ResourceState.READY: self.ec.schedule(reschedule_delay, self.deploy) return False @@ -160,39 +158,43 @@ class OMFWifiInterface(ResourceManager): try : attrval = self.get("ip") attrname = "net/%s/%s" % (self._alias, "ip") - self._omf_api.configure(self.node.get('hostname'), attrname, attrval) + self._omf_api.configure(self.node.get('hostname'), attrname, + attrval) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.debug(msg) + self.fail() #raise return True - def deploy(self): - """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface - It becomes DEPLOYED after sending messages to configure the interface + """ Deploy the RM. It means : Get the xmpp client and send messages + using OMF 5.4 protocol to configure the interface. + It becomes DEPLOYED after sending messages to configure the interface """ if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) if not self._omf_api : - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() return - if not (self.get('mode') and self.get('type') and self.get('essid') and self.get('ip')): - self._state = ResourceState.FAILED + if not (self.get('mode') and self.get('type') and self.get('essid') \ + and self.get('ip')): msg = "Interface's variable are not initialized" self.error(msg) + self.fail() return False if not self.node.get('hostname') : msg = "The channel is connected with an undefined node" self.error(msg) + self.fail() return False # Just for information @@ -213,28 +215,14 @@ class OMFWifiInterface(ResourceManager): super(OMFWifiInterface, self).deploy() return True - def start(self): - """Start the RM. It means nothing special for an interface for now - It becomes STARTED as soon as this method starts. - - """ - - super(OMFWifiInterface, self).start() - - def stop(self): - """Stop the RM. It means nothing special for an interface for now - It becomes STOPPED as soon as this method stops - - """ - super(OMFWifiInterface, self).stop() - def release(self): - """Clean the RM at the end of the experiment and release the API + """ Clean the RM at the end of the experiment and release the API """ if self._omf_api : OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) super(OMFWifiInterface, self).release() diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index 612fa389..1421078d 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -101,14 +101,9 @@ class OMFNode(ResourceManager): self._omf_api = None - @property - def exp_id(self): - if self.ec.exp_id.startswith('exp-'): - return None - return self.ec.exp_id - def valid_connection(self, guid): - """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed. + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. :param guid: Guid of the current RM :type guid: int @@ -117,40 +112,47 @@ class OMFNode(ResourceManager): """ rm = self.ec.get_resource(guid) if rm.rtype() in self._authorized_connections: - msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid) + msg = "Connection between %s %s and %s %s accepted" % ( + self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return True - msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid) + + msg = "Connection between %s %s and %s %s refused" % ( + self.rtype(), self._guid, rm.rtype(), guid) self.debug(msg) + return False def deploy(self): - """Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment - It becomes DEPLOYED after sending messages to enroll the node + """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol + to enroll the node into the experiment. + It becomes DEPLOYED after sending messages to enroll the node """ if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) if not self._omf_api : - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" self.error(msg) + self.fail() return if not self.get('hostname') : - self._state = ResourceState.FAILED msg = "Hostname's value is not initialized" self.error(msg) + self.fail() return False try: self._omf_api.enroll_host(self.get('hostname')) except AttributeError: - self._state = ResourceState.FAILED msg = "Credentials are not initialzed. XMPP Connections impossible" - self.debug(msg) + self.error(msg) + self.fail() #raise AttributeError, msg super(OMFNode, self).deploy() @@ -188,8 +190,10 @@ class OMFNode(ResourceManager): """ if self._omf_api : self._omf_api.release(self.get('hostname')) + OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) + self.get('xmppHost'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.ec.exp_id) super(OMFNode, self).release() diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index b4b01c51..dc03b937 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -48,10 +48,13 @@ class OMFAPI(Logger): .. note:: - This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3 + This class is the implementation of an OMF 5.4 API. + Since the version 5.4.1, the Topic Architecture start with OMF_5.4 + instead of OMF used for OMF5.3 """ - def __init__(self, slice, host, port, password, xmpp_root = None, exp_id = None): + def __init__(self, slice, host, port, password, xmpp_root = None, + exp_id = None): """ :param slice: Xmpp Slice @@ -81,6 +84,7 @@ class OMFAPI(Logger): # OMF xmpp client self._client = None + # message handler self._message = None @@ -133,7 +137,8 @@ class OMFAPI(Logger): """ Publish New Experiment Message """ - address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user) + address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, + self._user) #print address payload = self._message.newexp_function(self._user, address) slice_sid = "/%s/%s" % (self._xmpp_root, self._slice) @@ -160,7 +165,8 @@ class OMFAPI(Logger): :type hostname: str """ - return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname) + return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, + hostname) def _host_resource_id(self, hostname): """ Return the Topic Name as /xmpp_root/slice/resources/hostname @@ -201,7 +207,8 @@ class OMFAPI(Logger): self._client.delete(xmpp_node) def enroll_host(self, hostname): - """ Create and Subscribe to the session topic and the resources corresponding to the hostname + """ Create and Subscribe to the session topic and the resources + corresponding to the hostname :param hostname: Full hrn of the node :type hostname: str @@ -227,7 +234,8 @@ class OMFAPI(Logger): :param hostname: Full hrn of the node :type hostname: str - :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number) + :param attribute: Attribute that need to be configured ( + often written as /net/wX/attribute, with X the interface number) :type attribute: str :param value: Value of the attribute :type value: str @@ -242,7 +250,8 @@ class OMFAPI(Logger): :param hostname: Full hrn of the node :type hostname: str - :param app_id: Application Id (Any id that represents in a unique way the application) + :param app_id: Application Id (Any id that represents in a unique + way the application) :type app_id: str :param arguments: Arguments of the application :type arguments: str @@ -252,7 +261,8 @@ class OMFAPI(Logger): :type env: str """ - payload = self._message.execute_function(hostname, app_id, arguments, path, env) + payload = self._message.execute_function(hostname, app_id, arguments, + path, env) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) @@ -295,11 +305,12 @@ class OMFAPIFactory(object): """ .. note:: - It allows the different RM to use the same xmpp client if they use the same credentials. - For the moment, it is focused on Xmpp. + It allows the different RM to use the same xmpp client if they use + the same credentials. For the moment, it is focused on XMPP. """ - # use lock to avoid concurrent access to the Api list at the same times by 2 different threads + # use lock to avoid concurrent access to the Api list at the same times by 2 + # different threads lock = threading.Lock() _apis = dict() diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index f85c3d50..f71bc5f4 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -23,8 +23,18 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat -import threading + import subprocess +import threading + +# A.Q. GENERAL COMMENTS: This module needs major cleaning up +# - Lines should be 80 characters +# - Most methods have too many lines and there are no comments or spaces +# - There should be only two line breaks between two methods +# - Code is too compressed. Hard to read. Add spaces when needed +# - In general the code needs to be more subdivided. Use more methods +# with clear names to divide operations (even if you don't reuse the +# methods else where, this will make the code more readable) @clsinit_copy class PlanetlabNode(LinuxNode): @@ -43,6 +53,7 @@ class PlanetlabNode(LinuxNode): """ return cls._blacklist + ### A.Q. COMMENT: Why did you wrapped the locks inside methods ? @classmethod def in_provision(cls): """ Returns the nodes that anohter RM is trying to provision @@ -221,6 +232,8 @@ class PlanetlabNode(LinuxNode): return self._plapi def discoverl(self): + #### A.Q. COMMENT: no need to have methods for the locks and + ## other attributes. Please remove. bl = PlanetlabNode.blacklist() inpro = PlanetlabNode.in_provision() lockbl = PlanetlabNode.lock_bl() @@ -231,6 +244,12 @@ class PlanetlabNode(LinuxNode): if node_id not in bl and node_id not in inpro: try_other = self.do_ping(node_id) if try_other: + # A.Q. COMMENT: Here you could do + # + # with self._lockbl: + # ... + # + # Class attributes can still be accesed with 'self' lockbl.acquire() bl.append(node_id) lockbl.release() @@ -306,6 +325,7 @@ class PlanetlabNode(LinuxNode): def provisionl(self): + # A.Q. COMMENT: you can import time on the top import time bl = PlanetlabNode.blacklist() lockbl = PlanetlabNode.lock_bl() @@ -325,6 +345,9 @@ class PlanetlabNode(LinuxNode): t = 0 while t < timeout and not ssh_ok: # check ssh connection + + # A.Q. COMMENT IMPORTANT! Instead of issuing SSH commands directly use the + # "execute" method inherithed from LinuxNode with blocking = True command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip) p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = p.communicate() @@ -341,6 +364,9 @@ class PlanetlabNode(LinuxNode): with lockbl: bl.append(node) print bl + # A.Q. COMMENT: Make method "delete_slice_node" and there + # put this code. Repeat this for all calls to plapi. + # This will make the code cleaner. self.plapi.delete_slice_node(slicename, [node]) self.discover() continue @@ -351,6 +377,8 @@ class PlanetlabNode(LinuxNode): p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = p.communicate() if stdout.find("/proc type proc") < 0: + # A.Q. COMMENT: lines 382-384 should go to a method + # "blacklist_node()" lockbl.acquire() bl.append(node) lockbl.release() @@ -370,7 +398,6 @@ class PlanetlabNode(LinuxNode): # call provision de linux node? super(PlanetlabNode, self).provision() - def filter_based_on_attributes(self): # Map attributes with tagnames of PL timeframe = self.get("timeframe")[0] @@ -485,6 +512,8 @@ class PlanetlabNode(LinuxNode): return nodes_inslice def do_ping(self, node_id): + # A.Q. COMMENT: the execfuncs module in utils will do the local ping for you + # code reuse is good... ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip']) ip = ip[0]['ip'] result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE) @@ -493,7 +522,7 @@ class PlanetlabNode(LinuxNode): elif result == 1 or result == 2: return True - + # A.Q. Unclear name for method "fail2" def fail2(self): self.fail() msg = "Discovery failed. No candidates found for node" diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index 01e7b144..411eb514 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -18,7 +18,7 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode @@ -166,35 +166,31 @@ class PlanetlabTap(LinuxApplication): raise self.debug("----- READY ---- ") - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_ready() def start(self): - if self._state == ResourceState.READY: + if self.state == ResourceState.READY: command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = tnow() - self._state = ResourceState.STARTED + self.set_started() else: msg = " Failed to execute command '%s'" % command self.error(msg, out, err) - self._state = ResourceState.FAILED + self.fail() raise RuntimeError, msg def stop(self): command = self.get('command') or '' - state = self.state - if state == ResourceState.STARTED: + if self.state == ResourceState.STARTED: self.info("Stopping command '%s'" % command) command = "bash %s" % os.path.join(self.app_home, "stop.sh") (out, err), proc = self.execute_command(command, blocking = True) - self._stop_time = tnow() - self._state = ResourceState.STOPPED + self.set_stopped() @property def state(self): @@ -208,6 +204,7 @@ class PlanetlabTap(LinuxApplication): if out.strip().find(self.get("deviceName")) == -1: # tap is not running is not running (socket not found) + self._finish_time = tnow() self._state = ResourceState.FINISHED self._last_state_check = tnow() -- 2.43.0