adding back use of sudo in sshfuncs.rexec
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None, 
209         agent = True,
210         sudo = False,
211         stdin = None,
212         identity = None,
213         server_key = None,
214         env = None,
215         tty = False,
216         timeout = None,
217         retry = 3,
218         err_on_timeout = True,
219         connect_timeout = 30,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     hostip = gethostbyname(host)
230
231     args = ['ssh', '-C',
232             # Don't bother with localhost. Makes test easier
233             '-o', 'NoHostAuthenticationForLocalhost=yes',
234             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235             '-o', 'ConnectionAttempts=3',
236             '-o', 'ServerAliveInterval=30',
237             '-o', 'TCPKeepAlive=yes',
238             '-l', user, hostip or host]
239
240     if persistent and openssh_has_persist():
241         args.extend([
242             '-o', 'ControlMaster=auto',
243             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244             '-o', 'ControlPersist=60' ])
245
246     if not strict_host_checking:
247         # Do not check for Host key. Unsafe.
248         args.extend(['-o', 'StrictHostKeyChecking=no'])
249
250     if agent:
251         args.append('-A')
252
253     if port:
254         args.append('-p%d' % port)
255
256     if identity:
257         args.extend(('-i', identity))
258
259     if tty:
260         args.append('-t')
261         args.append('-t')
262
263     if forward_x11:
264         args.append('-X')
265
266     if server_key:
267         # Create a temporary server key file
268         tmp_known_hosts = make_server_key_args(server_key, host, port)
269         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
270
271     if sudo:
272         command = "sudo " + command
273
274     args.append(command)
275
276     for x in xrange(retry):
277         # connects to the remote host and starts a remote connection
278         proc = subprocess.Popen(args,
279                 env = env,
280                 stdout = subprocess.PIPE,
281                 stdin = subprocess.PIPE, 
282                 stderr = subprocess.PIPE)
283         
284         # attach tempfile object to the process, to make sure the file stays
285         # alive until the process is finished with it
286         proc._known_hosts = tmp_known_hosts
287     
288         # by default, rexec calls _communicate which will block 
289         # until the process has exit. The argument block == False 
290         # forces to rexec to return immediately, without blocking 
291         if not blocking:
292            return (("", ""), proc)
293
294         try:
295             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
296             msg = " rexec - host %s - command %s " % (host, " ".join(args))
297             log(msg, logging.DEBUG, out, err)
298
299             if proc.poll():
300                 skip = False
301
302                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
303                     # SSH error, can safely retry
304                     skip = True 
305                 elif retry:
306                     # Probably timed out or plain failed but can retry
307                     skip = True 
308                 
309                 if skip:
310                     t = x*2
311                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
312                             t, x, host, " ".join(args))
313                     log(msg, logging.DEBUG)
314
315                     time.sleep(t)
316                     continue
317             break
318         except RuntimeError, e:
319             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
320             log(msg, logging.DEBUG, out, err)
321
322             if retry <= 0:
323                 raise
324             retry -= 1
325         
326     return ((out, err), proc)
327
328 def rcopy(source, dest,
329         port = None, 
330         agent = True, 
331         recursive = False,
332         identity = None,
333         server_key = None,
334         retry = 3,
335         strict_host_checking = True):
336     """
337     Copies from/to remote sites.
338     
339     Source and destination should have the user and host encoded
340     as per scp specs.
341     
342     If source is a file object, a special mode will be used to
343     create the remote file with the same contents.
344     
345     If dest is a file object, the remote file (source) will be
346     read and written into dest.
347     
348     In these modes, recursive cannot be True.
349     
350     Source can be a list of files to copy to a single destination,
351     in which case it is advised that the destination be a folder.
352     """
353     
354     if isinstance(source, file) and source.tell() == 0:
355         source = source.name
356     elif hasattr(source, 'read'):
357         tmp = tempfile.NamedTemporaryFile()
358         while True:
359             buf = source.read(65536)
360             if buf:
361                 tmp.write(buf)
362             else:
363                 break
364         tmp.seek(0)
365         source = tmp.name
366     
367     if isinstance(source, file) or isinstance(dest, file) \
368             or hasattr(source, 'read')  or hasattr(dest, 'write'):
369         assert not recursive
370         
371         # Parse source/destination as <user>@<server>:<path>
372         if isinstance(dest, basestring) and ':' in dest:
373             remspec, path = dest.split(':',1)
374         elif isinstance(source, basestring) and ':' in source:
375             remspec, path = source.split(':',1)
376         else:
377             raise ValueError, "Both endpoints cannot be local"
378         user,host = remspec.rsplit('@',1)
379         
380         tmp_known_hosts = None
381         hostip = gethostbyname(host)
382         
383         args = ['ssh', '-l', user, '-C',
384                 # Don't bother with localhost. Makes test easier
385                 '-o', 'NoHostAuthenticationForLocalhost=yes',
386                 '-o', 'ConnectTimeout=60',
387                 '-o', 'ConnectionAttempts=3',
388                 '-o', 'ServerAliveInterval=30',
389                 '-o', 'TCPKeepAlive=yes',
390                 hostip or host ]
391
392         if openssh_has_persist():
393             args.extend([
394                 '-o', 'ControlMaster=auto',
395                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
396                 '-o', 'ControlPersist=60' ])
397
398         if port:
399             args.append('-P%d' % port)
400
401         if identity:
402             args.extend(('-i', identity))
403
404         if server_key:
405             # Create a temporary server key file
406             tmp_known_hosts = make_server_key_args(server_key, host, port)
407             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
408         
409         if isinstance(source, file) or hasattr(source, 'read'):
410             args.append('cat > %s' % (shell_escape(path),))
411         elif isinstance(dest, file) or hasattr(dest, 'write'):
412             args.append('cat %s' % (shell_escape(path),))
413         else:
414             raise AssertionError, "Unreachable code reached! :-Q"
415         
416         # connects to the remote host and starts a remote connection
417         if isinstance(source, file):
418             proc = subprocess.Popen(args, 
419                     stdout = open('/dev/null','w'),
420                     stderr = subprocess.PIPE,
421                     stdin = source)
422             err = proc.stderr.read()
423             proc._known_hosts = tmp_known_hosts
424             eintr_retry(proc.wait)()
425             return ((None,err), proc)
426         elif isinstance(dest, file):
427             proc = subprocess.Popen(args, 
428                     stdout = open('/dev/null','w'),
429                     stderr = subprocess.PIPE,
430                     stdin = source)
431             err = proc.stderr.read()
432             proc._known_hosts = tmp_known_hosts
433             eintr_retry(proc.wait)()
434             return ((None,err), proc)
435         elif hasattr(source, 'read'):
436             # file-like (but not file) source
437             proc = subprocess.Popen(args, 
438                     stdout = open('/dev/null','w'),
439                     stderr = subprocess.PIPE,
440                     stdin = subprocess.PIPE)
441             
442             buf = None
443             err = []
444             while True:
445                 if not buf:
446                     buf = source.read(4096)
447                 if not buf:
448                     #EOF
449                     break
450                 
451                 rdrdy, wrdy, broken = select.select(
452                     [proc.stderr],
453                     [proc.stdin],
454                     [proc.stderr,proc.stdin])
455                 
456                 if proc.stderr in rdrdy:
457                     # use os.read for fully unbuffered behavior
458                     err.append(os.read(proc.stderr.fileno(), 4096))
459                 
460                 if proc.stdin in wrdy:
461                     proc.stdin.write(buf)
462                     buf = None
463                 
464                 if broken:
465                     break
466             proc.stdin.close()
467             err.append(proc.stderr.read())
468                 
469             proc._known_hosts = tmp_known_hosts
470             eintr_retry(proc.wait)()
471             return ((None,''.join(err)), proc)
472         elif hasattr(dest, 'write'):
473             # file-like (but not file) dest
474             proc = subprocess.Popen(args, 
475                     stdout = subprocess.PIPE,
476                     stderr = subprocess.PIPE,
477                     stdin = open('/dev/null','w'))
478             
479             buf = None
480             err = []
481             while True:
482                 rdrdy, wrdy, broken = select.select(
483                     [proc.stderr, proc.stdout],
484                     [],
485                     [proc.stderr, proc.stdout])
486                 
487                 if proc.stderr in rdrdy:
488                     # use os.read for fully unbuffered behavior
489                     err.append(os.read(proc.stderr.fileno(), 4096))
490                 
491                 if proc.stdout in rdrdy:
492                     # use os.read for fully unbuffered behavior
493                     buf = os.read(proc.stdout.fileno(), 4096)
494                     dest.write(buf)
495                     
496                     if not buf:
497                         #EOF
498                         break
499                 
500                 if broken:
501                     break
502             err.append(proc.stderr.read())
503                 
504             proc._known_hosts = tmp_known_hosts
505             eintr_retry(proc.wait)()
506             return ((None,''.join(err)), proc)
507         else:
508             raise AssertionError, "Unreachable code reached! :-Q"
509     else:
510         # Parse destination as <user>@<server>:<path>
511         if isinstance(dest, basestring) and ':' in dest:
512             remspec, path = dest.split(':',1)
513         elif isinstance(source, basestring) and ':' in source:
514             remspec, path = source.split(':',1)
515         else:
516             raise ValueError, "Both endpoints cannot be local"
517         user,host = remspec.rsplit('@',1)
518         
519         # plain scp
520         tmp_known_hosts = None
521
522         args = ['scp', '-q', '-p', '-C',
523                 # Speed up transfer using blowfish cypher specification which is 
524                 # faster than the default one (3des)
525                 '-c', 'blowfish',
526                 # Don't bother with localhost. Makes test easier
527                 '-o', 'NoHostAuthenticationForLocalhost=yes',
528                 '-o', 'ConnectTimeout=60',
529                 '-o', 'ConnectionAttempts=3',
530                 '-o', 'ServerAliveInterval=30',
531                 '-o', 'TCPKeepAlive=yes' ]
532                 
533         if port:
534             args.append('-P%d' % port)
535
536         if recursive:
537             args.append('-r')
538
539         if identity:
540             args.extend(('-i', identity))
541
542         if server_key:
543             # Create a temporary server key file
544             tmp_known_hosts = make_server_key_args(server_key, host, port)
545             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
546
547         if not strict_host_checking:
548             # Do not check for Host key. Unsafe.
549             args.extend(['-o', 'StrictHostKeyChecking=no'])
550
551         if isinstance(source,list):
552             args.extend(source)
553         else:
554             if openssh_has_persist():
555                 args.extend([
556                     '-o', 'ControlMaster=auto',
557                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
558                     ])
559             args.append(source)
560
561         args.append(dest)
562
563         for x in xrange(retry):
564             # connects to the remote host and starts a remote connection
565             proc = subprocess.Popen(args,
566                     stdout = subprocess.PIPE,
567                     stdin = subprocess.PIPE, 
568                     stderr = subprocess.PIPE)
569             
570             # attach tempfile object to the process, to make sure the file stays
571             # alive until the process is finished with it
572             proc._known_hosts = tmp_known_hosts
573         
574             try:
575                 (out, err) = proc.communicate()
576                 eintr_retry(proc.wait)()
577                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
578                 log(msg, logging.DEBUG, out, err)
579
580                 if proc.poll():
581                     t = x*2
582                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
583                             t, x, host, " ".join(args))
584                     log(msg, logging.DEBUG)
585
586                     time.sleep(t)
587                     continue
588
589                 break
590             except RuntimeError, e:
591                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
592                 log(msg, logging.DEBUG, out, err)
593
594                 if retry <= 0:
595                     raise
596                 retry -= 1
597             
598         return ((out, err), proc)
599
600 def rspawn(command, pidfile, 
601         stdout = '/dev/null', 
602         stderr = STDOUT, 
603         stdin = '/dev/null',
604         home = None, 
605         create_home = False, 
606         sudo = False,
607         host = None, 
608         port = None, 
609         user = None, 
610         agent = None, 
611         identity = None, 
612         server_key = None,
613         tty = False):
614     """
615     Spawn a remote command such that it will continue working asynchronously in 
616     background. 
617
618         :param command: The command to run, it should be a single line.
619         :type command: str
620
621         :param pidfile: Path to a file where to store the pid and ppid of the 
622                         spawned process
623         :type pidfile: str
624
625         :param stdout: Path to file to redirect standard output. 
626                        The default value is /dev/null
627         :type stdout: str
628
629         :param stderr: Path to file to redirect standard error.
630                        If the special STDOUT value is used, stderr will 
631                        be redirected to the same file as stdout
632         :type stderr: str
633
634         :param stdin: Path to a file with input to be piped into the command's standard input
635         :type stdin: str
636
637         :param home: Path to working directory folder. 
638                     It is assumed to exist unless the create_home flag is set.
639         :type home: str
640
641         :param create_home: Flag to force creation of the home folder before 
642                             running the command
643         :type create_home: bool
644  
645         :param sudo: Flag forcing execution with sudo user
646         :type sudo: bool
647         
648         :rtype: touple
649
650         (stdout, stderr), process
651         
652         Of the spawning process, which only captures errors at spawning time.
653         Usually only useful for diagnostics.
654     """
655     # Start process in a "daemonized" way, using nohup and heavy
656     # stdin/out redirection to avoid connection issues
657     if stderr is STDOUT:
658         stderr = '&1'
659     else:
660         stderr = ' ' + stderr
661     
662     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
663         'command' : command,
664         'pidfile' : shell_escape(pidfile),
665         'stdout' : stdout,
666         'stderr' : stderr,
667         'stdin' : stdin,
668     }
669     
670     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
671             'command' : shell_escape(daemon_command),
672             'sudo' : 'sudo -S' if sudo else '',
673             'pidfile' : shell_escape(pidfile),
674             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
675             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
676         }
677
678     (out,err),proc = rexec(
679         cmd,
680         host = host,
681         port = port,
682         user = user,
683         agent = agent,
684         identity = identity,
685         server_key = server_key,
686         tty = tty ,
687         )
688     
689     if proc.wait():
690         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
691
692     return ((out, err), proc)
693
694 @eintr_retry
695 def rgetpid(pidfile,
696         host = None, 
697         port = None, 
698         user = None, 
699         agent = None, 
700         identity = None,
701         server_key = None):
702     """
703     Returns the pid and ppid of a process from a remote file where the 
704     information was stored.
705
706         :param home: Path to directory where the pidfile is located
707         :type home: str
708
709         :param pidfile: Name of file containing the pid information
710         :type pidfile: str
711         
712         :rtype: int
713         
714         A (pid, ppid) tuple useful for calling rstatus and rkill,
715         or None if the pidfile isn't valid yet (can happen when process is staring up)
716
717     """
718     (out,err),proc = rexec(
719         "cat %(pidfile)s" % {
720             'pidfile' : pidfile,
721         },
722         host = host,
723         port = port,
724         user = user,
725         agent = agent,
726         identity = identity,
727         server_key = server_key
728         )
729         
730     if proc.wait():
731         return None
732     
733     if out:
734         try:
735             return map(int,out.strip().split(' ',1))
736         except:
737             # Ignore, many ways to fail that don't matter that much
738             return None
739
740 @eintr_retry
741 def rstatus(pid, ppid, 
742         host = None, 
743         port = None, 
744         user = None, 
745         agent = None, 
746         identity = None,
747         server_key = None):
748     """
749     Returns a code representing the the status of a remote process
750
751         :param pid: Process id of the process
752         :type pid: int
753
754         :param ppid: Parent process id of process
755         :type ppid: int
756     
757         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
758     
759     """
760     (out,err),proc = rexec(
761         # Check only by pid. pid+ppid does not always work (especially with sudo) 
762         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
763             'ppid' : ppid,
764             'pid' : pid,
765         },
766         host = host,
767         port = port,
768         user = user,
769         agent = agent,
770         identity = identity,
771         server_key = server_key
772         )
773     
774     if proc.wait():
775         return ProcStatus.NOT_STARTED
776     
777     status = False
778     if err:
779         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
780             status = True
781     elif out:
782         status = (out.strip() == 'wait')
783     else:
784         return ProcStatus.NOT_STARTED
785     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
786
787 @eintr_retry
788 def rkill(pid, ppid,
789         host = None, 
790         port = None, 
791         user = None, 
792         agent = None, 
793         sudo = False,
794         identity = None, 
795         server_key = None, 
796         nowait = False):
797     """
798     Sends a kill signal to a remote process.
799
800     First tries a SIGTERM, and if the process does not end in 10 seconds,
801     it sends a SIGKILL.
802  
803         :param pid: Process id of process to be killed
804         :type pid: int
805
806         :param ppid: Parent process id of process to be killed
807         :type ppid: int
808
809         :param sudo: Flag indicating if sudo should be used to kill the process
810         :type sudo: bool
811         
812     """
813     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
814     cmd = """
815 SUBKILL="%(subkill)s" ;
816 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
817 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
818 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
819     sleep 0.2 
820     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
821         break
822     else
823         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
824         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
825     fi
826     sleep 1.8
827 done
828 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
829     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
830     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
831 fi
832 """
833     if nowait:
834         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
835
836     (out,err),proc = rexec(
837         cmd % {
838             'ppid' : ppid,
839             'pid' : pid,
840             'sudo' : 'sudo -S' if sudo else '',
841             'subkill' : subkill,
842         },
843         host = host,
844         port = port,
845         user = user,
846         agent = agent,
847         identity = identity,
848         server_key = server_key
849         )
850     
851     # wait, don't leave zombies around
852     proc.wait()
853
854     return (out, err), proc
855
856 # POSIX
857 def _communicate(self, input, timeout=None, err_on_timeout=True):
858     read_set = []
859     write_set = []
860     stdout = None # Return
861     stderr = None # Return
862     
863     killed = False
864     
865     if timeout is not None:
866         timelimit = time.time() + timeout
867         killtime = timelimit + 4
868         bailtime = timelimit + 4
869
870     if self.stdin:
871         # Flush stdio buffer.  This might block, if the user has
872         # been writing to .stdin in an uncontrolled fashion.
873         self.stdin.flush()
874         if input:
875             write_set.append(self.stdin)
876         else:
877             self.stdin.close()
878     if self.stdout:
879         read_set.append(self.stdout)
880         stdout = []
881     if self.stderr:
882         read_set.append(self.stderr)
883         stderr = []
884
885     input_offset = 0
886     while read_set or write_set:
887         if timeout is not None:
888             curtime = time.time()
889             if timeout is None or curtime > timelimit:
890                 if curtime > bailtime:
891                     break
892                 elif curtime > killtime:
893                     signum = signal.SIGKILL
894                 else:
895                     signum = signal.SIGTERM
896                 # Lets kill it
897                 os.kill(self.pid, signum)
898                 select_timeout = 0.5
899             else:
900                 select_timeout = timelimit - curtime + 0.1
901         else:
902             select_timeout = 1.0
903         
904         if select_timeout > 1.0:
905             select_timeout = 1.0
906             
907         try:
908             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
909         except select.error,e:
910             if e[0] != 4:
911                 raise
912             else:
913                 continue
914         
915         if not rlist and not wlist and not xlist and self.poll() is not None:
916             # timeout and process exited, say bye
917             break
918
919         if self.stdin in wlist:
920             # When select has indicated that the file is writable,
921             # we can write up to PIPE_BUF bytes without risk
922             # blocking.  POSIX defines PIPE_BUF >= 512
923             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
924             input_offset += bytes_written
925             if input_offset >= len(input):
926                 self.stdin.close()
927                 write_set.remove(self.stdin)
928
929         if self.stdout in rlist:
930             data = os.read(self.stdout.fileno(), 1024)
931             if data == "":
932                 self.stdout.close()
933                 read_set.remove(self.stdout)
934             stdout.append(data)
935
936         if self.stderr in rlist:
937             data = os.read(self.stderr.fileno(), 1024)
938             if data == "":
939                 self.stderr.close()
940                 read_set.remove(self.stderr)
941             stderr.append(data)
942     
943     # All data exchanged.  Translate lists into strings.
944     if stdout is not None:
945         stdout = ''.join(stdout)
946     if stderr is not None:
947         stderr = ''.join(stderr)
948
949     # Translate newlines, if requested.  We cannot let the file
950     # object do the translation: It is based on stdio, which is
951     # impossible to combine with select (unless forcing no
952     # buffering).
953     if self.universal_newlines and hasattr(file, 'newlines'):
954         if stdout:
955             stdout = self._translate_newlines(stdout)
956         if stderr:
957             stderr = self._translate_newlines(stderr)
958
959     if killed and err_on_timeout:
960         errcode = self.poll()
961         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
962     else:
963         if killed:
964             self.poll()
965         else:
966             self.wait()
967         return (stdout, stderr)
968