Modification to sshfuncs to support multi hop commands
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-l', user, hostip or host]
243
244     if persistent and openssh_has_persist():
245         args.extend([
246             '-o', 'ControlMaster=auto',
247             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
248             '-o', 'ControlPersist=60' ])
249
250     if not strict_host_checking:
251         # Do not check for Host key. Unsafe.
252         args.extend(['-o', 'StrictHostKeyChecking=no'])
253
254     if gw:
255         if gwuser:
256             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
257         else:
258             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
259         args.extend(['-o', proxycommand])
260
261     if agent:
262         args.append('-A')
263
264     if port:
265         args.append('-p%d' % port)
266
267     if identity:
268         args.extend(('-i', identity))
269
270     if tty:
271         args.append('-t')
272         args.append('-t')
273
274     if forward_x11:
275         args.append('-X')
276
277     if server_key:
278         # Create a temporary server key file
279         tmp_known_hosts = make_server_key_args(server_key, host, port)
280         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
281
282     if sudo:
283         command = "sudo " + command
284
285     args.append(command)
286
287     for x in xrange(retry):
288         # connects to the remote host and starts a remote connection
289         proc = subprocess.Popen(args,
290                 env = env,
291                 stdout = subprocess.PIPE,
292                 stdin = subprocess.PIPE, 
293                 stderr = subprocess.PIPE)
294         
295         # attach tempfile object to the process, to make sure the file stays
296         # alive until the process is finished with it
297         proc._known_hosts = tmp_known_hosts
298     
299         # by default, rexec calls _communicate which will block 
300         # until the process has exit. The argument block == False 
301         # forces to rexec to return immediately, without blocking 
302         try:
303             if blocking:
304                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
305             else:
306                 err = proc.stderr.read()
307                 out = proc.stdout.read()
308
309             msg = " rexec - host %s - command %s " % (host, " ".join(args))
310             log(msg, logging.DEBUG, out, err)
311
312             if proc.poll():
313                 skip = False
314
315                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
316                     # SSH error, can safely retry
317                     skip = True 
318                 elif retry:
319                     # Probably timed out or plain failed but can retry
320                     skip = True 
321                 
322                 if skip:
323                     t = x*2
324                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
325                             t, x, host, " ".join(args))
326                     log(msg, logging.DEBUG)
327
328                     time.sleep(t)
329                     continue
330             break
331         except RuntimeError, e:
332             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
333             log(msg, logging.DEBUG, out, err)
334
335             if retry <= 0:
336                 raise
337             retry -= 1
338         
339     return ((out, err), proc)
340
341 def rcopy(source, dest,
342         port = None,
343         gwuser = None,
344         gw = None,
345         agent = True, 
346         recursive = False,
347         identity = None,
348         server_key = None,
349         retry = 3,
350         strict_host_checking = True):
351     """
352     Copies from/to remote sites.
353     
354     Source and destination should have the user and host encoded
355     as per scp specs.
356     
357     If source is a file object, a special mode will be used to
358     create the remote file with the same contents.
359     
360     If dest is a file object, the remote file (source) will be
361     read and written into dest.
362     
363     In these modes, recursive cannot be True.
364     
365     Source can be a list of files to copy to a single destination,
366     in which case it is advised that the destination be a folder.
367     """
368     
369     if isinstance(source, file) and source.tell() == 0:
370         source = source.name
371     elif hasattr(source, 'read'):
372         tmp = tempfile.NamedTemporaryFile()
373         while True:
374             buf = source.read(65536)
375             if buf:
376                 tmp.write(buf)
377             else:
378                 break
379         tmp.seek(0)
380         source = tmp.name
381     
382     if isinstance(source, file) or isinstance(dest, file) \
383             or hasattr(source, 'read')  or hasattr(dest, 'write'):
384         assert not recursive
385         
386         # Parse source/destination as <user>@<server>:<path>
387         if isinstance(dest, basestring) and ':' in dest:
388             remspec, path = dest.split(':',1)
389         elif isinstance(source, basestring) and ':' in source:
390             remspec, path = source.split(':',1)
391         else:
392             raise ValueError, "Both endpoints cannot be local"
393         user,host = remspec.rsplit('@',1)
394         
395         tmp_known_hosts = None
396         if not gw:
397             hostip = gethostbyname(host)
398         else: hostip = None
399         
400         args = ['ssh', '-l', user, '-C',
401                 # Don't bother with localhost. Makes test easier
402                 '-o', 'NoHostAuthenticationForLocalhost=yes',
403                 '-o', 'ConnectTimeout=60',
404                 '-o', 'ConnectionAttempts=3',
405                 '-o', 'ServerAliveInterval=30',
406                 '-o', 'TCPKeepAlive=yes',
407                 hostip or host ]
408
409         if openssh_has_persist():
410             args.extend([
411                 '-o', 'ControlMaster=auto',
412                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
413                 '-o', 'ControlPersist=60' ])
414
415         if gw:
416             if gwuser:
417                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
418             else:
419                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
420             args.extend(['-o', proxycommand])
421
422         if port:
423             args.append('-P%d' % port)
424
425         if identity:
426             args.extend(('-i', identity))
427
428         if server_key:
429             # Create a temporary server key file
430             tmp_known_hosts = make_server_key_args(server_key, host, port)
431             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
432         
433         if isinstance(source, file) or hasattr(source, 'read'):
434             args.append('cat > %s' % (shell_escape(path),))
435         elif isinstance(dest, file) or hasattr(dest, 'write'):
436             args.append('cat %s' % (shell_escape(path),))
437         else:
438             raise AssertionError, "Unreachable code reached! :-Q"
439         
440         # connects to the remote host and starts a remote connection
441         if isinstance(source, file):
442             proc = subprocess.Popen(args, 
443                     stdout = open('/dev/null','w'),
444                     stderr = subprocess.PIPE,
445                     stdin = source)
446             err = proc.stderr.read()
447             proc._known_hosts = tmp_known_hosts
448             eintr_retry(proc.wait)()
449             return ((None,err), proc)
450         elif isinstance(dest, file):
451             proc = subprocess.Popen(args, 
452                     stdout = open('/dev/null','w'),
453                     stderr = subprocess.PIPE,
454                     stdin = source)
455             err = proc.stderr.read()
456             proc._known_hosts = tmp_known_hosts
457             eintr_retry(proc.wait)()
458             return ((None,err), proc)
459         elif hasattr(source, 'read'):
460             # file-like (but not file) source
461             proc = subprocess.Popen(args, 
462                     stdout = open('/dev/null','w'),
463                     stderr = subprocess.PIPE,
464                     stdin = subprocess.PIPE)
465             
466             buf = None
467             err = []
468             while True:
469                 if not buf:
470                     buf = source.read(4096)
471                 if not buf:
472                     #EOF
473                     break
474                 
475                 rdrdy, wrdy, broken = select.select(
476                     [proc.stderr],
477                     [proc.stdin],
478                     [proc.stderr,proc.stdin])
479                 
480                 if proc.stderr in rdrdy:
481                     # use os.read for fully unbuffered behavior
482                     err.append(os.read(proc.stderr.fileno(), 4096))
483                 
484                 if proc.stdin in wrdy:
485                     proc.stdin.write(buf)
486                     buf = None
487                 
488                 if broken:
489                     break
490             proc.stdin.close()
491             err.append(proc.stderr.read())
492                 
493             proc._known_hosts = tmp_known_hosts
494             eintr_retry(proc.wait)()
495             return ((None,''.join(err)), proc)
496         elif hasattr(dest, 'write'):
497             # file-like (but not file) dest
498             proc = subprocess.Popen(args, 
499                     stdout = subprocess.PIPE,
500                     stderr = subprocess.PIPE,
501                     stdin = open('/dev/null','w'))
502             
503             buf = None
504             err = []
505             while True:
506                 rdrdy, wrdy, broken = select.select(
507                     [proc.stderr, proc.stdout],
508                     [],
509                     [proc.stderr, proc.stdout])
510                 
511                 if proc.stderr in rdrdy:
512                     # use os.read for fully unbuffered behavior
513                     err.append(os.read(proc.stderr.fileno(), 4096))
514                 
515                 if proc.stdout in rdrdy:
516                     # use os.read for fully unbuffered behavior
517                     buf = os.read(proc.stdout.fileno(), 4096)
518                     dest.write(buf)
519                     
520                     if not buf:
521                         #EOF
522                         break
523                 
524                 if broken:
525                     break
526             err.append(proc.stderr.read())
527                 
528             proc._known_hosts = tmp_known_hosts
529             eintr_retry(proc.wait)()
530             return ((None,''.join(err)), proc)
531         else:
532             raise AssertionError, "Unreachable code reached! :-Q"
533     else:
534         # Parse destination as <user>@<server>:<path>
535         if isinstance(dest, basestring) and ':' in dest:
536             remspec, path = dest.split(':',1)
537         elif isinstance(source, basestring) and ':' in source:
538             remspec, path = source.split(':',1)
539         else:
540             raise ValueError, "Both endpoints cannot be local"
541         user,host = remspec.rsplit('@',1)
542         
543         # plain scp
544         tmp_known_hosts = None
545
546         args = ['scp', '-q', '-p', '-C',
547                 # Speed up transfer using blowfish cypher specification which is 
548                 # faster than the default one (3des)
549                 '-c', 'blowfish',
550                 # Don't bother with localhost. Makes test easier
551                 '-o', 'NoHostAuthenticationForLocalhost=yes',
552                 '-o', 'ConnectTimeout=60',
553                 '-o', 'ConnectionAttempts=3',
554                 '-o', 'ServerAliveInterval=30',
555                 '-o', 'TCPKeepAlive=yes' ]
556                 
557         if port:
558             args.append('-P%d' % port)
559
560         if gw:
561             if gwuser:
562                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
563             else:
564                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
565             args.extend(['-o', proxycommand])
566
567         if recursive:
568             args.append('-r')
569
570         if identity:
571             args.extend(('-i', identity))
572
573         if server_key:
574             # Create a temporary server key file
575             tmp_known_hosts = make_server_key_args(server_key, host, port)
576             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
577
578         if not strict_host_checking:
579             # Do not check for Host key. Unsafe.
580             args.extend(['-o', 'StrictHostKeyChecking=no'])
581
582         if isinstance(source,list):
583             args.extend(source)
584         else:
585             if openssh_has_persist():
586                 args.extend([
587                     '-o', 'ControlMaster=auto',
588                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
589                     ])
590             args.append(source)
591
592         args.append(dest)
593         
594         for x in xrange(retry):
595             # connects to the remote host and starts a remote connection
596             proc = subprocess.Popen(args,
597                     stdout = subprocess.PIPE,
598                     stdin = subprocess.PIPE, 
599                     stderr = subprocess.PIPE)
600             
601             # attach tempfile object to the process, to make sure the file stays
602             # alive until the process is finished with it
603             proc._known_hosts = tmp_known_hosts
604         
605             try:
606                 (out, err) = proc.communicate()
607                 eintr_retry(proc.wait)()
608                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
609                 log(msg, logging.DEBUG, out, err)
610
611                 if proc.poll():
612                     t = x*2
613                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
614                             t, x, host, " ".join(args))
615                     log(msg, logging.DEBUG)
616
617                     time.sleep(t)
618                     continue
619
620                 break
621             except RuntimeError, e:
622                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
623                 log(msg, logging.DEBUG, out, err)
624
625                 if retry <= 0:
626                     raise
627                 retry -= 1
628             
629         return ((out, err), proc)
630
631 def rspawn(command, pidfile, 
632         stdout = '/dev/null', 
633         stderr = STDOUT, 
634         stdin = '/dev/null',
635         home = None, 
636         create_home = False, 
637         sudo = False,
638         host = None, 
639         port = None, 
640         user = None, 
641         gwuser = None,
642         gw = None,
643         agent = None, 
644         identity = None, 
645         server_key = None,
646         tty = False):
647     """
648     Spawn a remote command such that it will continue working asynchronously in 
649     background. 
650
651         :param command: The command to run, it should be a single line.
652         :type command: str
653
654         :param pidfile: Path to a file where to store the pid and ppid of the 
655                         spawned process
656         :type pidfile: str
657
658         :param stdout: Path to file to redirect standard output. 
659                        The default value is /dev/null
660         :type stdout: str
661
662         :param stderr: Path to file to redirect standard error.
663                        If the special STDOUT value is used, stderr will 
664                        be redirected to the same file as stdout
665         :type stderr: str
666
667         :param stdin: Path to a file with input to be piped into the command's standard input
668         :type stdin: str
669
670         :param home: Path to working directory folder. 
671                     It is assumed to exist unless the create_home flag is set.
672         :type home: str
673
674         :param create_home: Flag to force creation of the home folder before 
675                             running the command
676         :type create_home: bool
677  
678         :param sudo: Flag forcing execution with sudo user
679         :type sudo: bool
680         
681         :rtype: touple
682
683         (stdout, stderr), process
684         
685         Of the spawning process, which only captures errors at spawning time.
686         Usually only useful for diagnostics.
687     """
688     # Start process in a "daemonized" way, using nohup and heavy
689     # stdin/out redirection to avoid connection issues
690     if stderr is STDOUT:
691         stderr = '&1'
692     else:
693         stderr = ' ' + stderr
694     
695     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
696         'command' : command,
697         'pidfile' : shell_escape(pidfile),
698         'stdout' : stdout,
699         'stderr' : stderr,
700         'stdin' : stdin,
701     }
702     
703     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
704             'command' : shell_escape(daemon_command),
705             'sudo' : 'sudo -S' if sudo else '',
706             'pidfile' : shell_escape(pidfile),
707             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
708             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
709         }
710
711     (out,err),proc = rexec(
712         cmd,
713         host = host,
714         port = port,
715         user = user,
716         gwuser = gwuser,
717         gw = gw,
718         agent = agent,
719         identity = identity,
720         server_key = server_key,
721         tty = tty ,
722         )
723     
724     if proc.wait():
725         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
726
727     return ((out, err), proc)
728
729 @eintr_retry
730 def rgetpid(pidfile,
731         host = None, 
732         port = None, 
733         user = None, 
734         gwuser = None,
735         gw = None,
736         agent = None, 
737         identity = None,
738         server_key = None):
739     """
740     Returns the pid and ppid of a process from a remote file where the 
741     information was stored.
742
743         :param home: Path to directory where the pidfile is located
744         :type home: str
745
746         :param pidfile: Name of file containing the pid information
747         :type pidfile: str
748         
749         :rtype: int
750         
751         A (pid, ppid) tuple useful for calling rstatus and rkill,
752         or None if the pidfile isn't valid yet (can happen when process is staring up)
753
754     """
755     (out,err),proc = rexec(
756         "cat %(pidfile)s" % {
757             'pidfile' : pidfile,
758         },
759         host = host,
760         port = port,
761         user = user,
762         gwuser = gwuser,
763         gw = gw,
764         agent = agent,
765         identity = identity,
766         server_key = server_key
767         )
768         
769     if proc.wait():
770         return None
771     
772     if out:
773         try:
774             return map(int,out.strip().split(' ',1))
775         except:
776             # Ignore, many ways to fail that don't matter that much
777             return None
778
779 @eintr_retry
780 def rstatus(pid, ppid, 
781         host = None, 
782         port = None, 
783         user = None, 
784         gwuser = None,
785         gw = None,
786         agent = None, 
787         identity = None,
788         server_key = None):
789     """
790     Returns a code representing the the status of a remote process
791
792         :param pid: Process id of the process
793         :type pid: int
794
795         :param ppid: Parent process id of process
796         :type ppid: int
797     
798         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
799     
800     """
801     (out,err),proc = rexec(
802         # Check only by pid. pid+ppid does not always work (especially with sudo) 
803         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
804             'ppid' : ppid,
805             'pid' : pid,
806         },
807         host = host,
808         port = port,
809         user = user,
810         gwuser = gwuser,
811         gw = gw,
812         agent = agent,
813         identity = identity,
814         server_key = server_key
815         )
816     
817     if proc.wait():
818         return ProcStatus.NOT_STARTED
819     
820     status = False
821     if err:
822         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
823             status = True
824     elif out:
825         status = (out.strip() == 'wait')
826     else:
827         return ProcStatus.NOT_STARTED
828     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
829
830 @eintr_retry
831 def rkill(pid, ppid,
832         host = None, 
833         port = None, 
834         user = None, 
835         gwuser = None,
836         gw = None,
837         agent = None, 
838         sudo = False,
839         identity = None, 
840         server_key = None, 
841         nowait = False):
842     """
843     Sends a kill signal to a remote process.
844
845     First tries a SIGTERM, and if the process does not end in 10 seconds,
846     it sends a SIGKILL.
847  
848         :param pid: Process id of process to be killed
849         :type pid: int
850
851         :param ppid: Parent process id of process to be killed
852         :type ppid: int
853
854         :param sudo: Flag indicating if sudo should be used to kill the process
855         :type sudo: bool
856         
857     """
858     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
859     cmd = """
860 SUBKILL="%(subkill)s" ;
861 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
862 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
863 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
864     sleep 0.2 
865     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
866         break
867     else
868         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
869         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
870     fi
871     sleep 1.8
872 done
873 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
874     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
875     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
876 fi
877 """
878     if nowait:
879         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
880
881     (out,err),proc = rexec(
882         cmd % {
883             'ppid' : ppid,
884             'pid' : pid,
885             'sudo' : 'sudo -S' if sudo else '',
886             'subkill' : subkill,
887         },
888         host = host,
889         port = port,
890         user = user,
891         gwuser = gwuser,
892         gw = gw,
893         agent = agent,
894         identity = identity,
895         server_key = server_key
896         )
897     
898     # wait, don't leave zombies around
899     proc.wait()
900
901     return (out, err), proc
902
903 # POSIX
904 def _communicate(proc, input, timeout=None, err_on_timeout=True):
905     read_set = []
906     write_set = []
907     stdout = None # Return
908     stderr = None # Return
909     
910     killed = False
911     
912     if timeout is not None:
913         timelimit = time.time() + timeout
914         killtime = timelimit + 4
915         bailtime = timelimit + 4
916
917     if proc.stdin:
918         # Flush stdio buffer.  This might block, if the user has
919         # been writing to .stdin in an uncontrolled fashion.
920         proc.stdin.flush()
921         if input:
922             write_set.append(proc.stdin)
923         else:
924             proc.stdin.close()
925
926     if proc.stdout:
927         read_set.append(proc.stdout)
928         stdout = []
929
930     if proc.stderr:
931         read_set.append(proc.stderr)
932         stderr = []
933
934     input_offset = 0
935     while read_set or write_set:
936         if timeout is not None:
937             curtime = time.time()
938             if timeout is None or curtime > timelimit:
939                 if curtime > bailtime:
940                     break
941                 elif curtime > killtime:
942                     signum = signal.SIGKILL
943                 else:
944                     signum = signal.SIGTERM
945                 # Lets kill it
946                 os.kill(proc.pid, signum)
947                 select_timeout = 0.5
948             else:
949                 select_timeout = timelimit - curtime + 0.1
950         else:
951             select_timeout = 1.0
952         
953         if select_timeout > 1.0:
954             select_timeout = 1.0
955             
956         try:
957             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
958         except select.error,e:
959             if e[0] != 4:
960                 raise
961             else:
962                 continue
963         
964         if not rlist and not wlist and not xlist and proc.poll() is not None:
965             # timeout and process exited, say bye
966             break
967
968         if proc.stdin in wlist:
969             # When select has indicated that the file is writable,
970             # we can write up to PIPE_BUF bytes without risk
971             # blocking.  POSIX defines PIPE_BUF >= 512
972             bytes_written = os.write(proc.stdin.fileno(),
973                     buffer(input, input_offset, 512))
974             input_offset += bytes_written
975
976             if input_offset >= len(input):
977                 proc.stdin.close()
978                 write_set.remove(proc.stdin)
979
980         if proc.stdout in rlist:
981             data = os.read(proc.stdout.fileno(), 1024)
982             if data == "":
983                 proc.stdout.close()
984                 read_set.remove(proc.stdout)
985             stdout.append(data)
986
987         if proc.stderr in rlist:
988             data = os.read(proc.stderr.fileno(), 1024)
989             if data == "":
990                 proc.stderr.close()
991                 read_set.remove(proc.stderr)
992             stderr.append(data)
993     
994     # All data exchanged.  Translate lists into strings.
995     if stdout is not None:
996         stdout = ''.join(stdout)
997     if stderr is not None:
998         stderr = ''.join(stderr)
999
1000     # Translate newlines, if requested.  We cannot let the file
1001     # object do the translation: It is based on stdio, which is
1002     # impossible to combine with select (unless forcing no
1003     # buffering).
1004     if proc.universal_newlines and hasattr(file, 'newlines'):
1005         if stdout:
1006             stdout = proc._translate_newlines(stdout)
1007         if stderr:
1008             stderr = proc._translate_newlines(stderr)
1009
1010     if killed and err_on_timeout:
1011         errcode = proc.poll()
1012         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1013     else:
1014         if killed:
1015             proc.poll()
1016         else:
1017             proc.wait()
1018         return (stdout, stderr)
1019