From 87f44a7c2853afb7021276dd3700858cff950703 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 4 Jul 2013 12:28:55 -0700 Subject: [PATCH] LinuxApplication: Changed directory structure to store experiment files in the Linux host --- examples/linux/ccn/ccncat_2_nodes.py | 20 +- .../linux/ccn/ccncat_extended_ring_topo.py | 2 +- examples/linux/scalability.py | 12 +- src/nepi/execution/ec.py | 90 ++- src/nepi/resources/all/collector.py | 4 +- src/nepi/resources/linux/application.py | 378 +++++++----- .../resources/linux/ccn/ccnapplication.py | 9 +- src/nepi/resources/linux/ccn/ccncontent.py | 8 +- src/nepi/resources/linux/ccn/ccnd.py | 18 +- src/nepi/resources/linux/ccn/ccnr.py | 2 +- src/nepi/resources/linux/ccn/fibentry.py | 14 +- src/nepi/resources/linux/node.py | 565 +++++++++++------- src/nepi/util/sshfuncs.py | 64 +- test/resources/linux/application.py | 57 +- test/resources/linux/node.py | 53 +- 15 files changed, 817 insertions(+), 479 deletions(-) diff --git a/examples/linux/ccn/ccncat_2_nodes.py b/examples/linux/ccn/ccncat_2_nodes.py index 55133dab..60d4362a 100755 --- a/examples/linux/ccn/ccncat_2_nodes.py +++ b/examples/linux/ccn/ccncat_2_nodes.py @@ -71,14 +71,14 @@ def add_ccnd(ec, os_type, peers): build = ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd" + " test -f ${BIN}/ccnx-0.7.1/bin/ccnd" " ) || ( " # If not, untar and build " ( " - " mkdir -p ${SOURCES}/ccnx && " - " tar xf ${SOURCES}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SOURCES}/ccnx " + " mkdir -p ${SRC}/ccnx-0.7.1 && " + " tar xf ${SRC}/ccnx-0.7.1.tar.gz --strip-components=1 -C ${SRC}/ccnx-0.7.1 " " ) && " - "cd ${SOURCES}/ccnx && " + "cd ${SRC}/ccnx-0.7.1 && " # Just execute and silence warnings... "( ./configure && make ) " " )") @@ -86,14 +86,14 @@ def add_ccnd(ec, os_type, peers): install = ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd" + " test -f ${BIN}/ccnx-0.7.1/bin/ccnd" " ) || ( " - " mkdir -p ${EXP_HOME}/ccnx/bin && " - " cp -r ${SOURCES}/ccnx ${EXP_HOME}" + " mkdir -p ${BIN}/ccnx-0.7.1/bin && " + " cp -r ${SRC}/ccnx-0.7.1/bin ${BIN}/ccnx-0.7.1" " )" ) - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin" # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr ' command = "ccndstart && " @@ -112,7 +112,7 @@ def add_ccnd(ec, os_type, peers): return app def add_publish(ec, movie): - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin" command = "ccnseqwriter -r ccnx:/VIDEO" app = ec.register_resource("LinuxApplication") @@ -123,7 +123,7 @@ def add_publish(ec, movie): return app def add_stream(ec): - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" + env = "PATH=$PATH:${BIN}/ccnx-0.7.1/bin" command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) " app = ec.register_resource("LinuxApplication") diff --git a/examples/linux/ccn/ccncat_extended_ring_topo.py b/examples/linux/ccn/ccncat_extended_ring_topo.py index 08625eaf..e91351a9 100755 --- a/examples/linux/ccn/ccncat_extended_ring_topo.py +++ b/examples/linux/ccn/ccncat_extended_ring_topo.py @@ -109,7 +109,7 @@ def get_options(): default_key = default_key if os.path.exists(default_key) else None pl_ssh_key = os.environ.get("PL_SSHKEY", default_key) - usage = "usage: %prog -s -m -e -i -r Base directory for multi-experiment files + | + ${LIB} |- /lib --> Base directory for libraries + ${BIN} |- /bin --> Base directory for binary files + ${SRC} |- /src --> Base directory for sources + ${SHARE} |- /share --> Base directory for other files + + ${HOME}/nepi-exp --> Base directory for single-experiment files + | + ${EXP_HOME} |- / --> Base directory for experiment exp-id + | + ${APP_HOME} |- / --> Base directory for application + | specific files (e.g. command.sh, input) + | + ${RUN_HOME} |- / --> Base directory for run specific + + """ + _rtype = "LinuxApplication" @classmethod def _register_attributes(cls): - command = Attribute("command", "Command to execute", + command = Attribute("command", "Command to execute at application start. " + "Note that commands will be executed in the ${RUN_HOME} directory, " + "make sure to take this into account when using relative paths. ", flags = Flags.ExecReadOnly) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) @@ -50,40 +102,48 @@ class LinuxApplication(ResourceManager): "Space-separated list of packages required to run the application", flags = Flags.ExecReadOnly) sources = Attribute("sources", - "Space-separated list of regular files to be deployed in the working " - "path prior to building. Archives won't be expanded automatically.", + "Space-separated list of regular files to be uploaded to ${SRC} " + "directory prior to building. Archives won't be expanded automatically. " + "Sources are globally available for all experiments unless " + "cleanHome is set to True (This will delete all sources). ", + flags = Flags.ExecReadOnly) + files = Attribute("files", + "Space-separated list of regular miscellaneous files to be uploaded " + "to ${SHARE} directory. " + "Files are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.ExecReadOnly) + libs = Attribute("libs", + "Space-separated list of libraries (e.g. .so files) to be uploaded " + "to ${LIB} directory. " + "Libraries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.ExecReadOnly) + bins = Attribute("bins", + "Space-separated list of binary files to be uploaded " + "to ${BIN} directory. " + "Binaries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", flags = Flags.ExecReadOnly) code = Attribute("code", - "Plain text source code to be uploaded to the server. It will be stored " - "under ${SOURCES}/code", + "Plain text source code to be uploaded to the ${APP_HOME} directory. ", flags = Flags.ExecReadOnly) build = Attribute("build", "Build commands to execute after deploying the sources. " - "Sources will be in the ${SOURCES} folder. " - "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" - "Try to make the commands return with a nonzero exit code on error.\n" - "Also, do not install any programs here, use the 'install' attribute. This will " - "help keep the built files constrained to the build folder (which may " - "not be the home folder), and will result in faster deployment. Also, " - "make sure to clean up temporary files, to reduce bandwidth usage between " - "nodes when transferring built packages.", + "Sources are uploaded to the ${SRC} directory and code " + "is uploaded to the ${APP_HOME} directory. \n" + "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && " + "./configure && make && make clean.\n" + "Make sure to make the build commands return with a nonzero exit " + "code on error.", flags = Flags.ReadOnly) install = Attribute("install", "Commands to transfer built files to their final destinations. " - "Sources will be in the initial working folder, and a special " - "tag ${SOURCES} can be used to reference the experiment's " - "home folder (where the application commands will run).\n" - "ALL sources and targets needed for execution must be copied there, " - "if building has been enabled.\n" - "That is, 'slave' nodes will not automatically get any source files. " - "'slave' nodes don't get build dependencies either, so if you need " - "make and other tools to install, be sure to provide them as " - "actual dependencies instead.", + "Install commands are executed after build commands. ", flags = Flags.ReadOnly) - stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) - stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) - stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) - tear_down = Attribute("tearDown", "Bash script to be executed before " + stdin = Attribute("stdin", "Standard input for the 'command'", + flags = Flags.ExecReadOnly) + tear_down = Attribute("tearDown", "Command to be executed just before " "releasing the resource", flags = Flags.ReadOnly) @@ -94,11 +154,12 @@ class LinuxApplication(ResourceManager): cls._register_attribute(depends) cls._register_attribute(sources) cls._register_attribute(code) + cls._register_attribute(files) + cls._register_attribute(bins) + cls._register_attribute(libs) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) - cls._register_attribute(stdout) - cls._register_attribute(stderr) cls._register_attribute(tear_down) @classmethod @@ -122,7 +183,7 @@ class LinuxApplication(ResourceManager): # timestamp of last state check of the application self._last_state_check = tnow() - + def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @@ -138,12 +199,8 @@ class LinuxApplication(ResourceManager): return os.path.join(self.node.exp_home, self._home) @property - def src_dir(self): - return os.path.join(self.app_home, 'src') - - @property - def build_dir(self): - return os.path.join(self.app_home, 'build') + def run_home(self): + return os.path.join(self.app_home, self.ec.run_id) @property def pid(self): @@ -169,7 +226,7 @@ class LinuxApplication(ResourceManager): def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) - path = os.path.join(self.app_home, name) + path = os.path.join(self.run_home, name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) @@ -183,7 +240,7 @@ class LinuxApplication(ResourceManager): return path if attr == TraceAttr.ALL: - (out, err), proc = self.node.check_output(self.app_home, name) + (out, err), proc = self.node.check_output(self.run_home, name) if err and proc.poll(): msg = " Couldn't read trace %s " % name @@ -210,26 +267,36 @@ class LinuxApplication(ResourceManager): return out def provision(self): - # create home dir for application - self.node.mkdir(self.app_home) - - # upload sources - self.upload_sources() - - # upload code - self.upload_code() - - # upload stdin - self.upload_stdin() - - # install dependencies - self.install_dependencies() - - # build - self.build() - - # Install - self.install() + # create run dir for application + self.node.mkdir(self.run_home) + + steps = [ + # upload sources + self.upload_sources, + # upload files + self.upload_files, + # upload binaries + self.upload_binaries, + # upload libraries + self.upload_libraries, + # upload code + self.upload_code, + # upload stdin + self.upload_stdin, + # install dependencies + self.install_dependencies, + # build + self.build, + # Install + self.install] + + # Since provisioning takes a long time, before + # each step we check that the EC is still + for step in steps: + if self.ec.finished: + raise RuntimeError, "EC finished" + + step() # Upload command to remote bash script # - only if command can be executed in background and detached @@ -245,8 +312,10 @@ class LinuxApplication(ResourceManager): env = self.get("env") env = env and self.replace_paths(env) - self.node.upload_command(command, self.app_home, - shfile = "app.sh", + shfile = os.path.join(self.app_home, "app.sh") + + self.node.upload_command(command, + shfile = shfile, env = env) self.info("Provisioning finished") @@ -255,34 +324,36 @@ class LinuxApplication(ResourceManager): def upload_sources(self): sources = self.get("sources") + if sources: self.info("Uploading sources ") - # create dir for sources - self.node.mkdir(self.src_dir) - sources = sources.split(' ') - http_sources = list() + # Separate sources that should be downloaded from + # the web, from sources that should be uploaded from + # the local machine + command = [] for source in list(sources): if source.startswith("http") or source.startswith("https"): - http_sources.append(source) + # remove the hhtp source from the sources list sources.remove(source) - # Download http sources remotely - if http_sources: - command = [" wget -c --directory-prefix=${SOURCES} "] - check = [] - - for source in http_sources: - command.append(" %s " % (source)) - check.append(" ls ${SOURCES}/%s " % os.path.basename(source)) - - command = " ".join(command) - check = " ; ".join(check) - - # Append the command to check that the sources were downloaded - command += " ; %s " % check + command.append( " ( " + # Check if the source already exists + " ls ${SRC}/%(basename)s " + " || ( " + # If source doesn't exist, download it and check + # that it it downloaded ok + " wget -c --directory-prefix=${SRC} %(source)s && " + " ls ${SRC}/%(basename)s " + " ) ) " % { + "basename": os.path.basename(source), + "source": source + }) + + if command: + command = " && ".join(command) # replace application specific paths in the command command = self.replace_paths(command) @@ -290,64 +361,78 @@ class LinuxApplication(ResourceManager): # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "http_sources.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "http_sources.sh"), + overwrite = False, pidfile = "http_sources_pidfile", ecodefile = "http_sources_exitcode", stdout = "http_sources_stdout", stderr = "http_sources_stderr") if sources: - self.node.upload(sources, self.src_dir) + sources = ' '.join(sources) + self.node.upload(sources, self.node.src_dir, overwrite = False) + + def upload_files(self): + files = self.get("files") + + if files: + self.info("Uploading files %s " % files) + self.node.upload(files, self.node.share_dir, overwrite = False) + + def upload_libraries(self): + libs = self.get("libs") + + if libs: + self.info("Uploading libraries %s " % libaries) + self.node.upload(libs, self.node.lib_dir, overwrite = False) + + def upload_binaries(self): + bins = self.get("bins") + + if bins: + self.info("Uploading binaries %s " % binaries) + self.node.upload(bins, self.node.bin_dir, overwrite = False) def upload_code(self): code = self.get("code") - if code: - # create dir for sources - self.node.mkdir(self.src_dir) - self.info("Uploading code ") + if code: + self.info("Uploading code") - dst = os.path.join(self.src_dir, "code") - self.node.upload(sources, dst, text = True) + dst = os.path.join(self.app_home, "code") + self.node.upload(code, dst, overwrite = False, text = True) def upload_stdin(self): stdin = self.get("stdin") if stdin: # create dir for sources - self.info(" Uploading stdin ") + self.info("Uploading stdin") dst = os.path.join(self.app_home, "stdin") - - # If what we are uploading is a file, check whether - # the same file already exists (using md5sum) - if self.compare_hash(stdin, dst): - return - - self.node.upload(stdin, dst, text = True) + self.node.upload(stdin, dst, overwrite = False, text = True) def install_dependencies(self): depends = self.get("depends") if depends: self.info("Installing dependencies %s" % depends) - self.node.install_packages(depends, self.app_home) + self.node.install_packages(depends, self.app_home, self.run_home) def build(self): build = self.get("build") + if build: self.info("Building sources ") - # create dir for build - self.node.mkdir(self.build_dir) - # replace application specific paths in the command command = self.replace_paths(build) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "build.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "build.sh"), + overwrite = False, pidfile = "build_pidfile", ecodefile = "build_exitcode", stdout = "build_stdout", @@ -355,6 +440,7 @@ class LinuxApplication(ResourceManager): def install(self): install = self.get("install") + if install: self.info("Installing sources ") @@ -364,8 +450,9 @@ class LinuxApplication(ResourceManager): # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "install.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "install.sh"), + overwrite = False, pidfile = "install_pidfile", ecodefile = "install_exitcode", stdout = "install_stdout", @@ -409,10 +496,13 @@ class LinuxApplication(ResourceManager): def _start_in_foreground(self): command = self.get("command") - stdin = "stdin" if self.get("stdin") else None sudo = self.get("sudo") or False x11 = self.get("forwardX11") + # For a command being executed in foreground, if there is stdin, + # it is expected to be text string not a file or pipe + stdin = self.get("stdin") or None + # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. @@ -440,17 +530,20 @@ class LinuxApplication(ResourceManager): def _start_in_background(self): command = self.get("command") env = self.get("env") - stdin = "stdin" if self.get("stdin") else None - stdout = "stdout" if self.get("stdout") else "stdout" - stderr = "stderr" if self.get("stderr") else "stderr" sudo = self.get("sudo") or False - # Command will be as a daemon in baground and detached from any terminal. - # The real command to run was previously uploaded to a bash script - # during deployment, now launch the remote script using 'run' - # method from the node - cmd = "bash ./app.sh" - (out, err), proc = self.node.run(cmd, self.app_home, + stdout = "stdout" + stderr = "stderr" + stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \ + else None + + # Command will be run as a daemon in baground and detached from any + # terminal. + # The command to run was previously uploaded to a bash script + # during deployment, now we launch the remote script using 'run' + # method from the node. + cmd = "bash %s" % os.path.join(self.app_home, "app.sh") + (out, err), proc = self.node.run(cmd, self.run_home, stdin = stdin, stdout = stdout, stderr = stderr, @@ -465,14 +558,14 @@ class LinuxApplication(ResourceManager): raise RuntimeError, msg # Wait for pid file to be generated - pid, ppid = self.node.wait_pid(self.app_home) + pid, ppid = self.node.wait_pid(self.run_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: - (out, err), proc = self.node.check_errors(self.app_home, + (out, err), proc = self.node.check_errors(self.run_home, stderr = stderr) # Out is what was written in the stderr file @@ -557,7 +650,7 @@ class LinuxApplication(ResourceManager): state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred - (out, err), proc = self.node.check_errors(self.app_home) + (out, err), proc = self.node.check_errors(self.run_home) if err: msg = " Failed to execute command '%s'" % self.get("command") @@ -580,49 +673,20 @@ class LinuxApplication(ResourceManager): """ Replace all special path tags with shell-escaped actual paths. """ - def absolute_dir(d): - return d if d.startswith("/") else os.path.join("${HOME}", d) - return ( command - .replace("${SOURCES}", absolute_dir(self.src_dir)) - .replace("${BUILD}", absolute_dir(self.build_dir)) - .replace("${APP_HOME}", absolute_dir(self.app_home)) - .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) - .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) + .replace("${USR}", self.node.usr_dir) + .replace("${LIB}", self.node.lib_dir) + .replace("${BIN}", self.node.bin_dir) + .replace("${SRC}", self.node.src_dir) + .replace("${SHARE}", self.node.share_dir) + .replace("${EXP}", self.node.exp_dir) + .replace("${EXP_HOME}", self.node.exp_home) + .replace("${APP_HOME}", self.app_home) + .replace("${RUN_HOME}", self.run_home) + .replace("${NODE_HOME}", self.node.node_home) + .replace("${HOME}", self.node.home_dir) ) - def compare_hash(self, local, remote): - # getting md5sum from remote file - (out, err), proc = self.node.execute("md5sum %s " % remote) - - if proc.poll() == 0: #OK - if not os.path.isfile(local): - # store to a tmp file - f = tempfile.NamedTemporaryFile() - f.write(local) - f.flush() - local = f.name - - lproc = subprocess.Popen(["md5sum", local], - stdout = subprocess.PIPE, - stderr = subprocess.PIPE) - - # getting md5sum from local file - (lout, lerr) = lproc.communicate() - - # files are the same, no need to upload - lchk = lout.strip().split(" ")[0] - rchk = out.strip().split(" ")[0] - - msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( - local, lchk, remote, rchk) - self.debug(msg) - - if lchk == rchk: - return True - - return False - def valid_connection(self, guid): # TODO: Validate! return True diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py index e5ae00e0..39ac2a92 100644 --- a/src/nepi/resources/linux/ccn/ccnapplication.py +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -51,7 +51,7 @@ class LinuxCCNApplication(LinuxApplication): @property def _environment(self): - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + env = "PATH=$PATH:${STORE}/ccnx/bin " return env def execute_command(self, command, env): @@ -59,12 +59,7 @@ class LinuxCCNApplication(LinuxApplication): command = environ + command command = self.replace_paths(command) - (out, err), proc = self.node.execute(command) - - if proc.poll(): - self._state = ResourceState.FAILED - self.error(msg, out, err) - raise RuntimeError, msg + return self.node.execute(command) def valid_connection(self, guid): # TODO: Validate! diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index e5216d2e..8b799743 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -87,7 +87,13 @@ class LinuxCCNContent(LinuxCCNApplication): # Run the command as a bash script in the background, # in the host ( but wait until the command has # finished to continue ) - self.execute_command(command, env) + (out, err), proc = self.execute_command(command, env) + + if proc.poll(): + self._state = ResourceState.FAILED + msg = "Failed to execute command" + self.error(msg, out, err) + raise RuntimeError, msg self.debug("----- READY ---- ") self._ready_time = tnow() diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 44c1afb9..7c150eaf 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -126,6 +126,8 @@ class LinuxCCND(LinuxApplication): def __init__(self, ec, guid): super(LinuxCCND, self).__init__(ec, guid) self._home = "ccnd-%s" % self.guid + self._version = None + self._environment = None def deploy(self): if not self.node or self.node.state < ResourceState.READY: @@ -294,15 +296,15 @@ class LinuxCCND(LinuxApplication): return ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd && " + " test -f ${STORE}/ccnx/bin/ccnd && " " echo 'sources found, nothing to do' " " ) || ( " # If not, untar and build " ( " - " mkdir -p ${SOURCES}/ccnx && " - " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx " + " mkdir -p ${STORE}/ccnx && " + " tar xf ${STORE}/%(sources)s --strip-components=1 -C ${STORE}/ccnx " " ) && " - "cd ${SOURCES}/ccnx && " + "cd ${STORE}/ccnx && " # Just execute and silence warnings... " ( ./configure && make ) " " )") % ({ 'sources': sources }) @@ -312,12 +314,12 @@ class LinuxCCND(LinuxApplication): return ( # Evaluate if ccnx binaries are already installed " ( " - " test -f ${EXP_HOME}/ccnx/bin/ccnd && " + " test -f ${SOURCES}/ccnx/bin/ccnd && " " echo 'sources found, nothing to do' " " ) || ( " # If not, install - " mkdir -p ${EXP_HOME}/ccnx/bin && " - " cp -r ${SOURCES}/ccnx ${EXP_HOME}" + " mkdir -p ${SOURCES}/ccnx/bin && " + " cp -r ${}/ccnx ${STORE}" " )" ) @@ -339,7 +341,7 @@ class LinuxCCND(LinuxApplication): "prefix" : "CCND_PREFIX", }) - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + env = "PATH=$PATH:${SOURCES}/ccnx/bin " env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), str(self.get(k))) \ if self.get(k) else "", envs.keys())) diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index cdf48e37..920b3f97 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -268,7 +268,7 @@ class LinuxCCNR(LinuxCCNApplication): "ccnsSyncScope": "CCNS_SYNC_SCOPE", }) - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + env = "PATH=$PATH:${STORE}/ccnx/bin " env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ if self.get(k) else "", envs.keys())) diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index c6398feb..9f4d1d35 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -82,7 +82,14 @@ class LinuxFIBEntry(LinuxCCNApplication): self.info("Deploying command '%s' " % command) self.node.mkdir(self.app_home) - self.execute_command(command, env) + (out, err), proc = self.execute_command(command, env) + + if proc.poll(): + self._state = ResourceState.FAILED + msg = "Failed to execute command" + self.error(msg, out, err) + raise RuntimeError, msg + self.debug("----- READY ---- ") self._ready_time = tnow() @@ -109,7 +116,10 @@ class LinuxFIBEntry(LinuxCCNApplication): self.info("Stopping command '%s'" % command) command = self._stop_command - self.execute_command(command, env) + (out, err), proc = self.execute_command(command, env) + + if proc.poll(): + pass self._stop_time = tnow() self._state = ResourceState.STOPPED diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 0d2597fd..5080a6c4 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -31,13 +31,11 @@ import re import tempfile import time import threading +import traceback -# TODO: Verify files and dirs exists already -# TODO: Blacklist nodes! # TODO: Unify delays!! # TODO: Validate outcome of uploads!! - class ExitCode: """ Error codes that the rexitcode function can return if unable to @@ -165,8 +163,12 @@ class LinuxNode(ResourceManager): server_key = Attribute("serverKey", "Server public key", flags = Flags.ExecReadOnly) - clean_home = Attribute("cleanHome", "Remove all files and directories " + \ - " from home folder before starting experiment", + clean_home = Attribute("cleanHome", "Remove all nepi files and directories " + " from node home folder before starting experiment", + flags = Flags.ExecReadOnly) + + clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " + " from a previous same experiment, before the new experiment starts", flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", @@ -184,12 +186,15 @@ class LinuxNode(ResourceManager): cls._register_attribute(identity) cls._register_attribute(server_key) cls._register_attribute(clean_home) + cls._register_attribute(clean_experiment) cls._register_attribute(clean_processes) cls._register_attribute(tear_down) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None + # home directory at Linux host + self._home_dir = "" # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() @@ -199,17 +204,47 @@ class LinuxNode(ResourceManager): self.get("hostname"), msg) @property - def home(self): - return self.get("home") or "" + def home_dir(self): + home = self.get("home") or "" + if not home.startswith("/"): + home = os.path.join(self._home_dir, home) + return home + + @property + def usr_dir(self): + return os.path.join(self.home_dir, "nepi-usr") + + @property + def lib_dir(self): + return os.path.join(self.usr_dir, "lib") + + @property + def bin_dir(self): + return os.path.join(self.usr_dir, "bin") + + @property + def src_dir(self): + return os.path.join(self.usr_dir, "src") + + @property + def share_dir(self): + return os.path.join(self.usr_dir, "share") + + @property + def exp_dir(self): + return os.path.join(self.home_dir, "nepi-exp") @property def exp_home(self): - return os.path.join(self.home, self.ec.exp_id) + return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): - node_home = "node-%d" % self.guid - return os.path.join(self.exp_home, node_home) + return os.path.join(self.exp_home, "node-%d" % self.guid) + + @property + def run_home(self): + return os.path.join(self.node_home, self.ec.run_id) @property def os(self): @@ -259,18 +294,32 @@ class LinuxNode(ResourceManager): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] def provision(self): + # check if host is alive if not self.is_alive(): - self._state = ResourceState.FAILED + self.fail() + msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg + self.find_home() + if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() - + + if self.get("cleanExperiment"): + self.clean_experiment() + + # Create shared directory structure + self.mkdir(self.lib_dir) + self.mkdir(self.bin_dir) + self.mkdir(self.src_dir) + self.mkdir(self.share_dir) + + # Create experiment node home directory self.mkdir(self.node_home) super(LinuxNode, self).provision() @@ -300,6 +349,8 @@ class LinuxNode(ResourceManager): if tear_down: self.execute(tear_down) + self.clean_processes() + super(LinuxNode, self).release() def valid_connection(self, guid): @@ -327,21 +378,220 @@ class LinuxNode(ResourceManager): (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def clean_home(self): + """ Cleans all NEPI related folders in the Linux host + """ self.info("Cleaning up home") - cmd = ( - # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + - "find . -maxdepth 1 -name 'nepi-*' " + - " -execdir rm -rf {} + " - ) + cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % ( + self.home_dir ) + + return self.execute(cmd, with_lock = True) + + def clean_experiment(self): + """ Cleans all experiment related files in the Linux host. + It preserves NEPI files and folders that have a multi experiment + scope. + """ + self.info("Cleaning up experiment files") + + cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % ( + self.exp_dir, + self.ec.exp_id ) - if self.home: - cmd = "cd %s ; " % self.home + cmd + return self.execute(cmd, with_lock = True) + + def execute(self, command, + sudo = False, + stdin = None, + env = None, + tty = False, + forward_x11 = False, + timeout = None, + retry = 3, + err_on_timeout = True, + connect_timeout = 30, + strict_host_checking = False, + persistent = True, + blocking = True, + with_lock = False + ): + """ Notice that this invocation will block until the + execution finishes. If this is not the desired behavior, + use 'run' instead.""" + if self.localhost: + (out, err), proc = execfuncs.lexec(command, + user = user, + sudo = sudo, + stdin = stdin, + env = env) + else: + if with_lock: + with self._lock: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + else: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + + return (out, err), proc + + def run(self, command, home, + create_home = False, + pidfile = 'pidfile', + stdin = None, + stdout = 'stdout', + stderr = 'stderr', + sudo = False, + tty = False): + + self.debug("Running command '%s'" % command) + + if self.localhost: + (out, err), proc = execfuncs.lspawn(command, pidfile, + stdout = stdout, + stderr = stderr, + stdin = stdin, + home = home, + create_home = create_home, + sudo = sudo, + user = user) + else: + with self._lock: + (out, err), proc = sshfuncs.rspawn( + command, + pidfile = pidfile, + home = home, + create_home = create_home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + tty = tty + ) + + return (out, err), proc + + def getpid(self, home, pidfile = "pidfile"): + if self.localhost: + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) + else: + with self._lock: + pidtuple = sshfuncs.rgetpid( + os.path.join(home, pidfile), + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return pidtuple + + def status(self, pid, ppid): + if self.localhost: + status = execfuncs.lstatus(pid, ppid) + else: + with self._lock: + status = sshfuncs.rstatus( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return status + + def kill(self, pid, ppid, sudo = False): out = err = "" - (out, err), proc = self.execute(cmd, with_lock = True) + proc = None + status = self.status(pid, ppid) + + if status == sshfuncs.ProcStatus.RUNNING: + if self.localhost: + (out, err), proc = execfuncs.lkill(pid, ppid, sudo) + else: + with self._lock: + (out, err), proc = sshfuncs.rkill( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return (out, err), proc - def upload(self, src, dst, text = False): + def copy(self, src, dst): + if self.localhost: + (out, err), proc = execfuncs.lcopy(source, dest, + recursive = True, + strict_host_checking = False) + else: + with self._lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) + + return (out, err), proc + + + def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination src content to copy. Can be a local file, directory or a list of files @@ -360,9 +610,17 @@ class LinuxNode(ResourceManager): f.close() src = f.name + # If dst files should not be overwritten, check that the files do not + # exits already + if overwrite == False: + src = self.filter_existing_files(src, dst) + if not src: + return ("", ""), None + if not self.localhost: # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) + result = self.copy(src, dst) # clean up temp file @@ -377,7 +635,12 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home): + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ command = "" if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) @@ -388,19 +651,25 @@ class LinuxNode(ResourceManager): self.error(msg, self.os) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "instpkg.sh", + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "instpkg.sh"), pidfile = "instpkg_pidfile", ecodefile = "instpkg_exitcode", stdout = "instpkg_stdout", stderr = "instpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc - def remove_packages(self, packages, home): - command = "" + def remove_packages(self, packages, home, run_home = None): + """ Uninstall packages from the Linux host. + + 'home' is the directory to upload the package un-installation script. + 'run_home' is the directory from where to execute the script. + """ if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) elif self.use_deb: @@ -410,13 +679,15 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "rmpkg.sh", + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "rmpkg.sh"), pidfile = "rmpkg_pidfile", ecodefile = "rmpkg_exitcode", stdout = "rmpkg_stdout", stderr = "rmpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc @@ -433,6 +704,7 @@ class LinuxNode(ResourceManager): def run_and_wait(self, command, home, shfile = "cmd.sh", env = None, + overwrite = True, pidfile = "pidfile", ecodefile = "exitcode", stdin = None, @@ -446,12 +718,17 @@ class LinuxNode(ResourceManager): Then runs the script detached in background in the host, and busy-waites until the script finishes executing. """ - self.upload_command(command, home, + + if not shfile.startswith("/"): + shfile = os.path.join(home, shfile) + + self.upload_command(command, shfile = shfile, ecodefile = ecodefile, - env = env) + env = env, + overwrite = overwrite) - command = "bash ./%s" % shfile + command = "bash %s" % shfile # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, @@ -514,9 +791,10 @@ class LinuxNode(ResourceManager): # Other error from 'cat' return ExitCode.ERROR - def upload_command(self, command, home, + def upload_command(self, command, shfile = "cmd.sh", ecodefile = "exitcode", + overwrite = True, env = None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile @@ -537,8 +815,7 @@ class LinuxNode(ResourceManager): # Add environ to command command = environ + command - dst = os.path.join(home, shfile) - return self.upload(command, dst, text = True) + return self.upload(command, shfile, text = True, overwrite = overwrite) def format_environment(self, env, inline = False): """Format environmental variables for command to be executed either @@ -639,215 +916,63 @@ class LinuxNode(ResourceManager): return (out, err), proc def is_alive(self): + """ Checks if host is responsive + """ if self.localhost: return True out = err = "" try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, + (out, err), proc = self.execute("echo 'ALIVE'", + retry = 5, with_lock = True) except: - import traceback trace = traceback.format_exc() msg = "Unresponsive host %s " % err self.error(msg, out, trace) return False - if out.strip().startswith('ALIVE'): + if out.strip() == "ALIVE": return True else: msg = "Unresponsive host " self.error(msg, out, err) return False - def copy(self, src, dst): - if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) - else: - with self._lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) - - return (out, err), proc - - def execute(self, command, - sudo = False, - stdin = None, - env = None, - tty = False, - forward_x11 = False, - timeout = None, - retry = 3, - err_on_timeout = True, - connect_timeout = 30, - strict_host_checking = False, - persistent = True, - blocking = True, - with_lock = False - ): - """ Notice that this invocation will block until the - execution finishes. If this is not the desired behavior, - use 'run' instead.""" + def find_home(self): + """ Retrieves host home directory + """ + (out, err), proc = self.execute("echo ${HOME}", retry = 5, + with_lock = True) - if self.localhost: - (out, err), proc = execfuncs.lexec(command, - user = user, - sudo = sudo, - stdin = stdin, - env = env) - else: - if with_lock: - with self._lock: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) - else: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) + if proc.poll(): + msg = "Imposible to retrieve HOME directory" + self.error(msg, out, err) + raise RuntimeError, msg - return (out, err), proc + self._home_dir = out.strip() - def run(self, command, home, - create_home = False, - pidfile = 'pidfile', - stdin = None, - stdout = 'stdout', - stderr = 'stderr', - sudo = False, - tty = False): - - self.debug("Running command '%s'" % command) - - if self.localhost: - (out, err), proc = execfuncs.lspawn(command, pidfile, - stdout = stdout, - stderr = stderr, - stdin = stdin, - home = home, - create_home = create_home, - sudo = sudo, - user = user) - else: - with self._lock: - (out, err), proc = sshfuncs.rspawn( - command, - pidfile = pidfile, - home = home, - create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey"), - tty = tty - ) + def filter_existing_files(self, src, dst): + """ Removes files that already exist in the Linux host from src list + """ + # construct a dictionary with { dst: src } + dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), + src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) - return (out, err), proc + command = [] + for d in dests.keys(): + command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} ) - def getpid(self, home, pidfile = "pidfile"): - if self.localhost: - pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) - else: - with self._lock: - pidtuple = sshfuncs.rgetpid( - os.path.join(home, pidfile), - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return pidtuple + command = ";".join(command) - def status(self, pid, ppid): - if self.localhost: - status = execfuncs.lstatus(pid, ppid) - else: - with self._lock: - status = sshfuncs.rstatus( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return status + (out, err), proc = self.execute(command, retry = 1, with_lock = True) - def kill(self, pid, ppid, sudo = False): - out = err = "" - proc = None - status = self.status(pid, ppid) + for d in dests.keys(): + if out.find(d) > -1: + del dests[d] - if status == sshfuncs.ProcStatus.RUNNING: - if self.localhost: - (out, err), proc = execfuncs.lkill(pid, ppid, sudo) - else: - with self._lock: - (out, err), proc = sshfuncs.rkill( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + if not dests: + return "" - return (out, err), proc + return " ".join(dests.values()) diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index f5c64005..cc303486 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -294,7 +294,7 @@ def rexec(command, host, user, else: out = err = "" if proc.poll(): - err = self._proc.stderr.read() + err = proc.stderr.read() msg = " rexec - host %s - command %s " % (host, " ".join(args)) log(msg, logging.DEBUG, out, err) @@ -857,7 +857,7 @@ fi return (out, err), proc # POSIX -def _communicate(self, input, timeout=None, err_on_timeout=True): +def _communicate(proc, input, timeout=None, err_on_timeout=True): read_set = [] write_set = [] stdout = None # Return @@ -870,19 +870,21 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): killtime = timelimit + 4 bailtime = timelimit + 4 - if self.stdin: + if proc.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - self.stdin.flush() + proc.stdin.flush() if input: - write_set.append(self.stdin) + write_set.append(proc.stdin) else: - self.stdin.close() - if self.stdout: - read_set.append(self.stdout) + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) stdout = [] - if self.stderr: - read_set.append(self.stderr) + + if proc.stderr: + read_set.append(proc.stderr) stderr = [] input_offset = 0 @@ -897,7 +899,7 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): else: signum = signal.SIGTERM # Lets kill it - os.kill(self.pid, signum) + os.kill(proc.pid, signum) select_timeout = 0.5 else: select_timeout = timelimit - curtime + 0.1 @@ -915,32 +917,34 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): else: continue - if not rlist and not wlist and not xlist and self.poll() is not None: + if not rlist and not wlist and not xlist and proc.poll() is not None: # timeout and process exited, say bye break - if self.stdin in wlist: + if proc.stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk # blocking. POSIX defines PIPE_BUF >= 512 - bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512)) + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) input_offset += bytes_written + if input_offset >= len(input): - self.stdin.close() - write_set.remove(self.stdin) + proc.stdin.close() + write_set.remove(proc.stdin) - if self.stdout in rlist: - data = os.read(self.stdout.fileno(), 1024) + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) if data == "": - self.stdout.close() - read_set.remove(self.stdout) + proc.stdout.close() + read_set.remove(proc.stdout) stdout.append(data) - if self.stderr in rlist: - data = os.read(self.stderr.fileno(), 1024) + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) if data == "": - self.stderr.close() - read_set.remove(self.stderr) + proc.stderr.close() + read_set.remove(proc.stderr) stderr.append(data) # All data exchanged. Translate lists into strings. @@ -953,19 +957,19 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): # object do the translation: It is based on stdio, which is # impossible to combine with select (unless forcing no # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): + if proc.universal_newlines and hasattr(file, 'newlines'): if stdout: - stdout = self._translate_newlines(stdout) + stdout = proc._translate_newlines(stdout) if stderr: - stderr = self._translate_newlines(stderr) + stderr = proc._translate_newlines(stderr) if killed and err_on_timeout: - errcode = self.poll() + errcode = proc.poll() raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) else: if killed: - self.poll() + proc.poll() else: - self.wait() + proc.wait() return (stdout, stderr) diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index b1e489ce..eb278ffb 100755 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -64,7 +64,7 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.deploy() - ec.wait_finished([app]) + ec.wait_finished(app) self.assertTrue(ec.state(node) == ResourceState.STARTED) self.assertTrue(ec.state(app) == ResourceState.FINISHED) @@ -97,7 +97,7 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.deploy() - ec.wait_finished([app]) + ec.wait_finished(app) self.assertTrue(ec.state(node) == ResourceState.STARTED) self.assertTrue(ec.state(app) == ResourceState.FINISHED) @@ -111,11 +111,54 @@ class LinuxApplicationTestCase(unittest.TestCase): path = ec.trace(app, "stdout", attr = TraceAttr.PATH) rm = ec.get_resource(app) - p = os.path.join(rm.app_home, "stdout") + p = os.path.join(rm.run_home, "stdout") self.assertEquals(path, p) ec.shutdown() + @skipIfNotAlive + def t_code(self, host, user): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(LinuxNode) + ResourceFactory.register_type(LinuxApplication) + + ec = ExperimentController() + + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + prog = """#include + +int +main (void) +{ + printf ("Hello, world!\\n"); + return 0; +} +""" + cmd = "${RUN_HOME}/hello" + build = "gcc -Wall -x c ${APP_HOME}/code -o hello" + + app = ec.register_resource("LinuxApplication") + ec.set(app, "command", cmd) + ec.set(app, "code", prog) + ec.set(app, "depends", "gcc") + ec.set(app, "build", build) + ec.register_connection(app, node) + + ec.deploy() + + ec.wait_finished(app) + + out = ec.trace(app, 'stdout') + self.assertEquals(out, "Hello, world!\n") + + ec.shutdown() + @skipIfNotAlive def t_concurrency(self, host, user): from nepi.execution.resource import ResourceFactory @@ -159,7 +202,7 @@ class LinuxApplicationTestCase(unittest.TestCase): path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) rm = ec.get_resource(app) - p = os.path.join(rm.app_home, 'stdout') + p = os.path.join(rm.run_home, 'stdout') self.assertEquals(path, p) ec.shutdown() @@ -306,6 +349,12 @@ class LinuxApplicationTestCase(unittest.TestCase): def test_http_sources_ubuntu(self): self.t_http_sources(self.ubuntu_host, self.ubuntu_user) + def test_code_fedora(self): + self.t_code(self.fedora_host, self.fedora_user) + + def test_code_ubuntu(self): + self.t_code(self.ubuntu_host, self.ubuntu_user) + @skipInteractive def test_xterm_ubuntu(self): """ Interactive test. Should not run automatically """ diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index ef3d8df9..e7d44e1f 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -55,6 +55,7 @@ class LinuxNodeTestCase(unittest.TestCase): def t_run(self, host, user): node, ec = create_node(host, user) + node.find_home() app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) @@ -83,6 +84,7 @@ class LinuxNodeTestCase(unittest.TestCase): node, ec = create_node(host, user) + node.find_home() app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) @@ -102,13 +104,15 @@ class LinuxNodeTestCase(unittest.TestCase): def t_exitcode_kill(self, host, user): node, ec = create_node(host, user) + node.find_home() app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) # Upload command that will not finish command = "ping localhost" - (out, err), proc = node.upload_command(command, app_home, - shfile = "cmd.sh", + shfile = os.path.join(app_home, "cmd.sh") + (out, err), proc = node.upload_command(command, + shfile = shfile, ecodefile = "exitcode") (out, err), proc = node.run(command, app_home, @@ -140,6 +144,7 @@ class LinuxNodeTestCase(unittest.TestCase): node, ec = create_node(host, user) + node.find_home() app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) @@ -159,12 +164,13 @@ class LinuxNodeTestCase(unittest.TestCase): (out, err), proc = node.check_errors(app_home) - self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found") + self.assertTrue(err.find("cmd.sh: line 1: unexistent-command: command not found") > -1) @skipIfNotAlive def t_install(self, host, user): node, ec = create_node(host, user) + node.find_home() (out, err), proc = node.mkdir(node.node_home, clean = True) self.assertEquals(err, "") @@ -177,10 +183,42 @@ class LinuxNodeTestCase(unittest.TestCase): (out, err), proc = node.rmdir(node.exp_home) self.assertEquals(err, "") + @skipIfNotAlive + def t_clean(self, host, user): + node, ec = create_node(host, user) + + node.find_home() + node.mkdir(node.lib_dir) + node.mkdir(node.node_home) + + command1 = " [ -d %s ] && echo 'Found'" % node.lib_dir + (out, err), proc = node.execute(command1) + + self.assertEquals(out.strip(), "Found") + + command2 = " [ -d %s ] && echo 'Found'" % node.node_home + (out, err), proc = node.execute(command2) + + self.assertEquals(out.strip(), "Found") + + node.clean_experiment() + + (out, err), proc = node.execute(command2) + + self.assertEquals(out.strip(), "") + + node.clean_home() + + (out, err), proc = node.execute(command1) + + self.assertEquals(out.strip(), "") + + @skipIfNotAlive def t_xterm(self, host, user): node, ec = create_node(host, user) + node.find_home() (out, err), proc = node.mkdir(node.node_home, clean = True) self.assertEquals(err, "") @@ -197,6 +235,7 @@ class LinuxNodeTestCase(unittest.TestCase): def t_compile(self, host, user): node, ec = create_node(host, user) + node.find_home() app_home = os.path.join(node.exp_home, "my-app") node.mkdir(app_home, clean = True) @@ -288,7 +327,13 @@ main (void) def test_exitcode_error_ubuntu(self): self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user) - + + def test_clean_fedora(self): + self.t_clean(self.fedora_host, self.fedora_user) + + def test_clean_ubuntu(self): + self.t_clean(self.ubuntu_host, self.ubuntu_user) + @skipInteractive def test_xterm_ubuntu(self): """ Interactive test. Should not run automatically """ -- 2.43.0