From: Alina Quereilhac Date: Tue, 18 Jun 2013 21:10:06 +0000 (-0700) Subject: Fixed nasty concurrency bug in EC X-Git-Tag: nepi-3.0.0~100 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4c5d308e0d13c0dc4b54556f149bc2a9cd585592;p=nepi.git Fixed nasty concurrency bug in EC --- diff --git a/src/nepi/execution/attribute.py b/src/nepi/execution/attribute.py index c6b973bc..e9f4c54a 100644 --- a/src/nepi/execution/attribute.py +++ b/src/nepi/execution/attribute.py @@ -44,23 +44,42 @@ class Flags: class Attribute(object): """ .. class:: Class Args : + + An Attribute reflects a configuration parameter for + a particular resource. Attributes might be read only or + not. :param name: Name of the attribute :type name: str - :param help: Help about the attribute + + :param help: Attribute description :type help: str - :param type: type of the attribute + + :param type: The type expected for the attribute value. + Should be one of Attribute.Types . :type type: str - :param flags: Help about the attribute - :type flags: str + + :param flags: Defines attribute behavior (i.e. whether it is read-only, + read and write, etc). This parameter should take its values from + Attribute.Flags. Flags values can be bitwised. + :type flags: hex + :param default: Default value of the attribute - :type default: str - :param allowed: Allowed value for this attribute - :type allowed: str - :param range: Range of the attribute - :type range: str - :param set_hook: hook that is related with this attribute - :type set_hook: str + :type default: depends on the type of attribute + + :param allowed: List of values that the attribute can take. + This parameter is only meaningful for Enumerate type attributes. + :type allowed: list + + :param range: (max, min) tuple with range of possible values for + attributes. + This parameter is only meaningful for Integer or Double type + attributes. + :type range: (int, int) or (float, float) + + :param set_hook: Function that will be executed when ever a new + value is set for the attribute. + :type set_hook: function """ def __init__(self, name, help, type = Types.String, diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 262aecf6..b5663c0f 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -370,10 +370,12 @@ class ExperimentController(object): """ rm = self.get_resource(guid) + state = rm.state + if hr: - return ResourceState2str.get(rm.state) + return ResourceState2str.get(state) - return rm.state + return state def stop(self, guid): """ Stop a specific RM defined by its 'guid' @@ -487,7 +489,7 @@ class ExperimentController(object): # same conditions (e.g. LinuxApplications running on a same # node share a single lock, so they will tend to be serialized). # If we disorder the group list, this problem can be mitigated. - #random.shuffle(group) + random.shuffle(group) def wait_all_and_start(group): reschedule = False @@ -525,7 +527,6 @@ class ExperimentController(object): # schedule a stop. Otherwise the RM will stop immediately self.schedule("2s", rm.stop_with_conditions) - def release(self, group = None): """ Release the elements of the list 'group' or all the resources if any group is specified @@ -592,7 +593,7 @@ class ExperimentController(object): if track: self._tasks[task.id] = task - + # Notify condition to wake up the processing thread self._notify() @@ -601,6 +602,8 @@ class ExperimentController(object): def _process(self): """ Process scheduled tasks. + .. note:: + The _process method is executed in an independent thread held by the ExperimentController for as long as the experiment is running. @@ -641,51 +644,55 @@ class ExperimentController(object): try: while not self.finished: self._cond.acquire() + task = self._scheduler.next() - self._cond.release() if not task: - # It there are not tasks in the tasks queue we need to - # wait until a call to schedule wakes us up - self._cond.acquire() + # No task to execute. Wait for a new task to be scheduled. self._cond.wait() - self._cond.release() - else: - # If the task timestamp is in the future the thread needs to wait - # until time elapse or until another task is scheduled + else: + # The task timestamp is in the future. Wait for timeout + # or until another task is scheduled. now = strfnow() if now < task.timestamp: - # Calculate time difference in seconds + # Calculate timeout in seconds timeout = strfdiff(task.timestamp, now) + # Re-schedule task with the same timestamp self._scheduler.schedule(task) - # Sleep until timeout or until a new task awakes the condition - self._cond.acquire() + + task = None + + # Wait timeout or until a new task awakes the condition self._cond.wait(timeout) - self._cond.release() - else: - # Process tasks in parallel - runner.put(self._execute, task) + + self._cond.release() + + if task: + # Process tasks in parallel + runner.put(self._execute, task) except: import traceback err = traceback.format_exc() - self._logger.error("Error while processing tasks in the EC: %s" % err) + self.logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED finally: - self._logger.info("Exiting the task processing loop ... ") + self.logger.debug("Exiting the task processing loop ... ") runner.sync() def _execute(self, task): """ Executes a single task. - If the invokation of the task callback raises an - exception, the processing thread of the ExperimentController - will be stopped and the experiment will be aborted. - :param task: Object containing the callback to execute :type task: Task + .. note:: + + If the invokation of the task callback raises an + exception, the processing thread of the ExperimentController + will be stopped and the experiment will be aborted. + """ # Invoke callback task.status = TaskStatus.DONE @@ -698,7 +705,7 @@ class ExperimentController(object): task.result = err task.status = TaskStatus.ERROR - self._logger.error("Error occurred while executing task: %s" % err) + self.logger.error("Error occurred while executing task: %s" % err) # Set the EC to FAILED state (this will force to exit the task # processing thread) diff --git a/src/nepi/execution/scheduler.py b/src/nepi/execution/scheduler.py index 8d70fbb7..53a7530c 100644 --- a/src/nepi/execution/scheduler.py +++ b/src/nepi/execution/scheduler.py @@ -62,6 +62,7 @@ class HeapScheduler(object): """ if task.id == None: task.id = self._idgen.next() + entry = (task.timestamp, task.id, task) self._valid.add(task.id) heapq.heappush(self._queue, entry) diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 57c5304b..13a8a9b9 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -151,13 +151,14 @@ class LinuxApplication(ResourceManager): @property def in_foreground(self): - """ Returns True is the command needs to be executed in foreground. + """ Returns True if the command needs to be executed in foreground. This means that command will be executed using 'execute' instead of - 'run'. + 'run' ('run' executes a command in background and detached from the + terminal) When using X11 forwarding option, the command can not run in background - and detached from a terminal in the remote host, since we need to keep - the SSH connection to receive graphical data + and detached from a terminal, since we need to keep the terminal attached + to interact with it. """ return self.get("forwardX11") or False @@ -392,8 +393,8 @@ class LinuxApplication(ResourceManager): self.info("Starting command '%s'" % command) if self.in_foreground: - # If command should be ran in foreground, we invoke - # the node 'execute' method + # If command should run in foreground, we invoke 'execute' method + # of the node if not command: msg = "No command is defined but X11 forwarding has been set" self.error(msg) @@ -401,8 +402,7 @@ class LinuxApplication(ResourceManager): raise RuntimeError, msg # Export environment - environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\ - if env else "" + environ = self.node.format_environment(env, inline = True) command = environ + command command = self.replace_paths(command) @@ -410,7 +410,9 @@ class LinuxApplication(ResourceManager): x11 = self.get("forwardX11") # We save the reference to the process in self._proc - # to be able to kill the process from the stop method + # to be able to kill the process from the stop method. + # We also set blocking = False, since we don't want the + # thread to block until the execution finishes. (out, err), self._proc = self.node.execute(command, sudo = sudo, stdin = stdin, @@ -427,13 +429,14 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).start() elif command: - # If command is set (i.e. application not used only for dependency - # installation), and it does not need to run in foreground, we use - # the 'run' method of the node to launch the application as a daemon + # If command is set (i.e. application is not used only for dependency + # installation), and it does not need to run in foreground, then we + # invoke the 'run' method of the node to launch the application as a + # daemon in background # The real command to execute was previously uploaded to a remote bash - # script during deployment, now run the remote script using 'run' method - # from the node + # script during deployment, now launch the remote script using 'run' + # method from the node cmd = "bash ./app.sh" (out, err), proc = self.node.run(cmd, self.app_home, stdin = stdin, @@ -516,8 +519,12 @@ class LinuxApplication(ResourceManager): @property def state(self): + """ Returns the state of the application + """ if self._state == ResourceState.STARTED: if self.in_foreground: + # Check if the process we used to execute the command + # is still running ... retcode = self._proc.poll() # retcode == None -> running @@ -525,32 +532,31 @@ class LinuxApplication(ResourceManager): # retcode == 0 -> finished if retcode: out = "" + msg = " Failed to execute command '%s'" % self.get("command") err = self._proc.stderr.read() - self._state = ResourceState.FAILED self.error(msg, out, err) + self._state = ResourceState.FAILED elif retcode == 0: self._state = ResourceState.FINISHED else: - # To avoid overwhelming the remote hosts and the local processor - # with too many ssh queries, the state is only requested - # every 'state_check_delay' seconds. + # We need to query the status of the command we launched in + # background. In oredr to avoid overwhelming the remote host and + # the local processor with too many ssh queries, the state is only + # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if strfdiff(strfnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred (out, err), proc = self.node.check_errors(self.app_home) - if out or err: - if err.find("No such file or directory") >= 0 : - # The resource is marked as started, but the - # command was not yet executed - return ResourceState.READY - + if err: msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED elif self.pid and self.ppid: + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: diff --git a/src/nepi/resources/linux/ccnd.py b/src/nepi/resources/linux/ccnd.py index 655bc62f..44b5f164 100644 --- a/src/nepi/resources/linux/ccnd.py +++ b/src/nepi/resources/linux/ccnd.py @@ -23,6 +23,8 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import strfnow, strfdiff import os @clsinit_copy @@ -185,90 +187,7 @@ class LinuxCCND(LinuxApplication): super(LinuxCCND, self).deploy() def start(self): - command = self.get("command") - env = self.get("env") - stdin = "stdin" if self.get("stdin") else None - stdout = "stdout" if self.get("stdout") else "stdout" - stderr = "stderr" if self.get("stderr") else "stderr" - sudo = self.get("sudo") or False - x11 = self.get("forwardX11") or False - failed = False - - if not command: - # If no command was given, then the application - # is directly marked as FINISHED - self._state = ResourceState.FINISHED - - self.info("Starting command '%s'" % command) - - if x11: - # If X11 forwarding was specified, then the application - # can not run detached, so instead of invoking asynchronous - # 'run' we invoke synchronous 'execute'. - if not command: - msg = "No command is defined but X11 forwarding has been set" - self.error(msg) - self._state = ResourceState.FAILED - raise RuntimeError, msg - - # Export environment - environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\ - if env else "" - - command = environ + command - command = self.replace_paths(command) - - # Mark application as started before executing the command - # since after the thread will be blocked by the execution - # until it finished - super(LinuxApplication, self).start() - - # If the command requires X11 forwarding, we - # can't run it asynchronously - (out, err), proc = self.node.execute(command, - sudo = sudo, - stdin = stdin, - forward_x11 = x11) - - self._state = ResourceState.FINISHED - - if proc.poll() and err: - failed = True - else: - # Command was previously uploaded, now run the remote - # bash file asynchronously - cmd = "bash ./app.sh" - (out, err), proc = self.node.run(cmd, self.app_home, - stdin = stdin, - stdout = stdout, - stderr = stderr, - sudo = sudo) - - # check if execution errors occurred - msg = " Failed to start command '%s' " % command - - if proc.poll() and err: - self.error(msg, out, err) - raise RuntimeError, msg - - # Check status of process running in background - pid, ppid = self.node.wait_pid(self.app_home) - if pid: self._pid = int(pid) - if ppid: self._ppid = int(ppid) - - # If the process is not running, check for error information - # on the remote machine - if not self.pid or not self.ppid: - (out, err), proc = self.node.check_output(self.app_home, 'stderr') - self.error(msg, out, err) - - msg2 = " Setting state to Failed" - self.debug(msg2) - self._state = ResourceState.FAILED - - raise RuntimeError, msg - - super(LinuxApplication, self).start() + super(LinuxCCND, self).start() def stop(self): command = self.get('command') or '' @@ -299,31 +218,29 @@ class LinuxCCND(LinuxApplication): @property def state(self): if self._state == ResourceState.STARTED: - # To avoid overwhelming the remote hosts and the local processor - # with too many ssh queries, the state is only requested - # every 'state_check_delay' seconds. + # we executed the ccndstart command. This should have started + # a remote ccnd daemon. The way we can query wheather ccnd is + # still running is by executing the ccndstatus command. state_check_delay = 0.5 if strfdiff(strfnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out, err), proc = self.node.check_errors(self.app_home) + env = self.get('env') or "" + environ = self.node.format_environment(env, inline = True) + command = environ + "; ccndstatus" + command = self.replace_paths(command) + + (out, err), proc = self.node.execute(command) - if out or err: - if err.find("No such file or directory") >= 0 : - # The resource is marked as started, but the - # command was not yet executed - return ResourceState.READY + retcode = proc.poll() - msg = " Failed to execute command '%s'" % self.get("command") + if retcode == 1 and err.find("No such file or directory") > -1: + # ccnd is not running (socket not found) + self._state = ResourceState.FINISHED + elif retcode: + # other error + msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self._state = ResourceState.FAILED - elif self.pid and self.ppid: - status = self.node.status(self.pid, self.ppid) - - if status == ProcStatus.FINISHED: - self._state = ResourceState.FINISHED - - self._last_state_check = strfnow() return self._state @@ -356,7 +273,8 @@ class LinuxCCND(LinuxApplication): return ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd" + " test -f ${EXP_HOME}/ccnx/bin/ccnd && " + " echo 'sources found, nothing to do' " " ) || ( " # If not, untar and build " ( " @@ -373,8 +291,10 @@ class LinuxCCND(LinuxApplication): return ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd" + " test -f ${EXP_HOME}/ccnx/bin/ccnd && " + " echo 'sources found, nothing to do' " " ) || ( " + # If not, install " mkdir -p ${EXP_HOME}/ccnx/bin && " " cp -r ${SOURCES}/ccnx ${EXP_HOME}" " )" diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 7ac2d12c..c2c825e8 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -388,7 +388,7 @@ class LinuxNode(ResourceManager): (out, err), proc = self.check_errors(home, ecodefile, stderr) # Out is what was written in the stderr file - if out or err: + if err: msg = " Failed to run command '%s' " % command self.error(msg, out, err) @@ -434,8 +434,7 @@ class LinuxNode(ResourceManager): } # Export environment - environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \ - if env else "" + environ = self.format_environment(env) # Add environ to command command = environ + command @@ -443,6 +442,16 @@ class LinuxNode(ResourceManager): dst = os.path.join(home, shfile) return self.upload(command, dst, text = True) + def format_environment(self, env, inline = False): + """Format environmental variables for command to be executed either + as an inline command (i.e. PYTHONPATH=src/.. python script.py) or + as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) + """ + sep = " " if inline else "\n" + export = " " if inline else "export" + return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \ + + sep if env else "" + def check_errors(self, home, ecodefile = "exitcode", stderr = "stderr"): @@ -450,11 +459,12 @@ class LinuxNode(ResourceManager): Checks whether errors occurred while running a command. It first checks the exit code for the command, and only if the exit code is an error one it returns the error output. + """ out = err = "" proc = None - # get Exit code + # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: @@ -463,12 +473,18 @@ class LinuxNode(ResourceManager): # The process returned an error code or didn't exist. # Check standard error. (out, err), proc = self.check_output(home, stderr) - - # If the stderr file was not found, assume nothing happened. - # We just ignore the error. + + # If the stderr file was not found, assume nothing bad happened, + # and just ignore the error. # (cat returns 1 for error "No such file or directory") if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: out = err = "" + else: + # The actual error (read from the stderr file) is in 'out'. + # We swap the variables to avoid confusion. It is more + # intuitive to find the 'error' in err variable. + err = out + out = "" return (out, err), proc diff --git a/src/nepi/util/environ.py b/src/nepi/util/environ.py index 218370ce..b4035468 100644 --- a/src/nepi/util/environ.py +++ b/src/nepi/util/environ.py @@ -15,6 +15,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # +# Author: Alina Quereilhac +# Martin Ferrari + + import ctypes import imp diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index e354b429..fffdea5f 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -15,6 +15,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # +# Author: Claudio Freire +# import threading import Queue diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index 259f5783..0c3f86c1 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -153,11 +153,13 @@ class LinuxNodeTestCase(unittest.TestCase): # get the pid of the process ecode = node.exitcode(app_home) + # bash erro 127 - command not found self.assertEquals(ecode, 127) (out, err), proc = node.check_errors(app_home) - self.assertNotEquals(out, "") + + self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found") @skipIfNotAlive def t_install(self, host, user):