def wait_all_and_start(group):
reschedule = False
for guid in group:
- rm = self.get_resource(guid)
- if rm.state < ResourceState.READY:
+ if self.state(guid) < ResourceState.READY:
reschedule = True
break
self._ppid = None
self._home = "app-%s" % self.guid
+ # keep a reference to the running process handler when
+ # the command is not executed as remote daemon in background
+ self._proc = None
+
# timestamp of last state check of the application
self._last_state_check = strfnow()
def ppid(self):
return self._ppid
+ @property
+ def in_foreground(self):
+ """ Returns True is the command needs to be executed in foreground.
+ This means that command will be executed using 'execute' instead of
+ 'run'.
+
+ When using X11 forwarding option, the command can not run in background
+ and detached from a terminal in the remote host, since we need to keep
+ the SSH connection to receive graphical data
+ """
+ return self.get("forwardX11") or False
+
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
# Install
self.install()
- # Upload command
+ # Upload command to remote bash script
+ # - only if command can be executed in background and detached
command = self.get("command")
- x11 = self.get("forwardX11")
- env = self.get("env")
-
- if command and not x11:
+
+ if command and not self.in_foreground:
self.info("Uploading command '%s'" % command)
# replace application specific paths in the command
command = self.replace_paths(command)
+
+ # replace application specific paths in the environment
+ env = self.get("env")
env = env and self.replace_paths(env)
self.node.upload_command(command, self.app_home,
shfile = "app.sh",
env = env)
+ self.info("Provisioning finished")
+
super(LinuxApplication, self).provision()
def upload_sources(self):
- # TODO: check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
- self.info(" Uploading sources ")
+ self.info("Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
# replace application specific paths in the command
command = self.replace_paths(command)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "http_sources.sh",
pidfile = "http_sources_pidfile",
# create dir for sources
self.node.mkdir(self.src_dir)
- self.info(" Uploading code ")
+ self.info("Uploading code ")
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
def install_dependencies(self):
depends = self.get("depends")
if depends:
- self.info(" Installing dependencies %s" % depends)
+ self.info("Installing dependencies %s" % depends)
self.node.install_packages(depends, self.app_home)
def build(self):
build = self.get("build")
if build:
- self.info(" Building sources ")
+ self.info("Building sources ")
# create dir for build
self.node.mkdir(self.build_dir)
# replace application specific paths in the command
command = self.replace_paths(build)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "build.sh",
pidfile = "build_pidfile",
def install(self):
install = self.get("install")
if install:
- self.info(" Installing sources ")
+ self.info("Installing sources ")
# replace application specific paths in the command
command = self.replace_paths(install)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "install.sh",
pidfile = "install_pidfile",
super(LinuxApplication, self).deploy()
def start(self):
- command = self.get('command')
- env = self.get('env')
- stdin = 'stdin' if self.get('stdin') else None
- stdout = 'stdout' if self.get('stdout') else 'stdout'
- stderr = 'stderr' if self.get('stderr') else 'stderr'
- sudo = self.get('sudo') or False
- x11 = self.get('forwardX11') or False
+ command = self.get("command")
+ env = self.get("env")
+ stdin = "stdin" if self.get("stdin") else None
+ stdout = "stdout" if self.get("stdout") else "stdout"
+ stderr = "stderr" if self.get("stderr") else "stderr"
+ sudo = self.get("sudo") or False
failed = False
- if not command:
- # If no command was given, then the application
- # is directly marked as FINISHED
- self._state = ResourceState.FINISHED
- else:
- super(LinuxApplication, self).start()
-
self.info("Starting command '%s'" % command)
- if x11:
- # If X11 forwarding was specified, then the application
- # can not run detached, so instead of invoking asynchronous
- # 'run' we invoke synchronous 'execute'.
+ if self.in_foreground:
+ # If command should be ran in foreground, we invoke
+ # the node 'execute' method
if not command:
msg = "No command is defined but X11 forwarding has been set"
self.error(msg)
self._state = ResourceState.FAILED
raise RuntimeError, msg
- if env:
- # Export environment
- environ = ""
- for var in env.split(" "):
- environ += ' %s ' % var
+ # Export environment
+ environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+ if env else ""
- command = "{" + environ + " ; " + command + " ; }"
- command = self.replace_paths(command)
+ command = environ + command
+ command = self.replace_paths(command)
+
+ x11 = self.get("forwardX11")
- # If the command requires X11 forwarding, we
- # can't run it asynchronously
- (out, err), proc = self.node.execute(command,
+ # We save the reference to the process in self._proc
+ # to be able to kill the process from the stop method
+ (out, err), self._proc = self.node.execute(command,
sudo = sudo,
stdin = stdin,
- forward_x11 = x11)
+ forward_x11 = x11,
+ blocking = False)
- self._state = ResourceState.FINISHED
+ if self._proc.poll():
+ out = ""
+ err = self._proc.stderr.read()
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ super(LinuxApplication, self).start()
- if proc.poll() and err:
- failed = True
- else:
- # Command was previously uploaded, now run the remote
- # bash file asynchronously
+ elif command:
+ # If command is set (i.e. application not used only for dependency
+ # installation), and it does not need to run in foreground, we use
+ # the 'run' method of the node to launch the application as a daemon
+
+ # The real command to execute was previously uploaded to a remote bash
+ # script during deployment, now run the remote script using 'run' method
+ # from the node
cmd = "bash ./app.sh"
(out, err), proc = self.node.run(cmd, self.app_home,
stdin = stdin,
self._state = ResourceState.FAILED
raise RuntimeError, msg
+
+ super(LinuxApplication, self).start()
+ else:
+ # If no command was given (i.e. Application was used for dependency
+ # installation), then the application is directly marked as FINISHED
+ self._state = ResourceState.FINISHED
+
def stop(self):
+ """ Stops application execution
+ """
command = self.get('command') or ''
state = self.state
-
+
if state == ResourceState.STARTED:
- self.info("Stopping command '%s'" % command)
+ stopped = True
- (out, err), proc = self.node.kill(self.pid, self.ppid)
+ self.info("Stopping command '%s'" % command)
+
+ # If the command is running in foreground (it was launched using
+ # the node 'execute' method), then we use the handler to the Popen
+ # process to kill it. Else we send a kill signal using the pid and ppid
+ # retrieved after running the command with the node 'run' method
- if out or err:
- # check if execution errors occurred
- msg = " Failed to STOP command '%s' " % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
- stopped = False
+ if self._proc:
+ self._proc.kill()
else:
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ stopped = False
+
+ if stopped:
super(LinuxApplication, self).stop()
def release(self):
@property
def state(self):
if self._state == ResourceState.STARTED:
- # To avoid overwhelming the remote hosts and the local processor
- # with too many ssh queries, the state is only requested
- # every 'state_check_delay' seconds.
- state_check_delay = 0.5
- if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.app_home)
-
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
-
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
+ if self.in_foreground:
+ retcode = self._proc.poll()
+
+ # retcode == None -> running
+ # retcode > 0 -> error
+ # retcode == 0 -> finished
+ if retcode:
+ out = ""
+ err = self._proc.stderr.read()
self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ elif retcode == 0:
+ self._state = ResourceState.FINISHED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
-
- if status == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
-
-
- self._last_state_check = strfnow()
+ else:
+ # To avoid overwhelming the remote hosts and the local processor
+ # with too many ssh queries, the state is only requested
+ # every 'state_check_delay' seconds.
+ state_check_delay = 0.5
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ # check if execution errors occurred
+ (out, err), proc = self.node.check_errors(self.app_home)
+
+ if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
+
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
+
+ if status == ProcStatus.FINISHED:
+ self._state = ResourceState.FINISHED
+
+ self._last_state_check = strfnow()
return self._state
super(LinuxCCND, self).deploy()
+ def start(self):
+ command = self.get("command")
+ env = self.get("env")
+ stdin = "stdin" if self.get("stdin") else None
+ stdout = "stdout" if self.get("stdout") else "stdout"
+ stderr = "stderr" if self.get("stderr") else "stderr"
+ sudo = self.get("sudo") or False
+ x11 = self.get("forwardX11") or False
+ failed = False
+
+ if not command:
+ # If no command was given, then the application
+ # is directly marked as FINISHED
+ self._state = ResourceState.FINISHED
+
+ self.info("Starting command '%s'" % command)
+
+ if x11:
+ # If X11 forwarding was specified, then the application
+ # can not run detached, so instead of invoking asynchronous
+ # 'run' we invoke synchronous 'execute'.
+ if not command:
+ msg = "No command is defined but X11 forwarding has been set"
+ self.error(msg)
+ self._state = ResourceState.FAILED
+ raise RuntimeError, msg
+
+ # Export environment
+ environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+ if env else ""
+
+ command = environ + command
+ command = self.replace_paths(command)
+
+ # Mark application as started before executing the command
+ # since after the thread will be blocked by the execution
+ # until it finished
+ super(LinuxApplication, self).start()
+
+ # If the command requires X11 forwarding, we
+ # can't run it asynchronously
+ (out, err), proc = self.node.execute(command,
+ sudo = sudo,
+ stdin = stdin,
+ forward_x11 = x11)
+
+ self._state = ResourceState.FINISHED
+
+ if proc.poll() and err:
+ failed = True
+ else:
+ # Command was previously uploaded, now run the remote
+ # bash file asynchronously
+ cmd = "bash ./app.sh"
+ (out, err), proc = self.node.run(cmd, self.app_home,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
+
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+
+ if proc.poll() and err:
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ # Check status of process running in background
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+
+ # If the process is not running, check for error information
+ # on the remote machine
+ if not self.pid or not self.ppid:
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+ self.error(msg, out, err)
+
+ msg2 = " Setting state to Failed"
+ self.debug(msg2)
+ self._state = ResourceState.FAILED
+
+ raise RuntimeError, msg
+
+ super(LinuxApplication, self).start()
+
def stop(self):
command = self.get('command') or ''
state = self.state
if state == ResourceState.STARTED:
self.info("Stopping command '%s'" % command)
- (out, err), proc = self.node.kill(self.pid, self.ppid)
+ command = "ccndstop"
+ env = self.get("env")
+
+ # replace application specific paths in the command
+ command = self.replace_paths(command)
+ env = env and self.replace_paths(env)
+
+ # Upload the command to a file, and execute asynchronously
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "ccndstop.sh",
+ env = env,
+ pidfile = "ccndstop_pidfile",
+ ecodefile = "ccndstop_exitcode",
+ stdout = "ccndstop_stdout",
+ stderr = "ccndstop_stderr")
+
- if out or err:
+ super(LinuxCCND, self).stop()
+
+ @property
+ def state(self):
+ if self._state == ResourceState.STARTED:
+ # To avoid overwhelming the remote hosts and the local processor
+ # with too many ssh queries, the state is only requested
+ # every 'state_check_delay' seconds.
+ state_check_delay = 0.5
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
- msg = " Failed to STOP command '%s' " % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
- stopped = False
- else:
- super(LinuxApplication, self).stop()
+ (out, err), proc = self.node.check_errors(self.app_home)
+
+ if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
+
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
+
+ if status == ProcStatus.FINISHED:
+ self._state = ResourceState.FINISHED
+
+
+ self._last_state_check = strfnow()
+ return self._state
@property
def _default_command(self):
if not isinstance(packages, list):
packages = [packages]
- cmd = ""
- for p in packages:
- cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % {
- 'package': p}
-
- #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
+ cmd = " && ".join(map(lambda p:
+ " { dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ; } " % {
+ 'package': p}, packages))
+
+ #cmd = { dpkg -s vim || sudo -S apt-get -y install vim ; } && ..
return cmd
def remove_packages_command(os, packages):
if not isinstance(packages, list):
packages = [packages]
- cmd = ""
- for p in packages:
- cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % {
- 'package': p}
-
- #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
+ cmd = " && ".join(map(lambda p:
+ " { dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ; } " % {
+ 'package': p}, packages))
+
+ #cmd = { dpkg -s vim && sudo -S apt-get -y purge vim ; } && ..
return cmd
def run_and_wait(self, command, home,
shfile = "cmd.sh",
+ env = None,
pidfile = "pidfile",
ecodefile = "exitcode",
stdin = None,
since in the remote host the command can continue to run detached
even if network disconnections occur
"""
- self.upload_command(command, home, shfile, ecodefile)
+ self.upload_command(command, home,
+ shfile = shfile,
+ ecodefile = ecodefile,
+ env = env)
command = "bash ./%s" % shfile
# run command in background in remote host
shfile = "cmd.sh",
ecodefile = "exitcode",
env = None):
+ """ Saves the command as a bash script file in the remote host, and
+ forces to save the exit code of the command execution to the ecodefile
+ """
+
+ # Prepare command to be executed as a bash script file
+ # Make sure command ends in ';' so the curly brackets syntax is correct
+ if not command.strip()[-1] == ';':
+ command += " ; "
- command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
+ # The exit code of the command will be stored in ecodefile
+ command = " { { %(command)s } ; echo $? > %(ecodefile)s ; } " % {
'command': command,
'ecodefile': ecodefile,
}
# Export environment
- environ = ""
- if env:
- for var in env.split(" "):
- environ += 'export %s\n' % var
+ environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) \
+ if env else ""
+ # Add environ to command
command = environ + command
dst = os.path.join(home, shfile)
connect_timeout = 30,
strict_host_checking = False,
persistent = True,
+ blocking = True,
with_lock = False
):
""" Notice that this invocation will block until the
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
persistent = persistent,
+ blocking = blocking,
strict_host_checking = strict_host_checking
)
else:
retry = retry,
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
- persistent = persistent
+ persistent = persistent,
+ blocking = blocking,
+ strict_host_checking = strict_host_checking
)
return (out, err), proc
if not isinstance(packages, list):
packages = [packages]
- cmd = "( %s )" % install_rpmfusion_command(os)
- for p in packages:
- cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
- 'package': p}
+ cmd = install_rpmfusion_command(os) + " && "
+ cmd += " && ".join(map(lambda p:
+ " { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % {
+ 'package': p}, packages))
- #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
- return " ( %s )" % cmd
+ #cmd = { rpm -q rpmfusion-free-release || sudo -s rpm -i ... ; } && { rpm -q vim || sudo yum -y install vim ; } && ..
+ return cmd
def remove_packages_command(os, packages):
if not isinstance(packages, list):
packages = [packages]
- cmd = ""
- for p in packages:
- cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
- 'package': p}
-
- #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
+ cmd = " && ".join(map(lambda p:
+ " { rpm -q %(package)s && sudo -S yum -y remove %(package)s ; } " % {
+ 'package': p}, packages))
+
+ #cmd = { rpm -q vim && sudo yum -y remove vim ; } && ..
return cmd
def install_rpmfusion_command(os):
from nepi.resources.linux.node import OSType
- cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
+ cmd = " { rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s ; } "
if os in [OSType.FEDORA, OSType.FEDORA_12]:
cmd = cmd % {'package': RPM_FUSION_URL_F12}
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Claudio Freire <claudio-daniel.freire@inria.fr>
import base64
import errno
connect_timeout = 30,
persistent = True,
forward_x11 = False,
+ blocking = True,
strict_host_checking = True):
"""
Executes a remote command, returns ((stdout,stderr),process)
# alive until the process is finished with it
proc._known_hosts = tmp_known_hosts
+ # by default, rexec calls _communicate which will block
+ # until the process has exit. The argument block == False
+ # forces to rexec to return immediately, without blocking
+ if not blocking:
+ return (("", ""), proc)
+
try:
out, err = _communicate(proc, stdin, timeout, err_on_timeout)
msg = " rexec - host %s - command %s " % (host, " ".join(args))
else:
stderr = ' ' + stderr
- daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+ daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
'command' : command,
'pidfile' : shell_escape(pidfile),
'stdout' : stdout,
node = ec.register_resource("Node")
apps = list()
- for i in xrange(5000):
+ for i in xrange(1000):
app = ec.register_resource("Application")
ec.register_connection(app, node)
apps.append(app)
from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.application import LinuxApplication
-from test_utils import skipIfNotAlive
+from test_utils import skipIfNotAlive, skipInteractive
import os
import time
class LinuxApplicationTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = "nepi5.pl.sophia.inria.fr"
+ self.fedora_host = "nepi2.pl.sophia.inria.fr"
self.fedora_user = "inria_nepi"
self.ubuntu_host = "roseval.pl.sophia.inria.fr"
self.ubuntu_user = "alina"
- self.target = "nepi3.pl.sophia.inria.fr"
+ self.target = "nepi5.pl.sophia.inria.fr"
@skipIfNotAlive
def t_stdout(self, host, user):
ec.shutdown()
+ @skipIfNotAlive
+ def t_xterm(self, host, user):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", "xterm")
+ ec.set(app, "depends", "xterm")
+ ec.set(app, "forwardX11", True)
+
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished([app])
+
+ self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+ ec.shutdown()
+
def test_stdout_fedora(self):
self.t_stdout(self.fedora_host, self.fedora_user)
def test_ping_ubuntu(self):
self.t_ping(self.ubuntu_host, self.ubuntu_user)
- def ztest_concurrency_fedora(self):
+ def test_concurrency_fedora(self):
self.t_concurrency(self.fedora_host, self.fedora_user)
- def ztest_concurrency_ubuntu(self):
+ def test_concurrency_ubuntu(self):
self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
def test_condition_fedora(self):
def test_http_sources_ubuntu(self):
self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
+ @skipInteractive
+ def test_xterm_ubuntu(self):
+ """ Interactive test. Should not run automatically """
+ self.t_xterm(self.ubuntu_host, self.ubuntu_user)
+
+
if __name__ == '__main__':
unittest.main()
def skipInteractive(func):
name = func.__name__
def wrapped(*args, **kwargs):
- mode = os.environ.get("NEPI_INTERACTIVE", False)
+ mode = os.environ.get("NEPI_INTERACTIVE_TEST", False)
mode = mode and mode.lower() in ['true', 'yes']
if not mode:
print "*** WARNING: Skipping test %s: Interactive mode off \n" % name