class Attribute(object):
"""
.. class:: Class Args :
+
+ An Attribute reflects a configuration parameter for
+ a particular resource. Attributes might be read only or
+ not.
:param name: Name of the attribute
:type name: str
- :param help: Help about the attribute
+
+ :param help: Attribute description
:type help: str
- :param type: type of the attribute
+
+ :param type: The type expected for the attribute value.
+ Should be one of Attribute.Types .
:type type: str
- :param flags: Help about the attribute
- :type flags: str
+
+ :param flags: Defines attribute behavior (i.e. whether it is read-only,
+ read and write, etc). This parameter should take its values from
+ Attribute.Flags. Flags values can be bitwised.
+ :type flags: hex
+
:param default: Default value of the attribute
- :type default: str
- :param allowed: Allowed value for this attribute
- :type allowed: str
- :param range: Range of the attribute
- :type range: str
- :param set_hook: hook that is related with this attribute
- :type set_hook: str
+ :type default: depends on the type of attribute
+
+ :param allowed: List of values that the attribute can take.
+ This parameter is only meaningful for Enumerate type attributes.
+ :type allowed: list
+
+ :param range: (max, min) tuple with range of possible values for
+ attributes.
+ This parameter is only meaningful for Integer or Double type
+ attributes.
+ :type range: (int, int) or (float, float)
+
+ :param set_hook: Function that will be executed when ever a new
+ value is set for the attribute.
+ :type set_hook: function
"""
def __init__(self, name, help, type = Types.String,
"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
""" Stop a specific RM defined by its 'guid'
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- #random.shuffle(group)
+ random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
-
def release(self, group = None):
""" Release the elements of the list 'group' or
all the resources if any group is specified
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
self._notify()
def _process(self):
""" Process scheduled tasks.
+ .. note::
+
The _process method is executed in an independent thread held by the
ExperimentController for as long as the experiment is running.
try:
while not self.finished:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
now = strfnow()
if now < task.timestamp:
- # Calculate time difference in seconds
+ # Calculate timeout in seconds
timeout = strfdiff(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
+
+ self._cond.release()
+
+ if task:
+ # Process tasks in parallel
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
finally:
- self._logger.info("Exiting the task processing loop ... ")
+ self.logger.debug("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
""" Executes a single task.
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
:param task: Object containing the callback to execute
:type task: Task
+ .. note::
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
"""
# Invoke callback
task.status = TaskStatus.DONE
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
+ self.logger.error("Error occurred while executing task: %s" % err)
# Set the EC to FAILED state (this will force to exit the task
# processing thread)
"""
if task.id == None:
task.id = self._idgen.next()
+
entry = (task.timestamp, task.id, task)
self._valid.add(task.id)
heapq.heappush(self._queue, entry)
@property
def in_foreground(self):
- """ Returns True is the command needs to be executed in foreground.
+ """ Returns True if the command needs to be executed in foreground.
This means that command will be executed using 'execute' instead of
- 'run'.
+ 'run' ('run' executes a command in background and detached from the
+ terminal)
When using X11 forwarding option, the command can not run in background
- and detached from a terminal in the remote host, since we need to keep
- the SSH connection to receive graphical data
+ and detached from a terminal, since we need to keep the terminal attached
+ to interact with it.
"""
return self.get("forwardX11") or False
self.info("Starting command '%s'" % command)
if self.in_foreground:
- # If command should be ran in foreground, we invoke
- # the node 'execute' method
+ # If command should run in foreground, we invoke 'execute' method
+ # of the node
if not command:
msg = "No command is defined but X11 forwarding has been set"
self.error(msg)
raise RuntimeError, msg
# Export environment
- environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
- if env else ""
+ environ = self.node.format_environment(env, inline = True)
command = environ + command
command = self.replace_paths(command)
x11 = self.get("forwardX11")
# We save the reference to the process in self._proc
- # to be able to kill the process from the stop method
+ # to be able to kill the process from the stop method.
+ # We also set blocking = False, since we don't want the
+ # thread to block until the execution finishes.
(out, err), self._proc = self.node.execute(command,
sudo = sudo,
stdin = stdin,
super(LinuxApplication, self).start()
elif command:
- # If command is set (i.e. application not used only for dependency
- # installation), and it does not need to run in foreground, we use
- # the 'run' method of the node to launch the application as a daemon
+ # If command is set (i.e. application is not used only for dependency
+ # installation), and it does not need to run in foreground, then we
+ # invoke the 'run' method of the node to launch the application as a
+ # daemon in background
# The real command to execute was previously uploaded to a remote bash
- # script during deployment, now run the remote script using 'run' method
- # from the node
+ # script during deployment, now launch the remote script using 'run'
+ # method from the node
cmd = "bash ./app.sh"
(out, err), proc = self.node.run(cmd, self.app_home,
stdin = stdin,
@property
def state(self):
+ """ Returns the state of the application
+ """
if self._state == ResourceState.STARTED:
if self.in_foreground:
+ # Check if the process we used to execute the command
+ # is still running ...
retcode = self._proc.poll()
# retcode == None -> running
# retcode == 0 -> finished
if retcode:
out = ""
+ msg = " Failed to execute command '%s'" % self.get("command")
err = self._proc.stderr.read()
- self._state = ResourceState.FAILED
self.error(msg, out, err)
+ self._state = ResourceState.FAILED
elif retcode == 0:
self._state = ResourceState.FINISHED
else:
- # To avoid overwhelming the remote hosts and the local processor
- # with too many ssh queries, the state is only requested
- # every 'state_check_delay' seconds.
+ # We need to query the status of the command we launched in
+ # background. In oredr to avoid overwhelming the remote host and
+ # the local processor with too many ssh queries, the state is only
+ # requested every 'state_check_delay' seconds.
state_check_delay = 0.5
if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
(out, err), proc = self.node.check_errors(self.app_home)
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
-
+ if err:
msg = " Failed to execute command '%s'" % self.get("command")
self.error(msg, out, err)
self._state = ResourceState.FAILED
elif self.pid and self.ppid:
+ # No execution errors occurred. Make sure the background
+ # process with the recorded pid is still running.
status = self.node.status(self.pid, self.ppid)
if status == ProcStatus.FINISHED:
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import strfnow, strfdiff
import os
@clsinit_copy
super(LinuxCCND, self).deploy()
def start(self):
- command = self.get("command")
- env = self.get("env")
- stdin = "stdin" if self.get("stdin") else None
- stdout = "stdout" if self.get("stdout") else "stdout"
- stderr = "stderr" if self.get("stderr") else "stderr"
- sudo = self.get("sudo") or False
- x11 = self.get("forwardX11") or False
- failed = False
-
- if not command:
- # If no command was given, then the application
- # is directly marked as FINISHED
- self._state = ResourceState.FINISHED
-
- self.info("Starting command '%s'" % command)
-
- if x11:
- # If X11 forwarding was specified, then the application
- # can not run detached, so instead of invoking asynchronous
- # 'run' we invoke synchronous 'execute'.
- if not command:
- msg = "No command is defined but X11 forwarding has been set"
- self.error(msg)
- self._state = ResourceState.FAILED
- raise RuntimeError, msg
-
- # Export environment
- environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
- if env else ""
-
- command = environ + command
- command = self.replace_paths(command)
-
- # Mark application as started before executing the command
- # since after the thread will be blocked by the execution
- # until it finished
- super(LinuxApplication, self).start()
-
- # If the command requires X11 forwarding, we
- # can't run it asynchronously
- (out, err), proc = self.node.execute(command,
- sudo = sudo,
- stdin = stdin,
- forward_x11 = x11)
-
- self._state = ResourceState.FINISHED
-
- if proc.poll() and err:
- failed = True
- else:
- # Command was previously uploaded, now run the remote
- # bash file asynchronously
- cmd = "bash ./app.sh"
- (out, err), proc = self.node.run(cmd, self.app_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
-
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
-
- if proc.poll() and err:
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- # Check status of process running in background
- pid, ppid = self.node.wait_pid(self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
-
- # If the process is not running, check for error information
- # on the remote machine
- if not self.pid or not self.ppid:
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
- self.error(msg, out, err)
-
- msg2 = " Setting state to Failed"
- self.debug(msg2)
- self._state = ResourceState.FAILED
-
- raise RuntimeError, msg
-
- super(LinuxApplication, self).start()
+ super(LinuxCCND, self).start()
def stop(self):
command = self.get('command') or ''
@property
def state(self):
if self._state == ResourceState.STARTED:
- # To avoid overwhelming the remote hosts and the local processor
- # with too many ssh queries, the state is only requested
- # every 'state_check_delay' seconds.
+ # we executed the ccndstart command. This should have started
+ # a remote ccnd daemon. The way we can query wheather ccnd is
+ # still running is by executing the ccndstatus command.
state_check_delay = 0.5
if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.app_home)
+ env = self.get('env') or ""
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + "; ccndstatus"
+ command = self.replace_paths(command)
+
+ (out, err), proc = self.node.execute(command)
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
+ retcode = proc.poll()
- msg = " Failed to execute command '%s'" % self.get("command")
+ if retcode == 1 and err.find("No such file or directory") > -1:
+ # ccnd is not running (socket not found)
+ self._state = ResourceState.FINISHED
+ elif retcode:
+ # other error
+ msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
self._state = ResourceState.FAILED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
-
- if status == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
-
-
self._last_state_check = strfnow()
return self._state
return (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+ " echo 'sources found, nothing to do' "
" ) || ( "
# If not, untar and build
" ( "
return (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+ " echo 'sources found, nothing to do' "
" ) || ( "
+ # If not, install
" mkdir -p ${EXP_HOME}/ccnx/bin && "
" cp -r ${SOURCES}/ccnx ${EXP_HOME}"
" )"
(out, err), proc = self.check_errors(home, ecodefile, stderr)
# Out is what was written in the stderr file
- if out or err:
+ if err:
msg = " Failed to run command '%s' " % command
self.error(msg, out, err)
}
# Export environment
- environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
- if env else ""
+ environ = self.format_environment(env)
# Add environ to command
command = environ + command
dst = os.path.join(home, shfile)
return self.upload(command, dst, text = True)
+ def format_environment(self, env, inline = False):
+ """Format environmental variables for command to be executed either
+ as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
+ as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
+ """
+ sep = " " if inline else "\n"
+ export = " " if inline else "export"
+ return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \
+ + sep if env else ""
+
def check_errors(self, home,
ecodefile = "exitcode",
stderr = "stderr"):
Checks whether errors occurred while running a command.
It first checks the exit code for the command, and only if the
exit code is an error one it returns the error output.
+
"""
out = err = ""
proc = None
- # get Exit code
+ # get exit code saved in the 'exitcode' file
ecode = self.exitcode(home, ecodefile)
if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
# The process returned an error code or didn't exist.
# Check standard error.
(out, err), proc = self.check_output(home, stderr)
-
- # If the stderr file was not found, assume nothing happened.
- # We just ignore the error.
+
+ # If the stderr file was not found, assume nothing bad happened,
+ # and just ignore the error.
# (cat returns 1 for error "No such file or directory")
if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
out = err = ""
+ else:
+ # The actual error (read from the stderr file) is in 'out'.
+ # We swap the variables to avoid confusion. It is more
+ # intuitive to find the 'error' in err variable.
+ err = out
+ out = ""
return (out, err), proc
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Martin Ferrari <martin.ferrari@inria.fr>
+
+
import ctypes
import imp
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+# Author: Claudio Freire <claudio-daniel.freire@inria.fr>
+#
import threading
import Queue
# get the pid of the process
ecode = node.exitcode(app_home)
+
# bash erro 127 - command not found
self.assertEquals(ecode, 127)
(out, err), proc = node.check_errors(app_home)
- self.assertNotEquals(out, "")
+
+ self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found")
@skipIfNotAlive
def t_install(self, host, user):