Replacing _backend for _platform class attribute in ResourceManager
[nepi.git] / src / nepi / resources / linux / application.py
index 0057ad1..23108e5 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay
+        ResourceState
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -63,14 +63,14 @@ class LinuxApplication(ResourceManager):
         The directory structure used by LinuxApplication RM at the Linux
         host is the following:
 
-        ${HOME}/nepi-usr --> Base directory for multi-experiment files
+        ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
                       |
         ${LIB}        |- /lib --> Base directory for libraries
         ${BIN}        |- /bin --> Base directory for binary files
         ${SRC}        |- /src --> Base directory for sources
         ${SHARE}      |- /share --> Base directory for other files
 
-        ${HOME}/nepi-exp --> Base directory for single-experiment files
+        ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
                       |
         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
                           |
@@ -81,52 +81,52 @@ class LinuxApplication(ResourceManager):
 
     """
 
-    _rtype = "LinuxApplication"
+    _rtype = "linux::Application"
     _help = "Runs an application on a Linux host with a BASH command "
-    _backend_type = "linux"
+    _platform = "linux"
 
     @classmethod
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute at application start. "
                 "Note that commands will be executed in the ${RUN_HOME} directory, "
                 "make sure to take this into account when using relative paths. ", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         env = Attribute("env", "Environment variables string for command execution",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         sudo = Attribute("sudo", "Run with root privileges", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         depends = Attribute("depends", 
                 "Space-separated list of packages required to run the application",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         sources = Attribute("sources", 
-                "Space-separated list of regular files to be uploaded to ${SRC} "
+                "semi-colon separated list of regular files to be uploaded to ${SRC} "
                 "directory prior to building. Archives won't be expanded automatically. "
                 "Sources are globally available for all experiments unless "
                 "cleanHome is set to True (This will delete all sources). ",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         files = Attribute("files", 
-                "Space-separated list of regular miscellaneous files to be uploaded "
+                "semi-colon separated list of regular miscellaneous files to be uploaded "
                 "to ${SHARE} directory. "
                 "Files are globally available for all experiments unless "
                 "cleanHome is set to True (This will delete all files). ",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         libs = Attribute("libs", 
-                "Space-separated list of libraries (e.g. .so files) to be uploaded "
+                "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
                 "to ${LIB} directory. "
                 "Libraries are globally available for all experiments unless "
                 "cleanHome is set to True (This will delete all files). ",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         bins = Attribute("bins", 
-                "Space-separated list of binary files to be uploaded "
+                "semi-colon separated list of binary files to be uploaded "
                 "to ${BIN} directory. "
                 "Binaries are globally available for all experiments unless "
                 "cleanHome is set to True (This will delete all files). ",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         code = Attribute("code", 
                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         build = Attribute("build", 
                 "Build commands to execute after deploying the sources. "
                 "Sources are uploaded to the ${SRC} directory and code "
@@ -135,16 +135,16 @@ class LinuxApplication(ResourceManager):
                 "./configure && make && make clean.\n"
                 "Make sure to make the build commands return with a nonzero exit "
                 "code on error.",
-                flags = Flags.ReadOnly)
+                flags = Flags.Design)
         install = Attribute("install", 
                 "Commands to transfer built files to their final destinations. "
                 "Install commands are executed after build commands. ",
-                flags = Flags.ReadOnly)
+                flags = Flags.Design)
         stdin = Attribute("stdin", "Standard input for the 'command'", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         tear_down = Attribute("tearDown", "Command to be executed just before " 
                 "releasing the resource", 
-                flags = Flags.ReadOnly)
+                flags = Flags.Design)
 
         cls._register_attribute(command)
         cls._register_attribute(forward_x11)
@@ -173,7 +173,9 @@ class LinuxApplication(ResourceManager):
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
+        self._node = None
         self._home = "app-%s" % self.guid
+
         # whether the command should run in foreground attached
         # to a terminal
         self._in_foreground = False
@@ -194,9 +196,16 @@ class LinuxApplication(ResourceManager):
 
     @property
     def node(self):
-        node = self.get_connected(LinuxNode.get_rtype())
-        if node: return node[0]
-        return None
+        if not self._node:
+            node = self.get_connected(LinuxNode.get_rtype())
+            if not node: 
+                msg = "Application %s guid %d NOT connected to Node" % (
+                        self._rtype, self.guid)
+                raise RuntimeError, msg
+
+            self._node = node[0]
+
+        return self._node
 
     @property
     def app_home(self):
@@ -227,10 +236,13 @@ class LinuxApplication(ResourceManager):
         """
         return self.get("forwardX11") or self._in_foreground
 
