import os
import time
-def add_node(ec, host, user):
+def add_node(ec, host, user, ssh_key = None):
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", host)
ec.set(node, "username", user)
- #ec.set(node, "cleanHome", True)
+ ec.set(node, "identity", ssh_key)
+ ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
return node
if os_type == "f12":
depends = ( " autoconf openssl-devel expat-devel libpcap-devel "
" ecryptfs-utils-devel libxml2-devel automake gawk "
- " gcc gcc-c++ git pcre-devel ")
+ " gcc gcc-c++ git pcre-devel make ")
elif os_type == "ubuntu":
depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
" libecryptfs0 libxml2-utils automake gawk gcc g++ "
- " git-core pkg-config libpcre3-dev ")
+ " git-core pkg-config libpcre3-dev make ")
sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
build = (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -d ${EXP_HOME}/ccnx/bin"
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd"
" ) || ( "
# If not, untar and build
" ( "
install = (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -d ${EXP_HOME}/ccnx/bin "
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd"
" ) || ( "
" mkdir -p ${EXP_HOME}/ccnx/bin && "
" cp -r ${SOURCES}/ccnx ${EXP_HOME}"
env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
- # BASH command -> ' ccndstart 2>&1 ; ccndc add ccnx:/ udp host ; ccnr 2>&1 '
- command = "ccndstart 2>&1 ; "
+ # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr '
+ command = "ccndstart ; "
peers = map(lambda peer: "ccndc add ccnx:/ udp %s" % peer, peers)
command += " ; ".join(peers) + " ; "
- command += " ccnr 2>&1 "
+ command += " ccnr "
app = ec.register_resource("LinuxApplication")
ec.set(app, "depends", depends)
def add_stream(ec):
env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
- command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1"
+ command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) "
app = ec.register_resource("LinuxApplication")
ec.set(app, "depends", "vlc")
def get_options():
slicename = os.environ.get("PL_SLICE")
- usage = "usage: %prog -s <pl-slice> -u <user-2> -m <movie> -l <exp-id>"
+ # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
+ # id_rsa_planetlab exists
+ default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+ default_key = default_key if os.path.exists(default_key) else None
+ pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
+
+ usage = "usage: %prog -s <pl-slice> -u <username> -m <movie> -l <exp-id> -i <ssh_key>"
parser = OptionParser(usage=usage)
parser.add_option("-s", "--pl-slice", dest="pl_slice",
- help="PlanetLab slicename", default=slicename, type="str")
- parser.add_option("-u", "--user-2", dest="user2",
- help="User for non PlanetLab machine", type="str")
+ help="PlanetLab slicename", default = slicename, type="str")
+ parser.add_option("-u", "--username", dest="username",
+ help="User for extra host (non PlanetLab)", type="str")
parser.add_option("-m", "--movie", dest="movie",
help="Stream movie", type="str")
parser.add_option("-l", "--exp-id", dest="exp_id",
help="Label to identify experiment", type="str")
+ parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
+ help="Path to private SSH key to be used for connection",
+ default = pl_ssh_key, type="str")
(options, args) = parser.parse_args()
if not options.movie:
parser.error("movie is a required argument")
- return (options.pl_slice, options.user2, options.movie, options.exp_id)
+ return (options.pl_slice, options.username, options.movie, options.exp_id,
+ options.pl_ssh_key)
if __name__ == '__main__':
- ( pl_slice, user2, movie, exp_id ) = get_options()
+ ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options()
# Search for available RMs
populate_factory()
+ # PlanetLab node
host1 = 'planetlab2.u-strasbg.fr'
+
+ # Another node
+ # IMPORTANT NOTE: you must replace this host for another one
+ # you have access to. You must set up your SSH keys so
+ # the host can be accessed through SSH without prompting
+ # for a password. The host must allow X forwarding using SSH.
host2 = 'roseval.pl.sophia.inria.fr'
+ # Create the ExperimentController instance
ec = ExperimentController(exp_id = exp_id)
- node1 = add_node(ec, host1, pl_slice)
+ # Register a ResourceManager (RM) for the PlanetLab node
+ node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
peers = [host2]
ccnd1 = add_ccnd(ec, "f12", peers)
ec.register_condition(pub, ResourceAction.START,
ccnd1, ResourceState.STARTED)
- node2 = add_node(ec, host2, user2)
+ node2 = add_node(ec, host2, username)
peers = [host1]
ccnd2 = add_ccnd(ec, "ubuntu", peers)
ec.register_connection(ccnd2, node2)
ec.register_condition(stream, ResourceAction.START,
pub, ResourceState.STARTED)
+ # Deploy all ResourceManagers
ec.deploy()
+ # Wait until the applications are finished
apps = [ccnd1, pub, ccnd2, stream]
ec.wait_finished(apps)
+ # Shutdown the experiment controller
ec.shutdown()
@property
def name(self):
- """ Returns the name of the attribute """
+ """ Returns the name of the attribute """
return self._name
@property
def default(self):
- """ Returns the default value of the attribute """
+ """ Returns the default value of the attribute """
return self._default
@property
def type(self):
- """ Returns the type of the attribute """
+ """ Returns the type of the attribute """
return self._type
@property
def help(self):
- """ Returns the help of the attribute """
+ """ Returns the help of the attribute """
return self._help
@property
def flags(self):
- """ Returns the flags of the attribute """
+ """ Returns the flags of the attribute """
return self._flags
@property
def allowed(self):
- """ Returns the allowed value for this attribute """
+ """ Returns the allowed value for this attribute """
return self._allowed
@property
def range(self):
- """ Returns the range of the attribute """
+ """ Returns the range of the attribute """
return self._range
def has_flag(self, flag):
- """ Returns true if the attribute has the flag 'flag'
+ """ Returns true if the attribute has the flag 'flag'
:param flag: Flag that need to be ckecked
:type flag: Flags
- """
+ """
return (self._flags & flag) == flag
def get_value(self):
- """ Returns the value of the attribute """
+ """ Returns the value of the attribute """
return self._value
def set_value(self, value):
- """ Change the value of the attribute after checking the type """
+ """ Change the value of the attribute after checking the type """
valid = True
if self.type == Types.Enumerate:
if not group:
group = self.resources
+ if isinstance(group, int):
+ group = [group]
+
# Before starting deployment we disorder the group list with the
# purpose of speeding up the whole deployment process.
# It is likely that the user inserted in the 'group' list closely
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- random.shuffle(group)
+ #random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
# If all resources are read, we schedule the start
for guid in group:
rm = self.get_resource(guid)
- self.schedule("0.01s", rm.start_with_conditions)
+ self.schedule("0s", rm.start_with_conditions)
if wait_all_ready:
# Schedule the function that will check all resources are
for guid in group:
rm = self.get_resource(guid)
- self.schedule("0.001s", rm.deploy)
+ self.schedule("0s", rm.deploy)
if not wait_all_ready:
self.schedule("1s", rm.start_with_conditions)
def shutdown(self):
""" Shutdown the Experiment Controller.
- It means : Release all the resources and stop the scheduler
+ Releases all the resources and stops task processing thread
"""
self.release()
- self._stop_scheduler()
+ # Mark the EC state as TERMINATED
+ self._state = ECState.TERMINATED
+
+ # Notify condition to wake up the processing thread
+ self._notify()
if self._thread.is_alive():
self._thread.join()
self._tasks[task.id] = task
# Notify condition to wake up the processing thread
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._notify()
return task.id
def _process(self):
- """ Process at executing the task that are in the scheduler.
+ """ Process scheduled tasks.
+
+ The _process method is executed in an independent thread held by the
+ ExperimentController for as long as the experiment is running.
+
+ Tasks are scheduled by invoking the schedule method with a target callback.
+ The schedule method is givedn a execution time which controls the
+ order in which tasks are processed.
+
+ Tasks are processed in parallel using multithreading.
+ The environmental variable NEPI_NTHREADS can be used to control
+ the number of threads used to process tasks. The default value is 50.
"""
+ nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
- runner = ParallelRun(maxthreads = 50)
+ runner = ParallelRun(maxthreads = nthreads)
runner.start()
try:
self._logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
-
- # Mark EC state as terminated
- if self.ecstate == ECState.RUNNING:
- # Synchronize to get errors if occurred
+ finally:
runner.sync()
- self._state = ECState.TERMINATED
def _execute(self, task):
- """ Invoke the callback of the task 'task'
+ """ Executes a single task.
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
- :param task: Id of the task
- :type task: int
+ :param task: Object containing the callback to execute
+ :type task: Task
"""
# Invoke callback
self._logger.error("Error occurred while executing task: %s" % err)
- self._stop_scheduler()
+ # Set the EC to FAILED state (this will force to exit the task
+ # processing thread)
+ self._state = ECState.FAILED
+
+ # Notify condition to wake up the processing thread
+ self._notify()
# Propage error to the ParallelRunner
raise
- def _stop_scheduler(self):
- """ Stop the scheduler and put the EC into a FAILED State.
-
+ def _notify(self):
+ """ Awakes the processing thread in case it is blocked waiting
+ for a new task to be scheduled.
"""
-
- # Mark the EC as failed
- self._state = ECState.FAILED
-
- # Wake up the EC in case it was sleeping
self._cond.acquire()
self._cond.notify()
self._cond.release()
-
return rclass(ec, guid)
def populate_factory():
- """Register all the possible RM that exists in the current version of Nepi.
-
- """
+ """Register all the possible RM that exists in the current version of Nepi.
+ """
for rclass in find_types():
ResourceFactory.register_type(rclass)
def find_types():
- """Look into the different folders to find all the
- availables Resources Managers
+ """Look into the different folders to find all the
+ availables Resources Managers
- """
+ """
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))
@property
def name(self):
- """ Returns the name of the trace """
+ """ Returns the name of the trace """
return self._name
@property
def help(self):
- """ Returns the help of the trace """
+ """ Returns the help of the trace """
return self._help
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs
+from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import strfnow, strfdiff
import os
-reschedule_delay = "0.5s"
-state_check_delay = 1
-
# TODO: Resolve wildcards in commands!!
-# TODO: If command is not set give a warning but do not generate an error!
+
@clsinit
class LinuxApplication(ResourceManager):
# Install
self.install()
+ # Upload command
command = self.get("command")
x11 = self.get("forwardX11")
- if not x11 and command:
+ env = self.get("env")
+
+ if command and not x11:
self.info("Uploading command '%s'" % command)
- # Export environment
- environ = ""
- if self.get("env"):
- for var in self.get("env").split(" "):
- environ += 'export %s\n' % var
-
- command = environ + command
-
- # If the command runs asynchronous, pre upload the command
- # to the app.sh file in the remote host
- dst = os.path.join(self.app_home, "app.sh")
+ # replace application specific paths in the command
command = self.replace_paths(command)
- self.node.upload(command, dst, text = True)
+ self.node.upload_command(command, self.app_home,
+ shfile = "app.sh",
+ env = env)
+
super(LinuxApplication, self).provision()
def upload_sources(self):
http_sources.append(source)
sources.remove(source)
- # Download http sources
+ # Download http sources remotely
if http_sources:
- cmd = " wget -c --directory-prefix=${SOURCES} "
- verif = ""
+ command = " wget -c --directory-prefix=${SOURCES} "
+ check = ""
for source in http_sources:
- cmd += " %s " % (source)
- verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+ command += " %s " % (source)
+ check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
- # Wget output goes to stderr :S
- cmd += " 2> /dev/null ; "
-
- # Add verification
- cmd += " %s " % verif
+ # Append the command to check that the sources were downloaded
+ command += " ; %s " % check
+ # replace application specific paths in the command
+ command = self.replace_paths(command)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(cmd,
- "http_sources.sh", "http_sources_pid",
- "http_sources_out", "http_sources_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "http_sources.sh",
+ pidfile = "http_sources_pidfile",
+ ecodefile = "http_sources_exitcode",
+ stdout = "http_sources_stdout",
+ stderr = "http_sources_stderr")
+
if sources:
self.node.upload(sources, self.src_dir)
depends = self.get("depends")
if depends:
self.info(" Installing dependencies %s" % depends)
- self.node.install_packages(depends, home = self.app_home)
+ self.node.install_packages(depends, self.app_home)
def build(self):
build = self.get("build")
# create dir for build
self.node.mkdir(self.build_dir)
+ # replace application specific paths in the command
+ command = self.replace_paths(command)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(build,
- "build.sh", "build_pid",
- "build_out", "build_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "build.sh",
+ pidfile = "build_pidfile",
+ ecodefile = "build_exitcode",
+ stdout = "build_stdout",
+ stderr = "build_stderr")
def install(self):
install = self.get("install")
if install:
self.info(" Installing sources ")
+ # replace application specific paths in the command
+ command = self.replace_paths(command)
+
# Upload the command to a file, and execute asynchronously
- self.upload_and_run(install,
- "install.sh", "install_pid",
- "install_out", "install_err")
+ self.node.run_and_wait(command, self.app_home,
+ shfile = "install.sh",
+ pidfile = "install_pidfile",
+ ecodefile = "install_exitcode",
+ stdout = "install_stdout",
+ stderr = "install_stderr")
def deploy(self):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+
+ reschedule_delay = "0.5s"
self.ec.schedule(reschedule_delay, self.deploy)
else:
try:
x11 = self.get('forwardX11') or False
failed = False
- super(LinuxApplication, self).start()
-
if not command:
- self.info("No command to start ")
+ # If no command was given, then the application
+ # is directly marked as FINISHED
self._state = ResourceState.FINISHED
- return
+ else:
+ super(LinuxApplication, self).start()
self.info("Starting command '%s'" % command)
if x11:
+ # If X11 forwarding was specified, then the application
+ # can not run detached, so instead of invoking asynchronous
+ # 'run' we invoke synchronous 'execute'.
+ if not command:
+ msg = "No command is defined but X11 forwarding has been set"
+ self.error(msg)
+ self._state = ResourceState.FAILED
+ raise RuntimeError, msg
+
if env:
# Export environment
environ = ""
stderr = stderr,
sudo = sudo)
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+
if proc.poll() and err:
- failed = True
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- if not failed:
- pid, ppid = self.node.wait_pid(home = self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
+ # Check status of process running in background
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+ # If the process is not running, check for error information
+ # on the remote machine
if not self.pid or not self.ppid:
- failed = True
-
- (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
-
- if failed or out or chkerr:
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
- out = out
- if err:
- err = err
- elif chkerr:
- err = chkerr
-
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
self.error(msg, out, err)
msg2 = " Setting state to Failed"
if self._state == ResourceState.STARTED:
# To avoid overwhelming the remote hosts and the local processor
# with too many ssh queries, the state is only requested
- # every 'state_check_delay' .
+ # every 'state_check_delay' seconds.
+ state_check_delay = 0.5
if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+ (out, err), proc = self.node.check_errors(self.app_home)
if out or err:
if err.find("No such file or directory") >= 0 :
elif self.pid and self.ppid:
status = self.node.status(self.pid, self.ppid)
- if status == sshfuncs.FINISHED:
+ if status == ProcStatus.FINISHED:
self._state = ResourceState.FINISHED
return self._state
- def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
- dst = os.path.join(self.app_home, fname)
- cmd = self.replace_paths(cmd)
- self.node.upload(cmd, dst, text = True)
-
- cmd = "bash ./%s" % fname
- (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
- pidfile = pidfile,
- stdout = outfile,
- stderr = errfile,
- raise_on_error = True)
-
def replace_paths(self, command):
"""
Replace all special path tags with shell-escaped actual paths.
from nepi.execution.attribute import Attribute, Flags
from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.resources.linux import rpmfuncs, debfuncs
-from nepi.util import sshfuncs, execfuncs
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
import collections
import os
reschedule_delay = "0.5s"
+class ExitCode:
+ """
+ Error codes that the rexitcode function can return if unable to
+ check the exit code of a spawned process
+ """
+ FILENOTFOUND = -1
+ CORRUPTFILE = -2
+ ERROR = -3
+ OK = 0
@clsinit
class LinuxNode(ResourceManager):
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
- def install_packages(self, packages, home = None):
- home = home or self.node_home
-
- cmd = ""
+ def install_packages(self, packages, home):
+ command = ""
if self.os in ["f12", "f14"]:
- cmd = rpmfuncs.install_packages_command(self.os, packages)
+ command = rpmfuncs.install_packages_command(self.os, packages)
elif self.os in ["debian", "ubuntu"]:
- cmd = debfuncs.install_packages_command(self.os, packages)
+ command = debfuncs.install_packages_command(self.os, packages)
else:
msg = "Error installing packages ( OS not known ) "
self.error(msg, self.os)
raise RuntimeError, msg
out = err = ""
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "instpkg_pid",
- stdout = "instpkg_out",
- stderr = "instpkg_err",
+ (out, err), proc = self.run_and_wait(command, home,
+ shfile = "instpkg.sh",
+ pidfile = "instpkg_pidfile",
+ ecodefile = "instpkg_exitcode",
+ stdout = "instpkg_stdout",
+ stderr = "instpkg_stderr",
raise_on_error = True)
return (out, err), proc
- def remove_packages(self, packages, home = None):
- home = home or self.node_home
-
- cmd = ""
+ def remove_packages(self, packages, home):
+ command = ""
if self.os in ["f12", "f14"]:
- cmd = rpmfuncs.remove_packages_command(self.os, packages)
+ command = rpmfuncs.remove_packages_command(self.os, packages)
elif self.os in ["debian", "ubuntu"]:
- cmd = debfuncs.remove_packages_command(self.os, packages)
+ command = debfuncs.remove_packages_command(self.os, packages)
else:
msg = "Error removing packages ( OS not known ) "
self.error(msg)
raise RuntimeError, msg
out = err = ""
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "rmpkg_pid",
- stdout = "rmpkg_out",
- stderr = "rmpkg_err",
+ (out, err), proc = self.run_and_wait(command, home,
+ shfile = "rmpkg.sh",
+ pidfile = "rmpkg_pidfile",
+ ecodefile = "rmpkg_exitcode",
+ stdout = "rmpkg_stdout",
+ stderr = "rmpkg_stderr",
raise_on_error = True)
return (out, err), proc
def rmdir(self, path):
return self.execute("rm -rf %s" % path, with_lock = True)
-
- def run_and_wait(self, command,
- home = ".",
- pidfile = "pid",
+
+ def run_and_wait(self, command, home,
+ shfile = "cmd.sh",
+ pidfile = "pidfile",
+ ecodefile = "exitcode",
stdin = None,
- stdout = 'stdout',
- stderr = 'stderr',
+ stdout = "stdout",
+ stderr = "stderr",
sudo = False,
tty = False,
raise_on_error = False):
- """ runs a command in background on the remote host, but waits
- until the command finishes execution.
- This is more robust than doing a simple synchronized 'execute',
- since in the remote host the command can continue to run detached
- even if network disconnections occur
+ """
+ runs a command in background on the remote host, busy-waiting
+ until the command finishes execution.
+ This is more robust than doing a simple synchronized 'execute',
+ since in the remote host the command can continue to run detached
+ even if network disconnections occur
"""
+ self.upload_command(command, home, shfile, ecodefile)
+
+ command = "bash ./%s" % shfile
# run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
# wait until command finishes to execute
self.wait_run(pid, ppid)
-
- # check if execution errors occurred
- (out, err), proc = self.check_output(home, stderr)
+
+ (out, err), proc = self.check_errors(home, ecodefile, stderr)
- if err or out:
+ # Out is what was written in the stderr file
+ if out or err:
msg = " Failed to run command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
return (out, err), proc
+
+ def exitcode(self, home, ecodefile = "exitcode"):
+ """
+ Get the exit code of an application.
+ Returns an integer value with the exit code
+ """
+ (out, err), proc = self.check_output(home, ecodefile)
+
+ # Succeeded to open file, return exit code in the file
+ if proc.wait() == 0:
+ try:
+ return int(out.strip())
+ except:
+ # Error in the content of the file!
+ return ExitCode.CORRUPTFILE
+
+ # No such file or directory
+ if proc.returncode == 1:
+ return ExitCode.FILENOTFOUND
+
+ # Other error from 'cat'
+ return ExitCode.ERROR
+
+ def upload_command(self, command, home,
+ shfile = "cmd.sh",
+ ecodefile = "exitcode",
+ env = None):
+
+ command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
+ 'command': command,
+ 'ecodefile': ecodefile,
+ }
+
+ # Export environment
+ environ = ""
+ if env:
+ for var in env.split(" "):
+ environ += 'export %s\n' % var
+
+ command = environ + command
+
+ dst = os.path.join(home, shfile)
+ return self.upload(command, dst, text = True)
+
+ def check_errors(self, home,
+ ecodefile = "exitcode",
+ stderr = "stderr"):
+ """
+ Checks whether errors occurred while running a command.
+ It first checks the exit code for the command, and only if the
+ exit code is an error one it returns the error output.
+ """
+ out = err = ""
+ proc = None
+
+ # get Exit code
+ ecode = self.exitcode(home, ecodefile)
+
+ if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+ err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+ elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+ # The process returned an error code or didn't exist.
+ # Check standard error.
+ (out, err), proc = self.check_output(home, stderr)
+
+ # If the stderr file was not found, assume nothing happened.
+ # We just ignore the error.
+ if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
+ err = ""
+
+ return (out, err), proc
- def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+ def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
""" Waits until the pid file for the command is generated,
and returns the pid and ppid of the process """
pid = ppid = None
delay = 1.0
- for i in xrange(5):
- pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+ for i in xrange(4):
+ pidtuple = self.getpid(home = home, pidfile = pidfile)
if pidtuple:
pid, ppid = pidtuple
break
else:
time.sleep(delay)
- delay = min(30,delay*1.2)
+ delay = delay * 1.5
else:
msg = " Failed to get pid for pidfile %s/%s " % (
home, pidfile )
def wait_run(self, pid, ppid, trial = 0):
""" wait for a remote process to finish execution """
- delay = 1.0
- first = True
- bustspin = 0
+ start_delay = 1.0
while True:
status = self.status(pid, ppid)
- if status is sshfuncs.FINISHED:
+ if status is ProcStatus.FINISHED:
break
- elif status is not sshfuncs.RUNNING:
- bustspin += 1
- time.sleep(delay*(5.5+random.random()))
- if bustspin > 12:
+ elif status is not ProcStatus.RUNNING:
+ delay = delay * 1.5
+ time.sleep(delay)
+ # If it takes more than 20 seconds to start, then
+ # asume something went wrong
+ if delay > 20:
break
else:
- if first:
- first = False
-
- time.sleep(delay*(0.5+random.random()))
- delay = min(30,delay*1.2)
- bustspin = 0
+ # The app is running, just wait...
+ time.sleep(0.5)
def check_output(self, home, filename):
- """ checks file content """
+ """ Retrives content of file """
(out, err), proc = self.execute("cat %s" %
os.path.join(home, filename), retry = 1, with_lock = True)
return (out, err), proc
def copy(self, src, dst):
if self.localhost:
- (out, err), proc = execfuncs.lcopy(source, dest,
+ (out, err), proc = execfuncs.lcopy(source, dest,
recursive = True,
strict_host_checking = False)
else:
return (out, err), proc
- def run(self, command,
- home = None,
+ def run(self, command, home,
create_home = False,
- pidfile = "pid",
+ pidfile = 'pidfile',
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
tty = False):
-
+
self.debug("Running command '%s'" % command)
if self.localhost:
sudo = sudo,
user = user)
else:
- # Start process in a "daemonized" way, using nohup and heavy
- # stdin/out redirection to avoid connection issues
with self._lock:
- (out,err), proc = sshfuncs.rspawn(
+ (out, err), proc = sshfuncs.rspawn(
command,
pidfile = pidfile,
home = home,
return (out, err), proc
- def checkpid(self, home = ".", pidfile = "pid"):
+ def getpid(self, home, pidfile = "pidfile"):
if self.localhost:
- pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
+ pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
else:
with self._lock:
- pidtuple = sshfuncs.rcheckpid(
+ pidtuple = sshfuncs.rgetpid(
os.path.join(home, pidfile),
host = self.get("hostname"),
user = self.get("username"),
)
return pidtuple
-
+
def status(self, pid, ppid):
if self.localhost:
status = execfuncs.lstatus(pid, ppid)
proc = None
status = self.status(pid, ppid)
- if status == sshfuncs.RUNNING:
+ if status == sshfuncs.ProcStatus.RUNNING:
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
identity = self.get("identity"),
server_key = self.get("serverKey")
)
- return (out, err), proc
- def check_bad_host(self, out, err):
- badre = re.compile(r'(?:'
- r'|Error: disk I/O error'
- r')',
- re.I)
- return badre.search(out) or badre.search(err)
+ return (out, err), proc
cmd = "( %s )" % install_rpmfusion_command(os)
for p in packages:
cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
- 'package': p}
+ 'package': p}
#cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
return " ( %s )" % cmd
for p in packages:
cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
'package': p}
-
+
#cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
return cmd
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT
+from nepi.util.sshfuncs import ProcStatus, STDOUT
import subprocess
return (out,err),proc
-def lcheckpid(pidfile):
+def lgetpid(pidfile):
"""
Check the pidfile of a process spawned with remote_spawn.
})
if proc.wait():
- return NOT_STARTED
+ return ProcStatus.NOT_STARTED
status = False
if out:
status = (out.strip() == 'wait')
else:
- return NOT_STARTED
- return RUNNING if status else FINISHED
+ return ProcStatus.NOT_STARTED
+ return ProcStatus.RUNNING if status else ProcStatus.FINISHED
def lkill(pid, ppid, sudo = False):
redirect to whatever stdout was redirected to.
"""
-class RUNNING:
+class ProcStatus:
"""
- Process is still running
+ Codes for status of remote spawned process
"""
+ # Process is still running
+ RUNNING = 1
-class FINISHED:
- """
- Process is finished
- """
-
-class NOT_STARTED:
- """
- Process hasn't started running yet (this should be very rare)
- """
+ # Process is finished
+ FINISHED = 2
+
+ # Process hasn't started running yet (this should be very rare)
+ NOT_STARTED = 3
hostbyname_cache = dict()
hostbyname_cache_lock = threading.Lock()
tmp_known_hosts = None
args = ['scp', '-q', '-p', '-C',
+ # Speed up transfer using blowfish cypher specification which is
+ # faster than the default one (3des)
+ '-c', 'blowfish',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
'-o', 'ConnectTimeout=60',
def rspawn(command, pidfile,
stdout = '/dev/null',
stderr = STDOUT,
- stdin = '/dev/null',
+ stdin = '/dev/null',
home = None,
create_home = False,
sudo = False,
server_key = None,
tty = False):
"""
- Spawn a remote command such that it will continue working asynchronously.
-
- Parameters:
- command: the command to run - it should be a single line.
-
- pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
-
- stdout: path of a file to redirect standard output to - must be a string.
- Defaults to /dev/null
- stderr: path of a file to redirect standard error to - string or the special STDOUT value
- to redirect to the same file stdout was redirected to. Defaults to STDOUT.
- stdin: path of a file with input to be piped into the command's standard input
+ Spawn a remote command such that it will continue working asynchronously in
+ background.
+
+ :param command: The command to run, it should be a single line.
+ :type command: str
+
+ :param pidfile: Path to a file where to store the pid and ppid of the
+ spawned process
+ :type pidfile: str
+
+ :param stdout: Path to file to redirect standard output.
+ The default value is /dev/null
+ :type stdout: str
+
+ :param stderr: Path to file to redirect standard error.
+ If the special STDOUT value is used, stderr will
+ be redirected to the same file as stdout
+ :type stderr: str
+
+ :param stdin: Path to a file with input to be piped into the command's standard input
+ :type stdin: str
+
+ :param home: Path to working directory folder.
+ It is assumed to exist unless the create_home flag is set.
+ :type home: str
+
+ :param create_home: Flag to force creation of the home folder before
+ running the command
+ :type create_home: bool
+
+ :param sudo: Flag forcing execution with sudo user
+ :type sudo: bool
- home: path of a folder to use as working directory - should exist, unless you specify create_home
-
- create_home: if True, the home folder will be created first with mkdir -p
-
- sudo: whether the command needs to be executed as root
-
- host/port/user/agent/identity: see rexec
-
- Returns:
+ :rtype: touple
+
(stdout, stderr), process
Of the spawning process, which only captures errors at spawning time.
return ((out, err), proc)
@eintr_retry
-def rcheckpid(pidfile,
+def rgetpid(pidfile,
host = None,
port = None,
user = None,
identity = None,
server_key = None):
"""
- Check the pidfile of a process spawned with remote_spawn.
-
- Parameters:
- pidfile: the pidfile passed to remote_span
+ Returns the pid and ppid of a process from a remote file where the
+ information was stored.
+
+ :param home: Path to directory where the pidfile is located
+ :type home: str
+
+ :param pidfile: Name of file containing the pid information
+ :type pidfile: str
- host/port/user/agent/identity: see rexec
-
- Returns:
+ :rtype: int
- A (pid, ppid) tuple useful for calling remote_status and remote_kill,
- or None if the pidfile isn't valid yet (maybe the process is still starting).
- """
+ A (pid, ppid) tuple useful for calling rstatus and rkill,
+ or None if the pidfile isn't valid yet (can happen when process is staring up)
+ """
(out,err),proc = rexec(
"cat %(pidfile)s" % {
'pidfile' : pidfile,
identity = None,
server_key = None):
"""
- Check the status of a process spawned with remote_spawn.
+ Returns a code representing the the status of a remote process
+
+ :param pid: Process id of the process
+ :type pid: int
+
+ :param ppid: Parent process id of process
+ :type ppid: int
- Parameters:
- pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-
- host/port/user/agent/identity: see rexec
+ :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
- Returns:
-
- One of NOT_STARTED, RUNNING, FINISHED
"""
-
(out,err),proc = rexec(
# Check only by pid. pid+ppid does not always work (especially with sudo)
" (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
)
if proc.wait():
- return NOT_STARTED
+ return ProcStatus.NOT_STARTED
status = False
if err:
elif out:
status = (out.strip() == 'wait')
else:
- return NOT_STARTED
- return RUNNING if status else FINISHED
+ return ProcStatus.NOT_STARTED
+ return ProcStatus.RUNNING if status else ProcStatus.FINISHED
@eintr_retry
def rkill(pid, ppid,
server_key = None,
nowait = False):
"""
- Kill a process spawned with remote_spawn.
-
+ Sends a kill signal to a remote process.
+
First tries a SIGTERM, and if the process does not end in 10 seconds,
it sends a SIGKILL.
-
- Parameters:
- pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-
- sudo: whether the command was run with sudo - careful killing like this.
-
- host/port/user/agent/identity: see rexec
-
- Returns:
+
+ :param pid: Process id of process to be killed
+ :type pid: int
+
+ :param ppid: Parent process id of process to be killed
+ :type ppid: int
+
+ :param sudo: Flag indicating if sudo should be used to kill the process
+ :type sudo: bool
- Nothing, should have killed the process
"""
-
subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
cmd = """
SUBKILL="%(subkill)s" ;
def test_schedule_exception(self):
def raise_error():
- raise RuntimeError, "the error"
+ raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
ec = ExperimentController()
ec.schedule("2s", raise_error)
class LinuxApplicationTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = 'nepi2.pl.sophia.inria.fr'
- self.fedora_user = 'inria_nepi'
+ self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_user = "inria_nepi"
- self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
- self.ubuntu_user = 'alina'
+ self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+ self.ubuntu_user = "alina"
- self.target = 'nepi5.pl.sophia.inria.fr'
+ self.target = "nepi5.pl.sophia.inria.fr"
@skipIfNotAlive
def t_stdout(self, host, user):
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
- stdout = ec.trace(app, 'stdout')
+ stdout = ec.trace(app, "stdout")
self.assertTrue(stdout.strip() == "HOLA")
ec.shutdown()
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
- stdout = ec.trace(app, 'stdout')
- size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+ stdout = ec.trace(app, "stdout")
+ size = ec.trace(app, "stdout", attr = TraceAttr.SIZE)
self.assertEquals(len(stdout), size)
- block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+ block = ec.trace(app, "stdout", attr = TraceAttr.STREAM, block = 5, offset = 1)
self.assertEquals(block, stdout[5:10])
- path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+ path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.app_home, 'stdout')
+ p = os.path.join(rm.app_home, "stdout")
self.assertEquals(path, p)
ec.shutdown()
self.assertTrue(ec.state(server) == ResourceState.FINISHED)
self.assertTrue(ec.state(client) == ResourceState.FINISHED)
- stdout = ec.trace(client, 'stdout')
+ stdout = ec.trace(client, "stdout")
self.assertTrue(stdout.strip() == "HOLA")
ec.shutdown()
ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
- sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \
- "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf"
+ sources = "http://nepi.inria.fr/code/nef/archive/tip.tar.gz " \
+ " http://nepi.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png"
app = ec.register_resource("LinuxApplication")
ec.set(app, "sources", sources)
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
- err = ec.trace(app, 'http_sources_err')
- self.assertTrue(err == "")
+ exitcode = ec.trace(app, "http_sources_exitcode")
+ self.assertTrue(exitcode.strip() == "0")
- out = ec.trace(app, 'http_sources_out')
- self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1)
- self.assertTrue(out.find("nepi_integration_framework.pdf") > -1)
+ out = ec.trace(app, "http_sources_stdout")
+ self.assertTrue(out.find("tip.tar.gz") > -1)
+ self.assertTrue(out.find("connect.png") > -1)
ec.shutdown()
self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
- # TODO: test compilation, sources, dependencies, etc!!!
-
if __name__ == '__main__':
unittest.main()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.resources.linux.node import LinuxNode
-from nepi.util.sshfuncs import RUNNING, FINISHED
+from nepi.resources.linux.node import LinuxNode, ExitCode
+from nepi.util.sshfuncs import ProcStatus
from test_utils import skipIfNotAlive, skipInteractive, create_node
class LinuxNodeTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = 'nepi2.pl.sophia.inria.fr'
- self.fedora_user = 'inria_nepi'
+ self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_user = "inria_nepi"
- self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
- self.ubuntu_user = 'alina'
+ self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+ self.ubuntu_user = "alina"
- self.target = 'nepi5.pl.sophia.inria.fr'
-
- @skipIfNotAlive
- def t_xterm(self, host, user):
- node, ec = create_node(host, user)
-
- node.install_packages('xterm')
-
- (out, err), proc = node.execute('xterm', forward_x11 = True)
-
- self.assertEquals(out, "")
-
- (out, err), proc = node.remove_packages('xterm')
-
- self.assertEquals(out, "")
+ self.target = "nepi5.pl.sophia.inria.fr"
@skipIfNotAlive
def t_execute(self, host, user):
command = "ping %s" % self.target
node.run(command, app_home)
- pid, ppid = node.checkpid(app_home)
+ pid, ppid = node.getpid(app_home)
status = node.status(pid, ppid)
- self.assertTrue(status, RUNNING)
+ self.assertTrue(status, ProcStatus.RUNNING)
node.kill(pid, ppid)
status = node.status(pid, ppid)
- self.assertTrue(status, FINISHED)
+ self.assertTrue(status, ProcStatus.FINISHED)
(out, err), proc = node.check_output(app_home, "stdout")
node.rmdir(app_home)
+ @skipIfNotAlive
+ def t_exitcode_ok(self, host, user):
+ command = "echo 'OK!'"
+
+ node, ec = create_node(host, user)
+
+ app_home = os.path.join(node.exp_home, "my-app")
+ node.mkdir(app_home, clean = True)
+
+ (out, err), proc = node.run_and_wait(command, app_home,
+ shfile = "cmd.sh",
+ pidfile = "pid",
+ ecodefile = "exitcode",
+ stdout = "stdout",
+ stderr = "stderr",
+ raise_on_error = True)
+
+ # get the pid of the process
+ ecode = node.exitcode(app_home)
+ self.assertEquals(ecode, ExitCode.OK)
+
+ @skipIfNotAlive
+ def t_exitcode_kill(self, host, user):
+ node, ec = create_node(host, user)
+
+ app_home = os.path.join(node.exp_home, "my-app")
+ node.mkdir(app_home, clean = True)
+
+ # Upload command that will not finish
+ command = "ping localhost"
+ (out, err), proc = node.upload_command(command, app_home,
+ shfile = "cmd.sh",
+ ecodefile = "exitcode")
+
+ (out, err), proc = node.run(command, app_home,
+ pidfile = "pidfile",
+ stdout = "stdout",
+ stderr = "stderr")
+
+ # Just wait to make sure the ping started
+ time.sleep(5)
+
+ # The process is still running, so no retfile has been created yet
+ ecode = node.exitcode(app_home)
+ self.assertEquals(ecode, ExitCode.FILENOTFOUND)
+
+ (out, err), proc = node.check_errors(app_home)
+ self.assertEquals(err, "")
+
+ # Now kill the app
+ pid, ppid = node.getpid(app_home)
+ node.kill(pid, ppid)
+
+ (out, err), proc = node.check_errors(app_home)
+ self.assertEquals(err, "")
+
+ @skipIfNotAlive
+ def t_exitcode_error(self, host, user):
+ # Try to execute a command that doesn't exist
+ command = "unexistent-command"
+
+ node, ec = create_node(host, user)
+
+ app_home = os.path.join(node.exp_home, "my-app")
+ node.mkdir(app_home, clean = True)
+
+ (out, err), proc = node.run_and_wait(command, app_home,
+ shfile = "cmd.sh",
+ pidfile = "pid",
+ ecodefile = "exitcode",
+ stdout = "stdout",
+ stderr = "stderr",
+ raise_on_error = False)
+
+ # get the pid of the process
+ ecode = node.exitcode(app_home)
+ # bash erro 127 - command not found
+ self.assertEquals(ecode, 127)
+
+ (out, err), proc = node.check_errors(app_home)
+ self.assertNotEquals(out, "")
+
@skipIfNotAlive
def t_install(self, host, user):
node, ec = create_node(host, user)
- (out, err), proc = node.mkdir(node.node_home, clean=True)
+ (out, err), proc = node.mkdir(node.node_home, clean = True)
self.assertEquals(out, "")
- (out, err), proc = node.install_packages('gcc')
+ (out, err), proc = node.install_packages("gcc", node.node_home)
self.assertEquals(out, "")
- (out, err), proc = node.remove_packages('gcc')
+ (out, err), proc = node.remove_packages("gcc", node.node_home)
self.assertEquals(out, "")
(out, err), proc = node.rmdir(node.exp_home)
self.assertEquals(out, "")
+ @skipIfNotAlive
+ def t_xterm(self, host, user):
+ node, ec = create_node(host, user)
+
+ (out, err), proc = node.mkdir(node.node_home, clean = True)
+ self.assertEquals(out, "")
+
+ node.install_packages("xterm", node.node_home)
+ self.assertEquals(out, "")
+
+ (out, err), proc = node.execute("xterm", forward_x11 = True)
+ self.assertEquals(out, "")
+
+ (out, err), proc = node.remove_packages("xterm", node.node_home)
+ self.assertEquals(out, "")
+
@skipIfNotAlive
def t_compile(self, host, user):
node, ec = create_node(host, user)
node.upload(prog, dst, text = True)
# install gcc
- node.install_packages('gcc')
+ node.install_packages('gcc', app_home)
# compile the program using gcc
command = "cd %s; gcc -Wall hello.c -o hello" % app_home
# retrieve the output file
src = os.path.join(app_home, "hello.out")
- f = tempfile.NamedTemporaryFile(delete=False)
+ f = tempfile.NamedTemporaryFile(delete = False)
dst = f.name
node.download(src, dst)
f.close()
- node.remove_packages('gcc')
+ node.remove_packages("gcc", app_home)
node.rmdir(app_home)
f = open(dst, "r")
def test_compile_ubuntu(self):
self.t_compile(self.ubuntu_host, self.ubuntu_user)
+
+ def test_exitcode_ok_fedora(self):
+ self.t_exitcode_ok(self.fedora_host, self.fedora_user)
+
+ def test_exitcode_ok_ubuntu(self):
+ self.t_exitcode_ok(self.ubuntu_host, self.ubuntu_user)
+
+ def test_exitcode_kill_fedora(self):
+ self.t_exitcode_kill(self.fedora_host, self.fedora_user)
+
+ def test_exitcode_kill_ubuntu(self):
+ self.t_exitcode_kill(self.ubuntu_host, self.ubuntu_user)
+
+ def test_exitcode_error_fedora(self):
+ self.t_exitcode_error(self.fedora_host, self.fedora_user)
+
+ def test_exitcode_error_ubuntu(self):
+ self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user)
@skipInteractive
def test_xterm_ubuntu(self):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
- RUNNING, FINISHED
+from nepi.util.sshfuncs import rexec, rcopy, rspawn, rgetpid, rstatus, rkill,\
+ ProcStatus
import getpass
import unittest
time.sleep(2)
- (pid, ppid) = rcheckpid(pidfile,
+ (pid, ppid) = rgetpid(pidfile,
host = host,
user = user,
port = env.port,
port = env.port,
agent = True)
- self.assertEquals(status, RUNNING)
+ self.assertEquals(status, ProcStatus.RUNNING)
rkill(pid, ppid,
host = host,
port = env.port,
agent = True)
- self.assertEquals(status, FINISHED)
+ self.assertEquals(status, ProcStatus.FINISHED)
if __name__ == '__main__':