build = (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " test -f ${BIN}/ccnx-0.7.1/bin/ccnd"
" ) || ( "
# If not, untar and build
" ( "
- " mkdir -p ${SOURCES}/ccnx && "
- " tar xf ${SOURCES}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SOURCES}/ccnx "
+ " mkdir -p ${SRC}/ccnx-0.7.1 && "
+ " tar xf ${SRC}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SRC}/ccnx-0.7.1 "
" ) && "
- "cd ${SOURCES}/ccnx && "
+ "cd ${SRC}/ccnx-0.7.1 && "
# Just execute and silence warnings...
"( ./configure && make ) "
" )")
install = (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " test -f ${BIN}/ccnx-0.7.1/bin/ccnd"
" ) || ( "
- " mkdir -p ${EXP_HOME}/ccnx/bin && "
- " cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+ " mkdir -p ${BIN}/ccnx-0.7.1/bin && "
+ " cp -r ${SRC}/ccnx-0.7.1/bin ${BIN}/ccnx-0.7.1"
" )"
)
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+ env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
# BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr '
command = "ccndstart && "
return app
def add_publish(ec, movie):
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+ env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
command = "ccnseqwriter -r ccnx:/VIDEO"
app = ec.register_resource("LinuxApplication")
return app
def add_stream(ec):
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+ env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin"
command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) "
app = ec.register_resource("LinuxApplication")
default_key = default_key if os.path.exists(default_key) else None
pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
- usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results"
+ usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results>"
parser = OptionParser(usage=usage)
parser.add_option("-s", "--pl-user", dest="pl_user",
#"planetlab1.uc3m.es",
#"planetlab2.um.es",
"planet1.servers.ua.pt",
- "planetlab2.fct.ualg.pt",
+ #"planetlab2.fct.ualg.pt",
"planetlab-1.tagus.ist.utl.pt",
"planetlab-2.tagus.ist.utl.pt",
"planetlab-um00.di.uminho.pt",
"planetlab1.pjwstk.edu.pl",
"ple2.tu.koszalin.pl",
"planetlab2.ci.pwr.wroc.pl",
- "planetlab2.cyfronet.pl",
+ #"planetlab2.cyfronet.pl",
"plab2.ple.silweb.pl",
- "planetlab1.cyfronet.pl",
+ #"planetlab1.cyfronet.pl",
"plab4.ple.silweb.pl",
"ple2.dmcs.p.lodz.pl",
"planetlab2.pjwstk.edu.pl",
"planetlab-1.ing.unimo.it",
"gschembra4.diit.unict.it",
"iraplab1.iralab.uni-karlsruhe.de",
- "planetlab-1.fokus.fraunhofer.de",
+ #"planetlab-1.fokus.fraunhofer.de",
"iraplab2.iralab.uni-karlsruhe.de",
"planet2.zib.de",
#"pl2.uni-rostock.de",
"onelab-1.fhi-fokus.de",
"planet2.l3s.uni-hannover.de",
"planetlab1.exp-math.uni-essen.de",
- "planetlab-2.fokus.fraunhofer.de",
+ #"planetlab-2.fokus.fraunhofer.de",
"planetlab02.tkn.tu-berlin.de",
"planetlab1.informatik.uni-goettingen.de",
"planetlab1.informatik.uni-erlangen.de",
"orval.infonet.fundp.ac.be",
"rochefort.infonet.fundp.ac.be",
]
-
+
ec = ExperimentController(exp_id = exp_id)
for host in hostnames:
from nepi.util import guid
from nepi.util.parallel import ParallelRun
-from nepi.util.timefuncs import tnow, tdiffsec, stabsformat
+from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.resource import ResourceFactory, ResourceAction, \
ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
-# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
+# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
class ECState(object):
""" State of the Experiment Controller
"""
.. class:: Class Args :
- :param exp_id: Human readable identifier for the experiment.
- It will be used in the name of the directory
+ :param exp_id: Human readable identifier for the experiment scenario.
+ It will be used in the name of the directory
where experiment related information is stored
- :type exp_id: int
-
- :param root_dir: Root directory where experiment specific folder
- will be created to store experiment information
- :type root_dir: str
+ :type exp_id: str
.. note::
+
+ An experiment, or scenario, is defined by a concrete use, behavior,
+ configuration and interconnection of resources that describe a single
+ experiment case (We call this the experiment description).
+ A same experiment (scenario) can be run many times.
+
The ExperimentController (EC), is the entity responsible for
- managing a single experiment.
+ managing an experiment instance (run). The same scenario can be
+ recreated (and re-run) by instantiating an EC and recreating
+ the same experiment description.
+
+ In NEPI, an experiment is represented as a graph of interconnected
+ resources. A resource is a generic concept in the sense that any
+ component taking part of an experiment, whether physical of
+ virtual, is considered a resource. A resources could be a host,
+ a virtual machine, an application, a simulator, a IP address.
+
+ A ResourceManager (RM), is the entity responsible for managing a
+ single resource. ResourceManagers are specific to a resource
+ type (i.e. An RM to control a Linux application will not be
+ the same as the RM used to control a ns-3 simulation).
+ In order for a new type of resource to be supported in NEPI
+ a new RM must be implemented. NEPI already provides different
+ RMs to control basic resources, and new can be extended from
+ the existing ones.
+
Through the EC interface the user can create ResourceManagers (RMs),
- configure them and interconnect them, in order to describe the experiment.
-
- Only when the 'deploy()' method is invoked, the EC will take actions
- to transform the 'described' experiment into a 'running' experiment.
+ configure them and interconnect them, in order to describe an experiment.
+ Describing an experiment through the EC does not run the experiment.
+ Only when the 'deploy()' method is invoked on the EC, will the EC take
+ actions to transform the 'described' experiment into a 'running' experiment.
While the experiment is running, it is possible to continue to
create/configure/connect RMs, and to deploy them to involve new
- resources in the experiment.
-
+ resources in the experiment (this is known as 'interactive' deployment).
+
+ An experiments in NEPI is identified by a string id,
+ which is either given by the user, or automatically generated by NEPI.
+ The purpose of this identifier is to separate files and results that
+ belong to different experiment scenarios.
+ However, since a same 'experiment' can be run many times, the experiment
+ id is not enough to identify an experiment instance (run).
+ For this reason, the ExperimentController has two identifier, the
+ exp_id, which can be re-used by different ExperimentController instances,
+ and the run_id, which unique to a ExperimentController instance, and
+ is automatically generated by NEPI.
+
"""
- def __init__(self, exp_id = None, root_dir = "/tmp"):
+ def __init__(self, exp_id = None):
super(ExperimentController, self).__init__()
# root directory to store files
- self._root_dir = root_dir
- # experiment identifier given by the user
- self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+ # Run identifier. It identifies a concrete instance (run) of an experiment.
+ # Since a same experiment (same configuration) can be run many times,
+ # this id permits to identify concrete exoeriment run
+ self._run_id = tsformat()
+
+ # Experiment identifier. Usually assigned by the user
+ self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
@property
def exp_id(self):
- """ Return the experiment ID
+ """ Return the experiment id assigned by the user
+
+ """
+ return self._exp_id
+
+ @property
+ def run_id(self):
+ """ Return the experiment instance (run) identifier
"""
- exp_id = self._exp_id
- if not exp_id.startswith("nepi-"):
- exp_id = "nepi-" + exp_id
- return exp_id
+ return self._run_id
@property
def finished(self):
from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
ResourceAction
from nepi.util.sshfuncs import ProcStatus
-from nepi.util.timefuncs import tsformat
import os
import tempfile
raise RuntimeError, msg
store_dir = self.get("storeDir")
- timestamp = tsformat()
- self._store_path = os.path.join(store_dir, self.ec.exp_id, timestamp)
+ self._store_path = os.path.join(store_dir, self.ec.exp_id, self.ec.run_id)
msg = "Creating local directory at %s to store %s traces " % (
store_dir, trace_name)
import subprocess
# TODO: Resolve wildcards in commands!!
-# TODO: compare_hash for all files that are uploaded!
+# TODO: During provisioning, everything that is not scp could be
+# uploaded to a same script, http_sources download, etc...
+# and like that require performing less ssh connections!!!
@clsinit
class LinuxApplication(ResourceManager):
+ """
+ .. class:: Class Args :
+
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+
+ .. note::
+
+ A LinuxApplication RM represents a process that can be executed in
+ a remote Linux host using SSH.
+
+ The LinuxApplication RM takes care of uploadin sources and any files
+ needed to run the experiment, to the remote host.
+ It also allows to provide source compilation (build) and installation
+ instructions, and takes care of automating the sources build and
+ installation tasks for the user.
+
+ It is important to note that files uploaded to the remote host have
+ two possible scopes: single-experiment or multi-experiment.
+ Single experiment files are those that will not be re-used by other
+ experiments. Multi-experiment files are those that will.
+ Sources and shared files are always made available to all experiments.
+
+ Directory structure:
+
+ The directory structure used by LinuxApplication RM at the Linux
+ host is the following:
+
+ ${HOME}/nepi-usr --> Base directory for multi-experiment files
+ |
+ ${LIB} |- /lib --> Base directory for libraries
+ ${BIN} |- /bin --> Base directory for binary files
+ ${SRC} |- /src --> Base directory for sources
+ ${SHARE} |- /share --> Base directory for other files
+
+ ${HOME}/nepi-exp --> Base directory for single-experiment files
+ |
+ ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
+ |
+ ${APP_HOME} |- /<app-guid> --> Base directory for application
+ | specific files (e.g. command.sh, input)
+ |
+ ${RUN_HOME} |- /<run-id> --> Base directory for run specific
+
+ """
+
_rtype = "LinuxApplication"
@classmethod
def _register_attributes(cls):
- command = Attribute("command", "Command to execute",
+ command = Attribute("command", "Command to execute at application start. "
+ "Note that commands will be executed in the ${RUN_HOME} directory, "
+ "make sure to take this into account when using relative paths. ",
flags = Flags.ExecReadOnly)
forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
flags = Flags.ExecReadOnly)
"Space-separated list of packages required to run the application",
flags = Flags.ExecReadOnly)
sources = Attribute("sources",
- "Space-separated list of regular files to be deployed in the working "
- "path prior to building. Archives won't be expanded automatically.",
+ "Space-separated list of regular files to be uploaded to ${SRC} "
+ "directory prior to building. Archives won't be expanded automatically. "
+ "Sources are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all sources). ",
+ flags = Flags.ExecReadOnly)
+ files = Attribute("files",
+ "Space-separated list of regular miscellaneous files to be uploaded "
+ "to ${SHARE} directory. "
+ "Files are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.ExecReadOnly)
+ libs = Attribute("libs",
+ "Space-separated list of libraries (e.g. .so files) to be uploaded "
+ "to ${LIB} directory. "
+ "Libraries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.ExecReadOnly)
+ bins = Attribute("bins",
+ "Space-separated list of binary files to be uploaded "
+ "to ${BIN} directory. "
+ "Binaries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
flags = Flags.ExecReadOnly)
code = Attribute("code",
- "Plain text source code to be uploaded to the server. It will be stored "
- "under ${SOURCES}/code",
+ "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
flags = Flags.ExecReadOnly)
build = Attribute("build",
"Build commands to execute after deploying the sources. "
- "Sources will be in the ${SOURCES} folder. "
- "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
- "Try to make the commands return with a nonzero exit code on error.\n"
- "Also, do not install any programs here, use the 'install' attribute. This will "
- "help keep the built files constrained to the build folder (which may "
- "not be the home folder), and will result in faster deployment. Also, "
- "make sure to clean up temporary files, to reduce bandwidth usage between "
- "nodes when transferring built packages.",
+ "Sources are uploaded to the ${SRC} directory and code "
+ "is uploaded to the ${APP_HOME} directory. \n"
+ "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
+ "./configure && make && make clean.\n"
+ "Make sure to make the build commands return with a nonzero exit "
+ "code on error.",
flags = Flags.ReadOnly)
install = Attribute("install",
"Commands to transfer built files to their final destinations. "
- "Sources will be in the initial working folder, and a special "
- "tag ${SOURCES} can be used to reference the experiment's "
- "home folder (where the application commands will run).\n"
- "ALL sources and targets needed for execution must be copied there, "
- "if building has been enabled.\n"
- "That is, 'slave' nodes will not automatically get any source files. "
- "'slave' nodes don't get build dependencies either, so if you need "
- "make and other tools to install, be sure to provide them as "
- "actual dependencies instead.",
+ "Install commands are executed after build commands. ",
flags = Flags.ReadOnly)
- stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
- stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
- stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
- tear_down = Attribute("tearDown", "Bash script to be executed before "
+ stdin = Attribute("stdin", "Standard input for the 'command'",
+ flags = Flags.ExecReadOnly)
+ tear_down = Attribute("tearDown", "Command to be executed just before "
"releasing the resource",
flags = Flags.ReadOnly)
cls._register_attribute(depends)
cls._register_attribute(sources)
cls._register_attribute(code)
+ cls._register_attribute(files)
+ cls._register_attribute(bins)
+ cls._register_attribute(libs)
cls._register_attribute(build)
cls._register_attribute(install)
cls._register_attribute(stdin)
- cls._register_attribute(stdout)
- cls._register_attribute(stderr)
cls._register_attribute(tear_down)
@classmethod
# timestamp of last state check of the application
self._last_state_check = tnow()
-
+
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
self.node.get("hostname"), msg)
return os.path.join(self.node.exp_home, self._home)
@property
- def src_dir(self):
- return os.path.join(self.app_home, 'src')
-
- @property
- def build_dir(self):
- return os.path.join(self.app_home, 'build')
+ def run_home(self):
+ return os.path.join(self.app_home, self.ec.run_id)
@property
def pid(self):
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
- path = os.path.join(self.app_home, name)
+ path = os.path.join(self.run_home, name)
command = "(test -f %s && echo 'success') || echo 'error'" % path
(out, err), proc = self.node.execute(command)
return path
if attr == TraceAttr.ALL:
- (out, err), proc = self.node.check_output(self.app_home, name)
+ (out, err), proc = self.node.check_output(self.run_home, name)
if err and proc.poll():
msg = " Couldn't read trace %s " % name
return out
def provision(self):
- # create home dir for application
- self.node.mkdir(self.app_home)
-
- # upload sources
- self.upload_sources()
-
- # upload code
- self.upload_code()
-
- # upload stdin
- self.upload_stdin()
-
- # install dependencies
- self.install_dependencies()
-
- # build
- self.build()
-
- # Install
- self.install()
+ # create run dir for application
+ self.node.mkdir(self.run_home)
+
+ steps = [
+ # upload sources
+ self.upload_sources,
+ # upload files
+ self.upload_files,
+ # upload binaries
+ self.upload_binaries,
+ # upload libraries
+ self.upload_libraries,
+ # upload code
+ self.upload_code,
+ # upload stdin
+ self.upload_stdin,
+ # install dependencies
+ self.install_dependencies,
+ # build
+ self.build,
+ # Install
+ self.install]
+
+ # Since provisioning takes a long time, before
+ # each step we check that the EC is still
+ for step in steps:
+ if self.ec.finished:
+ raise RuntimeError, "EC finished"
+
+ step()
# Upload command to remote bash script
# - only if command can be executed in background and detached
env = self.get("env")
env = env and self.replace_paths(env)
- self.node.upload_command(command, self.app_home,
- shfile = "app.sh",
+ shfile = os.path.join(self.app_home, "app.sh")
+
+ self.node.upload_command(command,
+ shfile = shfile,
env = env)
self.info("Provisioning finished")
def upload_sources(self):
sources = self.get("sources")
+
if sources:
self.info("Uploading sources ")
- # create dir for sources
- self.node.mkdir(self.src_dir)
-
sources = sources.split(' ')
- http_sources = list()
+ # Separate sources that should be downloaded from
+ # the web, from sources that should be uploaded from
+ # the local machine
+ command = []
for source in list(sources):
if source.startswith("http") or source.startswith("https"):
- http_sources.append(source)
+ # remove the hhtp source from the sources list
sources.remove(source)
- # Download http sources remotely
- if http_sources:
- command = [" wget -c --directory-prefix=${SOURCES} "]
- check = []
-
- for source in http_sources:
- command.append(" %s " % (source))
- check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
-
- command = " ".join(command)
- check = " ; ".join(check)
-
- # Append the command to check that the sources were downloaded
- command += " ; %s " % check
+ command.append( " ( "
+ # Check if the source already exists
+ " ls ${SRC}/%(basename)s "
+ " || ( "
+ # If source doesn't exist, download it and check
+ # that it it downloaded ok
+ " wget -c --directory-prefix=${SRC} %(source)s && "
+ " ls ${SRC}/%(basename)s "
+ " ) ) " % {
+ "basename": os.path.basename(source),
+ "source": source
+ })
+
+ if command:
+ command = " && ".join(command)
# replace application specific paths in the command
command = self.replace_paths(command)
# Upload the command to a bash script and run it
# in background ( but wait until the command has
# finished to continue )
- self.node.run_and_wait(command, self.app_home,
- shfile = "http_sources.sh",
+ self.node.run_and_wait(command, self.run_home,
+ shfile = os.path.join(self.app_home, "http_sources.sh"),
+ overwrite = False,
pidfile = "http_sources_pidfile",
ecodefile = "http_sources_exitcode",
stdout = "http_sources_stdout",
stderr = "http_sources_stderr")
if sources:
- self.node.upload(sources, self.src_dir)
+ sources = ' '.join(sources)
+ self.node.upload(sources, self.node.src_dir, overwrite = False)
+
+ def upload_files(self):
+ files = self.get("files")
+
+ if files:
+ self.info("Uploading files %s " % files)
+ self.node.upload(files, self.node.share_dir, overwrite = False)
+
+ def upload_libraries(self):
+ libs = self.get("libs")
+
+ if libs:
+ self.info("Uploading libraries %s " % libaries)
+ self.node.upload(libs, self.node.lib_dir, overwrite = False)
+
+ def upload_binaries(self):
+ bins = self.get("bins")
+
+ if bins:
+ self.info("Uploading binaries %s " % binaries)
+ self.node.upload(bins, self.node.bin_dir, overwrite = False)
def upload_code(self):
code = self.get("code")
- if code:
- # create dir for sources
- self.node.mkdir(self.src_dir)
- self.info("Uploading code ")
+ if code:
+ self.info("Uploading code")
- dst = os.path.join(self.src_dir, "code")
- self.node.upload(sources, dst, text = True)
+ dst = os.path.join(self.app_home, "code")
+ self.node.upload(code, dst, overwrite = False, text = True)
def upload_stdin(self):
stdin = self.get("stdin")
if stdin:
# create dir for sources
- self.info(" Uploading stdin ")
+ self.info("Uploading stdin")
dst = os.path.join(self.app_home, "stdin")
-
- # If what we are uploading is a file, check whether
- # the same file already exists (using md5sum)
- if self.compare_hash(stdin, dst):
- return
-
- self.node.upload(stdin, dst, text = True)
+ self.node.upload(stdin, dst, overwrite = False, text = True)
def install_dependencies(self):
depends = self.get("depends")
if depends:
self.info("Installing dependencies %s" % depends)
- self.node.install_packages(depends, self.app_home)
+ self.node.install_packages(depends, self.app_home, self.run_home)
def build(self):
build = self.get("build")
+
if build:
self.info("Building sources ")
- # create dir for build
- self.node.mkdir(self.build_dir)
-
# replace application specific paths in the command
command = self.replace_paths(build)
# Upload the command to a bash script and run it
# in background ( but wait until the command has
# finished to continue )
- self.node.run_and_wait(command, self.app_home,
- shfile = "build.sh",
+ self.node.run_and_wait(command, self.run_home,
+ shfile = os.path.join(self.app_home, "build.sh"),
+ overwrite = False,
pidfile = "build_pidfile",
ecodefile = "build_exitcode",
stdout = "build_stdout",
def install(self):
install = self.get("install")
+
if install:
self.info("Installing sources ")
# Upload the command to a bash script and run it
# in background ( but wait until the command has
# finished to continue )
- self.node.run_and_wait(command, self.app_home,
- shfile = "install.sh",
+ self.node.run_and_wait(command, self.run_home,
+ shfile = os.path.join(self.app_home, "install.sh"),
+ overwrite = False,
pidfile = "install_pidfile",
ecodefile = "install_exitcode",
stdout = "install_stdout",
def _start_in_foreground(self):
command = self.get("command")
- stdin = "stdin" if self.get("stdin") else None
sudo = self.get("sudo") or False
x11 = self.get("forwardX11")
+ # For a command being executed in foreground, if there is stdin,
+ # it is expected to be text string not a file or pipe
+ stdin = self.get("stdin") or None
+
# Command will be launched in foreground and attached to the
# terminal using the node 'execute' in non blocking mode.
def _start_in_background(self):
command = self.get("command")
env = self.get("env")
- stdin = "stdin" if self.get("stdin") else None
- stdout = "stdout" if self.get("stdout") else "stdout"
- stderr = "stderr" if self.get("stderr") else "stderr"
sudo = self.get("sudo") or False
- # Command will be as a daemon in baground and detached from any terminal.
- # The real command to run was previously uploaded to a bash script
- # during deployment, now launch the remote script using 'run'
- # method from the node
- cmd = "bash ./app.sh"
- (out, err), proc = self.node.run(cmd, self.app_home,
+ stdout = "stdout"
+ stderr = "stderr"
+ stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
+ else None
+
+ # Command will be run as a daemon in baground and detached from any
+ # terminal.
+ # The command to run was previously uploaded to a bash script
+ # during deployment, now we launch the remote script using 'run'
+ # method from the node.
+ cmd = "bash %s" % os.path.join(self.app_home, "app.sh")
+ (out, err), proc = self.node.run(cmd, self.run_home,
stdin = stdin,
stdout = stdout,
stderr = stderr,
raise RuntimeError, msg
# Wait for pid file to be generated
- pid, ppid = self.node.wait_pid(self.app_home)
+ pid, ppid = self.node.wait_pid(self.run_home)
if pid: self._pid = int(pid)
if ppid: self._ppid = int(ppid)
# If the process is not running, check for error information
# on the remote machine
if not self.pid or not self.ppid:
- (out, err), proc = self.node.check_errors(self.app_home,
+ (out, err), proc = self.node.check_errors(self.run_home,
stderr = stderr)
# Out is what was written in the stderr file
state_check_delay = 0.5
if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.app_home)
+ (out, err), proc = self.node.check_errors(self.run_home)
if err:
msg = " Failed to execute command '%s'" % self.get("command")
"""
Replace all special path tags with shell-escaped actual paths.
"""
- def absolute_dir(d):
- return d if d.startswith("/") else os.path.join("${HOME}", d)
-
return ( command
- .replace("${SOURCES}", absolute_dir(self.src_dir))
- .replace("${BUILD}", absolute_dir(self.build_dir))
- .replace("${APP_HOME}", absolute_dir(self.app_home))
- .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
- .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+ .replace("${USR}", self.node.usr_dir)
+ .replace("${LIB}", self.node.lib_dir)
+ .replace("${BIN}", self.node.bin_dir)
+ .replace("${SRC}", self.node.src_dir)
+ .replace("${SHARE}", self.node.share_dir)
+ .replace("${EXP}", self.node.exp_dir)
+ .replace("${EXP_HOME}", self.node.exp_home)
+ .replace("${APP_HOME}", self.app_home)
+ .replace("${RUN_HOME}", self.run_home)
+ .replace("${NODE_HOME}", self.node.node_home)
+ .replace("${HOME}", self.node.home_dir)
)
- def compare_hash(self, local, remote):
- # getting md5sum from remote file
- (out, err), proc = self.node.execute("md5sum %s " % remote)
-
- if proc.poll() == 0: #OK
- if not os.path.isfile(local):
- # store to a tmp file
- f = tempfile.NamedTemporaryFile()
- f.write(local)
- f.flush()
- local = f.name
-
- lproc = subprocess.Popen(["md5sum", local],
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE)
-
- # getting md5sum from local file
- (lout, lerr) = lproc.communicate()
-
- # files are the same, no need to upload
- lchk = lout.strip().split(" ")[0]
- rchk = out.strip().split(" ")[0]
-
- msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
- local, lchk, remote, rchk)
- self.debug(msg)
-
- if lchk == rchk:
- return True
-
- return False
-
def valid_connection(self, guid):
# TODO: Validate!
return True
@property
def _environment(self):
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+ env = "PATH=$PATH:${STORE}/ccnx/bin "
return env
def execute_command(self, command, env):
command = environ + command
command = self.replace_paths(command)
- (out, err), proc = self.node.execute(command)
-
- if proc.poll():
- self._state = ResourceState.FAILED
- self.error(msg, out, err)
- raise RuntimeError, msg
+ return self.node.execute(command)
def valid_connection(self, guid):
# TODO: Validate!
# Run the command as a bash script in the background,
# in the host ( but wait until the command has
# finished to continue )
- self.execute_command(command, env)
+ (out, err), proc = self.execute_command(command, env)
+
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ msg = "Failed to execute command"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
self.debug("----- READY ---- ")
self._ready_time = tnow()
def __init__(self, ec, guid):
super(LinuxCCND, self).__init__(ec, guid)
self._home = "ccnd-%s" % self.guid
+ self._version = None
+ self._environment = None
def deploy(self):
if not self.node or self.node.state < ResourceState.READY:
return (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+ " test -f ${STORE}/ccnx/bin/ccnd && "
" echo 'sources found, nothing to do' "
" ) || ( "
# If not, untar and build
" ( "
- " mkdir -p ${SOURCES}/ccnx && "
- " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
+ " mkdir -p ${STORE}/ccnx && "
+ " tar xf ${STORE}/%(sources)s --strip-components=1 -C ${STORE}/ccnx "
" ) && "
- "cd ${SOURCES}/ccnx && "
+ "cd ${STORE}/ccnx && "
# Just execute and silence warnings...
" ( ./configure && make ) "
" )") % ({ 'sources': sources })
return (
# Evaluate if ccnx binaries are already installed
" ( "
- " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+ " test -f ${SOURCES}/ccnx/bin/ccnd && "
" echo 'sources found, nothing to do' "
" ) || ( "
# If not, install
- " mkdir -p ${EXP_HOME}/ccnx/bin && "
- " cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+ " mkdir -p ${SOURCES}/ccnx/bin && "
+ " cp -r ${}/ccnx ${STORE}"
" )"
)
"prefix" : "CCND_PREFIX",
})
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+ env = "PATH=$PATH:${SOURCES}/ccnx/bin "
env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), str(self.get(k))) \
if self.get(k) else "", envs.keys()))
"ccnsSyncScope": "CCNS_SYNC_SCOPE",
})
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+ env = "PATH=$PATH:${STORE}/ccnx/bin "
env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
if self.get(k) else "", envs.keys()))
self.info("Deploying command '%s' " % command)
self.node.mkdir(self.app_home)
- self.execute_command(command, env)
+ (out, err), proc = self.execute_command(command, env)
+
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ msg = "Failed to execute command"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
self.debug("----- READY ---- ")
self._ready_time = tnow()
self.info("Stopping command '%s'" % command)
command = self._stop_command
- self.execute_command(command, env)
+ (out, err), proc = self.execute_command(command, env)
+
+ if proc.poll():
+ pass
self._stop_time = tnow()
self._state = ResourceState.STOPPED
import tempfile
import time
import threading
+import traceback
-# TODO: Verify files and dirs exists already
-# TODO: Blacklist nodes!
# TODO: Unify delays!!
# TODO: Validate outcome of uploads!!
-
class ExitCode:
"""
Error codes that the rexitcode function can return if unable to
server_key = Attribute("serverKey", "Server public key",
flags = Flags.ExecReadOnly)
- clean_home = Attribute("cleanHome", "Remove all files and directories " + \
- " from home folder before starting experiment",
+ clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
+ " from node home folder before starting experiment",
+ flags = Flags.ExecReadOnly)
+
+ clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
+ " from a previous same experiment, before the new experiment starts",
flags = Flags.ExecReadOnly)
clean_processes = Attribute("cleanProcesses",
cls._register_attribute(identity)
cls._register_attribute(server_key)
cls._register_attribute(clean_home)
+ cls._register_attribute(clean_experiment)
cls._register_attribute(clean_processes)
cls._register_attribute(tear_down)
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
self._os = None
+ # home directory at Linux host
+ self._home_dir = ""
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
self.get("hostname"), msg)
@property
- def home(self):
- return self.get("home") or ""
+ def home_dir(self):
+ home = self.get("home") or ""
+ if not home.startswith("/"):
+ home = os.path.join(self._home_dir, home)
+ return home
+
+ @property
+ def usr_dir(self):
+ return os.path.join(self.home_dir, "nepi-usr")
+
+ @property
+ def lib_dir(self):
+ return os.path.join(self.usr_dir, "lib")
+
+ @property
+ def bin_dir(self):
+ return os.path.join(self.usr_dir, "bin")
+
+ @property
+ def src_dir(self):
+ return os.path.join(self.usr_dir, "src")
+
+ @property
+ def share_dir(self):
+ return os.path.join(self.usr_dir, "share")
+
+ @property
+ def exp_dir(self):
+ return os.path.join(self.home_dir, "nepi-exp")
@property
def exp_home(self):
- return os.path.join(self.home, self.ec.exp_id)
+ return os.path.join(self.exp_dir, self.ec.exp_id)
@property
def node_home(self):
- node_home = "node-%d" % self.guid
- return os.path.join(self.exp_home, node_home)
+ return os.path.join(self.exp_home, "node-%d" % self.guid)
+
+ @property
+ def run_home(self):
+ return os.path.join(self.node_home, self.ec.run_id)
@property
def os(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
def provision(self):
+ # check if host is alive
if not self.is_alive():
- self._state = ResourceState.FAILED
+ self.fail()
+
msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
self.error(msg)
raise RuntimeError, msg
+ self.find_home()
+
if self.get("cleanProcesses"):
self.clean_processes()
if self.get("cleanHome"):
self.clean_home()
-
+
+ if self.get("cleanExperiment"):
+ self.clean_experiment()
+
+ # Create shared directory structure
+ self.mkdir(self.lib_dir)
+ self.mkdir(self.bin_dir)
+ self.mkdir(self.src_dir)
+ self.mkdir(self.share_dir)
+
+ # Create experiment node home directory
self.mkdir(self.node_home)
super(LinuxNode, self).provision()
if tear_down:
self.execute(tear_down)
+ self.clean_processes()
+
super(LinuxNode, self).release()
def valid_connection(self, guid):
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
def clean_home(self):
+ """ Cleans all NEPI related folders in the Linux host
+ """
self.info("Cleaning up home")
- cmd = (
- # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
- "find . -maxdepth 1 -name 'nepi-*' " +
- " -execdir rm -rf {} + "
- )
+ cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+ self.home_dir )
+
+ return self.execute(cmd, with_lock = True)
+
+ def clean_experiment(self):
+ """ Cleans all experiment related files in the Linux host.
+ It preserves NEPI files and folders that have a multi experiment
+ scope.
+ """
+ self.info("Cleaning up experiment files")
+
+ cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
+ self.exp_dir,
+ self.ec.exp_id )
- if self.home:
- cmd = "cd %s ; " % self.home + cmd
+ return self.execute(cmd, with_lock = True)
+
+ def execute(self, command,
+ sudo = False,
+ stdin = None,
+ env = None,
+ tty = False,
+ forward_x11 = False,
+ timeout = None,
+ retry = 3,
+ err_on_timeout = True,
+ connect_timeout = 30,
+ strict_host_checking = False,
+ persistent = True,
+ blocking = True,
+ with_lock = False
+ ):
+ """ Notice that this invocation will block until the
+ execution finishes. If this is not the desired behavior,
+ use 'run' instead."""
+ if self.localhost:
+ (out, err), proc = execfuncs.lexec(command,
+ user = user,
+ sudo = sudo,
+ stdin = stdin,
+ env = env)
+ else:
+ if with_lock:
+ with self._lock:
+ (out, err), proc = sshfuncs.rexec(
+ command,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent,
+ blocking = blocking,
+ strict_host_checking = strict_host_checking
+ )
+ else:
+ (out, err), proc = sshfuncs.rexec(
+ command,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent,
+ blocking = blocking,
+ strict_host_checking = strict_host_checking
+ )
+
+ return (out, err), proc
+
+ def run(self, command, home,
+ create_home = False,
+ pidfile = 'pidfile',
+ stdin = None,
+ stdout = 'stdout',
+ stderr = 'stderr',
+ sudo = False,
+ tty = False):
+
+ self.debug("Running command '%s'" % command)
+
+ if self.localhost:
+ (out, err), proc = execfuncs.lspawn(command, pidfile,
+ stdout = stdout,
+ stderr = stderr,
+ stdin = stdin,
+ home = home,
+ create_home = create_home,
+ sudo = sudo,
+ user = user)
+ else:
+ with self._lock:
+ (out, err), proc = sshfuncs.rspawn(
+ command,
+ pidfile = pidfile,
+ home = home,
+ create_home = create_home,
+ stdin = stdin if stdin is not None else '/dev/null',
+ stdout = stdout if stdout else '/dev/null',
+ stderr = stderr if stderr else '/dev/null',
+ sudo = sudo,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ tty = tty
+ )
+
+ return (out, err), proc
+
+ def getpid(self, home, pidfile = "pidfile"):
+ if self.localhost:
+ pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
+ else:
+ with self._lock:
+ pidtuple = sshfuncs.rgetpid(
+ os.path.join(home, pidfile),
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
+
+ return pidtuple
+
+ def status(self, pid, ppid):
+ if self.localhost:
+ status = execfuncs.lstatus(pid, ppid)
+ else:
+ with self._lock:
+ status = sshfuncs.rstatus(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
+
+ return status
+
+ def kill(self, pid, ppid, sudo = False):
out = err = ""
- (out, err), proc = self.execute(cmd, with_lock = True)
+ proc = None
+ status = self.status(pid, ppid)
+
+ if status == sshfuncs.ProcStatus.RUNNING:
+ if self.localhost:
+ (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+ else:
+ with self._lock:
+ (out, err), proc = sshfuncs.rkill(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
+
+ return (out, err), proc
- def upload(self, src, dst, text = False):
+ def copy(self, src, dst):
+ if self.localhost:
+ (out, err), proc = execfuncs.lcopy(source, dest,
+ recursive = True,
+ strict_host_checking = False)
+ else:
+ with self._lock:
+ (out, err), proc = sshfuncs.rcopy(
+ src, dst,
+ port = self.get("port"),
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ recursive = True,
+ strict_host_checking = False)
+
+ return (out, err), proc
+
+
+ def upload(self, src, dst, text = False, overwrite = True):
""" Copy content to destination
src content to copy. Can be a local file, directory or a list of files
f.close()
src = f.name
+ # If dst files should not be overwritten, check that the files do not
+ # exits already
+ if overwrite == False:
+ src = self.filter_existing_files(src, dst)
+ if not src:
+ return ("", ""), None
+
if not self.localhost:
# Build destination as <user>@<server>:<path>
dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
result = self.copy(src, dst)
# clean up temp file
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
- def install_packages(self, packages, home):
+ def install_packages(self, packages, home, run_home = None):
+ """ Install packages in the Linux host.
+
+ 'home' is the directory to upload the package installation script.
+ 'run_home' is the directory from where to execute the script.
+ """
command = ""
if self.use_rpm:
command = rpmfuncs.install_packages_command(self.os, packages)
self.error(msg, self.os)
raise RuntimeError, msg
- out = err = ""
- (out, err), proc = self.run_and_wait(command, home,
- shfile = "instpkg.sh",
+ run_home = run_home or home
+
+ (out, err), proc = self.run_and_wait(command, run_home,
+ shfile = os.path.join(home, "instpkg.sh"),
pidfile = "instpkg_pidfile",
ecodefile = "instpkg_exitcode",
stdout = "instpkg_stdout",
stderr = "instpkg_stderr",
+ overwrite = False,
raise_on_error = True)
return (out, err), proc
- def remove_packages(self, packages, home):
- command = ""
+ def remove_packages(self, packages, home, run_home = None):
+ """ Uninstall packages from the Linux host.
+
+ 'home' is the directory to upload the package un-installation script.
+ 'run_home' is the directory from where to execute the script.
+ """
if self.use_rpm:
command = rpmfuncs.remove_packages_command(self.os, packages)
elif self.use_deb:
self.error(msg)
raise RuntimeError, msg
- out = err = ""
- (out, err), proc = self.run_and_wait(command, home,
- shfile = "rmpkg.sh",
+ run_home = run_home or home
+
+ (out, err), proc = self.run_and_wait(command, run_home,
+ shfile = os.path.join(home, "rmpkg.sh"),
pidfile = "rmpkg_pidfile",
ecodefile = "rmpkg_exitcode",
stdout = "rmpkg_stdout",
stderr = "rmpkg_stderr",
+ overwrite = False,
raise_on_error = True)
return (out, err), proc
def run_and_wait(self, command, home,
shfile = "cmd.sh",
env = None,
+ overwrite = True,
pidfile = "pidfile",
ecodefile = "exitcode",
stdin = None,
Then runs the script detached in background in the host, and
busy-waites until the script finishes executing.
"""
- self.upload_command(command, home,
+
+ if not shfile.startswith("/"):
+ shfile = os.path.join(home, shfile)
+
+ self.upload_command(command,
shfile = shfile,
ecodefile = ecodefile,
- env = env)
+ env = env,
+ overwrite = overwrite)
- command = "bash ./%s" % shfile
+ command = "bash %s" % shfile
# run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
# Other error from 'cat'
return ExitCode.ERROR
- def upload_command(self, command, home,
+ def upload_command(self, command,
shfile = "cmd.sh",
ecodefile = "exitcode",
+ overwrite = True,
env = None):
""" Saves the command as a bash script file in the remote host, and
forces to save the exit code of the command execution to the ecodefile
# Add environ to command
command = environ + command
- dst = os.path.join(home, shfile)
- return self.upload(command, dst, text = True)
+ return self.upload(command, shfile, text = True, overwrite = overwrite)
def format_environment(self, env, inline = False):
"""Format environmental variables for command to be executed either
return (out, err), proc
def is_alive(self):
+ """ Checks if host is responsive
+ """
if self.localhost:
return True
out = err = ""
try:
- # TODO: FIX NOT ALIVE!!!!
- (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
+ (out, err), proc = self.execute("echo 'ALIVE'",
+ retry = 5,
with_lock = True)
except:
- import traceback
trace = traceback.format_exc()
msg = "Unresponsive host %s " % err
self.error(msg, out, trace)
return False
- if out.strip().startswith('ALIVE'):
+ if out.strip() == "ALIVE":
return True
else:
msg = "Unresponsive host "
self.error(msg, out, err)
return False
- def copy(self, src, dst):
- if self.localhost:
- (out, err), proc = execfuncs.lcopy(source, dest,
- recursive = True,
- strict_host_checking = False)
- else:
- with self._lock:
- (out, err), proc = sshfuncs.rcopy(
- src, dst,
- port = self.get("port"),
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- recursive = True,
- strict_host_checking = False)
-
- return (out, err), proc
-
- def execute(self, command,
- sudo = False,
- stdin = None,
- env = None,
- tty = False,
- forward_x11 = False,
- timeout = None,
- retry = 3,
- err_on_timeout = True,
- connect_timeout = 30,
- strict_host_checking = False,
- persistent = True,
- blocking = True,
- with_lock = False
- ):
- """ Notice that this invocation will block until the
- execution finishes. If this is not the desired behavior,
- use 'run' instead."""
+ def find_home(self):
+ """ Retrieves host home directory
+ """
+ (out, err), proc = self.execute("echo ${HOME}", retry = 5,
+ with_lock = True)
- if self.localhost:
- (out, err), proc = execfuncs.lexec(command,
- user = user,
- sudo = sudo,
- stdin = stdin,
- env = env)
- else:
- if with_lock:
- with self._lock:
- (out, err), proc = sshfuncs.rexec(
- command,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- sudo = sudo,
- stdin = stdin,
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- env = env,
- tty = tty,
- forward_x11 = forward_x11,
- timeout = timeout,
- retry = retry,
- err_on_timeout = err_on_timeout,
- connect_timeout = connect_timeout,
- persistent = persistent,
- blocking = blocking,
- strict_host_checking = strict_host_checking
- )
- else:
- (out, err), proc = sshfuncs.rexec(
- command,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- sudo = sudo,
- stdin = stdin,
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- env = env,
- tty = tty,
- forward_x11 = forward_x11,
- timeout = timeout,
- retry = retry,
- err_on_timeout = err_on_timeout,
- connect_timeout = connect_timeout,
- persistent = persistent,
- blocking = blocking,
- strict_host_checking = strict_host_checking
- )
+ if proc.poll():
+ msg = "Imposible to retrieve HOME directory"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- return (out, err), proc
+ self._home_dir = out.strip()
- def run(self, command, home,
- create_home = False,
- pidfile = 'pidfile',
- stdin = None,
- stdout = 'stdout',
- stderr = 'stderr',
- sudo = False,
- tty = False):
-
- self.debug("Running command '%s'" % command)
-
- if self.localhost:
- (out, err), proc = execfuncs.lspawn(command, pidfile,
- stdout = stdout,
- stderr = stderr,
- stdin = stdin,
- home = home,
- create_home = create_home,
- sudo = sudo,
- user = user)
- else:
- with self._lock:
- (out, err), proc = sshfuncs.rspawn(
- command,
- pidfile = pidfile,
- home = home,
- create_home = create_home,
- stdin = stdin if stdin is not None else '/dev/null',
- stdout = stdout if stdout else '/dev/null',
- stderr = stderr if stderr else '/dev/null',
- sudo = sudo,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- tty = tty
- )
+ def filter_existing_files(self, src, dst):
+ """ Removes files that already exist in the Linux host from src list
+ """
+ # construct a dictionary with { dst: src }
+ dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
+ src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
- return (out, err), proc
+ command = []
+ for d in dests.keys():
+ command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
- def getpid(self, home, pidfile = "pidfile"):
- if self.localhost:
- pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
- else:
- with self._lock:
- pidtuple = sshfuncs.rgetpid(
- os.path.join(home, pidfile),
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
-
- return pidtuple
+ command = ";".join(command)
- def status(self, pid, ppid):
- if self.localhost:
- status = execfuncs.lstatus(pid, ppid)
- else:
- with self._lock:
- status = sshfuncs.rstatus(
- pid, ppid,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
-
- return status
+ (out, err), proc = self.execute(command, retry = 1, with_lock = True)
- def kill(self, pid, ppid, sudo = False):
- out = err = ""
- proc = None
- status = self.status(pid, ppid)
+ for d in dests.keys():
+ if out.find(d) > -1:
+ del dests[d]
- if status == sshfuncs.ProcStatus.RUNNING:
- if self.localhost:
- (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
- else:
- with self._lock:
- (out, err), proc = sshfuncs.rkill(
- pid, ppid,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- sudo = sudo,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
+ if not dests:
+ return ""
- return (out, err), proc
+ return " ".join(dests.values())
else:
out = err = ""
if proc.poll():
- err = self._proc.stderr.read()
+ err = proc.stderr.read()
msg = " rexec - host %s - command %s " % (host, " ".join(args))
log(msg, logging.DEBUG, out, err)
return (out, err), proc
# POSIX
-def _communicate(self, input, timeout=None, err_on_timeout=True):
+def _communicate(proc, input, timeout=None, err_on_timeout=True):
read_set = []
write_set = []
stdout = None # Return
killtime = timelimit + 4
bailtime = timelimit + 4
- if self.stdin:
+ if proc.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
- self.stdin.flush()
+ proc.stdin.flush()
if input:
- write_set.append(self.stdin)
+ write_set.append(proc.stdin)
else:
- self.stdin.close()
- if self.stdout:
- read_set.append(self.stdout)
+ proc.stdin.close()
+
+ if proc.stdout:
+ read_set.append(proc.stdout)
stdout = []
- if self.stderr:
- read_set.append(self.stderr)
+
+ if proc.stderr:
+ read_set.append(proc.stderr)
stderr = []
input_offset = 0
else:
signum = signal.SIGTERM
# Lets kill it
- os.kill(self.pid, signum)
+ os.kill(proc.pid, signum)
select_timeout = 0.5
else:
select_timeout = timelimit - curtime + 0.1
else:
continue
- if not rlist and not wlist and not xlist and self.poll() is not None:
+ if not rlist and not wlist and not xlist and proc.poll() is not None:
# timeout and process exited, say bye
break
- if self.stdin in wlist:
+ if proc.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
- bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+ bytes_written = os.write(proc.stdin.fileno(),
+ buffer(input, input_offset, 512))
input_offset += bytes_written
+
if input_offset >= len(input):
- self.stdin.close()
- write_set.remove(self.stdin)
+ proc.stdin.close()
+ write_set.remove(proc.stdin)
- if self.stdout in rlist:
- data = os.read(self.stdout.fileno(), 1024)
+ if proc.stdout in rlist:
+ data = os.read(proc.stdout.fileno(), 1024)
if data == "":
- self.stdout.close()
- read_set.remove(self.stdout)
+ proc.stdout.close()
+ read_set.remove(proc.stdout)
stdout.append(data)
- if self.stderr in rlist:
- data = os.read(self.stderr.fileno(), 1024)
+ if proc.stderr in rlist:
+ data = os.read(proc.stderr.fileno(), 1024)
if data == "":
- self.stderr.close()
- read_set.remove(self.stderr)
+ proc.stderr.close()
+ read_set.remove(proc.stderr)
stderr.append(data)
# All data exchanged. Translate lists into strings.
# object do the translation: It is based on stdio, which is
# impossible to combine with select (unless forcing no
# buffering).
- if self.universal_newlines and hasattr(file, 'newlines'):
+ if proc.universal_newlines and hasattr(file, 'newlines'):
if stdout:
- stdout = self._translate_newlines(stdout)
+ stdout = proc._translate_newlines(stdout)
if stderr:
- stderr = self._translate_newlines(stderr)
+ stderr = proc._translate_newlines(stderr)
if killed and err_on_timeout:
- errcode = self.poll()
+ errcode = proc.poll()
raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
else:
if killed:
- self.poll()
+ proc.poll()
else:
- self.wait()
+ proc.wait()
return (stdout, stderr)
ec.deploy()
- ec.wait_finished([app])
+ ec.wait_finished(app)
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
ec.deploy()
- ec.wait_finished([app])
+ ec.wait_finished(app)
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.app_home, "stdout")
+ p = os.path.join(rm.run_home, "stdout")
self.assertEquals(path, p)
ec.shutdown()
+ @skipIfNotAlive
+ def t_code(self, host, user):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ prog = """#include <stdio.h>
+
+int
+main (void)
+{
+ printf ("Hello, world!\\n");
+ return 0;
+}
+"""
+ cmd = "${RUN_HOME}/hello"
+ build = "gcc -Wall -x c ${APP_HOME}/code -o hello"
+
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", cmd)
+ ec.set(app, "code", prog)
+ ec.set(app, "depends", "gcc")
+ ec.set(app, "build", build)
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished(app)
+
+ out = ec.trace(app, 'stdout')
+ self.assertEquals(out, "Hello, world!\n")
+
+ ec.shutdown()
+
@skipIfNotAlive
def t_concurrency(self, host, user):
from nepi.execution.resource import ResourceFactory
path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.app_home, 'stdout')
+ p = os.path.join(rm.run_home, 'stdout')
self.assertEquals(path, p)
ec.shutdown()
def test_http_sources_ubuntu(self):
self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
+ def test_code_fedora(self):
+ self.t_code(self.fedora_host, self.fedora_user)
+
+ def test_code_ubuntu(self):
+ self.t_code(self.ubuntu_host, self.ubuntu_user)
+
@skipInteractive
def test_xterm_ubuntu(self):
""" Interactive test. Should not run automatically """
def t_run(self, host, user):
node, ec = create_node(host, user)
+ node.find_home()
app_home = os.path.join(node.exp_home, "my-app")
node.mkdir(app_home, clean = True)
node, ec = create_node(host, user)
+ node.find_home()
app_home = os.path.join(node.exp_home, "my-app")
node.mkdir(app_home, clean = True)
def t_exitcode_kill(self, host, user):
node, ec = create_node(host, user)
+ node.find_home()
app_home = os.path.join(node.exp_home, "my-app")
node.mkdir(app_home, clean = True)
# Upload command that will not finish
command = "ping localhost"
- (out, err), proc = node.upload_command(command, app_home,
- shfile = "cmd.sh",
+ shfile = os.path.join(app_home, "cmd.sh")
+ (out, err), proc = node.upload_command(command,
+ shfile = shfile,
ecodefile = "exitcode")
(out, err), proc = node.run(command, app_home,
node, ec = create_node(host, user)
+ node.find_home()
app_home = os.path.join(node.exp_home, "my-app")
node.mkdir(app_home, clean = True)
(out, err), proc = node.check_errors(app_home)
- self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found")
+ self.assertTrue(err.find("cmd.sh: line 1: unexistent-command: command not found") > -1)
@skipIfNotAlive
def t_install(self, host, user):
node, ec = create_node(host, user)
+ node.find_home()
(out, err), proc = node.mkdir(node.node_home, clean = True)
self.assertEquals(err, "")
(out, err), proc = node.rmdir(node.exp_home)
self.assertEquals(err, "")
+ @skipIfNotAlive
+ def t_clean(self, host, user):
+ node, ec = create_node(host, user)
+
+ node.find_home()
+ node.mkdir(node.lib_dir)
+ node.mkdir(node.node_home)
+
+ command1 = " [ -d %s ] && echo 'Found'" % node.lib_dir
+ (out, err), proc = node.execute(command1)
+
+ self.assertEquals(out.strip(), "Found")
+
+ command2 = " [ -d %s ] && echo 'Found'" % node.node_home
+ (out, err), proc = node.execute(command2)
+
+ self.assertEquals(out.strip(), "Found")
+
+ node.clean_experiment()
+
+ (out, err), proc = node.execute(command2)
+
+ self.assertEquals(out.strip(), "")
+
+ node.clean_home()
+
+ (out, err), proc = node.execute(command1)
+
+ self.assertEquals(out.strip(), "")
+
+
@skipIfNotAlive
def t_xterm(self, host, user):
node, ec = create_node(host, user)
+ node.find_home()
(out, err), proc = node.mkdir(node.node_home, clean = True)
self.assertEquals(err, "")
def t_compile(self, host, user):
node, ec = create_node(host, user)
+ node.find_home()
app_home = os.path.join(node.exp_home, "my-app")
node.mkdir(app_home, clean = True)
def test_exitcode_error_ubuntu(self):
self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user)
-
+
+ def test_clean_fedora(self):
+ self.t_clean(self.fedora_host, self.fedora_user)
+
+ def test_clean_ubuntu(self):
+ self.t_clean(self.ubuntu_host, self.ubuntu_user)
+
@skipInteractive
def test_xterm_ubuntu(self):
""" Interactive test. Should not run automatically """