+    def trace_filepath(self, filename):
+        return os.path.join(self.run_home, filename)
+
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
-        path = os.path.join(self.run_home, name)
+        path = self.trace_filepath(name)
         
         command = "(test -f %s && echo 'success') || echo 'error'" % path
         (out, err), proc = self.node.execute(command)
@@ -272,16 +284,18 @@ class LinuxApplication(ResourceManager):
 
     def do_provision(self):
         # take a snapshot of the system if user is root
-        # to assure cleanProcess kill every nepi process
+        # to ensure that cleanProcess will not kill
+        # pre-existent processes
         if self.node.get("username") == 'root':
             import pickle
             procs = dict()
             ps_aux = "ps aux |awk '{print $2,$11}'"
             (out, err), proc = self.node.execute(ps_aux)
-            for line in out.strip().split("\n"):
-                parts = line.strip().split(" ")
-                procs[parts[0]] = parts[1]
-            pickle.dump(procs, open("/tmp/save.proc", "wb"))
+            if len(out) != 0:
+                for line in out.strip().split("\n"):
+                    parts = line.strip().split(" ")
+                    procs[parts[0]] = parts[1]
+                pickle.dump(procs, open("/tmp/save.proc", "wb"))
             
         # create run dir for application
         self.node.mkdir(self.run_home)
@@ -353,29 +367,40 @@ class LinuxApplication(ResourceManager):
                     env = env,
                     overwrite = overwrite)
 
-    def execute_deploy_command(self, command):
+    def execute_deploy_command(self, command, prefix="deploy"):
         if command:
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
+            env = env and self.replace_paths(env)
+
             # Upload the command to a bash script and run it
             # in background ( but wait until the command has
             # finished to continue )
-            shfile = os.path.join(self.app_home, "deploy.sh")
+            shfile = os.path.join(self.app_home, "%s.sh" % prefix)
             self.node.run_and_wait(command, self.run_home,
                     shfile = shfile, 
                     overwrite = False,
-                    pidfile = "deploy_pidfile", 
-                    ecodefile = "deploy_exitcode", 
-                    stdout = "deploy_stdout", 
-                    stderr = "deploy_stderr")
-
-    def upload_sources(self):
-        sources = self.get("sources")
+                    pidfile = "%s_pidfile" % prefix, 
+                    ecodefile = "%s_exitcode" % prefix, 
+                    stdout = "%s_stdout" % prefix, 
+                    stderr = "%s_stderr" % prefix)
+
+    def upload_sources(self, sources = None, src_dir = None):
+        if not sources:
+            sources = self.get("sources")
    
         command = ""
 
+        if not src_dir:
+            src_dir = self.node.src_dir
+
         if sources:
             self.info("Uploading sources ")
 
-            sources = sources.split(' ')
+            sources = map(str.strip, sources.split(";"))
 
             # Separate sources that should be downloaded from 
             # the web, from sources that should be uploaded from
@@ -388,15 +413,16 @@ class LinuxApplication(ResourceManager):
 
                     command.append( " ( " 
                             # Check if the source already exists
-                            " ls ${SRC}/%(basename)s "
+                            " ls %(src_dir)s/%(basename)s "
                             " || ( "
                             # If source doesn't exist, download it and check
                             # that it it downloaded ok
-                            "   wget -c --directory-prefix=${SRC} %(source)s && "
-                            "   ls ${SRC}/%(basename)s "
+                            "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
+                            "   ls %(src_dir)s/%(basename)s "
                             " ) ) " % {
                                 "basename": os.path.basename(source),
-                                "source": source
+                                "source": source,
+                                "src_dir": src_dir
                                 })
 
             command = " && ".join(command)
@@ -405,34 +431,38 @@ class LinuxApplication(ResourceManager):
             command = self.replace_paths(command)
        
             if sources:
-                sources = ' '.join(sources)
-                self.node.upload(sources, self.node.src_dir, overwrite = False)
+                sources = ';'.join(sources)
+                self.node.upload(sources, src_dir, overwrite = False)
 
         return command
 
-    def upload_files(self):
-        files = self.get("files")
+    def upload_files(self, files = None):
+        if not files:
+            files = self.get("files")
 
         if files:
             self.info("Uploading files %s " % files)
             self.node.upload(files, self.node.share_dir, overwrite = False)
 
-    def upload_libraries(self):
-        libs = self.get("libs")
+    def upload_libraries(self, libs = None):
+        if not libs:
+            libs = self.get("libs")
 
         if libs:
             self.info("Uploading libraries %s " % libaries)
             self.node.upload(libs, self.node.lib_dir, overwrite = False)
 
