Fixing bug for uploading a list of sources
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-o', 'Batchmode=yes',
243             '-l', user, hostip or host]
244
245     if persistent and openssh_has_persist():
246         args.extend([
247             '-o', 'ControlMaster=auto',
248             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249             '-o', 'ControlPersist=60' ])
250
251     if not strict_host_checking:
252         # Do not check for Host key. Unsafe.
253         args.extend(['-o', 'StrictHostKeyChecking=no'])
254
255     if gw:
256         if gwuser:
257             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258         else:
259             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260         args.extend(['-o', proxycommand])
261
262     if agent:
263         args.append('-A')
264
265     if port:
266         args.append('-p%d' % port)
267
268     if identity:
269         args.extend(('-i', identity))
270
271     if tty:
272         args.append('-t')
273         args.append('-t')
274
275     if forward_x11:
276         args.append('-X')
277
278     if server_key:
279         # Create a temporary server key file
280         tmp_known_hosts = make_server_key_args(server_key, host, port)
281         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
282
283     if sudo:
284         command = "sudo " + command
285
286     args.append(command)
287
288     for x in xrange(retry):
289         # connects to the remote host and starts a remote connection
290         proc = subprocess.Popen(args,
291                 env = env,
292                 stdout = subprocess.PIPE,
293                 stdin = subprocess.PIPE, 
294                 stderr = subprocess.PIPE)
295        
296         # attach tempfile object to the process, to make sure the file stays
297         # alive until the process is finished with it
298         proc._known_hosts = tmp_known_hosts
299     
300         # by default, rexec calls _communicate which will block 
301         # until the process has exit. The argument block == False 
302         # forces to rexec to return immediately, without blocking 
303         try:
304             if blocking:
305                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
306             else:
307                 err = proc.stderr.read()
308                 out = proc.stdout.read()
309
310             msg = " rexec - host %s - command %s " % (host, " ".join(args))
311             log(msg, logging.DEBUG, out, err)
312
313             if proc.poll():
314                 skip = False
315
316                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
317                     # SSH error, can safely retry
318                     skip = True 
319                 elif retry:
320                     # Probably timed out or plain failed but can retry
321                     skip = True 
322                 
323                 if skip:
324                     t = x*2
325                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
326                             t, x, host, " ".join(args))
327                     log(msg, logging.DEBUG)
328
329                     time.sleep(t)
330                     continue
331             break
332         except RuntimeError, e:
333             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
334             log(msg, logging.DEBUG, out, err)
335
336             if retry <= 0:
337                 raise
338             retry -= 1
339         
340     return ((out, err), proc)
341
342 def rcopy(source, dest,
343         port = None,
344         gwuser = None,
345         gw = None,
346         agent = True, 
347         recursive = False,
348         identity = None,
349         server_key = None,
350         retry = 3,
351         strict_host_checking = True):
352     """
353     Copies from/to remote sites.
354     
355     Source and destination should have the user and host encoded
356     as per scp specs.
357     
358     If source is a file object, a special mode will be used to
359     create the remote file with the same contents.
360     
361     If dest is a file object, the remote file (source) will be
362     read and written into dest.
363     
364     In these modes, recursive cannot be True.
365     
366     Source can be a list of files to copy to a single destination,
367     in which case it is advised that the destination be a folder.
368     """
369     
370     if isinstance(source, file) and source.tell() == 0:
371         source = source.name
372     elif hasattr(source, 'read'):
373         tmp = tempfile.NamedTemporaryFile()
374         while True:
375             buf = source.read(65536)
376             if buf:
377                 tmp.write(buf)
378             else:
379                 break
380         tmp.seek(0)
381         source = tmp.name
382     
383     if isinstance(source, file) or isinstance(dest, file) \
384             or hasattr(source, 'read')  or hasattr(dest, 'write'):
385         assert not recursive
386         
387         # Parse source/destination as <user>@<server>:<path>
388         if isinstance(dest, basestring) and ':' in dest:
389             remspec, path = dest.split(':',1)
390         elif isinstance(source, basestring) and ':' in source:
391             remspec, path = source.split(':',1)
392         else:
393             raise ValueError, "Both endpoints cannot be local"
394         user,host = remspec.rsplit('@',1)
395         
396         tmp_known_hosts = None
397         if not gw:
398             hostip = gethostbyname(host)
399         else: hostip = None
400         
401         args = ['ssh', '-l', user, '-C',
402                 # Don't bother with localhost. Makes test easier
403                 '-o', 'NoHostAuthenticationForLocalhost=yes',
404                 '-o', 'ConnectTimeout=60',
405                 '-o', 'ConnectionAttempts=3',
406                 '-o', 'ServerAliveInterval=30',
407                 '-o', 'TCPKeepAlive=yes',
408                 hostip or host ]
409
410         if openssh_has_persist():
411             args.extend([
412                 '-o', 'ControlMaster=auto',
413                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
414                 '-o', 'ControlPersist=60' ])
415
416         if gw:
417             if gwuser:
418                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
419             else:
420                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
421             args.extend(['-o', proxycommand])
422
423         if port:
424             args.append('-P%d' % port)
425
426         if identity:
427             args.extend(('-i', identity))
428
429         if server_key:
430             # Create a temporary server key file
431             tmp_known_hosts = make_server_key_args(server_key, host, port)
432             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
433         
434         if isinstance(source, file) or hasattr(source, 'read'):
435             args.append('cat > %s' % (shell_escape(path),))
436         elif isinstance(dest, file) or hasattr(dest, 'write'):
437             args.append('cat %s' % (shell_escape(path),))
438         else:
439             raise AssertionError, "Unreachable code reached! :-Q"
440         
441         # connects to the remote host and starts a remote connection
442         if isinstance(source, file):
443             proc = subprocess.Popen(args, 
444                     stdout = open('/dev/null','w'),
445                     stderr = subprocess.PIPE,
446                     stdin = source)
447             err = proc.stderr.read()
448             proc._known_hosts = tmp_known_hosts
449             eintr_retry(proc.wait)()
450             return ((None,err), proc)
451         elif isinstance(dest, file):
452             proc = subprocess.Popen(args, 
453                     stdout = open('/dev/null','w'),
454                     stderr = subprocess.PIPE,
455                     stdin = source)
456             err = proc.stderr.read()
457             proc._known_hosts = tmp_known_hosts
458             eintr_retry(proc.wait)()
459             return ((None,err), proc)
460         elif hasattr(source, 'read'):
461             # file-like (but not file) source
462             proc = subprocess.Popen(args, 
463                     stdout = open('/dev/null','w'),
464                     stderr = subprocess.PIPE,
465                     stdin = subprocess.PIPE)
466             
467             buf = None
468             err = []
469             while True:
470                 if not buf:
471                     buf = source.read(4096)
472                 if not buf:
473                     #EOF
474                     break
475                 
476                 rdrdy, wrdy, broken = select.select(
477                     [proc.stderr],
478                     [proc.stdin],
479                     [proc.stderr,proc.stdin])
480                 
481                 if proc.stderr in rdrdy:
482                     # use os.read for fully unbuffered behavior
483                     err.append(os.read(proc.stderr.fileno(), 4096))
484                 
485                 if proc.stdin in wrdy:
486                     proc.stdin.write(buf)
487                     buf = None
488                 
489                 if broken:
490                     break
491             proc.stdin.close()
492             err.append(proc.stderr.read())
493                 
494             proc._known_hosts = tmp_known_hosts
495             eintr_retry(proc.wait)()
496             return ((None,''.join(err)), proc)
497         elif hasattr(dest, 'write'):
498             # file-like (but not file) dest
499             proc = subprocess.Popen(args, 
500                     stdout = subprocess.PIPE,
501                     stderr = subprocess.PIPE,
502                     stdin = open('/dev/null','w'))
503             
504             buf = None
505             err = []
506             while True:
507                 rdrdy, wrdy, broken = select.select(
508                     [proc.stderr, proc.stdout],
509                     [],
510                     [proc.stderr, proc.stdout])
511                 
512                 if proc.stderr in rdrdy:
513                     # use os.read for fully unbuffered behavior
514                     err.append(os.read(proc.stderr.fileno(), 4096))
515                 
516                 if proc.stdout in rdrdy:
517                     # use os.read for fully unbuffered behavior
518                     buf = os.read(proc.stdout.fileno(), 4096)
519                     dest.write(buf)
520                     
521                     if not buf:
522                         #EOF
523                         break
524                 
525                 if broken:
526                     break
527             err.append(proc.stderr.read())
528                 
529             proc._known_hosts = tmp_known_hosts
530             eintr_retry(proc.wait)()
531             return ((None,''.join(err)), proc)
532         else:
533             raise AssertionError, "Unreachable code reached! :-Q"
534     else:
535         # Parse destination as <user>@<server>:<path>
536         if isinstance(dest, basestring) and ':' in dest:
537             remspec, path = dest.split(':',1)
538         elif isinstance(source, basestring) and ':' in source:
539             remspec, path = source.split(':',1)
540         else:
541             raise ValueError, "Both endpoints cannot be local"
542         user,host = remspec.rsplit('@',1)
543         
544         # plain scp
545         tmp_known_hosts = None
546
547         args = ['scp', '-q', '-p', '-C',
548                 # Speed up transfer using blowfish cypher specification which is 
549                 # faster than the default one (3des)
550                 '-c', 'blowfish',
551                 # Don't bother with localhost. Makes test easier
552                 '-o', 'NoHostAuthenticationForLocalhost=yes',
553                 '-o', 'ConnectTimeout=60',
554                 '-o', 'ConnectionAttempts=3',
555                 '-o', 'ServerAliveInterval=30',
556                 '-o', 'TCPKeepAlive=yes' ]
557                 
558         if port:
559             args.append('-P%d' % port)
560
561         if gw:
562             if gwuser:
563                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
564             else:
565                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
566             args.extend(['-o', proxycommand])
567
568         if recursive:
569             args.append('-r')
570
571         if identity:
572             args.extend(('-i', identity))
573
574         if server_key:
575             # Create a temporary server key file
576             tmp_known_hosts = make_server_key_args(server_key, host, port)
577             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
578
579         if not strict_host_checking:
580             # Do not check for Host key. Unsafe.
581             args.extend(['-o', 'StrictHostKeyChecking=no'])
582
583         if ' ' in source:
584             source = source.split(' ')
585
586         if isinstance(source,list):
587             args.extend(source)
588         else:
589             if openssh_has_persist():
590                 args.extend([
591                     '-o', 'ControlMaster=auto',
592                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
593                     ])
594             args.append(source)
595
596         args.append(dest)
597
598         for x in xrange(retry):
599             # connects to the remote host and starts a remote connection
600             proc = subprocess.Popen(args,
601                     stdout = subprocess.PIPE,
602                     stdin = subprocess.PIPE, 
603                     stderr = subprocess.PIPE)
604             
605             # attach tempfile object to the process, to make sure the file stays
606             # alive until the process is finished with it
607             proc._known_hosts = tmp_known_hosts
608         
609             try:
610                 (out, err) = proc.communicate()
611                 eintr_retry(proc.wait)()
612                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
613                 log(msg, logging.DEBUG, out, err)
614
615                 if proc.poll():
616                     t = x*2
617                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
618                             t, x, host, " ".join(args))
619                     log(msg, logging.DEBUG)
620
621                     time.sleep(t)
622                     continue
623
624                 break
625             except RuntimeError, e:
626                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
627                 log(msg, logging.DEBUG, out, err)
628
629                 if retry <= 0:
630                     raise
631                 retry -= 1
632             
633         return ((out, err), proc)
634
635 def rspawn(command, pidfile, 
636         stdout = '/dev/null', 
637         stderr = STDOUT, 
638         stdin = '/dev/null',
639         home = None, 
640         create_home = False, 
641         sudo = False,
642         host = None, 
643         port = None, 
644         user = None, 
645         gwuser = None,
646         gw = None,
647         agent = None, 
648         identity = None, 
649         server_key = None,
650         tty = False):
651     """
652     Spawn a remote command such that it will continue working asynchronously in 
653     background. 
654
655         :param command: The command to run, it should be a single line.
656         :type command: str
657
658         :param pidfile: Path to a file where to store the pid and ppid of the 
659                         spawned process
660         :type pidfile: str
661
662         :param stdout: Path to file to redirect standard output. 
663                        The default value is /dev/null
664         :type stdout: str
665
666         :param stderr: Path to file to redirect standard error.
667                        If the special STDOUT value is used, stderr will 
668                        be redirected to the same file as stdout
669         :type stderr: str
670
671         :param stdin: Path to a file with input to be piped into the command's standard input
672         :type stdin: str
673
674         :param home: Path to working directory folder. 
675                     It is assumed to exist unless the create_home flag is set.
676         :type home: str
677
678         :param create_home: Flag to force creation of the home folder before 
679                             running the command
680         :type create_home: bool
681  
682         :param sudo: Flag forcing execution with sudo user
683         :type sudo: bool
684         
685         :rtype: touple
686
687         (stdout, stderr), process
688         
689         Of the spawning process, which only captures errors at spawning time.
690         Usually only useful for diagnostics.
691     """
692     # Start process in a "daemonized" way, using nohup and heavy
693     # stdin/out redirection to avoid connection issues
694     if stderr is STDOUT:
695         stderr = '&1'
696     else:
697         stderr = ' ' + stderr
698     
699     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
700         'command' : command,
701         'pidfile' : shell_escape(pidfile),
702         'stdout' : stdout,
703         'stderr' : stderr,
704         'stdin' : stdin,
705     }
706     
707     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
708             'command' : shell_escape(daemon_command),
709             'sudo' : 'sudo -S' if sudo else '',
710             'pidfile' : shell_escape(pidfile),
711             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
712             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
713         }
714
715     (out,err),proc = rexec(
716         cmd,
717         host = host,
718         port = port,
719         user = user,
720         gwuser = gwuser,
721         gw = gw,
722         agent = agent,
723         identity = identity,
724         server_key = server_key,
725         tty = tty ,
726         )
727     
728     if proc.wait():
729         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
730
731     return ((out, err), proc)
732
733 @eintr_retry
734 def rgetpid(pidfile,
735         host = None, 
736         port = None, 
737         user = None, 
738         gwuser = None,
739         gw = None,
740         agent = None, 
741         identity = None,
742         server_key = None):
743     """
744     Returns the pid and ppid of a process from a remote file where the 
745     information was stored.
746
747         :param home: Path to directory where the pidfile is located
748         :type home: str
749
750         :param pidfile: Name of file containing the pid information
751         :type pidfile: str
752         
753         :rtype: int
754         
755         A (pid, ppid) tuple useful for calling rstatus and rkill,
756         or None if the pidfile isn't valid yet (can happen when process is staring up)
757
758     """
759     (out,err),proc = rexec(
760         "cat %(pidfile)s" % {
761             'pidfile' : pidfile,
762         },
763         host = host,
764         port = port,
765         user = user,
766         gwuser = gwuser,
767         gw = gw,
768         agent = agent,
769         identity = identity,
770         server_key = server_key
771         )
772         
773     if proc.wait():
774         return None
775     
776     if out:
777         try:
778             return map(int,out.strip().split(' ',1))
779         except:
780             # Ignore, many ways to fail that don't matter that much
781             return None
782
783 @eintr_retry
784 def rstatus(pid, ppid, 
785         host = None, 
786         port = None, 
787         user = None, 
788         gwuser = None,
789         gw = None,
790         agent = None, 
791         identity = None,
792         server_key = None):
793     """
794     Returns a code representing the the status of a remote process
795
796         :param pid: Process id of the process
797         :type pid: int
798
799         :param ppid: Parent process id of process
800         :type ppid: int
801     
802         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
803     
804     """
805     (out,err),proc = rexec(
806         # Check only by pid. pid+ppid does not always work (especially with sudo) 
807         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
808             'ppid' : ppid,
809             'pid' : pid,
810         },
811         host = host,
812         port = port,
813         user = user,
814         gwuser = gwuser,
815         gw = gw,
816         agent = agent,
817         identity = identity,
818         server_key = server_key
819         )
820     
821     if proc.wait():
822         return ProcStatus.NOT_STARTED
823     
824     status = False
825     if err:
826         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
827             status = True
828     elif out:
829         status = (out.strip() == 'wait')
830     else:
831         return ProcStatus.NOT_STARTED
832     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
833
834 @eintr_retry
835 def rkill(pid, ppid,
836         host = None, 
837         port = None, 
838         user = None, 
839         gwuser = None,
840         gw = None,
841         agent = None, 
842         sudo = False,
843         identity = None, 
844         server_key = None, 
845         nowait = False):
846     """
847     Sends a kill signal to a remote process.
848
849     First tries a SIGTERM, and if the process does not end in 10 seconds,
850     it sends a SIGKILL.
851  
852         :param pid: Process id of process to be killed
853         :type pid: int
854
855         :param ppid: Parent process id of process to be killed
856         :type ppid: int
857
858         :param sudo: Flag indicating if sudo should be used to kill the process
859         :type sudo: bool
860         
861     """
862     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
863     cmd = """
864 SUBKILL="%(subkill)s" ;
865 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
866 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
867 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
868     sleep 0.2 
869     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
870         break
871     else
872         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
873         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
874     fi
875     sleep 1.8
876 done
877 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
878     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
879     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
880 fi
881 """
882     if nowait:
883         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
884
885     (out,err),proc = rexec(
886         cmd % {
887             'ppid' : ppid,
888             'pid' : pid,
889             'sudo' : 'sudo -S' if sudo else '',
890             'subkill' : subkill,
891         },
892         host = host,
893         port = port,
894         user = user,
895         gwuser = gwuser,
896         gw = gw,
897         agent = agent,
898         identity = identity,
899         server_key = server_key
900         )
901     
902     # wait, don't leave zombies around
903     proc.wait()
904
905     return (out, err), proc
906
907 # POSIX
908 def _communicate(proc, input, timeout=None, err_on_timeout=True):
909     read_set = []
910     write_set = []
911     stdout = None # Return
912     stderr = None # Return
913     
914     killed = False
915     
916     if timeout is not None:
917         timelimit = time.time() + timeout
918         killtime = timelimit + 4
919         bailtime = timelimit + 4
920
921     if proc.stdin:
922         # Flush stdio buffer.  This might block, if the user has
923         # been writing to .stdin in an uncontrolled fashion.
924         proc.stdin.flush()
925         if input:
926             write_set.append(proc.stdin)
927         else:
928             proc.stdin.close()
929
930     if proc.stdout:
931         read_set.append(proc.stdout)
932         stdout = []
933
934     if proc.stderr:
935         read_set.append(proc.stderr)
936         stderr = []
937
938     input_offset = 0
939     while read_set or write_set:
940         if timeout is not None:
941             curtime = time.time()
942             if timeout is None or curtime > timelimit:
943                 if curtime > bailtime:
944                     break
945                 elif curtime > killtime:
946                     signum = signal.SIGKILL
947                 else:
948                     signum = signal.SIGTERM
949                 # Lets kill it
950                 os.kill(proc.pid, signum)
951                 select_timeout = 0.5
952             else:
953                 select_timeout = timelimit - curtime + 0.1
954         else:
955             select_timeout = 1.0
956         
957         if select_timeout > 1.0:
958             select_timeout = 1.0
959             
960         try:
961             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
962         except select.error,e:
963             if e[0] != 4:
964                 raise
965             else:
966                 continue
967         
968         if not rlist and not wlist and not xlist and proc.poll() is not None:
969             # timeout and process exited, say bye
970             break
971
972         if proc.stdin in wlist:
973             # When select has indicated that the file is writable,
974             # we can write up to PIPE_BUF bytes without risk
975             # blocking.  POSIX defines PIPE_BUF >= 512
976             bytes_written = os.write(proc.stdin.fileno(),
977                     buffer(input, input_offset, 512))
978             input_offset += bytes_written
979
980             if input_offset >= len(input):
981                 proc.stdin.close()
982                 write_set.remove(proc.stdin)
983
984         if proc.stdout in rlist:
985             data = os.read(proc.stdout.fileno(), 1024)
986             if data == "":
987                 proc.stdout.close()
988                 read_set.remove(proc.stdout)
989             stdout.append(data)
990
991         if proc.stderr in rlist:
992             data = os.read(proc.stderr.fileno(), 1024)
993             if data == "":
994                 proc.stderr.close()
995                 read_set.remove(proc.stderr)
996             stderr.append(data)
997     
998     # All data exchanged.  Translate lists into strings.
999     if stdout is not None:
1000         stdout = ''.join(stdout)
1001     if stderr is not None:
1002         stderr = ''.join(stderr)
1003
1004     # Translate newlines, if requested.  We cannot let the file
1005     # object do the translation: It is based on stdio, which is
1006     # impossible to combine with select (unless forcing no
1007     # buffering).
1008     if proc.universal_newlines and hasattr(file, 'newlines'):
1009         if stdout:
1010             stdout = proc._translate_newlines(stdout)
1011         if stderr:
1012             stderr = proc._translate_newlines(stderr)
1013
1014     if killed and err_on_timeout:
1015         errcode = proc.poll()
1016         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1017     else:
1018         if killed:
1019             proc.poll()
1020         else:
1021             proc.wait()
1022         return (stdout, stderr)
1023