LinuxApplication: Changed directory structure to store experiment files in the Linux...
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None, 
209         agent = True,
210         sudo = False,
211         stdin = None,
212         identity = None,
213         server_key = None,
214         env = None,
215         tty = False,
216         timeout = None,
217         retry = 3,
218         err_on_timeout = True,
219         connect_timeout = 30,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     hostip = gethostbyname(host)
230
231     args = ['ssh', '-C',
232             # Don't bother with localhost. Makes test easier
233             '-o', 'NoHostAuthenticationForLocalhost=yes',
234             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235             '-o', 'ConnectionAttempts=3',
236             '-o', 'ServerAliveInterval=30',
237             '-o', 'TCPKeepAlive=yes',
238             '-l', user, hostip or host]
239
240     if persistent and openssh_has_persist():
241         args.extend([
242             '-o', 'ControlMaster=auto',
243             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244             '-o', 'ControlPersist=60' ])
245
246     if not strict_host_checking:
247         # Do not check for Host key. Unsafe.
248         args.extend(['-o', 'StrictHostKeyChecking=no'])
249
250     if agent:
251         args.append('-A')
252
253     if port:
254         args.append('-p%d' % port)
255
256     if identity:
257         args.extend(('-i', identity))
258
259     if tty:
260         args.append('-t')
261         args.append('-t')
262
263     if forward_x11:
264         args.append('-X')
265
266     if server_key:
267         # Create a temporary server key file
268         tmp_known_hosts = make_server_key_args(server_key, host, port)
269         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
270
271     if sudo:
272         command = "sudo " + command
273
274     args.append(command)
275
276     for x in xrange(retry):
277         # connects to the remote host and starts a remote connection
278         proc = subprocess.Popen(args,
279                 env = env,
280                 stdout = subprocess.PIPE,
281                 stdin = subprocess.PIPE, 
282                 stderr = subprocess.PIPE)
283         
284         # attach tempfile object to the process, to make sure the file stays
285         # alive until the process is finished with it
286         proc._known_hosts = tmp_known_hosts
287     
288         # by default, rexec calls _communicate which will block 
289         # until the process has exit. The argument block == False 
290         # forces to rexec to return immediately, without blocking 
291         try:
292             if blocking:
293                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
294             else:
295                 out = err = ""
296                 if proc.poll():
297                     err = proc.stderr.read()
298
299             msg = " rexec - host %s - command %s " % (host, " ".join(args))
300             log(msg, logging.DEBUG, out, err)
301
302             if proc.poll():
303                 skip = False
304
305                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
306                     # SSH error, can safely retry
307                     skip = True 
308                 elif retry:
309                     # Probably timed out or plain failed but can retry
310                     skip = True 
311                 
312                 if skip:
313                     t = x*2
314                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
315                             t, x, host, " ".join(args))
316                     log(msg, logging.DEBUG)
317
318                     time.sleep(t)
319                     continue
320             break
321         except RuntimeError, e:
322             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
323             log(msg, logging.DEBUG, out, err)
324
325             if retry <= 0:
326                 raise
327             retry -= 1
328         
329     return ((out, err), proc)
330
331 def rcopy(source, dest,
332         port = None, 
333         agent = True, 
334         recursive = False,
335         identity = None,
336         server_key = None,
337         retry = 3,
338         strict_host_checking = True):
339     """
340     Copies from/to remote sites.
341     
342     Source and destination should have the user and host encoded
343     as per scp specs.
344     
345     If source is a file object, a special mode will be used to
346     create the remote file with the same contents.
347     
348     If dest is a file object, the remote file (source) will be
349     read and written into dest.
350     
351     In these modes, recursive cannot be True.
352     
353     Source can be a list of files to copy to a single destination,
354     in which case it is advised that the destination be a folder.
355     """
356     
357     if isinstance(source, file) and source.tell() == 0:
358         source = source.name
359     elif hasattr(source, 'read'):
360         tmp = tempfile.NamedTemporaryFile()
361         while True:
362             buf = source.read(65536)
363             if buf:
364                 tmp.write(buf)
365             else:
366                 break
367         tmp.seek(0)
368         source = tmp.name
369     
370     if isinstance(source, file) or isinstance(dest, file) \
371             or hasattr(source, 'read')  or hasattr(dest, 'write'):
372         assert not recursive
373         
374         # Parse source/destination as <user>@<server>:<path>
375         if isinstance(dest, basestring) and ':' in dest:
376             remspec, path = dest.split(':',1)
377         elif isinstance(source, basestring) and ':' in source:
378             remspec, path = source.split(':',1)
379         else:
380             raise ValueError, "Both endpoints cannot be local"
381         user,host = remspec.rsplit('@',1)
382         
383         tmp_known_hosts = None
384         hostip = gethostbyname(host)
385         
386         args = ['ssh', '-l', user, '-C',
387                 # Don't bother with localhost. Makes test easier
388                 '-o', 'NoHostAuthenticationForLocalhost=yes',
389                 '-o', 'ConnectTimeout=60',
390                 '-o', 'ConnectionAttempts=3',
391                 '-o', 'ServerAliveInterval=30',
392                 '-o', 'TCPKeepAlive=yes',
393                 hostip or host ]
394
395         if openssh_has_persist():
396             args.extend([
397                 '-o', 'ControlMaster=auto',
398                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
399                 '-o', 'ControlPersist=60' ])
400
401         if port:
402             args.append('-P%d' % port)
403
404         if identity:
405             args.extend(('-i', identity))
406
407         if server_key:
408             # Create a temporary server key file
409             tmp_known_hosts = make_server_key_args(server_key, host, port)
410             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
411         
412         if isinstance(source, file) or hasattr(source, 'read'):
413             args.append('cat > %s' % (shell_escape(path),))
414         elif isinstance(dest, file) or hasattr(dest, 'write'):
415             args.append('cat %s' % (shell_escape(path),))
416         else:
417             raise AssertionError, "Unreachable code reached! :-Q"
418         
419         # connects to the remote host and starts a remote connection
420         if isinstance(source, file):
421             proc = subprocess.Popen(args, 
422                     stdout = open('/dev/null','w'),
423                     stderr = subprocess.PIPE,
424                     stdin = source)
425             err = proc.stderr.read()
426             proc._known_hosts = tmp_known_hosts
427             eintr_retry(proc.wait)()
428             return ((None,err), proc)
429         elif isinstance(dest, file):
430             proc = subprocess.Popen(args, 
431                     stdout = open('/dev/null','w'),
432                     stderr = subprocess.PIPE,
433                     stdin = source)
434             err = proc.stderr.read()
435             proc._known_hosts = tmp_known_hosts
436             eintr_retry(proc.wait)()
437             return ((None,err), proc)
438         elif hasattr(source, 'read'):
439             # file-like (but not file) source
440             proc = subprocess.Popen(args, 
441                     stdout = open('/dev/null','w'),
442                     stderr = subprocess.PIPE,
443                     stdin = subprocess.PIPE)
444             
445             buf = None
446             err = []
447             while True:
448                 if not buf:
449                     buf = source.read(4096)
450                 if not buf:
451                     #EOF
452                     break
453                 
454                 rdrdy, wrdy, broken = select.select(
455                     [proc.stderr],
456                     [proc.stdin],
457                     [proc.stderr,proc.stdin])
458                 
459                 if proc.stderr in rdrdy:
460                     # use os.read for fully unbuffered behavior
461                     err.append(os.read(proc.stderr.fileno(), 4096))
462                 
463                 if proc.stdin in wrdy:
464                     proc.stdin.write(buf)
465                     buf = None
466                 
467                 if broken:
468                     break
469             proc.stdin.close()
470             err.append(proc.stderr.read())
471                 
472             proc._known_hosts = tmp_known_hosts
473             eintr_retry(proc.wait)()
474             return ((None,''.join(err)), proc)
475         elif hasattr(dest, 'write'):
476             # file-like (but not file) dest
477             proc = subprocess.Popen(args, 
478                     stdout = subprocess.PIPE,
479                     stderr = subprocess.PIPE,
480                     stdin = open('/dev/null','w'))
481             
482             buf = None
483             err = []
484             while True:
485                 rdrdy, wrdy, broken = select.select(
486                     [proc.stderr, proc.stdout],
487                     [],
488                     [proc.stderr, proc.stdout])
489                 
490                 if proc.stderr in rdrdy:
491                     # use os.read for fully unbuffered behavior
492                     err.append(os.read(proc.stderr.fileno(), 4096))
493                 
494                 if proc.stdout in rdrdy:
495                     # use os.read for fully unbuffered behavior
496                     buf = os.read(proc.stdout.fileno(), 4096)
497                     dest.write(buf)
498                     
499                     if not buf:
500                         #EOF
501                         break
502                 
503                 if broken:
504                     break
505             err.append(proc.stderr.read())
506                 
507             proc._known_hosts = tmp_known_hosts
508             eintr_retry(proc.wait)()
509             return ((None,''.join(err)), proc)
510         else:
511             raise AssertionError, "Unreachable code reached! :-Q"
512     else:
513         # Parse destination as <user>@<server>:<path>
514         if isinstance(dest, basestring) and ':' in dest:
515             remspec, path = dest.split(':',1)
516         elif isinstance(source, basestring) and ':' in source:
517             remspec, path = source.split(':',1)
518         else:
519             raise ValueError, "Both endpoints cannot be local"
520         user,host = remspec.rsplit('@',1)
521         
522         # plain scp
523         tmp_known_hosts = None
524
525         args = ['scp', '-q', '-p', '-C',
526                 # Speed up transfer using blowfish cypher specification which is 
527                 # faster than the default one (3des)
528                 '-c', 'blowfish',
529                 # Don't bother with localhost. Makes test easier
530                 '-o', 'NoHostAuthenticationForLocalhost=yes',
531                 '-o', 'ConnectTimeout=60',
532                 '-o', 'ConnectionAttempts=3',
533                 '-o', 'ServerAliveInterval=30',
534                 '-o', 'TCPKeepAlive=yes' ]
535                 
536         if port:
537             args.append('-P%d' % port)
538
539         if recursive:
540             args.append('-r')
541
542         if identity:
543             args.extend(('-i', identity))
544
545         if server_key:
546             # Create a temporary server key file
547             tmp_known_hosts = make_server_key_args(server_key, host, port)
548             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
549
550         if not strict_host_checking:
551             # Do not check for Host key. Unsafe.
552             args.extend(['-o', 'StrictHostKeyChecking=no'])
553
554         if isinstance(source,list):
555             args.extend(source)
556         else:
557             if openssh_has_persist():
558                 args.extend([
559                     '-o', 'ControlMaster=auto',
560                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
561                     ])
562             args.append(source)
563
564         args.append(dest)
565
566         for x in xrange(retry):
567             # connects to the remote host and starts a remote connection
568             proc = subprocess.Popen(args,
569                     stdout = subprocess.PIPE,
570                     stdin = subprocess.PIPE, 
571                     stderr = subprocess.PIPE)
572             
573             # attach tempfile object to the process, to make sure the file stays
574             # alive until the process is finished with it
575             proc._known_hosts = tmp_known_hosts
576         
577             try:
578                 (out, err) = proc.communicate()
579                 eintr_retry(proc.wait)()
580                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
581                 log(msg, logging.DEBUG, out, err)
582
583                 if proc.poll():
584                     t = x*2
585                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
586                             t, x, host, " ".join(args))
587                     log(msg, logging.DEBUG)
588
589                     time.sleep(t)
590                     continue
591
592                 break
593             except RuntimeError, e:
594                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
595                 log(msg, logging.DEBUG, out, err)
596
597                 if retry <= 0:
598                     raise
599                 retry -= 1
600             
601         return ((out, err), proc)
602
603 def rspawn(command, pidfile, 
604         stdout = '/dev/null', 
605         stderr = STDOUT, 
606         stdin = '/dev/null',
607         home = None, 
608         create_home = False, 
609         sudo = False,
610         host = None, 
611         port = None, 
612         user = None, 
613         agent = None, 
614         identity = None, 
615         server_key = None,
616         tty = False):
617     """
618     Spawn a remote command such that it will continue working asynchronously in 
619     background. 
620
621         :param command: The command to run, it should be a single line.
622         :type command: str
623
624         :param pidfile: Path to a file where to store the pid and ppid of the 
625                         spawned process
626         :type pidfile: str
627
628         :param stdout: Path to file to redirect standard output. 
629                        The default value is /dev/null
630         :type stdout: str
631
632         :param stderr: Path to file to redirect standard error.
633                        If the special STDOUT value is used, stderr will 
634                        be redirected to the same file as stdout
635         :type stderr: str
636
637         :param stdin: Path to a file with input to be piped into the command's standard input
638         :type stdin: str
639
640         :param home: Path to working directory folder. 
641                     It is assumed to exist unless the create_home flag is set.
642         :type home: str
643
644         :param create_home: Flag to force creation of the home folder before 
645                             running the command
646         :type create_home: bool
647  
648         :param sudo: Flag forcing execution with sudo user
649         :type sudo: bool
650         
651         :rtype: touple
652
653         (stdout, stderr), process
654         
655         Of the spawning process, which only captures errors at spawning time.
656         Usually only useful for diagnostics.
657     """
658     # Start process in a "daemonized" way, using nohup and heavy
659     # stdin/out redirection to avoid connection issues
660     if stderr is STDOUT:
661         stderr = '&1'
662     else:
663         stderr = ' ' + stderr
664     
665     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
666         'command' : command,
667         'pidfile' : shell_escape(pidfile),
668         'stdout' : stdout,
669         'stderr' : stderr,
670         'stdin' : stdin,
671     }
672     
673     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
674             'command' : shell_escape(daemon_command),
675             'sudo' : 'sudo -S' if sudo else '',
676             'pidfile' : shell_escape(pidfile),
677             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
678             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
679         }
680
681     (out,err),proc = rexec(
682         cmd,
683         host = host,
684         port = port,
685         user = user,
686         agent = agent,
687         identity = identity,
688         server_key = server_key,
689         tty = tty ,
690         )
691     
692     if proc.wait():
693         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
694
695     return ((out, err), proc)
696
697 @eintr_retry
698 def rgetpid(pidfile,
699         host = None, 
700         port = None, 
701         user = None, 
702         agent = None, 
703         identity = None,
704         server_key = None):
705     """
706     Returns the pid and ppid of a process from a remote file where the 
707     information was stored.
708
709         :param home: Path to directory where the pidfile is located
710         :type home: str
711
712         :param pidfile: Name of file containing the pid information
713         :type pidfile: str
714         
715         :rtype: int
716         
717         A (pid, ppid) tuple useful for calling rstatus and rkill,
718         or None if the pidfile isn't valid yet (can happen when process is staring up)
719
720     """
721     (out,err),proc = rexec(
722         "cat %(pidfile)s" % {
723             'pidfile' : pidfile,
724         },
725         host = host,
726         port = port,
727         user = user,
728         agent = agent,
729         identity = identity,
730         server_key = server_key
731         )
732         
733     if proc.wait():
734         return None
735     
736     if out:
737         try:
738             return map(int,out.strip().split(' ',1))
739         except:
740             # Ignore, many ways to fail that don't matter that much
741             return None
742
743 @eintr_retry
744 def rstatus(pid, ppid, 
745         host = None, 
746         port = None, 
747         user = None, 
748         agent = None, 
749         identity = None,
750         server_key = None):
751     """
752     Returns a code representing the the status of a remote process
753
754         :param pid: Process id of the process
755         :type pid: int
756
757         :param ppid: Parent process id of process
758         :type ppid: int
759     
760         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
761     
762     """
763     (out,err),proc = rexec(
764         # Check only by pid. pid+ppid does not always work (especially with sudo) 
765         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
766             'ppid' : ppid,
767             'pid' : pid,
768         },
769         host = host,
770         port = port,
771         user = user,
772         agent = agent,
773         identity = identity,
774         server_key = server_key
775         )
776     
777     if proc.wait():
778         return ProcStatus.NOT_STARTED
779     
780     status = False
781     if err:
782         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
783             status = True
784     elif out:
785         status = (out.strip() == 'wait')
786     else:
787         return ProcStatus.NOT_STARTED
788     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
789
790 @eintr_retry
791 def rkill(pid, ppid,
792         host = None, 
793         port = None, 
794         user = None, 
795         agent = None, 
796         sudo = False,
797         identity = None, 
798         server_key = None, 
799         nowait = False):
800     """
801     Sends a kill signal to a remote process.
802
803     First tries a SIGTERM, and if the process does not end in 10 seconds,
804     it sends a SIGKILL.
805  
806         :param pid: Process id of process to be killed
807         :type pid: int
808
809         :param ppid: Parent process id of process to be killed
810         :type ppid: int
811
812         :param sudo: Flag indicating if sudo should be used to kill the process
813         :type sudo: bool
814         
815     """
816     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
817     cmd = """
818 SUBKILL="%(subkill)s" ;
819 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
820 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
821 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
822     sleep 0.2 
823     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
824         break
825     else
826         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
827         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
828     fi
829     sleep 1.8
830 done
831 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
832     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
833     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
834 fi
835 """
836     if nowait:
837         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
838
839     (out,err),proc = rexec(
840         cmd % {
841             'ppid' : ppid,
842             'pid' : pid,
843             'sudo' : 'sudo -S' if sudo else '',
844             'subkill' : subkill,
845         },
846         host = host,
847         port = port,
848         user = user,
849         agent = agent,
850         identity = identity,
851         server_key = server_key
852         )
853     
854     # wait, don't leave zombies around
855     proc.wait()
856
857     return (out, err), proc
858
859 # POSIX
860 def _communicate(proc, input, timeout=None, err_on_timeout=True):
861     read_set = []
862     write_set = []
863     stdout = None # Return
864     stderr = None # Return
865     
866     killed = False
867     
868     if timeout is not None:
869         timelimit = time.time() + timeout
870         killtime = timelimit + 4
871         bailtime = timelimit + 4
872
873     if proc.stdin:
874         # Flush stdio buffer.  This might block, if the user has
875         # been writing to .stdin in an uncontrolled fashion.
876         proc.stdin.flush()
877         if input:
878             write_set.append(proc.stdin)
879         else:
880             proc.stdin.close()
881
882     if proc.stdout:
883         read_set.append(proc.stdout)
884         stdout = []
885
886     if proc.stderr:
887         read_set.append(proc.stderr)
888         stderr = []
889
890     input_offset = 0
891     while read_set or write_set:
892         if timeout is not None:
893             curtime = time.time()
894             if timeout is None or curtime > timelimit:
895                 if curtime > bailtime:
896                     break
897                 elif curtime > killtime:
898                     signum = signal.SIGKILL
899                 else:
900                     signum = signal.SIGTERM
901                 # Lets kill it
902                 os.kill(proc.pid, signum)
903                 select_timeout = 0.5
904             else:
905                 select_timeout = timelimit - curtime + 0.1
906         else:
907             select_timeout = 1.0
908         
909         if select_timeout > 1.0:
910             select_timeout = 1.0
911             
912         try:
913             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
914         except select.error,e:
915             if e[0] != 4:
916                 raise
917             else:
918                 continue
919         
920         if not rlist and not wlist and not xlist and proc.poll() is not None:
921             # timeout and process exited, say bye
922             break
923
924         if proc.stdin in wlist:
925             # When select has indicated that the file is writable,
926             # we can write up to PIPE_BUF bytes without risk
927             # blocking.  POSIX defines PIPE_BUF >= 512
928             bytes_written = os.write(proc.stdin.fileno(),
929                     buffer(input, input_offset, 512))
930             input_offset += bytes_written
931
932             if input_offset >= len(input):
933                 proc.stdin.close()
934                 write_set.remove(proc.stdin)
935
936         if proc.stdout in rlist:
937             data = os.read(proc.stdout.fileno(), 1024)
938             if data == "":
939                 proc.stdout.close()
940                 read_set.remove(proc.stdout)
941             stdout.append(data)
942
943         if proc.stderr in rlist:
944             data = os.read(proc.stderr.fileno(), 1024)
945             if data == "":
946                 proc.stderr.close()
947                 read_set.remove(proc.stderr)
948             stderr.append(data)
949     
950     # All data exchanged.  Translate lists into strings.
951     if stdout is not None:
952         stdout = ''.join(stdout)
953     if stderr is not None:
954         stderr = ''.join(stderr)
955
956     # Translate newlines, if requested.  We cannot let the file
957     # object do the translation: It is based on stdio, which is
958     # impossible to combine with select (unless forcing no
959     # buffering).
960     if proc.universal_newlines and hasattr(file, 'newlines'):
961         if stdout:
962             stdout = proc._translate_newlines(stdout)
963         if stderr:
964             stderr = proc._translate_newlines(stderr)
965
966     if killed and err_on_timeout:
967         errcode = proc.poll()
968         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
969     else:
970         if killed:
971             proc.poll()
972         else:
973             proc.wait()
974         return (stdout, stderr)
975