Minor typos
[nepi.git] / src / neco / resources / linux / node.py
index 56eb6e2..3f30633 100644 (file)
@@ -13,6 +13,12 @@ import time
 import threading
 
 # TODO: Verify files and dirs exists already
+# TODO: Blacklist nodes!
+# TODO: Unify delays!!
+# TODO: Validate outcome of uploads!! 
+
+reschedule_delay = "0.5s"
+
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -20,32 +26,35 @@ class LinuxNode(ResourceManager):
 
     @classmethod
     def _register_attributes(cls):
-        hostname = Attribute("hostname", "Hostname of the machine")
+        hostname = Attribute("hostname", "Hostname of the machine",
+                flags = Flags.ExecReadOnly)
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
-        port = Attribute("port", "SSH port", flags = Flags.Credential)
+        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
         
-        home = Attribute("home", 
-                "Experiment home directory to store all experiment related files")
+        home = Attribute("home",
+                "Experiment home directory to store all experiment related files",
+                flags = Flags.ExecReadOnly)
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
-                flags = Flags.Credential)
+                flags = Flags.ExecReadOnly)
         
         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
                 " from home folder before starting experiment", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
-                "Kill all running processes before starting experiment", 
-                flags = Flags.ReadOnly)
+                "Kill all running processes before starting experiment",
+                flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
-                "releasing the resource", flags = Flags.ReadOnly)
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
@@ -60,19 +69,28 @@ class LinuxNode(ResourceManager):
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
-        self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
 
-        self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
+        self._logger = logging.getLogger("LinuxNode")
+    
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.get("hostname"), msg)
 
     @property
     def home(self):
-        home = self.get("home")
-        if home and not home.startswith("nepi-"):
-            home = "nepi-" + home
-        return home or self._home
+        return self.get("home") or ""
+
+    @property
+    def exp_home(self):
+        return os.path.join(self.home, self.ec.exp_id)
+
+    @property
+    def node_home(self):
+        node_home = "node-%d" % self.guid
+        return os.path.join(self.exp_home, node_home)
 
     @property
     def os(self):
@@ -80,16 +98,16 @@ class LinuxNode(ResourceManager):
             return self._os
 
         if (not self.get("hostname") or not self.get("username")):
-            msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
-            self.logger.error(msg)
+            msg = "Can't resolve OS, insufficient data "
+            self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue")
+        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
 
         if err and proc.poll():
-            msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
-            self.logger.error(msg)
-            raise RuntimeError, msg
+            msg = "Error detecting OS "
+            self.error(msg, out, err)
+            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
             self._os = "f12"
@@ -100,9 +118,9 @@ class LinuxNode(ResourceManager):
         elif out.find("Ubuntu") ==0:
             self._os = "ubuntu"
         else:
-            msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
-            self.logger.error(msg)
-            raise RuntimeError, msg
+            msg = "Unsupported OS"
+            self.error(msg, out)
+            raise RuntimeError, "%s - %s " %( msg, out )
 
         return self._os
 
@@ -113,20 +131,37 @@ class LinuxNode(ResourceManager):
     def provision(self, filters = None):
         if not self.is_alive():
             self._state = ResourceState.FAILED
-            self.logger.error("Deploy failed. Unresponsive node")
-            return
-
-    def deploy(self):
-        self.provision()
+            msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+            self.error(msg)
+            raise RuntimeError, msg
 
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
-            # self.clean_home() -> this is dangerous
-            pass
+            self.clean_home()
+       
+        self.mkdir(self.node_home)
 
-        self.mkdir(self.home)
+        super(LinuxNode, self).provision()
+
+    def deploy(self):
+        if self.state == ResourceState.NEW:
+            try:
+               self.discover()
+               self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
+
+        # Node needs to wait until all associated interfaces are 
+        # ready before it can finalize deployment
+        from neco.resources.linux.interface import LinuxInterface
+        ifaces = self.get_connected(LinuxInterface.rtype())
+        for iface in ifaces:
+            if iface.state < ResourceState.READY:
+                self.ec.schedule(reschedule_delay, self.deploy)
+                return 
 
         super(LinuxNode, self).deploy()
 
@@ -137,53 +172,57 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).release()
 
-    def validate_connection(self, guid):
+    def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
-    def clean_processes(self):
-        self.logger.info("Cleaning up processes")
+    def clean_processes(self, killer = False):
+        self.info("Cleaning up processes")
         
