Improved LinuxApplication behavior
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None, 
209         agent = True,
210         sudo = False,
211         stdin = None,
212         identity = None,
213         server_key = None,
214         env = None,
215         tty = False,
216         timeout = None,
217         retry = 3,
218         err_on_timeout = True,
219         connect_timeout = 30,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     hostip = gethostbyname(host)
230
231     args = ['ssh', '-C',
232             # Don't bother with localhost. Makes test easier
233             '-o', 'NoHostAuthenticationForLocalhost=yes',
234             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235             '-o', 'ConnectionAttempts=3',
236             '-o', 'ServerAliveInterval=30',
237             '-o', 'TCPKeepAlive=yes',
238             '-l', user, hostip or host]
239
240     if persistent and openssh_has_persist():
241         args.extend([
242             '-o', 'ControlMaster=auto',
243             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244             '-o', 'ControlPersist=60' ])
245
246     if not strict_host_checking:
247         # Do not check for Host key. Unsafe.
248         args.extend(['-o', 'StrictHostKeyChecking=no'])
249
250     if agent:
251         args.append('-A')
252
253     if port:
254         args.append('-p%d' % port)
255
256     if identity:
257         args.extend(('-i', identity))
258
259     if tty:
260         args.append('-t')
261         args.append('-t')
262
263     if forward_x11:
264         args.append('-X')
265
266     if server_key:
267         # Create a temporary server key file
268         tmp_known_hosts = make_server_key_args(server_key, host, port)
269         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
270
271     args.append(command)
272
273     for x in xrange(retry):
274         # connects to the remote host and starts a remote connection
275         proc = subprocess.Popen(args,
276                 env = env,
277                 stdout = subprocess.PIPE,
278                 stdin = subprocess.PIPE, 
279                 stderr = subprocess.PIPE)
280         
281         # attach tempfile object to the process, to make sure the file stays
282         # alive until the process is finished with it
283         proc._known_hosts = tmp_known_hosts
284     
285         # by default, rexec calls _communicate which will block 
286         # until the process has exit. The argument block == False 
287         # forces to rexec to return immediately, without blocking 
288         if not blocking:
289            return (("", ""), proc)
290
291         try:
292             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
293             msg = " rexec - host %s - command %s " % (host, " ".join(args))
294             log(msg, logging.DEBUG, out, err)
295
296             if proc.poll():
297                 skip = False
298
299                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
300                     # SSH error, can safely retry
301                     skip = True 
302                 elif retry:
303                     # Probably timed out or plain failed but can retry
304                     skip = True 
305                 
306                 if skip:
307                     t = x*2
308                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
309                             t, x, host, " ".join(args))
310                     log(msg, logging.DEBUG)
311
312                     time.sleep(t)
313                     continue
314             break
315         except RuntimeError, e:
316             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
317             log(msg, logging.DEBUG, out, err)
318
319             if retry <= 0:
320                 raise
321             retry -= 1
322         
323     return ((out, err), proc)
324
325 def rcopy(source, dest,
326         port = None, 
327         agent = True, 
328         recursive = False,
329         identity = None,
330         server_key = None,
331         retry = 3,
332         strict_host_checking = True):
333     """
334     Copies from/to remote sites.
335     
336     Source and destination should have the user and host encoded
337     as per scp specs.
338     
339     If source is a file object, a special mode will be used to
340     create the remote file with the same contents.
341     
342     If dest is a file object, the remote file (source) will be
343     read and written into dest.
344     
345     In these modes, recursive cannot be True.
346     
347     Source can be a list of files to copy to a single destination,
348     in which case it is advised that the destination be a folder.
349     """
350     
351     if isinstance(source, file) and source.tell() == 0:
352         source = source.name
353     elif hasattr(source, 'read'):
354         tmp = tempfile.NamedTemporaryFile()
355         while True:
356             buf = source.read(65536)
357             if buf:
358                 tmp.write(buf)
359             else:
360                 break
361         tmp.seek(0)
362         source = tmp.name
363     
364     if isinstance(source, file) or isinstance(dest, file) \
365             or hasattr(source, 'read')  or hasattr(dest, 'write'):
366         assert not recursive
367         
368         # Parse source/destination as <user>@<server>:<path>
369         if isinstance(dest, basestring) and ':' in dest:
370             remspec, path = dest.split(':',1)
371         elif isinstance(source, basestring) and ':' in source:
372             remspec, path = source.split(':',1)
373         else:
374             raise ValueError, "Both endpoints cannot be local"
375         user,host = remspec.rsplit('@',1)
376         
377         tmp_known_hosts = None
378         hostip = gethostbyname(host)
379         
380         args = ['ssh', '-l', user, '-C',
381                 # Don't bother with localhost. Makes test easier
382                 '-o', 'NoHostAuthenticationForLocalhost=yes',
383                 '-o', 'ConnectTimeout=60',
384                 '-o', 'ConnectionAttempts=3',
385                 '-o', 'ServerAliveInterval=30',
386                 '-o', 'TCPKeepAlive=yes',
387                 hostip or host ]
388
389         if openssh_has_persist():
390             args.extend([
391                 '-o', 'ControlMaster=auto',
392                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
393                 '-o', 'ControlPersist=60' ])
394
395         if port:
396             args.append('-P%d' % port)
397
398         if identity:
399             args.extend(('-i', identity))
400
401         if server_key:
402             # Create a temporary server key file
403             tmp_known_hosts = make_server_key_args(server_key, host, port)
404             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
405         
406         if isinstance(source, file) or hasattr(source, 'read'):
407             args.append('cat > %s' % (shell_escape(path),))
408         elif isinstance(dest, file) or hasattr(dest, 'write'):
409             args.append('cat %s' % (shell_escape(path),))
410         else:
411             raise AssertionError, "Unreachable code reached! :-Q"
412         
413         # connects to the remote host and starts a remote connection
414         if isinstance(source, file):
415             proc = subprocess.Popen(args, 
416                     stdout = open('/dev/null','w'),
417                     stderr = subprocess.PIPE,
418                     stdin = source)
419             err = proc.stderr.read()
420             proc._known_hosts = tmp_known_hosts
421             eintr_retry(proc.wait)()
422             return ((None,err), proc)
423         elif isinstance(dest, file):
424             proc = subprocess.Popen(args, 
425                     stdout = open('/dev/null','w'),
426                     stderr = subprocess.PIPE,
427                     stdin = source)
428             err = proc.stderr.read()
429             proc._known_hosts = tmp_known_hosts
430             eintr_retry(proc.wait)()
431             return ((None,err), proc)
432         elif hasattr(source, 'read'):
433             # file-like (but not file) source
434             proc = subprocess.Popen(args, 
435                     stdout = open('/dev/null','w'),
436                     stderr = subprocess.PIPE,
437                     stdin = subprocess.PIPE)
438             
439             buf = None
440             err = []
441             while True:
442                 if not buf:
443                     buf = source.read(4096)
444                 if not buf:
445                     #EOF
446                     break
447                 
448                 rdrdy, wrdy, broken = select.select(
449                     [proc.stderr],
450                     [proc.stdin],
451                     [proc.stderr,proc.stdin])
452                 
453                 if proc.stderr in rdrdy:
454                     # use os.read for fully unbuffered behavior
455                     err.append(os.read(proc.stderr.fileno(), 4096))
456                 
457                 if proc.stdin in wrdy:
458                     proc.stdin.write(buf)
459                     buf = None
460                 
461                 if broken:
462                     break
463             proc.stdin.close()
464             err.append(proc.stderr.read())
465                 
466             proc._known_hosts = tmp_known_hosts
467             eintr_retry(proc.wait)()
468             return ((None,''.join(err)), proc)
469         elif hasattr(dest, 'write'):
470             # file-like (but not file) dest
471             proc = subprocess.Popen(args, 
472                     stdout = subprocess.PIPE,
473                     stderr = subprocess.PIPE,
474                     stdin = open('/dev/null','w'))
475             
476             buf = None
477             err = []
478             while True:
479                 rdrdy, wrdy, broken = select.select(
480                     [proc.stderr, proc.stdout],
481                     [],
482                     [proc.stderr, proc.stdout])
483                 
484                 if proc.stderr in rdrdy:
485                     # use os.read for fully unbuffered behavior
486                     err.append(os.read(proc.stderr.fileno(), 4096))
487                 
488                 if proc.stdout in rdrdy:
489                     # use os.read for fully unbuffered behavior
490                     buf = os.read(proc.stdout.fileno(), 4096)
491                     dest.write(buf)
492                     
493                     if not buf:
494                         #EOF
495                         break
496                 
497                 if broken:
498                     break
499             err.append(proc.stderr.read())
500                 
501             proc._known_hosts = tmp_known_hosts
502             eintr_retry(proc.wait)()
503             return ((None,''.join(err)), proc)
504         else:
505             raise AssertionError, "Unreachable code reached! :-Q"
506     else:
507         # Parse destination as <user>@<server>:<path>
508         if isinstance(dest, basestring) and ':' in dest:
509             remspec, path = dest.split(':',1)
510         elif isinstance(source, basestring) and ':' in source:
511             remspec, path = source.split(':',1)
512         else:
513             raise ValueError, "Both endpoints cannot be local"
514         user,host = remspec.rsplit('@',1)
515         
516         # plain scp
517         tmp_known_hosts = None
518
519         args = ['scp', '-q', '-p', '-C',
520                 # Speed up transfer using blowfish cypher specification which is 
521                 # faster than the default one (3des)
522                 '-c', 'blowfish',
523                 # Don't bother with localhost. Makes test easier
524                 '-o', 'NoHostAuthenticationForLocalhost=yes',
525                 '-o', 'ConnectTimeout=60',
526                 '-o', 'ConnectionAttempts=3',
527                 '-o', 'ServerAliveInterval=30',
528                 '-o', 'TCPKeepAlive=yes' ]
529                 
530         if port:
531             args.append('-P%d' % port)
532
533         if recursive:
534             args.append('-r')
535
536         if identity:
537             args.extend(('-i', identity))
538
539         if server_key:
540             # Create a temporary server key file
541             tmp_known_hosts = make_server_key_args(server_key, host, port)
542             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
543
544         if not strict_host_checking:
545             # Do not check for Host key. Unsafe.
546             args.extend(['-o', 'StrictHostKeyChecking=no'])
547
548         if isinstance(source,list):
549             args.extend(source)
550         else:
551             if openssh_has_persist():
552                 args.extend([
553                     '-o', 'ControlMaster=auto',
554                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
555                     ])
556             args.append(source)
557
558         args.append(dest)
559
560         for x in xrange(retry):
561             # connects to the remote host and starts a remote connection
562             proc = subprocess.Popen(args,
563                     stdout = subprocess.PIPE,
564                     stdin = subprocess.PIPE, 
565                     stderr = subprocess.PIPE)
566             
567             # attach tempfile object to the process, to make sure the file stays
568             # alive until the process is finished with it
569             proc._known_hosts = tmp_known_hosts
570         
571             try:
572                 (out, err) = proc.communicate()
573                 eintr_retry(proc.wait)()
574                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
575                 log(msg, logging.DEBUG, out, err)
576
577                 if proc.poll():
578                     t = x*2
579                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
580                             t, x, host, " ".join(args))
581                     log(msg, logging.DEBUG)
582
583                     time.sleep(t)
584                     continue
585
586                 break
587             except RuntimeError, e:
588                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
589                 log(msg, logging.DEBUG, out, err)
590
591                 if retry <= 0:
592                     raise
593                 retry -= 1
594             
595         return ((out, err), proc)
596
597 def rspawn(command, pidfile, 
598         stdout = '/dev/null', 
599         stderr = STDOUT, 
600         stdin = '/dev/null',
601         home = None, 
602         create_home = False, 
603         sudo = False,
604         host = None, 
605         port = None, 
606         user = None, 
607         agent = None, 
608         identity = None, 
609         server_key = None,
610         tty = False):
611     """
612     Spawn a remote command such that it will continue working asynchronously in 
613     background. 
614
615         :param command: The command to run, it should be a single line.
616         :type command: str
617
618         :param pidfile: Path to a file where to store the pid and ppid of the 
619                         spawned process
620         :type pidfile: str
621
622         :param stdout: Path to file to redirect standard output. 
623                        The default value is /dev/null
624         :type stdout: str
625
626         :param stderr: Path to file to redirect standard error.
627                        If the special STDOUT value is used, stderr will 
628                        be redirected to the same file as stdout
629         :type stderr: str
630
631         :param stdin: Path to a file with input to be piped into the command's standard input
632         :type stdin: str
633
634         :param home: Path to working directory folder. 
635                     It is assumed to exist unless the create_home flag is set.
636         :type home: str
637
638         :param create_home: Flag to force creation of the home folder before 
639                             running the command
640         :type create_home: bool
641  
642         :param sudo: Flag forcing execution with sudo user
643         :type sudo: bool
644         
645         :rtype: touple
646
647         (stdout, stderr), process
648         
649         Of the spawning process, which only captures errors at spawning time.
650         Usually only useful for diagnostics.
651     """
652     # Start process in a "daemonized" way, using nohup and heavy
653     # stdin/out redirection to avoid connection issues
654     if stderr is STDOUT:
655         stderr = '&1'
656     else:
657         stderr = ' ' + stderr
658     
659     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
660         'command' : command,
661         'pidfile' : shell_escape(pidfile),
662         'stdout' : stdout,
663         'stderr' : stderr,
664         'stdin' : stdin,
665     }
666     
667     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
668             'command' : shell_escape(daemon_command),
669             'sudo' : 'sudo -S' if sudo else '',
670             'pidfile' : shell_escape(pidfile),
671             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
672             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
673         }
674
675     (out,err),proc = rexec(
676         cmd,
677         host = host,
678         port = port,
679         user = user,
680         agent = agent,
681         identity = identity,
682         server_key = server_key,
683         tty = tty ,
684         )
685     
686     if proc.wait():
687         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
688
689     return ((out, err), proc)
690
691 @eintr_retry
692 def rgetpid(pidfile,
693         host = None, 
694         port = None, 
695         user = None, 
696         agent = None, 
697         identity = None,
698         server_key = None):
699     """
700     Returns the pid and ppid of a process from a remote file where the 
701     information was stored.
702
703         :param home: Path to directory where the pidfile is located
704         :type home: str
705
706         :param pidfile: Name of file containing the pid information
707         :type pidfile: str
708         
709         :rtype: int
710         
711         A (pid, ppid) tuple useful for calling rstatus and rkill,
712         or None if the pidfile isn't valid yet (can happen when process is staring up)
713
714     """
715     (out,err),proc = rexec(
716         "cat %(pidfile)s" % {
717             'pidfile' : pidfile,
718         },
719         host = host,
720         port = port,
721         user = user,
722         agent = agent,
723         identity = identity,
724         server_key = server_key
725         )
726         
727     if proc.wait():
728         return None
729     
730     if out:
731         try:
732             return map(int,out.strip().split(' ',1))
733         except:
734             # Ignore, many ways to fail that don't matter that much
735             return None
736
737 @eintr_retry
738 def rstatus(pid, ppid, 
739         host = None, 
740         port = None, 
741         user = None, 
742         agent = None, 
743         identity = None,
744         server_key = None):
745     """
746     Returns a code representing the the status of a remote process
747
748         :param pid: Process id of the process
749         :type pid: int
750
751         :param ppid: Parent process id of process
752         :type ppid: int
753     
754         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
755     
756     """
757     (out,err),proc = rexec(
758         # Check only by pid. pid+ppid does not always work (especially with sudo) 
759         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
760             'ppid' : ppid,
761             'pid' : pid,
762         },
763         host = host,
764         port = port,
765         user = user,
766         agent = agent,
767         identity = identity,
768         server_key = server_key
769         )
770     
771     if proc.wait():
772         return ProcStatus.NOT_STARTED
773     
774     status = False
775     if err:
776         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
777             status = True
778     elif out:
779         status = (out.strip() == 'wait')
780     else:
781         return ProcStatus.NOT_STARTED
782     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
783
784 @eintr_retry
785 def rkill(pid, ppid,
786         host = None, 
787         port = None, 
788         user = None, 
789         agent = None, 
790         sudo = False,
791         identity = None, 
792         server_key = None, 
793         nowait = False):
794     """
795     Sends a kill signal to a remote process.
796
797     First tries a SIGTERM, and if the process does not end in 10 seconds,
798     it sends a SIGKILL.
799  
800         :param pid: Process id of process to be killed
801         :type pid: int
802
803         :param ppid: Parent process id of process to be killed
804         :type ppid: int
805
806         :param sudo: Flag indicating if sudo should be used to kill the process
807         :type sudo: bool
808         
809     """
810     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
811     cmd = """
812 SUBKILL="%(subkill)s" ;
813 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
814 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
815 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
816     sleep 0.2 
817     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
818         break
819     else
820         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
821         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
822     fi
823     sleep 1.8
824 done
825 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
826     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
827     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
828 fi
829 """
830     if nowait:
831         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
832
833     (out,err),proc = rexec(
834         cmd % {
835             'ppid' : ppid,
836             'pid' : pid,
837             'sudo' : 'sudo -S' if sudo else '',
838             'subkill' : subkill,
839         },
840         host = host,
841         port = port,
842         user = user,
843         agent = agent,
844         identity = identity,
845         server_key = server_key
846         )
847     
848     # wait, don't leave zombies around
849     proc.wait()
850
851     return (out, err), proc
852
853 # POSIX
854 def _communicate(self, input, timeout=None, err_on_timeout=True):
855     read_set = []
856     write_set = []
857     stdout = None # Return
858     stderr = None # Return
859     
860     killed = False
861     
862     if timeout is not None:
863         timelimit = time.time() + timeout
864         killtime = timelimit + 4
865         bailtime = timelimit + 4
866
867     if self.stdin:
868         # Flush stdio buffer.  This might block, if the user has
869         # been writing to .stdin in an uncontrolled fashion.
870         self.stdin.flush()
871         if input:
872             write_set.append(self.stdin)
873         else:
874             self.stdin.close()
875     if self.stdout:
876         read_set.append(self.stdout)
877         stdout = []
878     if self.stderr:
879         read_set.append(self.stderr)
880         stderr = []
881
882     input_offset = 0
883     while read_set or write_set:
884         if timeout is not None:
885             curtime = time.time()
886             if timeout is None or curtime > timelimit:
887                 if curtime > bailtime:
888                     break
889                 elif curtime > killtime:
890                     signum = signal.SIGKILL
891                 else:
892                     signum = signal.SIGTERM
893                 # Lets kill it
894                 os.kill(self.pid, signum)
895                 select_timeout = 0.5
896             else:
897                 select_timeout = timelimit - curtime + 0.1
898         else:
899             select_timeout = 1.0
900         
901         if select_timeout > 1.0:
902             select_timeout = 1.0
903             
904         try:
905             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
906         except select.error,e:
907             if e[0] != 4:
908                 raise
909             else:
910                 continue
911         
912         if not rlist and not wlist and not xlist and self.poll() is not None:
913             # timeout and process exited, say bye
914             break
915
916         if self.stdin in wlist:
917             # When select has indicated that the file is writable,
918             # we can write up to PIPE_BUF bytes without risk
919             # blocking.  POSIX defines PIPE_BUF >= 512
920             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
921             input_offset += bytes_written
922             if input_offset >= len(input):
923                 self.stdin.close()
924                 write_set.remove(self.stdin)
925
926         if self.stdout in rlist:
927             data = os.read(self.stdout.fileno(), 1024)
928             if data == "":
929                 self.stdout.close()
930                 read_set.remove(self.stdout)
931             stdout.append(data)
932
933         if self.stderr in rlist:
934             data = os.read(self.stderr.fileno(), 1024)
935             if data == "":
936                 self.stderr.close()
937                 read_set.remove(self.stderr)
938             stderr.append(data)
939     
940     # All data exchanged.  Translate lists into strings.
941     if stdout is not None:
942         stdout = ''.join(stdout)
943     if stderr is not None:
944         stderr = ''.join(stderr)
945
946     # Translate newlines, if requested.  We cannot let the file
947     # object do the translation: It is based on stdio, which is
948     # impossible to combine with select (unless forcing no
949     # buffering).
950     if self.universal_newlines and hasattr(file, 'newlines'):
951         if stdout:
952             stdout = self._translate_newlines(stdout)
953         if stderr:
954             stderr = self._translate_newlines(stderr)
955
956     if killed and err_on_timeout:
957         errcode = self.poll()
958         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
959     else:
960         if killed:
961             self.poll()
962         else:
963             self.wait()
964         return (stdout, stderr)
965