X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fapplication.py;h=90345b73aad6c726c8b622f300b7cf994f6ec542;hb=7fed767f9f18ee81807950771145969cbb27b8a7;hp=a1a9f972efe073f76a58e29ff96e32d2b1bf2a06;hpb=b769dda475a0b56e7a36e1763a2610990b1c1074;p=nepi.git diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index a1a9f972..90345b73 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay + ResourceState from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -63,14 +63,14 @@ class LinuxApplication(ResourceManager): The directory structure used by LinuxApplication RM at the Linux host is the following: - ${HOME}/nepi-usr --> Base directory for multi-experiment files + ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files | ${LIB} |- /lib --> Base directory for libraries ${BIN} |- /bin --> Base directory for binary files ${SRC} |- /src --> Base directory for sources ${SHARE} |- /share --> Base directory for other files - ${HOME}/nepi-exp --> Base directory for single-experiment files + ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files | ${EXP_HOME} |- / --> Base directory for experiment exp-id | @@ -81,9 +81,9 @@ class LinuxApplication(ResourceManager): """ - _rtype = "LinuxApplication" + _rtype = "linux::Application" _help = "Runs an application on a Linux host with a BASH command " - _backend_type = "linux" + _platform = "linux" @classmethod def _register_attributes(cls): @@ -101,25 +101,25 @@ class LinuxApplication(ResourceManager): "Space-separated list of packages required to run the application", flags = Flags.Design) sources = Attribute("sources", - "Space-separated list of regular files to be uploaded to ${SRC} " + "semi-colon separated list of regular files to be uploaded to ${SRC} " "directory prior to building. Archives won't be expanded automatically. " "Sources are globally available for all experiments unless " "cleanHome is set to True (This will delete all sources). ", flags = Flags.Design) files = Attribute("files", - "Space-separated list of regular miscellaneous files to be uploaded " + "semi-colon separated list of regular miscellaneous files to be uploaded " "to ${SHARE} directory. " "Files are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", flags = Flags.Design) libs = Attribute("libs", - "Space-separated list of libraries (e.g. .so files) to be uploaded " + "semi-colon separated list of libraries (e.g. .so files) to be uploaded " "to ${LIB} directory. " "Libraries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", flags = Flags.Design) bins = Attribute("bins", - "Space-separated list of binary files to be uploaded " + "semi-colon separated list of binary files to be uploaded " "to ${BIN} directory. " "Binaries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", @@ -173,7 +173,9 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None + self._node = None self._home = "app-%s" % self.guid + # whether the command should run in foreground attached # to a terminal self._in_foreground = False @@ -187,16 +189,23 @@ class LinuxApplication(ResourceManager): # timestamp of last state check of the application self._last_state_check = tnow() - + def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @property def node(self): - node = self.get_connected(LinuxNode.get_rtype()) - if node: return node[0] - return None + if not self._node: + node = self.get_connected(LinuxNode.get_rtype()) + if not node: + msg = "Application %s guid %d NOT connected to Node" % ( + self._rtype, self.guid) + raise RuntimeError, msg + + self._node = node[0] + + return self._node @property def app_home(self): @@ -274,6 +283,20 @@ class LinuxApplication(ResourceManager): return out def do_provision(self): + # take a snapshot of the system if user is root + # to ensure that cleanProcess will not kill + # pre-existent processes + if self.node.get("username") == 'root': + import pickle + procs = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.node.execute(ps_aux) + if len(out) != 0: + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + procs[parts[0]] = parts[1] + pickle.dump(procs, open("/tmp/save.proc", "wb")) + # create run dir for application self.node.mkdir(self.run_home) @@ -332,7 +355,6 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command command = self.replace_paths(command) - # replace application specific paths in the environment env = self.get("env") env = env and self.replace_paths(env) @@ -344,22 +366,30 @@ class LinuxApplication(ResourceManager): env = env, overwrite = overwrite) - def execute_deploy_command(self, command): + def execute_deploy_command(self, command, prefix="deploy"): if command: + # replace application specific paths in the command + command = self.replace_paths(command) + + # replace application specific paths in the environment + env = self.get("env") + env = env and self.replace_paths(env) + # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - shfile = os.path.join(self.app_home, "deploy.sh") + shfile = os.path.join(self.app_home, "%s.sh" % prefix) self.node.run_and_wait(command, self.run_home, shfile = shfile, overwrite = False, - pidfile = "deploy_pidfile", - ecodefile = "deploy_exitcode", - stdout = "deploy_stdout", - stderr = "deploy_stderr") - - def upload_sources(self, src_dir = None): - sources = self.get("sources") + pidfile = "%s_pidfile" % prefix, + ecodefile = "%s_exitcode" % prefix, + stdout = "%s_stdout" % prefix, + stderr = "%s_stderr" % prefix) + + def upload_sources(self, sources = None, src_dir = None): + if not sources: + sources = self.get("sources") command = "" @@ -369,7 +399,7 @@ class LinuxApplication(ResourceManager): if sources: self.info("Uploading sources ") - sources = sources.split(' ') + sources = map(str.strip, sources.split(";")) # Separate sources that should be downloaded from # the web, from sources that should be uploaded from @@ -400,34 +430,38 @@ class LinuxApplication(ResourceManager): command = self.replace_paths(command) if sources: - sources = ' '.join(sources) + sources = ';'.join(sources) self.node.upload(sources, src_dir, overwrite = False) return command - def upload_files(self): - files = self.get("files") + def upload_files(self, files = None): + if not files: + files = self.get("files") if files: self.info("Uploading files %s " % files) self.node.upload(files, self.node.share_dir, overwrite = False) - def upload_libraries(self): - libs = self.get("libs") + def upload_libraries(self, libs = None): + if not libs: + libs = self.get("libs") if libs: self.info("Uploading libraries %s " % libaries) self.node.upload(libs, self.node.lib_dir, overwrite = False) - def upload_binaries(self): - bins = self.get("bins") + def upload_binaries(self, bins = None): + if not bins: + bins = self.get("bins") if bins: self.info("Uploading binaries %s " % binaries) self.node.upload(bins, self.node.bin_dir, overwrite = False) - def upload_code(self): - code = self.get("code") + def upload_code(self, code = None): + if not code: + code = self.get("code") if code: self.info("Uploading code") @@ -435,15 +469,21 @@ class LinuxApplication(ResourceManager): dst = os.path.join(self.app_home, "code") self.node.upload(code, dst, overwrite = False, text = True) - def upload_stdin(self): - stdin = self.get("stdin") + def upload_stdin(self, stdin = None): + if not stdin: + stdin = self.get("stdin") + if stdin: # create dir for sources self.info("Uploading stdin") # upload stdin file to ${SHARE_DIR} directory - basename = os.path.basename(stdin) - dst = os.path.join(self.node.share_dir, basename) + if os.path.isfile(stdin): + basename = os.path.basename(stdin) + dst = os.path.join(self.node.share_dir, basename) + else: + dst = os.path.join(self.app_home, "stdin") + self.node.upload(stdin, dst, overwrite = False, text = True) # create "stdin" symlink on ${APP_HOME} directory @@ -453,14 +493,17 @@ class LinuxApplication(ResourceManager): return command - def install_dependencies(self): - depends = self.get("depends") + def install_dependencies(self, depends = None): + if not depends: + depends = self.get("depends") + if depends: self.info("Installing dependencies %s" % depends) return self.node.install_packages_command(depends) - def build(self): - build = self.get("build") + def build(self, build = None): + if not build: + build = self.get("build") if build: self.info("Building sources ") @@ -468,8 +511,9 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command return self.replace_paths(build) - def install(self): - install = self.get("install") + def install(self, install = None): + if not install: + install = self.get("install") if install: self.info("Installing sources ") @@ -481,8 +525,8 @@ class LinuxApplication(ResourceManager): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: - self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - self.ec.schedule(reschedule_delay, self.deploy) + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state) + self.ec.schedule(self.reschedule_delay, self.deploy) else: command = self.get("command") or "" self.info("Deploying command '%s' " % command) @@ -599,21 +643,27 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.kill(self.pid, self.ppid, sudo = self._sudo_kill) + """ # TODO: check if execution errors occurred if (proc and proc.poll()) or err: msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) - + """ + super(LinuxApplication, self).do_stop() def do_release(self): self.info("Releasing resource") + self.do_stop() + tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) - self.do_stop() + hard_release = self.get("hardRelease") + if hard_release: + self.node.rmdir(self.app_home) super(LinuxApplication, self).do_release() @@ -669,38 +719,49 @@ class LinuxApplication(ResourceManager): return self._state def execute_command(self, command, - env = None, - sudo = False, - forward_x11 = False, - blocking = False): + env=None, + sudo=False, + tty=False, + forward_x11=False, + blocking=False): environ = "" if env: - environ = self.node.format_environment(env, inline = True) + environ = self.node.format_environment(env, inline=True) command = environ + command command = self.replace_paths(command) return self.node.execute(command, - sudo = sudo, - forward_x11 = forward_x11, - blocking = blocking) + sudo=sudo, + tty=tty, + forward_x11=forward_x11, + blocking=blocking) - def replace_paths(self, command): + def replace_paths(self, command, node=None, app_home=None, run_home=None): """ Replace all special path tags with shell-escaped actual paths. """ + if not node: + node=self.node + + if not app_home: + app_home=self.app_home + + if not run_home: + run_home = self.run_home + return ( command - .replace("${USR}", self.node.usr_dir) - .replace("${LIB}", self.node.lib_dir) - .replace("${BIN}", self.node.bin_dir) - .replace("${SRC}", self.node.src_dir) - .replace("${SHARE}", self.node.share_dir) - .replace("${EXP}", self.node.exp_dir) - .replace("${EXP_HOME}", self.node.exp_home) - .replace("${APP_HOME}", self.app_home) - .replace("${RUN_HOME}", self.run_home) - .replace("${NODE_HOME}", self.node.node_home) - .replace("${HOME}", self.node.home_dir) + .replace("${USR}", node.usr_dir) + .replace("${LIB}", node.lib_dir) + .replace("${BIN}", node.bin_dir) + .replace("${SRC}", node.src_dir) + .replace("${SHARE}", node.share_dir) + .replace("${EXP}", node.exp_dir) + .replace("${EXP_HOME}", node.exp_home) + .replace("${APP_HOME}", app_home) + .replace("${RUN_HOME}", run_home) + .replace("${NODE_HOME}", node.node_home) + .replace("${HOME}", node.home_dir) ) def valid_connection(self, guid):