renaming to src/neco to src/nepi
[nepi.git] / src / nepi / util / sshfuncs.py
1 import base64
2 import errno
3 import hashlib
4 import logging
5 import os
6 import os.path
7 import re
8 import select
9 import signal
10 import socket
11 import subprocess
12 import threading
13 import time
14 import tempfile
15
16 logger = logging.getLogger("sshfuncs")
17
18 def log(msg, level, out = None, err = None):
19     if out:
20         msg += " - OUT: %s " % out
21
22     if err:
23         msg += " - ERROR: %s " % err
24
25     logger.log(level, msg)
26
27
28 if hasattr(os, "devnull"):
29     DEV_NULL = os.devnull
30 else:
31     DEV_NULL = "/dev/null"
32
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
34
35 class STDOUT: 
36     """
37     Special value that when given to rspawn in stderr causes stderr to 
38     redirect to whatever stdout was redirected to.
39     """
40
41 class RUNNING:
42     """
43     Process is still running
44     """
45
46 class FINISHED:
47     """
48     Process is finished
49     """
50
51 class NOT_STARTED:
52     """
53     Process hasn't started running yet (this should be very rare)
54     """
55
56 hostbyname_cache = dict()
57 hostbyname_cache_lock = threading.Lock()
58
59 def gethostbyname(host):
60     global hostbyname_cache
61     global hostbyname_cache_lock
62     
63     hostbyname = hostbyname_cache.get(host)
64     if not hostbyname:
65         with hostbyname_cache_lock:
66             hostbyname = socket.gethostbyname(host)
67             hostbyname_cache[host] = hostbyname
68
69             msg = " Added hostbyname %s - %s " % (host, hostbyname)
70             log(msg, logging.DEBUG)
71
72     return hostbyname
73
74 OPENSSH_HAS_PERSIST = None
75
76 def openssh_has_persist():
77     """ The ssh_config options ControlMaster and ControlPersist allow to
78     reuse a same network connection for multiple ssh sessions. In this 
79     way limitations on number of open ssh connections can be bypassed.
80     However, older versions of openSSH do not support this feature.
81     This function is used to determine if ssh connection persist features
82     can be used.
83     """
84     global OPENSSH_HAS_PERSIST
85     if OPENSSH_HAS_PERSIST is None:
86         proc = subprocess.Popen(["ssh","-v"],
87             stdout = subprocess.PIPE,
88             stderr = subprocess.STDOUT,
89             stdin = open("/dev/null","r") )
90         out,err = proc.communicate()
91         proc.wait()
92         
93         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
94         OPENSSH_HAS_PERSIST = bool(vre.match(out))
95     return OPENSSH_HAS_PERSIST
96
97 def make_server_key_args(server_key, host, port):
98     """ Returns a reference to a temporary known_hosts file, to which 
99     the server key has been added. 
100     
101     Make sure to hold onto the temp file reference until the process is 
102     done with it
103
104     :param server_key: the server public key
105     :type server_key: str
106
107     :param host: the hostname
108     :type host: str
109
110     :param port: the ssh port
111     :type port: str
112
113     """
114     if port is not None:
115         host = '%s:%s' % (host, str(port))
116
117     # Create a temporary server key file
118     tmp_known_hosts = tempfile.NamedTemporaryFile()
119    
120     hostbyname = gethostbyname(host) 
121
122     # Add the intended host key
123     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
124     
125     # If we're not in strict mode, add user-configured keys
126     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
127         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
128         if os.access(user_hosts_path, os.R_OK):
129             f = open(user_hosts_path, "r")
130             tmp_known_hosts.write(f.read())
131             f.close()
132         
133     tmp_known_hosts.flush()
134     
135     return tmp_known_hosts
136
137 def make_control_path(agent, forward_x11):
138     ctrl_path = "/tmp/nepi_ssh"
139
140     if agent:
141         ctrl_path +="_a"
142
143     if forward_x11:
144         ctrl_path +="_x"
145
146     ctrl_path += "-%r@%h:%p"
147
148     return ctrl_path
149
150 def shell_escape(s):
151     """ Escapes strings so that they are safe to use as command-line 
152     arguments """
153     if SHELL_SAFE.match(s):
154         # safe string - no escaping needed
155         return s
156     else:
157         # unsafe string - escape
158         def escp(c):
159             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
160                 return c
161             else:
162                 return "'$'\\x%02x''" % (ord(c),)
163         s = ''.join(map(escp,s))
164         return "'%s'" % (s,)
165
166 def eintr_retry(func):
167     """Retries a function invocation when a EINTR occurs"""
168     import functools
169     @functools.wraps(func)
170     def rv(*p, **kw):
171         retry = kw.pop("_retry", False)
172         for i in xrange(0 if retry else 4):
173             try:
174                 return func(*p, **kw)
175             except (select.error, socket.error), args:
176                 if args[0] == errno.EINTR:
177                     continue
178                 else:
179                     raise 
180             except OSError, e:
181                 if e.errno == errno.EINTR:
182                     continue
183                 else:
184                     raise
185         else:
186             return func(*p, **kw)
187     return rv
188
189 def rexec(command, host, user, 
190         port = None, 
191         agent = True,
192         sudo = False,
193         stdin = None,
194         identity = None,
195         server_key = None,
196         env = None,
197         tty = False,
198         timeout = None,
199         retry = 3,
200         err_on_timeout = True,
201         connect_timeout = 30,
202         persistent = True,
203         forward_x11 = False,
204         strict_host_checking = True):
205     """
206     Executes a remote command, returns ((stdout,stderr),process)
207     """
208     
209     tmp_known_hosts = None
210     hostip = gethostbyname(host)
211
212     args = ['ssh', '-C',
213             # Don't bother with localhost. Makes test easier
214             '-o', 'NoHostAuthenticationForLocalhost=yes',
215             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
216             '-o', 'ConnectionAttempts=3',
217             '-o', 'ServerAliveInterval=30',
218             '-o', 'TCPKeepAlive=yes',
219             '-l', user, hostip or host]
220
221     if persistent and openssh_has_persist():
222         args.extend([
223             '-o', 'ControlMaster=auto',
224             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
225             '-o', 'ControlPersist=60' ])
226
227     if not strict_host_checking:
228         # Do not check for Host key. Unsafe.
229         args.extend(['-o', 'StrictHostKeyChecking=no'])
230
231     if agent:
232         args.append('-A')
233
234     if port:
235         args.append('-p%d' % port)
236
237     if identity:
238         args.extend(('-i', identity))
239
240     if tty:
241         args.append('-t')
242         args.append('-t')
243
244     if forward_x11:
245         args.append('-X')
246
247     if server_key:
248         # Create a temporary server key file
249         tmp_known_hosts = make_server_key_args(server_key, host, port)
250         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
251
252     args.append(command)
253
254     for x in xrange(retry):
255         # connects to the remote host and starts a remote connection
256         proc = subprocess.Popen(args,
257                 env = env,
258                 stdout = subprocess.PIPE,
259                 stdin = subprocess.PIPE, 
260                 stderr = subprocess.PIPE)
261         
262         # attach tempfile object to the process, to make sure the file stays
263         # alive until the process is finished with it
264         proc._known_hosts = tmp_known_hosts
265     
266         try:
267             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
268             msg = " rexec - host %s - command %s " % (host, " ".join(args))
269             log(msg, logging.DEBUG, out, err)
270
271             if proc.poll():
272                 skip = False
273
274                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
275                     # SSH error, can safely retry
276                     skip = True 
277                 elif retry:
278                     # Probably timed out or plain failed but can retry
279                     skip = True 
280                 
281                 if skip:
282                     t = x*2
283                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
284                             t, x, host, " ".join(args))
285                     log(msg, logging.DEBUG)
286
287                     time.sleep(t)
288                     continue
289             break
290         except RuntimeError, e:
291             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
292             log(msg, logging.DEBUG, out, err)
293
294             if retry <= 0:
295                 raise
296             retry -= 1
297         
298     return ((out, err), proc)
299
300 def rcopy(source, dest,
301         port = None, 
302         agent = True, 
303         recursive = False,
304         identity = None,
305         server_key = None,
306         retry = 3,
307         strict_host_checking = True):
308     """
309     Copies from/to remote sites.
310     
311     Source and destination should have the user and host encoded
312     as per scp specs.
313     
314     If source is a file object, a special mode will be used to
315     create the remote file with the same contents.
316     
317     If dest is a file object, the remote file (source) will be
318     read and written into dest.
319     
320     In these modes, recursive cannot be True.
321     
322     Source can be a list of files to copy to a single destination,
323     in which case it is advised that the destination be a folder.
324     """
325     
326     if isinstance(source, file) and source.tell() == 0:
327         source = source.name
328     elif hasattr(source, 'read'):
329         tmp = tempfile.NamedTemporaryFile()
330         while True:
331             buf = source.read(65536)
332             if buf:
333                 tmp.write(buf)
334             else:
335                 break
336         tmp.seek(0)
337         source = tmp.name
338     
339     if isinstance(source, file) or isinstance(dest, file) \
340             or hasattr(source, 'read')  or hasattr(dest, 'write'):
341         assert not recursive
342         
343         # Parse source/destination as <user>@<server>:<path>
344         if isinstance(dest, basestring) and ':' in dest:
345             remspec, path = dest.split(':',1)
346         elif isinstance(source, basestring) and ':' in source:
347             remspec, path = source.split(':',1)
348         else:
349             raise ValueError, "Both endpoints cannot be local"
350         user,host = remspec.rsplit('@',1)
351         
352         tmp_known_hosts = None
353         hostip = gethostbyname(host)
354         
355         args = ['ssh', '-l', user, '-C',
356                 # Don't bother with localhost. Makes test easier
357                 '-o', 'NoHostAuthenticationForLocalhost=yes',
358                 '-o', 'ConnectTimeout=60',
359                 '-o', 'ConnectionAttempts=3',
360                 '-o', 'ServerAliveInterval=30',
361                 '-o', 'TCPKeepAlive=yes',
362                 hostip or host ]
363
364         if openssh_has_persist():
365             args.extend([
366                 '-o', 'ControlMaster=auto',
367                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
368                 '-o', 'ControlPersist=60' ])
369
370         if port:
371             args.append('-P%d' % port)
372
373         if identity:
374             args.extend(('-i', identity))
375
376         if server_key:
377             # Create a temporary server key file
378             tmp_known_hosts = make_server_key_args(server_key, host, port)
379             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
380         
381         if isinstance(source, file) or hasattr(source, 'read'):
382             args.append('cat > %s' % (shell_escape(path),))
383         elif isinstance(dest, file) or hasattr(dest, 'write'):
384             args.append('cat %s' % (shell_escape(path),))
385         else:
386             raise AssertionError, "Unreachable code reached! :-Q"
387         
388         # connects to the remote host and starts a remote connection
389         if isinstance(source, file):
390             proc = subprocess.Popen(args, 
391                     stdout = open('/dev/null','w'),
392                     stderr = subprocess.PIPE,
393                     stdin = source)
394             err = proc.stderr.read()
395             proc._known_hosts = tmp_known_hosts
396             eintr_retry(proc.wait)()
397             return ((None,err), proc)
398         elif isinstance(dest, file):
399             proc = subprocess.Popen(args, 
400                     stdout = open('/dev/null','w'),
401                     stderr = subprocess.PIPE,
402                     stdin = source)
403             err = proc.stderr.read()
404             proc._known_hosts = tmp_known_hosts
405             eintr_retry(proc.wait)()
406             return ((None,err), proc)
407         elif hasattr(source, 'read'):
408             # file-like (but not file) source
409             proc = subprocess.Popen(args, 
410                     stdout = open('/dev/null','w'),
411                     stderr = subprocess.PIPE,
412                     stdin = subprocess.PIPE)
413             
414             buf = None
415             err = []
416             while True:
417                 if not buf:
418                     buf = source.read(4096)
419                 if not buf:
420                     #EOF
421                     break
422                 
423                 rdrdy, wrdy, broken = select.select(
424                     [proc.stderr],
425                     [proc.stdin],
426                     [proc.stderr,proc.stdin])
427                 
428                 if proc.stderr in rdrdy:
429                     # use os.read for fully unbuffered behavior
430                     err.append(os.read(proc.stderr.fileno(), 4096))
431                 
432                 if proc.stdin in wrdy:
433                     proc.stdin.write(buf)
434                     buf = None
435                 
436                 if broken:
437                     break
438             proc.stdin.close()
439             err.append(proc.stderr.read())
440                 
441             proc._known_hosts = tmp_known_hosts
442             eintr_retry(proc.wait)()
443             return ((None,''.join(err)), proc)
444         elif hasattr(dest, 'write'):
445             # file-like (but not file) dest
446             proc = subprocess.Popen(args, 
447                     stdout = subprocess.PIPE,
448                     stderr = subprocess.PIPE,
449                     stdin = open('/dev/null','w'))
450             
451             buf = None
452             err = []
453             while True:
454                 rdrdy, wrdy, broken = select.select(
455                     [proc.stderr, proc.stdout],
456                     [],
457                     [proc.stderr, proc.stdout])
458                 
459                 if proc.stderr in rdrdy:
460                     # use os.read for fully unbuffered behavior
461                     err.append(os.read(proc.stderr.fileno(), 4096))
462                 
463                 if proc.stdout in rdrdy:
464                     # use os.read for fully unbuffered behavior
465                     buf = os.read(proc.stdout.fileno(), 4096)
466                     dest.write(buf)
467                     
468                     if not buf:
469                         #EOF
470                         break
471                 
472                 if broken:
473                     break
474             err.append(proc.stderr.read())
475                 
476             proc._known_hosts = tmp_known_hosts
477             eintr_retry(proc.wait)()
478             return ((None,''.join(err)), proc)
479         else:
480             raise AssertionError, "Unreachable code reached! :-Q"
481     else:
482         # Parse destination as <user>@<server>:<path>
483         if isinstance(dest, basestring) and ':' in dest:
484             remspec, path = dest.split(':',1)
485         elif isinstance(source, basestring) and ':' in source:
486             remspec, path = source.split(':',1)
487         else:
488             raise ValueError, "Both endpoints cannot be local"
489         user,host = remspec.rsplit('@',1)
490         
491         # plain scp
492         tmp_known_hosts = None
493
494         args = ['scp', '-q', '-p', '-C',
495                 # Don't bother with localhost. Makes test easier
496                 '-o', 'NoHostAuthenticationForLocalhost=yes',
497                 '-o', 'ConnectTimeout=60',
498                 '-o', 'ConnectionAttempts=3',
499                 '-o', 'ServerAliveInterval=30',
500                 '-o', 'TCPKeepAlive=yes' ]
501                 
502         if port:
503             args.append('-P%d' % port)
504
505         if recursive:
506             args.append('-r')
507
508         if identity:
509             args.extend(('-i', identity))
510
511         if server_key:
512             # Create a temporary server key file
513             tmp_known_hosts = make_server_key_args(server_key, host, port)
514             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
515
516         if not strict_host_checking:
517             # Do not check for Host key. Unsafe.
518             args.extend(['-o', 'StrictHostKeyChecking=no'])
519
520         if isinstance(source,list):
521             args.extend(source)
522         else:
523             if openssh_has_persist():
524                 args.extend([
525                     '-o', 'ControlMaster=auto',
526                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
527                     ])
528             args.append(source)
529
530         args.append(dest)
531
532         for x in xrange(retry):
533             # connects to the remote host and starts a remote connection
534             proc = subprocess.Popen(args,
535                     stdout = subprocess.PIPE,
536                     stdin = subprocess.PIPE, 
537                     stderr = subprocess.PIPE)
538             
539             # attach tempfile object to the process, to make sure the file stays
540             # alive until the process is finished with it
541             proc._known_hosts = tmp_known_hosts
542         
543             try:
544                 (out, err) = proc.communicate()
545                 eintr_retry(proc.wait)()
546                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
547                 log(msg, logging.DEBUG, out, err)
548
549                 if proc.poll():
550                     t = x*2
551                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
552                             t, x, host, " ".join(args))
553                     log(msg, logging.DEBUG)
554
555                     time.sleep(t)
556                     continue
557
558                 break
559             except RuntimeError, e:
560                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
561                 log(msg, logging.DEBUG, out, err)
562
563                 if retry <= 0:
564                     raise
565                 retry -= 1
566             
567         return ((out, err), proc)
568
569 def rspawn(command, pidfile, 
570         stdout = '/dev/null', 
571         stderr = STDOUT, 
572         stdin = '/dev/null', 
573         home = None, 
574         create_home = False, 
575         sudo = False,
576         host = None, 
577         port = None, 
578         user = None, 
579         agent = None, 
580         identity = None, 
581         server_key = None,
582         tty = False):
583     """
584     Spawn a remote command such that it will continue working asynchronously.
585     
586     Parameters:
587         command: the command to run - it should be a single line.
588         
589         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
590         
591         stdout: path of a file to redirect standard output to - must be a string.
592             Defaults to /dev/null
593         stderr: path of a file to redirect standard error to - string or the special STDOUT value
594             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
595         stdin: path of a file with input to be piped into the command's standard input
596         
597         home: path of a folder to use as working directory - should exist, unless you specify create_home
598         
599         create_home: if True, the home folder will be created first with mkdir -p
600         
601         sudo: whether the command needs to be executed as root
602         
603         host/port/user/agent/identity: see rexec
604     
605     Returns:
606         (stdout, stderr), process
607         
608         Of the spawning process, which only captures errors at spawning time.
609         Usually only useful for diagnostics.
610     """
611     # Start process in a "daemonized" way, using nohup and heavy
612     # stdin/out redirection to avoid connection issues
613     if stderr is STDOUT:
614         stderr = '&1'
615     else:
616         stderr = ' ' + stderr
617     
618     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
619         'command' : command,
620         'pidfile' : shell_escape(pidfile),
621         'stdout' : stdout,
622         'stderr' : stderr,
623         'stdin' : stdin,
624     }
625     
626     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
627             'command' : shell_escape(daemon_command),
628             'sudo' : 'sudo -S' if sudo else '',
629             'pidfile' : shell_escape(pidfile),
630             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
631             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
632         }
633
634     (out,err),proc = rexec(
635         cmd,
636         host = host,
637         port = port,
638         user = user,
639         agent = agent,
640         identity = identity,
641         server_key = server_key,
642         tty = tty ,
643         )
644     
645     if proc.wait():
646         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
647
648     return ((out, err), proc)
649
650 @eintr_retry
651 def rcheckpid(pidfile,
652         host = None, 
653         port = None, 
654         user = None, 
655         agent = None, 
656         identity = None,
657         server_key = None):
658     """
659     Check the pidfile of a process spawned with remote_spawn.
660     
661     Parameters:
662         pidfile: the pidfile passed to remote_span
663         
664         host/port/user/agent/identity: see rexec
665     
666     Returns:
667         
668         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
669         or None if the pidfile isn't valid yet (maybe the process is still starting).
670     """
671
672     (out,err),proc = rexec(
673         "cat %(pidfile)s" % {
674             'pidfile' : pidfile,
675         },
676         host = host,
677         port = port,
678         user = user,
679         agent = agent,
680         identity = identity,
681         server_key = server_key
682         )
683         
684     if proc.wait():
685         return None
686     
687     if out:
688         try:
689             return map(int,out.strip().split(' ',1))
690         except:
691             # Ignore, many ways to fail that don't matter that much
692             return None
693
694 @eintr_retry
695 def rstatus(pid, ppid, 
696         host = None, 
697         port = None, 
698         user = None, 
699         agent = None, 
700         identity = None,
701         server_key = None):
702     """
703     Check the status of a process spawned with remote_spawn.
704     
705     Parameters:
706         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
707         
708         host/port/user/agent/identity: see rexec
709     
710     Returns:
711         
712         One of NOT_STARTED, RUNNING, FINISHED
713     """
714
715     (out,err),proc = rexec(
716         # Check only by pid. pid+ppid does not always work (especially with sudo) 
717         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
718             'ppid' : ppid,
719             'pid' : pid,
720         },
721         host = host,
722         port = port,
723         user = user,
724         agent = agent,
725         identity = identity,
726         server_key = server_key
727         )
728     
729     if proc.wait():
730         return NOT_STARTED
731     
732     status = False
733     if err:
734         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
735             status = True
736     elif out:
737         status = (out.strip() == 'wait')
738     else:
739         return NOT_STARTED
740     return RUNNING if status else FINISHED
741
742 @eintr_retry
743 def rkill(pid, ppid,
744         host = None, 
745         port = None, 
746         user = None, 
747         agent = None, 
748         sudo = False,
749         identity = None, 
750         server_key = None, 
751         nowait = False):
752     """
753     Kill a process spawned with remote_spawn.
754     
755     First tries a SIGTERM, and if the process does not end in 10 seconds,
756     it sends a SIGKILL.
757     
758     Parameters:
759         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
760         
761         sudo: whether the command was run with sudo - careful killing like this.
762         
763         host/port/user/agent/identity: see rexec
764     
765     Returns:
766         
767         Nothing, should have killed the process
768     """
769     
770     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
771     cmd = """
772 SUBKILL="%(subkill)s" ;
773 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
774 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
775 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
776     sleep 0.2 
777     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
778         break
779     else
780         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
781         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
782     fi
783     sleep 1.8
784 done
785 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
786     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
787     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
788 fi
789 """
790     if nowait:
791         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
792
793     (out,err),proc = rexec(
794         cmd % {
795             'ppid' : ppid,
796             'pid' : pid,
797             'sudo' : 'sudo -S' if sudo else '',
798             'subkill' : subkill,
799         },
800         host = host,
801         port = port,
802         user = user,
803         agent = agent,
804         identity = identity,
805         server_key = server_key
806         )
807     
808     # wait, don't leave zombies around
809     proc.wait()
810
811     return (out, err), proc
812
813 # POSIX
814 def _communicate(self, input, timeout=None, err_on_timeout=True):
815     read_set = []
816     write_set = []
817     stdout = None # Return
818     stderr = None # Return
819     
820     killed = False
821     
822     if timeout is not None:
823         timelimit = time.time() + timeout
824         killtime = timelimit + 4
825         bailtime = timelimit + 4
826
827     if self.stdin:
828         # Flush stdio buffer.  This might block, if the user has
829         # been writing to .stdin in an uncontrolled fashion.
830         self.stdin.flush()
831         if input:
832             write_set.append(self.stdin)
833         else:
834             self.stdin.close()
835     if self.stdout:
836         read_set.append(self.stdout)
837         stdout = []
838     if self.stderr:
839         read_set.append(self.stderr)
840         stderr = []
841
842     input_offset = 0
843     while read_set or write_set:
844         if timeout is not None:
845             curtime = time.time()
846             if timeout is None or curtime > timelimit:
847                 if curtime > bailtime:
848                     break
849                 elif curtime > killtime:
850                     signum = signal.SIGKILL
851                 else:
852                     signum = signal.SIGTERM
853                 # Lets kill it
854                 os.kill(self.pid, signum)
855                 select_timeout = 0.5
856             else:
857                 select_timeout = timelimit - curtime + 0.1
858         else:
859             select_timeout = 1.0
860         
861         if select_timeout > 1.0:
862             select_timeout = 1.0
863             
864         try:
865             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
866         except select.error,e:
867             if e[0] != 4:
868                 raise
869             else:
870                 continue
871         
872         if not rlist and not wlist and not xlist and self.poll() is not None:
873             # timeout and process exited, say bye
874             break
875
876         if self.stdin in wlist:
877             # When select has indicated that the file is writable,
878             # we can write up to PIPE_BUF bytes without risk
879             # blocking.  POSIX defines PIPE_BUF >= 512
880             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
881             input_offset += bytes_written
882             if input_offset >= len(input):
883                 self.stdin.close()
884                 write_set.remove(self.stdin)
885
886         if self.stdout in rlist:
887             data = os.read(self.stdout.fileno(), 1024)
888             if data == "":
889                 self.stdout.close()
890                 read_set.remove(self.stdout)
891             stdout.append(data)
892
893         if self.stderr in rlist:
894             data = os.read(self.stderr.fileno(), 1024)
895             if data == "":
896                 self.stderr.close()
897                 read_set.remove(self.stderr)
898             stderr.append(data)
899     
900     # All data exchanged.  Translate lists into strings.
901     if stdout is not None:
902         stdout = ''.join(stdout)
903     if stderr is not None:
904         stderr = ''.join(stderr)
905
906     # Translate newlines, if requested.  We cannot let the file
907     # object do the translation: It is based on stdio, which is
908     # impossible to combine with select (unless forcing no
909     # buffering).
910     if self.universal_newlines and hasattr(file, 'newlines'):
911         if stdout:
912             stdout = self._translate_newlines(stdout)
913         if stderr:
914             stderr = self._translate_newlines(stderr)
915
916     if killed and err_on_timeout:
917         errcode = self.poll()
918         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
919     else:
920         if killed:
921             self.poll()
922         else:
923             self.wait()
924         return (stdout, stderr)
925