-        cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
-            "sudo -S killall python tcpdump || /bin/true ; " +
-            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
-            "sudo -S killall -u root || /bin/true ; " +
-            "sudo -S killall -u root || /bin/true ; ")
+        if killer:
+            # Hardcore kill
+            cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; ")
+        else:
+            # Be gentler...
+            cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
 
         out = err = ""
-        with self._lock:
-           (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "cppid",
-                stdout = "cplog", 
-                stderr = "cperr", 
-                raise_on_error = True)
-
-        return (out, err)   
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
     def clean_home(self):
-        self.logger.info("Cleaning up home")
-
-        cmd = "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
+        self.info("Cleaning up home")
+        
+        cmd = (
+            # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
+            "find . -maxdepth 1 -name 'nepi-*' " +
+            " -execdir rm -rf {} + "
+            )
+            
+        if self.home:
+            cmd = "cd %s ; " % self.home + cmd
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home,
-                pidfile = "chpid",
-                stdout = "chlog", 
-                stderr = "cherr", 
-                raise_on_error = True)
-        
-        return (out, err)   
+        (out, err), proc = self.execute(cmd, with_lock = True)
 
-    def upload(self, src, dst):
+    def upload(self, src, dst, text = False):
         """ Copy content to destination
 
-           src  content to copy. Can be a local file, directory or text input
+           src  content to copy. Can be a local file, directory or a list of files
 
            dst  destination path on the remote host (remote is always self.host)
+
+           text src is text input, it must be stored into a temp file before uploading
         """
         # If source is a string input 
-        if not os.path.isfile(src):
+        f = None
+        if text and not os.path.isfile(src):
             # src is text input that should be uploaded as file
             # create a temporal file with the content to upload
             f = tempfile.NamedTemporaryFile(delete=False)
@@ -195,7 +234,13 @@ class LinuxNode(ResourceManager):
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
-        return self.copy(src, dst)
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
 
     def download(self, src, dst):
         if not self.localhost:
@@ -203,47 +248,47 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages):
+    def install_packages(self, packages, home = None):
+        home = home or self.node_home
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.install_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
             cmd = debfuncs.install_packages_command(self.os, packages)
         else:
-            msg = "Error installing packages. OS not known for host %s " % (
-                    self.get("hostname"))
-            self.logger.error(msg)
+            msg = "Error installing packages ( OS not known ) "
+            self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "instpkgpid",
-                stdout = "instpkglog", 
-                stderr = "instpkgerr", 
-                raise_on_error = True)
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "instpkg_pid",
+            stdout = "instpkg_out", 
+            stderr = "instpkg_err",
+            raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages):
+    def remove_packages(self, packages, home = None):
+        home = home or self.node_home
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.remove_packages_command(self.os, packages)
         elif self.os in ["debian", "ubuntu"]:
             cmd = debfuncs.remove_packages_command(self.os, packages)
         else:
-            msg = "Error removing packages. OS not known for host %s " % (
-                    self.get("hostname"))
-            self.logger.error(msg)
+            msg = "Error removing packages ( OS not known ) "
+            self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "rmpkgpid",
-                stdout = "rmpkglog", 
-                stderr = "rmpkgerr", 
-                raise_on_error = True)
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "rmpkg_pid",
+            stdout = "rmpkg_out", 
+            stderr = "rmpkg_err",
+            raise_on_error = True)
          
         return (out, err), proc 
 
@@ -251,10 +296,10 @@ class LinuxNode(ResourceManager):
         if clean:
             self.rmdir(path)
 
-        return self.execute("mkdir -p %s" % path)
+        return self.execute("mkdir -p %s" % path, with_lock = True)
 
     def rmdir(self, path):
-        return self.execute("rm -rf %s" % path)
+        return self.execute("rm -rf %s" % path, with_lock = True)
 
     def run_and_wait(self, command, 
             home = ".", 
@@ -263,44 +308,54 @@ class LinuxNode(ResourceManager):
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
+            tty = False,
             raise_on_error = False):
-
+        """ runs a command in background on the remote host, but waits
+            until the command finishes execution.
+            This is more robust than doing a simple synchronized 'execute',
+            since in the remote host the command can continue to run detached
+            even if network disconnections occur
+        """
+        # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
                 stdin = stdin, 
                 stdout = stdout, 
                 stderr = stderr, 
