-"""
- NEPI, a framework to manage network experiments
- Copyright (C) 2013 INRIA
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs
+from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import strfnow, strfdiff
import os
-reschedule_delay = "0.5s"
-state_check_delay = 1
+# TODO: Resolve wildcards in commands!!
-# TODO: Resolve wildcards in commands!!
@clsinit
class LinuxApplication(ResourceManager):
def _register_traces(cls):
stdout = Trace("stdout", "Standard output stream")
stderr = Trace("stderr", "Standard error stream")
- buildlog = Trace("buildlog", "Output of the build process")
cls._register_trace(stdout)
cls._register_trace(stderr)
- cls._register_trace(buildlog)
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
# Install
self.install()
+ # Upload command
command = self.get("command")
x11 = self.get("forwardX11")
- if not x11 and command:
+ env = self.get("env")
+
+ if command and not x11:
self.info("Uploading command '%s'" % command)
- # Export environment
- environ = ""
- env = self.get("env") or ""
- for var in env.split(" "):
- environ += 'export %s\n' % var
-
- command = environ + command
-
- # If the command runs asynchronous, pre upload the command
- # to the app.sh file in the remote host
- dst = os.path.join(self.app_home, "app.sh")
+ # replace application specific paths in the command
command = self.replace_paths(command)
- self.node.upload(command, dst, text = True)
+ env = env and self.replace_paths(env)
+ self.node.upload_command(command, self.app_home,
+ shfile = "app.sh",
+ env = env)
+
super(LinuxApplication, self).provision()
def upload_sources(self):
http_sources.append(source)
sources.remove(source)
- # Download http sources
+ # Download http sources remotely
if http_sources:
- cmd = " wget -c --directory-prefix=${SOURCES} "
- verif = ""
+ command = [" wget -c --directory-prefix=${SOURCES} "]
+ check = []
for source in http_sources:
- cmd += " %s " % (source)
- verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+ command.append(" %s " % (source))
+ check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
- # Wget output goes to stderr :S
- cmd += " 2> /dev/null ; "
+ command = " ".join(command)
+ check = " ; ".join(check)
- # Add verification
- cmd += " %s " % verif
+ # Append the command to check that the sources were downloaded
+ command += " ; %s " % check
+ # replace application specific paths in the command
+ command = self.replace_paths(command)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(cmd,
- "http_sources.sh", "http_sources_pid",
- "http_sources_out", "http_sources_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "http_sources.sh",
+ pidfile = "http_sources_pidfile",
+ ecodefile = "http_sources_exitcode",
+ stdout = "http_sources_stdout",
+ stderr = "http_sources_stderr")
+
if sources:
self.node.upload(sources, self.src_dir)
depends = self.get("depends")
if depends:
self.info(" Installing dependencies %s" % depends)
- self.node.install_packages(depends, home = self.app_home)
+ self.node.install_packages(depends, self.app_home)
def build(self):
build = self.get("build")
# create dir for build
self.node.mkdir(self.build_dir)
+ # replace application specific paths in the command
+ command = self.replace_paths(build)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(build,
- "build.sh", "build_pid",
- "build_out", "build_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "build.sh",
+ pidfile = "build_pidfile",
+ ecodefile = "build_exitcode",
+ stdout = "build_stdout",
+ stderr = "build_stderr")
def install(self):
install = self.get("install")
if install:
self.info(" Installing sources ")
+ # replace application specific paths in the command
+ command = self.replace_paths(install)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(install,
- "install.sh", "install_pid",
- "install_out", "install_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "install.sh",
+ pidfile = "install_pidfile",
+ ecodefile = "install_exitcode",
+ stdout = "install_stdout",
+ stderr = "install_stderr")
def deploy(self):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+
+ reschedule_delay = "0.5s"
self.ec.schedule(reschedule_delay, self.deploy)
else:
try:
x11 = self.get('forwardX11') or False
failed = False
- super(LinuxApplication, self).start()
-
if not command:
- self.info("No command to start ")
+ # If no command was given, then the application
+ # is directly marked as FINISHED
self._state = ResourceState.FINISHED
- return
+ else:
+ super(LinuxApplication, self).start()
self.info("Starting command '%s'" % command)
if x11:
+ # If X11 forwarding was specified, then the application
+ # can not run detached, so instead of invoking asynchronous
+ # 'run' we invoke synchronous 'execute'.
+ if not command:
+ msg = "No command is defined but X11 forwarding has been set"
+ self.error(msg)
+ self._state = ResourceState.FAILED
+ raise RuntimeError, msg
+
if env:
# Export environment
environ = ""
for var in env.split(" "):
environ += ' %s ' % var
- command = "(" + environ + " ; " + command + ")"
+ command = "{" + environ + " ; " + command + " ; }"
command = self.replace_paths(command)
# If the command requires X11 forwarding, we
stderr = stderr,
sudo = sudo)
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+
if proc.poll() and err:
- failed = True
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- if not failed:
- pid, ppid = self.node.wait_pid(home = self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
+ # Check status of process running in background
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+ # If the process is not running, check for error information
+ # on the remote machine
if not self.pid or not self.ppid:
- failed = True
-
- (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
-
- if failed or out or chkerr:
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
- out = out
- if err:
- err = err
- elif chkerr:
- err = chkerr
-
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
self.error(msg, out, err)
msg2 = " Setting state to Failed"
if self._state == ResourceState.STARTED:
# To avoid overwhelming the remote hosts and the local processor
# with too many ssh queries, the state is only requested
- # every 'state_check_delay' .
+ # every 'state_check_delay' seconds.
+ state_check_delay = 0.5
if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+ (out, err), proc = self.node.check_errors(self.app_home)
if out or err:
if err.find("No such file or directory") >= 0 :
elif self.pid and self.ppid:
status = self.node.status(self.pid, self.ppid)
- if status == sshfuncs.FINISHED:
+ if status == ProcStatus.FINISHED:
self._state = ResourceState.FINISHED
return self._state
- def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
- dst = os.path.join(self.app_home, fname)
- cmd = self.replace_paths(cmd)
- self.node.upload(cmd, dst, text = True)
-
- cmd = "bash ./%s" % fname
- (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
- pidfile = pidfile,
- stdout = outfile,
- stderr = errfile,
- raise_on_error = True)
-
def replace_paths(self, command):
"""
Replace all special path tags with shell-escaped actual paths.
def valid_connection(self, guid):
# TODO: Validate!
return True
- # XXX: What if it is connected to more than one node?
- resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] if len(resources) == 1 else None
- return self._node