-    def upload_binaries(self):
-        bins = self.get("bins")
+    def upload_binaries(self, bins = None):
+        if not bins:
+            bins = self.get("bins")
 
         if bins:
             self.info("Uploading binaries %s " % binaries)
             self.node.upload(bins, self.node.bin_dir, overwrite = False)
 
-    def upload_code(self):
-        code = self.get("code")
+    def upload_code(self, code = None):
+        if not code:
+            code = self.get("code")
 
         if code:
             self.info("Uploading code")
@@ -440,15 +470,21 @@ class LinuxApplication(ResourceManager):
             dst = os.path.join(self.app_home, "code")
             self.node.upload(code, dst, overwrite = False, text = True)
 
-    def upload_stdin(self):
-        stdin = self.get("stdin")
+    def upload_stdin(self, stdin = None):
+        if not stdin:
+           stdin = self.get("stdin")
+
         if stdin:
             # create dir for sources
             self.info("Uploading stdin")
             
             # upload stdin file to ${SHARE_DIR} directory
-            basename = os.path.basename(stdin)
-            dst = os.path.join(self.node.share_dir, basename)
+            if os.path.isfile(stdin):
+                basename = os.path.basename(stdin)
+                dst = os.path.join(self.node.share_dir, basename)
+            else:
+                dst = os.path.join(self.app_home, "stdin")
+
             self.node.upload(stdin, dst, overwrite = False, text = True)
 
             # create "stdin" symlink on ${APP_HOME} directory
@@ -458,14 +494,17 @@ class LinuxApplication(ResourceManager):
 
             return command
 
-    def install_dependencies(self):
-        depends = self.get("depends")
+    def install_dependencies(self, depends = None):
+        if not depends:
+            depends = self.get("depends")
+
         if depends:
             self.info("Installing dependencies %s" % depends)
             return self.node.install_packages_command(depends)
 
-    def build(self):
-        build = self.get("build")
+    def build(self, build = None):
+        if not build:
+            build = self.get("build")
 
         if build:
             self.info("Building sources ")
@@ -473,8 +512,9 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             return self.replace_paths(build)
 
-    def install(self):
-        install = self.get("install")
+    def install(self, install = None):
+        if not install:
+            install = self.get("install")
 
         if install:
             self.info("Installing sources ")
@@ -487,7 +527,7 @@ class LinuxApplication(ResourceManager):
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
-            self.ec.schedule(reschedule_delay, self.deploy)
+            self.ec.schedule(self.reschedule_delay, self.deploy)
         else:
             command = self.get("command") or ""
             self.info("Deploying command '%s' " % command)
@@ -519,10 +559,6 @@ class LinuxApplication(ResourceManager):
         x11 = self.get("forwardX11")
         env = self.get("env")
 
-        # For a command being executed in foreground, if there is stdin,
-        # it is expected to be text string not a file or pipe
-        stdin = self.get("stdin") or None
-
         # Command will be launched in foreground and attached to the
         # terminal using the node 'execute' in non blocking mode.
 
@@ -533,7 +569,6 @@ class LinuxApplication(ResourceManager):
         (out, err), self._proc = self.execute_command(command, 
                 env = env,
                 sudo = sudo,
-                stdin = stdin,
                 forward_x11 = x11,
                 blocking = False)
 
@@ -609,21 +644,27 @@ class LinuxApplication(ResourceManager):
                     (out, err), proc = self.node.kill(self.pid, self.ppid,
                             sudo = self._sudo_kill)
 
+                    """
                     # TODO: check if execution errors occurred
-                    if proc.poll() or err:
+                    if (proc and proc.poll()) or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
-        
+                    """
+
             super(LinuxApplication, self).do_stop()
 
     def do_release(self):
         self.info("Releasing resource")
 
+        self.do_stop()
+        
         tear_down = self.get("tearDown")
         if tear_down:
             self.node.execute(tear_down)
 
-        self.do_stop()
+        hard_release = self.get("hardRelease")
+        if hard_release:
+            self.node.rmdir(self.app_home)
 
         super(LinuxApplication, self).do_release()
         
@@ -681,7 +722,7 @@ class LinuxApplication(ResourceManager):
     def execute_command(self, command, 
             env = None,
             sudo = False,
-            stdin = None,
+            tty = False,
             forward_x11 = False,
             blocking = False):
 
@@ -693,7 +734,7 @@ class LinuxApplication(ResourceManager):
 
         return self.node.execute(command,
                 sudo = sudo,
-                stdin = stdin,
+                tty = tty,
                 forward_x11 = forward_x11,
                 blocking = blocking)