Adding authors and correcting licence information
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import base64
21 import errno
22 import hashlib
23 import logging
24 import os
25 import os.path
26 import re
27 import select
28 import signal
29 import socket
30 import subprocess
31 import threading
32 import time
33 import tempfile
34
35 logger = logging.getLogger("sshfuncs")
36
37 def log(msg, level, out = None, err = None):
38     if out:
39         msg += " - OUT: %s " % out
40
41     if err:
42         msg += " - ERROR: %s " % err
43
44     logger.log(level, msg)
45
46
47 if hasattr(os, "devnull"):
48     DEV_NULL = os.devnull
49 else:
50     DEV_NULL = "/dev/null"
51
52 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
53
54 class STDOUT: 
55     """
56     Special value that when given to rspawn in stderr causes stderr to 
57     redirect to whatever stdout was redirected to.
58     """
59
60 class RUNNING:
61     """
62     Process is still running
63     """
64
65 class FINISHED:
66     """
67     Process is finished
68     """
69
70 class NOT_STARTED:
71     """
72     Process hasn't started running yet (this should be very rare)
73     """
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def gethostbyname(host):
79     global hostbyname_cache
80     global hostbyname_cache_lock
81     
82     hostbyname = hostbyname_cache.get(host)
83     if not hostbyname:
84         with hostbyname_cache_lock:
85             hostbyname = socket.gethostbyname(host)
86             hostbyname_cache[host] = hostbyname
87
88             msg = " Added hostbyname %s - %s " % (host, hostbyname)
89             log(msg, logging.DEBUG)
90
91     return hostbyname
92
93 OPENSSH_HAS_PERSIST = None
94
95 def openssh_has_persist():
96     """ The ssh_config options ControlMaster and ControlPersist allow to
97     reuse a same network connection for multiple ssh sessions. In this 
98     way limitations on number of open ssh connections can be bypassed.
99     However, older versions of openSSH do not support this feature.
100     This function is used to determine if ssh connection persist features
101     can be used.
102     """
103     global OPENSSH_HAS_PERSIST
104     if OPENSSH_HAS_PERSIST is None:
105         proc = subprocess.Popen(["ssh","-v"],
106             stdout = subprocess.PIPE,
107             stderr = subprocess.STDOUT,
108             stdin = open("/dev/null","r") )
109         out,err = proc.communicate()
110         proc.wait()
111         
112         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113         OPENSSH_HAS_PERSIST = bool(vre.match(out))
114     return OPENSSH_HAS_PERSIST
115
116 def make_server_key_args(server_key, host, port):
117     """ Returns a reference to a temporary known_hosts file, to which 
118     the server key has been added. 
119     
120     Make sure to hold onto the temp file reference until the process is 
121     done with it
122
123     :param server_key: the server public key
124     :type server_key: str
125
126     :param host: the hostname
127     :type host: str
128
129     :param port: the ssh port
130     :type port: str
131
132     """
133     if port is not None:
134         host = '%s:%s' % (host, str(port))
135
136     # Create a temporary server key file
137     tmp_known_hosts = tempfile.NamedTemporaryFile()
138    
139     hostbyname = gethostbyname(host) 
140
141     # Add the intended host key
142     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143     
144     # If we're not in strict mode, add user-configured keys
145     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147         if os.access(user_hosts_path, os.R_OK):
148             f = open(user_hosts_path, "r")
149             tmp_known_hosts.write(f.read())
150             f.close()
151         
152     tmp_known_hosts.flush()
153     
154     return tmp_known_hosts
155
156 def make_control_path(agent, forward_x11):
157     ctrl_path = "/tmp/nepi_ssh"
158
159     if agent:
160         ctrl_path +="_a"
161
162     if forward_x11:
163         ctrl_path +="_x"
164
165     ctrl_path += "-%r@%h:%p"
166
167     return ctrl_path
168
169 def shell_escape(s):
170     """ Escapes strings so that they are safe to use as command-line 
171     arguments """
172     if SHELL_SAFE.match(s):
173         # safe string - no escaping needed
174         return s
175     else:
176         # unsafe string - escape
177         def escp(c):
178             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
179                 return c
180             else:
181                 return "'$'\\x%02x''" % (ord(c),)
182         s = ''.join(map(escp,s))
183         return "'%s'" % (s,)
184
185 def eintr_retry(func):
186     """Retries a function invocation when a EINTR occurs"""
187     import functools
188     @functools.wraps(func)
189     def rv(*p, **kw):
190         retry = kw.pop("_retry", False)
191         for i in xrange(0 if retry else 4):
192             try:
193                 return func(*p, **kw)
194             except (select.error, socket.error), args:
195                 if args[0] == errno.EINTR:
196                     continue
197                 else:
198                     raise 
199             except OSError, e:
200                 if e.errno == errno.EINTR:
201                     continue
202                 else:
203                     raise
204         else:
205             return func(*p, **kw)
206     return rv
207
208 def rexec(command, host, user, 
209         port = None, 
210         agent = True,
211         sudo = False,
212         stdin = None,
213         identity = None,
214         server_key = None,
215         env = None,
216         tty = False,
217         timeout = None,
218         retry = 3,
219         err_on_timeout = True,
220         connect_timeout = 30,
221         persistent = True,
222         forward_x11 = False,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     hostip = gethostbyname(host)
230
231     args = ['ssh', '-C',
232             # Don't bother with localhost. Makes test easier
233             '-o', 'NoHostAuthenticationForLocalhost=yes',
234             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235             '-o', 'ConnectionAttempts=3',
236             '-o', 'ServerAliveInterval=30',
237             '-o', 'TCPKeepAlive=yes',
238             '-l', user, hostip or host]
239
240     if persistent and openssh_has_persist():
241         args.extend([
242             '-o', 'ControlMaster=auto',
243             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244             '-o', 'ControlPersist=60' ])
245
246     if not strict_host_checking:
247         # Do not check for Host key. Unsafe.
248         args.extend(['-o', 'StrictHostKeyChecking=no'])
249
250     if agent:
251         args.append('-A')
252
253     if port:
254         args.append('-p%d' % port)
255
256     if identity:
257         args.extend(('-i', identity))
258
259     if tty:
260         args.append('-t')
261         args.append('-t')
262
263     if forward_x11:
264         args.append('-X')
265
266     if server_key:
267         # Create a temporary server key file
268         tmp_known_hosts = make_server_key_args(server_key, host, port)
269         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
270
271     args.append(command)
272
273     for x in xrange(retry):
274         # connects to the remote host and starts a remote connection
275         proc = subprocess.Popen(args,
276                 env = env,
277                 stdout = subprocess.PIPE,
278                 stdin = subprocess.PIPE, 
279                 stderr = subprocess.PIPE)
280         
281         # attach tempfile object to the process, to make sure the file stays
282         # alive until the process is finished with it
283         proc._known_hosts = tmp_known_hosts
284     
285         try:
286             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
287             msg = " rexec - host %s - command %s " % (host, " ".join(args))
288             log(msg, logging.DEBUG, out, err)
289
290             if proc.poll():
291                 skip = False
292
293                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
294                     # SSH error, can safely retry
295                     skip = True 
296                 elif retry:
297                     # Probably timed out or plain failed but can retry
298                     skip = True 
299                 
300                 if skip:
301                     t = x*2
302                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
303                             t, x, host, " ".join(args))
304                     log(msg, logging.DEBUG)
305
306                     time.sleep(t)
307                     continue
308             break
309         except RuntimeError, e:
310             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
311             log(msg, logging.DEBUG, out, err)
312
313             if retry <= 0:
314                 raise
315             retry -= 1
316         
317     return ((out, err), proc)
318
319 def rcopy(source, dest,
320         port = None, 
321         agent = True, 
322         recursive = False,
323         identity = None,
324         server_key = None,
325         retry = 3,
326         strict_host_checking = True):
327     """
328     Copies from/to remote sites.
329     
330     Source and destination should have the user and host encoded
331     as per scp specs.
332     
333     If source is a file object, a special mode will be used to
334     create the remote file with the same contents.
335     
336     If dest is a file object, the remote file (source) will be
337     read and written into dest.
338     
339     In these modes, recursive cannot be True.
340     
341     Source can be a list of files to copy to a single destination,
342     in which case it is advised that the destination be a folder.
343     """
344     
345     if isinstance(source, file) and source.tell() == 0:
346         source = source.name
347     elif hasattr(source, 'read'):
348         tmp = tempfile.NamedTemporaryFile()
349         while True:
350             buf = source.read(65536)
351             if buf:
352                 tmp.write(buf)
353             else:
354                 break
355         tmp.seek(0)
356         source = tmp.name
357     
358     if isinstance(source, file) or isinstance(dest, file) \
359             or hasattr(source, 'read')  or hasattr(dest, 'write'):
360         assert not recursive
361         
362         # Parse source/destination as <user>@<server>:<path>
363         if isinstance(dest, basestring) and ':' in dest:
364             remspec, path = dest.split(':',1)
365         elif isinstance(source, basestring) and ':' in source:
366             remspec, path = source.split(':',1)
367         else:
368             raise ValueError, "Both endpoints cannot be local"
369         user,host = remspec.rsplit('@',1)
370         
371         tmp_known_hosts = None
372         hostip = gethostbyname(host)
373         
374         args = ['ssh', '-l', user, '-C',
375                 # Don't bother with localhost. Makes test easier
376                 '-o', 'NoHostAuthenticationForLocalhost=yes',
377                 '-o', 'ConnectTimeout=60',
378                 '-o', 'ConnectionAttempts=3',
379                 '-o', 'ServerAliveInterval=30',
380                 '-o', 'TCPKeepAlive=yes',
381                 hostip or host ]
382
383         if openssh_has_persist():
384             args.extend([
385                 '-o', 'ControlMaster=auto',
386                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
387                 '-o', 'ControlPersist=60' ])
388
389         if port:
390             args.append('-P%d' % port)
391
392         if identity:
393             args.extend(('-i', identity))
394
395         if server_key:
396             # Create a temporary server key file
397             tmp_known_hosts = make_server_key_args(server_key, host, port)
398             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
399         
400         if isinstance(source, file) or hasattr(source, 'read'):
401             args.append('cat > %s' % (shell_escape(path),))
402         elif isinstance(dest, file) or hasattr(dest, 'write'):
403             args.append('cat %s' % (shell_escape(path),))
404         else:
405             raise AssertionError, "Unreachable code reached! :-Q"
406         
407         # connects to the remote host and starts a remote connection
408         if isinstance(source, file):
409             proc = subprocess.Popen(args, 
410                     stdout = open('/dev/null','w'),
411                     stderr = subprocess.PIPE,
412                     stdin = source)
413             err = proc.stderr.read()
414             proc._known_hosts = tmp_known_hosts
415             eintr_retry(proc.wait)()
416             return ((None,err), proc)
417         elif isinstance(dest, file):
418             proc = subprocess.Popen(args, 
419                     stdout = open('/dev/null','w'),
420                     stderr = subprocess.PIPE,
421                     stdin = source)
422             err = proc.stderr.read()
423             proc._known_hosts = tmp_known_hosts
424             eintr_retry(proc.wait)()
425             return ((None,err), proc)
426         elif hasattr(source, 'read'):
427             # file-like (but not file) source
428             proc = subprocess.Popen(args, 
429                     stdout = open('/dev/null','w'),
430                     stderr = subprocess.PIPE,
431                     stdin = subprocess.PIPE)
432             
433             buf = None
434             err = []
435             while True:
436                 if not buf:
437                     buf = source.read(4096)
438                 if not buf:
439                     #EOF
440                     break
441                 
442                 rdrdy, wrdy, broken = select.select(
443                     [proc.stderr],
444                     [proc.stdin],
445                     [proc.stderr,proc.stdin])
446                 
447                 if proc.stderr in rdrdy:
448                     # use os.read for fully unbuffered behavior
449                     err.append(os.read(proc.stderr.fileno(), 4096))
450                 
451                 if proc.stdin in wrdy:
452                     proc.stdin.write(buf)
453                     buf = None
454                 
455                 if broken:
456                     break
457             proc.stdin.close()
458             err.append(proc.stderr.read())
459                 
460             proc._known_hosts = tmp_known_hosts
461             eintr_retry(proc.wait)()
462             return ((None,''.join(err)), proc)
463         elif hasattr(dest, 'write'):
464             # file-like (but not file) dest
465             proc = subprocess.Popen(args, 
466                     stdout = subprocess.PIPE,
467                     stderr = subprocess.PIPE,
468                     stdin = open('/dev/null','w'))
469             
470             buf = None
471             err = []
472             while True:
473                 rdrdy, wrdy, broken = select.select(
474                     [proc.stderr, proc.stdout],
475                     [],
476                     [proc.stderr, proc.stdout])
477                 
478                 if proc.stderr in rdrdy:
479                     # use os.read for fully unbuffered behavior
480                     err.append(os.read(proc.stderr.fileno(), 4096))
481                 
482                 if proc.stdout in rdrdy:
483                     # use os.read for fully unbuffered behavior
484                     buf = os.read(proc.stdout.fileno(), 4096)
485                     dest.write(buf)
486                     
487                     if not buf:
488                         #EOF
489                         break
490                 
491                 if broken:
492                     break
493             err.append(proc.stderr.read())
494                 
495             proc._known_hosts = tmp_known_hosts
496             eintr_retry(proc.wait)()
497             return ((None,''.join(err)), proc)
498         else:
499             raise AssertionError, "Unreachable code reached! :-Q"
500     else:
501         # Parse destination as <user>@<server>:<path>
502         if isinstance(dest, basestring) and ':' in dest:
503             remspec, path = dest.split(':',1)
504         elif isinstance(source, basestring) and ':' in source:
505             remspec, path = source.split(':',1)
506         else:
507             raise ValueError, "Both endpoints cannot be local"
508         user,host = remspec.rsplit('@',1)
509         
510         # plain scp
511         tmp_known_hosts = None
512
513         args = ['scp', '-q', '-p', '-C',
514                 # Don't bother with localhost. Makes test easier
515                 '-o', 'NoHostAuthenticationForLocalhost=yes',
516                 '-o', 'ConnectTimeout=60',
517                 '-o', 'ConnectionAttempts=3',
518                 '-o', 'ServerAliveInterval=30',
519                 '-o', 'TCPKeepAlive=yes' ]
520                 
521         if port:
522             args.append('-P%d' % port)
523
524         if recursive:
525             args.append('-r')
526
527         if identity:
528             args.extend(('-i', identity))
529
530         if server_key:
531             # Create a temporary server key file
532             tmp_known_hosts = make_server_key_args(server_key, host, port)
533             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
534
535         if not strict_host_checking:
536             # Do not check for Host key. Unsafe.
537             args.extend(['-o', 'StrictHostKeyChecking=no'])
538
539         if isinstance(source,list):
540             args.extend(source)
541         else:
542             if openssh_has_persist():
543                 args.extend([
544                     '-o', 'ControlMaster=auto',
545                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
546                     ])
547             args.append(source)
548
549         args.append(dest)
550
551         for x in xrange(retry):
552             # connects to the remote host and starts a remote connection
553             proc = subprocess.Popen(args,
554                     stdout = subprocess.PIPE,
555                     stdin = subprocess.PIPE, 
556                     stderr = subprocess.PIPE)
557             
558             # attach tempfile object to the process, to make sure the file stays
559             # alive until the process is finished with it
560             proc._known_hosts = tmp_known_hosts
561         
562             try:
563                 (out, err) = proc.communicate()
564                 eintr_retry(proc.wait)()
565                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
566                 log(msg, logging.DEBUG, out, err)
567
568                 if proc.poll():
569                     t = x*2
570                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
571                             t, x, host, " ".join(args))
572                     log(msg, logging.DEBUG)
573
574                     time.sleep(t)
575                     continue
576
577                 break
578             except RuntimeError, e:
579                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
580                 log(msg, logging.DEBUG, out, err)
581
582                 if retry <= 0:
583                     raise
584                 retry -= 1
585             
586         return ((out, err), proc)
587
588 def rspawn(command, pidfile, 
589         stdout = '/dev/null', 
590         stderr = STDOUT, 
591         stdin = '/dev/null', 
592         home = None, 
593         create_home = False, 
594         sudo = False,
595         host = None, 
596         port = None, 
597         user = None, 
598         agent = None, 
599         identity = None, 
600         server_key = None,
601         tty = False):
602     """
603     Spawn a remote command such that it will continue working asynchronously.
604     
605     Parameters:
606         command: the command to run - it should be a single line.
607         
608         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
609         
610         stdout: path of a file to redirect standard output to - must be a string.
611             Defaults to /dev/null
612         stderr: path of a file to redirect standard error to - string or the special STDOUT value
613             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
614         stdin: path of a file with input to be piped into the command's standard input
615         
616         home: path of a folder to use as working directory - should exist, unless you specify create_home
617         
618         create_home: if True, the home folder will be created first with mkdir -p
619         
620         sudo: whether the command needs to be executed as root
621         
622         host/port/user/agent/identity: see rexec
623     
624     Returns:
625         (stdout, stderr), process
626         
627         Of the spawning process, which only captures errors at spawning time.
628         Usually only useful for diagnostics.
629     """
630     # Start process in a "daemonized" way, using nohup and heavy
631     # stdin/out redirection to avoid connection issues
632     if stderr is STDOUT:
633         stderr = '&1'
634     else:
635         stderr = ' ' + stderr
636     
637     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
638         'command' : command,
639         'pidfile' : shell_escape(pidfile),
640         'stdout' : stdout,
641         'stderr' : stderr,
642         'stdin' : stdin,
643     }
644     
645     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
646             'command' : shell_escape(daemon_command),
647             'sudo' : 'sudo -S' if sudo else '',
648             'pidfile' : shell_escape(pidfile),
649             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
650             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
651         }
652
653     (out,err),proc = rexec(
654         cmd,
655         host = host,
656         port = port,
657         user = user,
658         agent = agent,
659         identity = identity,
660         server_key = server_key,
661         tty = tty ,
662         )
663     
664     if proc.wait():
665         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
666
667     return ((out, err), proc)
668
669 @eintr_retry
670 def rcheckpid(pidfile,
671         host = None, 
672         port = None, 
673         user = None, 
674         agent = None, 
675         identity = None,
676         server_key = None):
677     """
678     Check the pidfile of a process spawned with remote_spawn.
679     
680     Parameters:
681         pidfile: the pidfile passed to remote_span
682         
683         host/port/user/agent/identity: see rexec
684     
685     Returns:
686         
687         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
688         or None if the pidfile isn't valid yet (maybe the process is still starting).
689     """
690
691     (out,err),proc = rexec(
692         "cat %(pidfile)s" % {
693             'pidfile' : pidfile,
694         },
695         host = host,
696         port = port,
697         user = user,
698         agent = agent,
699         identity = identity,
700         server_key = server_key
701         )
702         
703     if proc.wait():
704         return None
705     
706     if out:
707         try:
708             return map(int,out.strip().split(' ',1))
709         except:
710             # Ignore, many ways to fail that don't matter that much
711             return None
712
713 @eintr_retry
714 def rstatus(pid, ppid, 
715         host = None, 
716         port = None, 
717         user = None, 
718         agent = None, 
719         identity = None,
720         server_key = None):
721     """
722     Check the status of a process spawned with remote_spawn.
723     
724     Parameters:
725         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
726         
727         host/port/user/agent/identity: see rexec
728     
729     Returns:
730         
731         One of NOT_STARTED, RUNNING, FINISHED
732     """
733
734     (out,err),proc = rexec(
735         # Check only by pid. pid+ppid does not always work (especially with sudo) 
736         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
737             'ppid' : ppid,
738             'pid' : pid,
739         },
740         host = host,
741         port = port,
742         user = user,
743         agent = agent,
744         identity = identity,
745         server_key = server_key
746         )
747     
748     if proc.wait():
749         return NOT_STARTED
750     
751     status = False
752     if err:
753         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
754             status = True
755     elif out:
756         status = (out.strip() == 'wait')
757     else:
758         return NOT_STARTED
759     return RUNNING if status else FINISHED
760
761 @eintr_retry
762 def rkill(pid, ppid,
763         host = None, 
764         port = None, 
765         user = None, 
766         agent = None, 
767         sudo = False,
768         identity = None, 
769         server_key = None, 
770         nowait = False):
771     """
772     Kill a process spawned with remote_spawn.
773     
774     First tries a SIGTERM, and if the process does not end in 10 seconds,
775     it sends a SIGKILL.
776     
777     Parameters:
778         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
779         
780         sudo: whether the command was run with sudo - careful killing like this.
781         
782         host/port/user/agent/identity: see rexec
783     
784     Returns:
785         
786         Nothing, should have killed the process
787     """
788     
789     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
790     cmd = """
791 SUBKILL="%(subkill)s" ;
792 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
793 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
794 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
795     sleep 0.2 
796     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
797         break
798     else
799         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
800         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
801     fi
802     sleep 1.8
803 done
804 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
805     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
806     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
807 fi
808 """
809     if nowait:
810         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
811
812     (out,err),proc = rexec(
813         cmd % {
814             'ppid' : ppid,
815             'pid' : pid,
816             'sudo' : 'sudo -S' if sudo else '',
817             'subkill' : subkill,
818         },
819         host = host,
820         port = port,
821         user = user,
822         agent = agent,
823         identity = identity,
824         server_key = server_key
825         )
826     
827     # wait, don't leave zombies around
828     proc.wait()
829
830     return (out, err), proc
831
832 # POSIX
833 def _communicate(self, input, timeout=None, err_on_timeout=True):
834     read_set = []
835     write_set = []
836     stdout = None # Return
837     stderr = None # Return
838     
839     killed = False
840     
841     if timeout is not None:
842         timelimit = time.time() + timeout
843         killtime = timelimit + 4
844         bailtime = timelimit + 4
845
846     if self.stdin:
847         # Flush stdio buffer.  This might block, if the user has
848         # been writing to .stdin in an uncontrolled fashion.
849         self.stdin.flush()
850         if input:
851             write_set.append(self.stdin)
852         else:
853             self.stdin.close()
854     if self.stdout:
855         read_set.append(self.stdout)
856         stdout = []
857     if self.stderr:
858         read_set.append(self.stderr)
859         stderr = []
860
861     input_offset = 0
862     while read_set or write_set:
863         if timeout is not None:
864             curtime = time.time()
865             if timeout is None or curtime > timelimit:
866                 if curtime > bailtime:
867                     break
868                 elif curtime > killtime:
869                     signum = signal.SIGKILL
870                 else:
871                     signum = signal.SIGTERM
872                 # Lets kill it
873                 os.kill(self.pid, signum)
874                 select_timeout = 0.5
875             else:
876                 select_timeout = timelimit - curtime + 0.1
877         else:
878             select_timeout = 1.0
879         
880         if select_timeout > 1.0:
881             select_timeout = 1.0
882             
883         try:
884             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
885         except select.error,e:
886             if e[0] != 4:
887                 raise
888             else:
889                 continue
890         
891         if not rlist and not wlist and not xlist and self.poll() is not None:
892             # timeout and process exited, say bye
893             break
894
895         if self.stdin in wlist:
896             # When select has indicated that the file is writable,
897             # we can write up to PIPE_BUF bytes without risk
898             # blocking.  POSIX defines PIPE_BUF >= 512
899             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
900             input_offset += bytes_written
901             if input_offset >= len(input):
902                 self.stdin.close()
903                 write_set.remove(self.stdin)
904
905         if self.stdout in rlist:
906             data = os.read(self.stdout.fileno(), 1024)
907             if data == "":
908                 self.stdout.close()
909                 read_set.remove(self.stdout)
910             stdout.append(data)
911
912         if self.stderr in rlist:
913             data = os.read(self.stderr.fileno(), 1024)
914             if data == "":
915                 self.stderr.close()
916                 read_set.remove(self.stderr)
917             stderr.append(data)
918     
919     # All data exchanged.  Translate lists into strings.
920     if stdout is not None:
921         stdout = ''.join(stdout)
922     if stderr is not None:
923         stderr = ''.join(stderr)
924
925     # Translate newlines, if requested.  We cannot let the file
926     # object do the translation: It is based on stdio, which is
927     # impossible to combine with select (unless forcing no
928     # buffering).
929     if self.universal_newlines and hasattr(file, 'newlines'):
930         if stdout:
931             stdout = self._translate_newlines(stdout)
932         if stderr:
933             stderr = self._translate_newlines(stderr)
934
935     if killed and err_on_timeout:
936         errcode = self.poll()
937         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
938     else:
939         if killed:
940             self.poll()
941         else:
942             self.wait()
943         return (stdout, stderr)
944