-                sudo = sudo)
+                sudo = sudo,
+                tty = tty)
 
+        # check no errors occurred
         if proc.poll() and err:
-            msg = " Failed to run command %s on host %s" % (
-                    command, self.get("hostname"))
-            self.logger.error(msg)
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
             if raise_on_error:
                 raise RuntimeError, msg
-        
+
+        # Wait for pid file to be generated
         pid, ppid = self.wait_pid(
                 home = home, 
                 pidfile = pidfile, 
                 raise_on_error = raise_on_error)
 
+        # wait until command finishes to execute
         self.wait_run(pid, ppid)
-        
-        (out, err), proc = self.check_run_error(home, stderr)
+       
+        # check if execution errors occurred
+        (out, err), proc = self.check_output(home, stderr)
 
         if err or out:
-            msg = "Error while running command %s on host %s. error output: %s" % (
-                    command, self.get("hostname"), out)
-            if err:
-                msg += " . err: %s" % err
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
 
-            self.logger.error(msg)
             if raise_on_error:
                 raise RuntimeError, msg
         
         return (out, err), proc
  
     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
         for i in xrange(5):
@@ -313,15 +368,17 @@ class LinuxNode(ResourceManager):
                 time.sleep(delay)
                 delay = min(30,delay*1.2)
         else:
-            msg = " Failed to get pid for pidfile %s/%s on host %s" % (
-                    home, pidfile, self.get("hostname"))
-            self.logger.error(msg)
+            msg = " Failed to get pid for pidfile %s/%s " % (
+                    home, pidfile )
+            self.error(msg)
+            
             if raise_on_error:
                 raise RuntimeError, msg
 
         return pid, ppid
 
     def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
         delay = 1.0
         first = True
         bustspin = 0
@@ -344,14 +401,10 @@ class LinuxNode(ResourceManager):
                 delay = min(30,delay*1.2)
                 bustspin = 0
 
-    def check_run_error(self, home, stderr = 'stderr'):
+    def check_output(self, home, filename):
+        """ checks file content """
         (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stderr))
-        return (out, err), proc
-
-    def check_run_output(self, home, stdout = 'stdout'):
-        (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stdout))
+            os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
 
     def is_alive(self):
@@ -360,36 +413,37 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         try:
-            (out, err), proc = self.execute("echo 'ALIVE'")
+            # TODO: FIX NOT ALIVE!!!!
+            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
+                    with_lock = True)
         except:
             import traceback
             trace = traceback.format_exc()
-            self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s", 
-                    self.get("hostname"), out, err, trace)
+            msg = "Unresponsive host  %s " % err
+            self.error(msg, out, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
-            self.logger.warn("Unresponsive host %s. got:\n%s%s", 
-                    self.get("hostname"), out, err)
+            msg = "Unresponsive host "
+            self.error(msg, out, err)
             return False
 
-            # TODO!
-            #if self.check_bad_host(out,err):
-            #    self.blacklist()
-
     def copy(self, src, dst):
         if self.localhost:
             (out, err), proc =  execfuncs.lcopy(source, dest, 
-                    recursive = True)
+                    recursive = True,
+                    strict_host_checking = False)
         else:
-            (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
-                src, dst, 
-                port = self.get("port"),
-                identity = self.get("identity"),
-                server_key = self.get("serverKey"),
-                recursive = True)
+            with self._lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True,
+                    strict_host_checking = False)
 
         return (out, err), proc
 
@@ -400,10 +454,12 @@ class LinuxNode(ResourceManager):
             tty = False,
             forward_x11 = False,
             timeout = None,
-            retry = 0,
+            retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
-            persistent = True
+            strict_host_checking = False,
+            persistent = True,
+            with_lock = False
             ):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
@@ -416,7 +472,30 @@ class LinuxNode(ResourceManager):
                     stdin = stdin,
                     env = env)
         else:
