merge with changes to 2.0 branch, since it will help with a timely completion.
authorStephen Soltesz <soltesz@cs.princeton.edu>
Wed, 11 Mar 2009 21:15:14 +0000 (21:15 +0000)
committerStephen Soltesz <soltesz@cs.princeton.edu>
Wed, 11 Mar 2009 21:15:14 +0000 (21:15 +0000)
moncommands.py

index 50d31e2..25765b6 100644 (file)
@@ -1,8 +1,10 @@
 import os
+import fcntl
 
 DEBUG= 0
 
 class ExceptionTimeout(Exception): pass
+class ExceptionReadTimeout(Exception): pass
 COMMAND_TIMEOUT = 60
 ssh_options = { 'StrictHostKeyChecking':'no', 
                                'BatchMode':'yes', 
@@ -13,15 +15,47 @@ import subprocess
 import signal
 
 class Sopen(subprocess.Popen):
-       def kill(self, signal = signal.SIGTERM):
-               os.kill(self.pid, signal)
+       def kill(self, sig = signal.SIGTERM):
+               try:
+                       # NOTE: this also kills parent... so doesn't work like I want.
+                       # NOTE: adding 'exec' before the cmd removes the extra sh, and
+                       #               partially addresses this problem.
+                       #os.killpg(os.getpgid(self.pid), signal.SIGKILL)
+                       os.kill(self.pid, sig)
+               except OSError:
+                       # no such process, due to it already exiting...
+                       pass
+
+
+def read_t(stream, count=1, timeout=COMMAND_TIMEOUT*2):
+       if count == 1:
+               retstr = ""
 
-def read_t(stream, count, timeout=COMMAND_TIMEOUT*2):
-       lin, lout, lerr = select([stream], [], [], timeout)
-       if len(lin) == 0:
-               raise ExceptionTimeout("TIMEOUT Running: %s" % cmd)
+               while True:
+                       lin, lout, lerr = select([stream], [], [], timeout)
+                       if len(lin) == 0:
+                               print "timeout!"
+                               raise ExceptionReadTimeout("TIMEOUT reading from command")
 
-       return stream.read(count)
+                       try:
+                               outbytes = stream.read(count)
+                       except IOError, err:
+                               print 'no content yet.'
+                               # due to no content.
+                               # the select timeout should catch this.
+                               continue
+
+                       if not outbytes:
+                               break
+                       retstr += outbytes
+
+               return retstr
+       else:
+               lin, lout, lerr = select([stream], [], [], timeout)
+               if len(lin) == 0:
+                       raise ExceptionReadTimeout("TIMEOUT reading from command")
+
+               return stream.read(count)
 
 class CMD:
        def __init__(self):
@@ -29,15 +63,18 @@ class CMD:
 
        def run_noexcept(self, cmd, timeout=COMMAND_TIMEOUT*2):
 
-               #print "CMD.run_noexcept(%s)" % cmd
                try:
                        return CMD.run(self,cmd,timeout)
                except ExceptionTimeout:
                        import traceback; print traceback.print_exc()
-                       return ("", "SCRIPTTIMEOUT")
-               except:
+                       return ("", "ScriptTimeout")
+               except ExceptionReadTimeout:
+                       print traceback.print_exc()
+                       return ("", "RunningScriptTimeout")
+               except Exception, err:
                        from nodecommon import email_exception
                        email_exception()
+                       return ("", str(err))
                        
        def system(self, cmd, timeout=COMMAND_TIMEOUT*2):
                (o,e) = self.run(cmd, timeout)
@@ -53,12 +90,10 @@ class CMD:
                s = Sopen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
                self.s = s
                (f_in, f_out, f_err) = (s.stdin, s.stdout, s.stderr)
-               #print "calling select(%s)" % timeout
                lout, lin, lerr = select([f_out], [], [f_err], timeout)
-               #print "TIMEOUT!!!!!!!!!!!!!!!!!!!"
                if len(lin) == 0 and len(lout) == 0 and len(lerr) == 0:
                        # Reached a timeout!  Nuke process so it does not hang.
-                       #print "KILLING"
+                       print "TIMEOUT!!!!!!!!!!!!!!!!!!!"
                        s.kill(signal.SIGKILL)
                        raise ExceptionTimeout("TIMEOUT Running: %s" % cmd)
                else:
@@ -70,29 +105,29 @@ class CMD:
                e_value = ""
 
                #print "reading from f_out"
-               if len(lout) > 0: o_value = f_out.read()
+               #if len(lout) > 0: o_value = f_out.read()
                #print "reading from f_err"
-               if len(lerr) > 0: e_value = f_err.read()
+               #if len(lerr) > 0: e_value = f_err.read()
+               #o_value = f_out.read()
+               flags = fcntl.fcntl(f_out, fcntl.F_GETFL)
+               fcntl.fcntl(f_out, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+               try:
+                       o_value = read_t(f_out,1,30)
+               except ExceptionReadTimeout:
+                       s.kill(signal.SIGKILL)
+                       raise ExceptionReadTimeout("TIMEOUT: failed to read from cmd: %s" % cmd)
+                       
+               e_value = f_err.read()
 
-               #print "striping output"
                o_value = o_value.strip()
                e_value = e_value.strip()
 
-               #print "OUTPUT", o_value, e_value
-
-               #print "closing files"
                f_out.close()
                f_in.close()
                f_err.close()
-               try:
-                       #print "s.kill()"
-                       s.kill()
-                       #print "after s.kill()"
-               except OSError:
-                       # no such process, due to it already exiting...
-                       pass
+               s.kill(signal.SIGKILL)
 
-               #print o_value, e_value
                return (o_value, e_value)
 
        def runargs(self, args, timeout=COMMAND_TIMEOUT*2):
@@ -117,11 +152,7 @@ class CMD:
                f_out.close()
                f_in.close()
                f_err.close()
-               try:
-                       s.kill()
-               except OSError:
-                       # no such process, due to it already exiting...
-                       pass
+               s.kill(signal.SIGKILL)
 
                return (o_value, e_value)
 
@@ -164,17 +195,9 @@ class SSH(CMD):
                return CMD.run_noexcept(self, cmd)
 
        def run_noexcept2(self, cmd, timeout=COMMAND_TIMEOUT*2):
-               cmd = "ssh -p %s %s %s@%s %s" % (self.port, self.__options_to_str(), 
+               cmd = "exec ssh -p %s %s %s@%s %s" % (self.port, self.__options_to_str(), 
                                                                        self.user, self.host, cmd)
-               #print "SSH.run_noexcept2(%s)" % cmd
                r = CMD.run_noexcept(self, cmd, timeout)
-
-               # XXX: this may be resulting in deadlocks... not sure.
-               #if self.s.returncode is None:
-               #       #self.s.kill()
-               #       self.s.kill(signal.SIGKILL)
-               #       self.s.wait()
-               #       self.ret = self.s.returncode
                self.ret = -1
 
                return r