From: Stephen Soltesz Date: Wed, 11 Mar 2009 21:15:14 +0000 (+0000) Subject: merge with changes to 2.0 branch, since it will help with a timely completion. X-Git-Tag: Monitor-1.0-16~6 X-Git-Url: http://git.onelab.eu/?p=monitor.git;a=commitdiff_plain;h=eee5b672bc9c5dd028dca1e102e825c90e9ab9ec Merge ... changes to 2.0 branch, since it will help with a timely completion. --- diff --git a/moncommands.py b/moncommands.py index 50d31e2..25765b6 100644 --- a/moncommands.py +++ b/moncommands.py @@ -1,8 +1,10 @@ import os +import fcntl DEBUG= 0 class ExceptionTimeout(Exception): pass +class ExceptionReadTimeout(Exception): pass COMMAND_TIMEOUT = 60 ssh_options = { 'StrictHostKeyChecking':'no', 'BatchMode':'yes', @@ -13,15 +15,47 @@ import subprocess import signal class Sopen(subprocess.Popen): - def kill(self, signal = signal.SIGTERM): - os.kill(self.pid, signal) + def kill(self, sig = signal.SIGTERM): + try: + # NOTE: this also kills parent... so doesn't work like I want. + # NOTE: adding 'exec' before the cmd removes the extra sh, and + # partially addresses this problem. + #os.killpg(os.getpgid(self.pid), signal.SIGKILL) + os.kill(self.pid, sig) + except OSError: + # no such process, due to it already exiting... + pass + + +def read_t(stream, count=1, timeout=COMMAND_TIMEOUT*2): + if count == 1: + retstr = "" -def read_t(stream, count, timeout=COMMAND_TIMEOUT*2): - lin, lout, lerr = select([stream], [], [], timeout) - if len(lin) == 0: - raise ExceptionTimeout("TIMEOUT Running: %s" % cmd) + while True: + lin, lout, lerr = select([stream], [], [], timeout) + if len(lin) == 0: + print "timeout!" + raise ExceptionReadTimeout("TIMEOUT reading from command") - return stream.read(count) + try: + outbytes = stream.read(count) + except IOError, err: + print 'no content yet.' + # due to no content. + # the select timeout should catch this. + continue + + if not outbytes: + break + retstr += outbytes + + return retstr + else: + lin, lout, lerr = select([stream], [], [], timeout) + if len(lin) == 0: + raise ExceptionReadTimeout("TIMEOUT reading from command") + + return stream.read(count) class CMD: def __init__(self): @@ -29,15 +63,18 @@ class CMD: def run_noexcept(self, cmd, timeout=COMMAND_TIMEOUT*2): - #print "CMD.run_noexcept(%s)" % cmd try: return CMD.run(self,cmd,timeout) except ExceptionTimeout: import traceback; print traceback.print_exc() - return ("", "SCRIPTTIMEOUT") - except: + return ("", "ScriptTimeout") + except ExceptionReadTimeout: + print traceback.print_exc() + return ("", "RunningScriptTimeout") + except Exception, err: from nodecommon import email_exception email_exception() + return ("", str(err)) def system(self, cmd, timeout=COMMAND_TIMEOUT*2): (o,e) = self.run(cmd, timeout) @@ -53,12 +90,10 @@ class CMD: s = Sopen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) self.s = s (f_in, f_out, f_err) = (s.stdin, s.stdout, s.stderr) - #print "calling select(%s)" % timeout lout, lin, lerr = select([f_out], [], [f_err], timeout) - #print "TIMEOUT!!!!!!!!!!!!!!!!!!!" if len(lin) == 0 and len(lout) == 0 and len(lerr) == 0: # Reached a timeout! Nuke process so it does not hang. - #print "KILLING" + print "TIMEOUT!!!!!!!!!!!!!!!!!!!" s.kill(signal.SIGKILL) raise ExceptionTimeout("TIMEOUT Running: %s" % cmd) else: @@ -70,29 +105,29 @@ class CMD: e_value = "" #print "reading from f_out" - if len(lout) > 0: o_value = f_out.read() + #if len(lout) > 0: o_value = f_out.read() #print "reading from f_err" - if len(lerr) > 0: e_value = f_err.read() + #if len(lerr) > 0: e_value = f_err.read() + #o_value = f_out.read() + flags = fcntl.fcntl(f_out, fcntl.F_GETFL) + fcntl.fcntl(f_out, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + try: + o_value = read_t(f_out,1,30) + except ExceptionReadTimeout: + s.kill(signal.SIGKILL) + raise ExceptionReadTimeout("TIMEOUT: failed to read from cmd: %s" % cmd) + + e_value = f_err.read() - #print "striping output" o_value = o_value.strip() e_value = e_value.strip() - #print "OUTPUT", o_value, e_value - - #print "closing files" f_out.close() f_in.close() f_err.close() - try: - #print "s.kill()" - s.kill() - #print "after s.kill()" - except OSError: - # no such process, due to it already exiting... - pass + s.kill(signal.SIGKILL) - #print o_value, e_value return (o_value, e_value) def runargs(self, args, timeout=COMMAND_TIMEOUT*2): @@ -117,11 +152,7 @@ class CMD: f_out.close() f_in.close() f_err.close() - try: - s.kill() - except OSError: - # no such process, due to it already exiting... - pass + s.kill(signal.SIGKILL) return (o_value, e_value) @@ -164,17 +195,9 @@ class SSH(CMD): return CMD.run_noexcept(self, cmd) def run_noexcept2(self, cmd, timeout=COMMAND_TIMEOUT*2): - cmd = "ssh -p %s %s %s@%s %s" % (self.port, self.__options_to_str(), + cmd = "exec ssh -p %s %s %s@%s %s" % (self.port, self.__options_to_str(), self.user, self.host, cmd) - #print "SSH.run_noexcept2(%s)" % cmd r = CMD.run_noexcept(self, cmd, timeout) - - # XXX: this may be resulting in deadlocks... not sure. - #if self.s.returncode is None: - # #self.s.kill() - # self.s.kill(signal.SIGKILL) - # self.s.wait() - # self.ret = self.s.returncode self.ret = -1 return r