X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fapplication.py;h=d0a8c32bee66dbecbf3229c5aedb4c6cb877e394;hb=87f44a7c2853afb7021276dd3700858cff950703;hp=6c7f82ccd2e277c48e6d9e582a60aee50500f8c0;hpb=54d2a201dca3af3dabf18601d4909bf506960627;p=nepi.git diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 6c7f82cc..d0a8c32b 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -29,16 +29,68 @@ import os import subprocess # TODO: Resolve wildcards in commands!! -# TODO: compare_hash for all files that are uploaded! +# TODO: During provisioning, everything that is not scp could be +# uploaded to a same script, http_sources download, etc... +# and like that require performing less ssh connections!!! @clsinit class LinuxApplication(ResourceManager): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + .. note:: + + A LinuxApplication RM represents a process that can be executed in + a remote Linux host using SSH. + + The LinuxApplication RM takes care of uploadin sources and any files + needed to run the experiment, to the remote host. + It also allows to provide source compilation (build) and installation + instructions, and takes care of automating the sources build and + installation tasks for the user. + + It is important to note that files uploaded to the remote host have + two possible scopes: single-experiment or multi-experiment. + Single experiment files are those that will not be re-used by other + experiments. Multi-experiment files are those that will. + Sources and shared files are always made available to all experiments. + + Directory structure: + + The directory structure used by LinuxApplication RM at the Linux + host is the following: + + ${HOME}/nepi-usr --> Base directory for multi-experiment files + | + ${LIB} |- /lib --> Base directory for libraries + ${BIN} |- /bin --> Base directory for binary files + ${SRC} |- /src --> Base directory for sources + ${SHARE} |- /share --> Base directory for other files + + ${HOME}/nepi-exp --> Base directory for single-experiment files + | + ${EXP_HOME} |- / --> Base directory for experiment exp-id + | + ${APP_HOME} |- / --> Base directory for application + | specific files (e.g. command.sh, input) + | + ${RUN_HOME} |- / --> Base directory for run specific + + """ + _rtype = "LinuxApplication" @classmethod def _register_attributes(cls): - command = Attribute("command", "Command to execute", + command = Attribute("command", "Command to execute at application start. " + "Note that commands will be executed in the ${RUN_HOME} directory, " + "make sure to take this into account when using relative paths. ", flags = Flags.ExecReadOnly) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) @@ -50,40 +102,48 @@ class LinuxApplication(ResourceManager): "Space-separated list of packages required to run the application", flags = Flags.ExecReadOnly) sources = Attribute("sources", - "Space-separated list of regular files to be deployed in the working " - "path prior to building. Archives won't be expanded automatically.", + "Space-separated list of regular files to be uploaded to ${SRC} " + "directory prior to building. Archives won't be expanded automatically. " + "Sources are globally available for all experiments unless " + "cleanHome is set to True (This will delete all sources). ", + flags = Flags.ExecReadOnly) + files = Attribute("files", + "Space-separated list of regular miscellaneous files to be uploaded " + "to ${SHARE} directory. " + "Files are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.ExecReadOnly) + libs = Attribute("libs", + "Space-separated list of libraries (e.g. .so files) to be uploaded " + "to ${LIB} directory. " + "Libraries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.ExecReadOnly) + bins = Attribute("bins", + "Space-separated list of binary files to be uploaded " + "to ${BIN} directory. " + "Binaries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", flags = Flags.ExecReadOnly) code = Attribute("code", - "Plain text source code to be uploaded to the server. It will be stored " - "under ${SOURCES}/code", + "Plain text source code to be uploaded to the ${APP_HOME} directory. ", flags = Flags.ExecReadOnly) build = Attribute("build", "Build commands to execute after deploying the sources. " - "Sources will be in the ${SOURCES} folder. " - "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" - "Try to make the commands return with a nonzero exit code on error.\n" - "Also, do not install any programs here, use the 'install' attribute. This will " - "help keep the built files constrained to the build folder (which may " - "not be the home folder), and will result in faster deployment. Also, " - "make sure to clean up temporary files, to reduce bandwidth usage between " - "nodes when transferring built packages.", + "Sources are uploaded to the ${SRC} directory and code " + "is uploaded to the ${APP_HOME} directory. \n" + "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && " + "./configure && make && make clean.\n" + "Make sure to make the build commands return with a nonzero exit " + "code on error.", flags = Flags.ReadOnly) install = Attribute("install", "Commands to transfer built files to their final destinations. " - "Sources will be in the initial working folder, and a special " - "tag ${SOURCES} can be used to reference the experiment's " - "home folder (where the application commands will run).\n" - "ALL sources and targets needed for execution must be copied there, " - "if building has been enabled.\n" - "That is, 'slave' nodes will not automatically get any source files. " - "'slave' nodes don't get build dependencies either, so if you need " - "make and other tools to install, be sure to provide them as " - "actual dependencies instead.", + "Install commands are executed after build commands. ", flags = Flags.ReadOnly) - stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) - stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) - stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) - tear_down = Attribute("tearDown", "Bash script to be executed before " + stdin = Attribute("stdin", "Standard input for the 'command'", + flags = Flags.ExecReadOnly) + tear_down = Attribute("tearDown", "Command to be executed just before " "releasing the resource", flags = Flags.ReadOnly) @@ -94,11 +154,12 @@ class LinuxApplication(ResourceManager): cls._register_attribute(depends) cls._register_attribute(sources) cls._register_attribute(code) + cls._register_attribute(files) + cls._register_attribute(bins) + cls._register_attribute(libs) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) - cls._register_attribute(stdout) - cls._register_attribute(stderr) cls._register_attribute(tear_down) @classmethod @@ -122,7 +183,7 @@ class LinuxApplication(ResourceManager): # timestamp of last state check of the application self._last_state_check = tnow() - + def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @@ -138,12 +199,8 @@ class LinuxApplication(ResourceManager): return os.path.join(self.node.exp_home, self._home) @property - def src_dir(self): - return os.path.join(self.app_home, 'src') - - @property - def build_dir(self): - return os.path.join(self.app_home, 'build') + def run_home(self): + return os.path.join(self.app_home, self.ec.run_id) @property def pid(self): @@ -169,7 +226,7 @@ class LinuxApplication(ResourceManager): def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) - path = os.path.join(self.app_home, name) + path = os.path.join(self.run_home, name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) @@ -183,7 +240,7 @@ class LinuxApplication(ResourceManager): return path if attr == TraceAttr.ALL: - (out, err), proc = self.node.check_output(self.app_home, name) + (out, err), proc = self.node.check_output(self.run_home, name) if err and proc.poll(): msg = " Couldn't read trace %s " % name @@ -210,26 +267,36 @@ class LinuxApplication(ResourceManager): return out def provision(self): - # create home dir for application - self.node.mkdir(self.app_home) - - # upload sources - self.upload_sources() - - # upload code - self.upload_code() - - # upload stdin - self.upload_stdin() - - # install dependencies - self.install_dependencies() - - # build - self.build() - - # Install - self.install() + # create run dir for application + self.node.mkdir(self.run_home) + + steps = [ + # upload sources + self.upload_sources, + # upload files + self.upload_files, + # upload binaries + self.upload_binaries, + # upload libraries + self.upload_libraries, + # upload code + self.upload_code, + # upload stdin + self.upload_stdin, + # install dependencies + self.install_dependencies, + # build + self.build, + # Install + self.install] + + # Since provisioning takes a long time, before + # each step we check that the EC is still + for step in steps: + if self.ec.finished: + raise RuntimeError, "EC finished" + + step() # Upload command to remote bash script # - only if command can be executed in background and detached @@ -245,8 +312,10 @@ class LinuxApplication(ResourceManager): env = self.get("env") env = env and self.replace_paths(env) - self.node.upload_command(command, self.app_home, - shfile = "app.sh", + shfile = os.path.join(self.app_home, "app.sh") + + self.node.upload_command(command, + shfile = shfile, env = env) self.info("Provisioning finished") @@ -255,34 +324,36 @@ class LinuxApplication(ResourceManager): def upload_sources(self): sources = self.get("sources") + if sources: self.info("Uploading sources ") - # create dir for sources - self.node.mkdir(self.src_dir) - sources = sources.split(' ') - http_sources = list() + # Separate sources that should be downloaded from + # the web, from sources that should be uploaded from + # the local machine + command = [] for source in list(sources): if source.startswith("http") or source.startswith("https"): - http_sources.append(source) + # remove the hhtp source from the sources list sources.remove(source) - # Download http sources remotely - if http_sources: - command = [" wget -c --directory-prefix=${SOURCES} "] - check = [] - - for source in http_sources: - command.append(" %s " % (source)) - check.append(" ls ${SOURCES}/%s " % os.path.basename(source)) - - command = " ".join(command) - check = " ; ".join(check) - - # Append the command to check that the sources were downloaded - command += " ; %s " % check + command.append( " ( " + # Check if the source already exists + " ls ${SRC}/%(basename)s " + " || ( " + # If source doesn't exist, download it and check + # that it it downloaded ok + " wget -c --directory-prefix=${SRC} %(source)s && " + " ls ${SRC}/%(basename)s " + " ) ) " % { + "basename": os.path.basename(source), + "source": source + }) + + if command: + command = " && ".join(command) # replace application specific paths in the command command = self.replace_paths(command) @@ -290,64 +361,78 @@ class LinuxApplication(ResourceManager): # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "http_sources.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "http_sources.sh"), + overwrite = False, pidfile = "http_sources_pidfile", ecodefile = "http_sources_exitcode", stdout = "http_sources_stdout", stderr = "http_sources_stderr") if sources: - self.node.upload(sources, self.src_dir) + sources = ' '.join(sources) + self.node.upload(sources, self.node.src_dir, overwrite = False) + + def upload_files(self): + files = self.get("files") + + if files: + self.info("Uploading files %s " % files) + self.node.upload(files, self.node.share_dir, overwrite = False) + + def upload_libraries(self): + libs = self.get("libs") + + if libs: + self.info("Uploading libraries %s " % libaries) + self.node.upload(libs, self.node.lib_dir, overwrite = False) + + def upload_binaries(self): + bins = self.get("bins") + + if bins: + self.info("Uploading binaries %s " % binaries) + self.node.upload(bins, self.node.bin_dir, overwrite = False) def upload_code(self): code = self.get("code") - if code: - # create dir for sources - self.node.mkdir(self.src_dir) - self.info("Uploading code ") + if code: + self.info("Uploading code") - dst = os.path.join(self.src_dir, "code") - self.node.upload(sources, dst, text = True) + dst = os.path.join(self.app_home, "code") + self.node.upload(code, dst, overwrite = False, text = True) def upload_stdin(self): stdin = self.get("stdin") if stdin: # create dir for sources - self.info(" Uploading stdin ") + self.info("Uploading stdin") dst = os.path.join(self.app_home, "stdin") - - # If what we are uploading is a file, check whether - # the same file already exists (using md5sum) - if self.compare_hash(stdin, dst): - return - - self.node.upload(stdin, dst, text = True) + self.node.upload(stdin, dst, overwrite = False, text = True) def install_dependencies(self): depends = self.get("depends") if depends: self.info("Installing dependencies %s" % depends) - self.node.install_packages(depends, self.app_home) + self.node.install_packages(depends, self.app_home, self.run_home) def build(self): build = self.get("build") + if build: self.info("Building sources ") - # create dir for build - self.node.mkdir(self.build_dir) - # replace application specific paths in the command command = self.replace_paths(build) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "build.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "build.sh"), + overwrite = False, pidfile = "build_pidfile", ecodefile = "build_exitcode", stdout = "build_stdout", @@ -355,6 +440,7 @@ class LinuxApplication(ResourceManager): def install(self): install = self.get("install") + if install: self.info("Installing sources ") @@ -364,8 +450,9 @@ class LinuxApplication(ResourceManager): # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - self.node.run_and_wait(command, self.app_home, - shfile = "install.sh", + self.node.run_and_wait(command, self.run_home, + shfile = os.path.join(self.app_home, "install.sh"), + overwrite = False, pidfile = "install_pidfile", ecodefile = "install_exitcode", stdout = "install_stdout", @@ -409,10 +496,13 @@ class LinuxApplication(ResourceManager): def _start_in_foreground(self): command = self.get("command") - stdin = "stdin" if self.get("stdin") else None sudo = self.get("sudo") or False x11 = self.get("forwardX11") + # For a command being executed in foreground, if there is stdin, + # it is expected to be text string not a file or pipe + stdin = self.get("stdin") or None + # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. @@ -440,17 +530,20 @@ class LinuxApplication(ResourceManager): def _start_in_background(self): command = self.get("command") env = self.get("env") - stdin = "stdin" if self.get("stdin") else None - stdout = "stdout" if self.get("stdout") else "stdout" - stderr = "stderr" if self.get("stderr") else "stderr" sudo = self.get("sudo") or False - # Command will be as a daemon in baground and detached from any terminal. - # The real command to run was previously uploaded to a bash script - # during deployment, now launch the remote script using 'run' - # method from the node - cmd = "bash ./app.sh" - (out, err), proc = self.node.run(cmd, self.app_home, + stdout = "stdout" + stderr = "stderr" + stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \ + else None + + # Command will be run as a daemon in baground and detached from any + # terminal. + # The command to run was previously uploaded to a bash script + # during deployment, now we launch the remote script using 'run' + # method from the node. + cmd = "bash %s" % os.path.join(self.app_home, "app.sh") + (out, err), proc = self.node.run(cmd, self.run_home, stdin = stdin, stdout = stdout, stderr = stderr, @@ -465,14 +558,14 @@ class LinuxApplication(ResourceManager): raise RuntimeError, msg # Wait for pid file to be generated - pid, ppid = self.node.wait_pid(self.app_home) + pid, ppid = self.node.wait_pid(self.run_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: - (out, err), proc = self.node.check_errors(self.app_home, + (out, err), proc = self.node.check_errors(self.run_home, stderr = stderr) # Out is what was written in the stderr file @@ -557,7 +650,7 @@ class LinuxApplication(ResourceManager): state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred - (out, err), proc = self.node.check_errors(self.app_home) + (out, err), proc = self.node.check_errors(self.run_home) if err: msg = " Failed to execute command '%s'" % self.get("command") @@ -580,49 +673,20 @@ class LinuxApplication(ResourceManager): """ Replace all special path tags with shell-escaped actual paths. """ - def absolute_dir(d): - return d if d.startswith("/") else os.path.join("${HOME}", d) - return ( command - .replace("${SOURCES}", absolute_dir(self.src_dir)) - .replace("${BUILD}", absolute_dir(self.build_dir)) - .replace("${APP_HOME}", absolute_dir(self.app_home)) - .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) - .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) + .replace("${USR}", self.node.usr_dir) + .replace("${LIB}", self.node.lib_dir) + .replace("${BIN}", self.node.bin_dir) + .replace("${SRC}", self.node.src_dir) + .replace("${SHARE}", self.node.share_dir) + .replace("${EXP}", self.node.exp_dir) + .replace("${EXP_HOME}", self.node.exp_home) + .replace("${APP_HOME}", self.app_home) + .replace("${RUN_HOME}", self.run_home) + .replace("${NODE_HOME}", self.node.node_home) + .replace("${HOME}", self.node.home_dir) ) - def compare_hash(self, local, remote): - # getting md5sum from remote file - (out, err), proc = self.node.execute("md5sum %s " % remote) - - if proc.poll() == 0: #OK - if not os.path.isfile(local): - # store to a tmp file - f = tempfile.NamedTemporaryFile() - f.write(local) - f.flush() - local = f.name - - lproc = subprocess.Popen(["md5sum", local], - stdout = subprocess.PIPE, - stderr = subprocess.PIPE) - - # getting md5sum from local file - (lout, lerr) = lproc.communicate() - - # files are the same, no need to upload - lchk = lout.strip().split(" ")[0] - rchk = out.strip().split(" ")[0] - - msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( - local, lchk, remote, rchk) - self.debug(msg) - - if lchk == rchk: - return True - - return False - def valid_connection(self, guid): # TODO: Validate! return True