-            (out, err), proc = self.safe_retry(sshfuncs.rexec)(
+            if with_lock:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rexec(
+                        command, 
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        stdin = stdin,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey"),
+                        env = env,
+                        tty = tty,
+                        forward_x11 = forward_x11,
+                        timeout = timeout,
+                        retry = retry,
+                        err_on_timeout = err_on_timeout,
+                        connect_timeout = connect_timeout,
+                        persistent = persistent,
+                        strict_host_checking = strict_host_checking
+                        )
+            else:
+                (out, err), proc = sshfuncs.rexec(
                     command, 
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -440,14 +519,15 @@ class LinuxNode(ResourceManager):
 
     def run(self, command, 
             home = None,
-            create_home = True,
+            create_home = False,
             pidfile = "pid",
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
-            sudo = False):
+            sudo = False,
+            tty = False):
 
-        self.logger.info("Running %s", command)
+        self.debug("Running command '%s'" % command)
         
         if self.localhost:
             (out, err), proc = execfuncs.lspawn(command, pidfile, 
@@ -461,22 +541,24 @@ class LinuxNode(ResourceManager):
         else:
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
-            (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
-                command,
-                pidfile = pidfile,
-                home = home,
-                create_home = create_home,
-                stdin = stdin if stdin is not None else '/dev/null',
-                stdout = stdout if stdout else '/dev/null',
-                stderr = stderr if stderr else '/dev/null',
-                sudo = sudo,
-                host = self.get("hostname"),
-                user = self.get("username"),
-                port = self.get("port"),
-                agent = True,
-                identity = self.get("identity"),
-                server_key = self.get("serverKey")
-                )
+            with self._lock:
+                (out,err), proc = sshfuncs.rspawn(
+                    command,
+                    pidfile = pidfile,
+                    home = home,
+                    create_home = create_home,
+                    stdin = stdin if stdin is not None else '/dev/null',
+                    stdout = stdout if stdout else '/dev/null',
+                    stderr = stderr if stderr else '/dev/null',
+                    sudo = sudo,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    tty = tty
+                    )
 
         return (out, err), proc
 
@@ -484,24 +566,9 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
         else:
-            pidtuple = sshfuncs.rcheckpid(
-                os.path.join(home, pidfile),
-                host = self.get("hostname"),
-                user = self.get("username"),
-                port = self.get("port"),
-                agent = True,
-                identity = self.get("identity"),
-                server_key = self.get("serverKey")
-                )
-        
-        return pidtuple
-    
-    def status(self, pid, ppid):
-        if self.localhost:
-            status = execfuncs.lstatus(pid, ppid)
-        else:
-            status = sshfuncs.rstatus(
-                    pid, ppid,
+            with self._lock:
+                pidtuple = sshfuncs.rcheckpid(
+                    os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
@@ -509,6 +576,23 @@ class LinuxNode(ResourceManager):
                     identity = self.get("identity"),
                     server_key = self.get("serverKey")
                     )
+        
+        return pidtuple
+    
+    def status(self, pid, ppid):
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            with self._lock:
+                status = sshfuncs.rstatus(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
            
         return status
     
@@ -521,16 +605,17 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                (out, err), proc = self.safe_retry(sshfuncs.rkill)(
-                    pid, ppid,
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    sudo = sudo,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey")
-                    )
+                with self._lock:
+                    (out, err), proc = sshfuncs.rkill(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
         return (out, err), proc
 
     def check_bad_host(self, out, err):
@@ -542,41 +627,7 @@ class LinuxNode(ResourceManager):
 
     def blacklist(self):
         # TODO!!!!
-        self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+        self.warn(" Blacklisting malfunctioning node ")
         #import util
         #util.appendBlacklist(self.hostname)
 
-    def safe_retry(self, func):
-        """Retries a function invocation using a lock"""
-        import functools
-        @functools.wraps(func)
-        def rv(*p, **kw):
-            fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
-                func.__name__, p, kw, self.get("hostname"))
-            retry = kw.pop("_retry", False)
-            wlock = kw.pop("_with_lock", False)
-
-            out = err = ""
-            proc = None
-            for i in xrange(0 if retry else 4):
-                try:
-                    if wlock:
-                        with self._lock:
-                            (out, err), proc = func(*p, **kw)
-                    else:
-                        (out, err), proc = func(*p, **kw)
-                        
-                    if proc.poll():
-                        if retry:
-                            time.sleep(i*15)
-                            continue
-                        else:
-                            self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
-                    break
-                except RuntimeError, e:
-                    if x >= 3:
-                        self.logger.error("%s. error: %s", fail_msg, e.args)
-            return (out, err), proc
-
-        return rv
-