Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import base64
21 import errno
22 import hashlib
23 import logging
24 import os
25 import os.path
26 import re
27 import select
28 import signal
29 import socket
30 import subprocess
31 import threading
32 import time
33 import tempfile
34
35 logger = logging.getLogger("sshfuncs")
36
37 def log(msg, level, out = None, err = None):
38     if out:
39         msg += " - OUT: %s " % out
40
41     if err:
42         msg += " - ERROR: %s " % err
43
44     logger.log(level, msg)
45
46
47 if hasattr(os, "devnull"):
48     DEV_NULL = os.devnull
49 else:
50     DEV_NULL = "/dev/null"
51
52 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
53
54 class STDOUT: 
55     """
56     Special value that when given to rspawn in stderr causes stderr to 
57     redirect to whatever stdout was redirected to.
58     """
59
60 class ProcStatus:
61     """
62     Codes for status of remote spawned process
63     """
64     # Process is still running
65     RUNNING = 1
66
67     # Process is finished
68     FINISHED = 2
69     
70     # Process hasn't started running yet (this should be very rare)
71     NOT_STARTED = 3
72
73 hostbyname_cache = dict()
74 hostbyname_cache_lock = threading.Lock()
75
76 def gethostbyname(host):
77     global hostbyname_cache
78     global hostbyname_cache_lock
79     
80     hostbyname = hostbyname_cache.get(host)
81     if not hostbyname:
82         with hostbyname_cache_lock:
83             hostbyname = socket.gethostbyname(host)
84             hostbyname_cache[host] = hostbyname
85
86             msg = " Added hostbyname %s - %s " % (host, hostbyname)
87             log(msg, logging.DEBUG)
88
89     return hostbyname
90
91 OPENSSH_HAS_PERSIST = None
92
93 def openssh_has_persist():
94     """ The ssh_config options ControlMaster and ControlPersist allow to
95     reuse a same network connection for multiple ssh sessions. In this 
96     way limitations on number of open ssh connections can be bypassed.
97     However, older versions of openSSH do not support this feature.
98     This function is used to determine if ssh connection persist features
99     can be used.
100     """
101     global OPENSSH_HAS_PERSIST
102     if OPENSSH_HAS_PERSIST is None:
103         proc = subprocess.Popen(["ssh","-v"],
104             stdout = subprocess.PIPE,
105             stderr = subprocess.STDOUT,
106             stdin = open("/dev/null","r") )
107         out,err = proc.communicate()
108         proc.wait()
109         
110         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
111         OPENSSH_HAS_PERSIST = bool(vre.match(out))
112     return OPENSSH_HAS_PERSIST
113
114 def make_server_key_args(server_key, host, port):
115     """ Returns a reference to a temporary known_hosts file, to which 
116     the server key has been added. 
117     
118     Make sure to hold onto the temp file reference until the process is 
119     done with it
120
121     :param server_key: the server public key
122     :type server_key: str
123
124     :param host: the hostname
125     :type host: str
126
127     :param port: the ssh port
128     :type port: str
129
130     """
131     if port is not None:
132         host = '%s:%s' % (host, str(port))
133
134     # Create a temporary server key file
135     tmp_known_hosts = tempfile.NamedTemporaryFile()
136    
137     hostbyname = gethostbyname(host) 
138
139     # Add the intended host key
140     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
141     
142     # If we're not in strict mode, add user-configured keys
143     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
144         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
145         if os.access(user_hosts_path, os.R_OK):
146             f = open(user_hosts_path, "r")
147             tmp_known_hosts.write(f.read())
148             f.close()
149         
150     tmp_known_hosts.flush()
151     
152     return tmp_known_hosts
153
154 def make_control_path(agent, forward_x11):
155     ctrl_path = "/tmp/nepi_ssh"
156
157     if agent:
158         ctrl_path +="_a"
159
160     if forward_x11:
161         ctrl_path +="_x"
162
163     ctrl_path += "-%r@%h:%p"
164
165     return ctrl_path
166
167 def shell_escape(s):
168     """ Escapes strings so that they are safe to use as command-line 
169     arguments """
170     if SHELL_SAFE.match(s):
171         # safe string - no escaping needed
172         return s
173     else:
174         # unsafe string - escape
175         def escp(c):
176             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
177                 return c
178             else:
179                 return "'$'\\x%02x''" % (ord(c),)
180         s = ''.join(map(escp,s))
181         return "'%s'" % (s,)
182
183 def eintr_retry(func):
184     """Retries a function invocation when a EINTR occurs"""
185     import functools
186     @functools.wraps(func)
187     def rv(*p, **kw):
188         retry = kw.pop("_retry", False)
189         for i in xrange(0 if retry else 4):
190             try:
191                 return func(*p, **kw)
192             except (select.error, socket.error), args:
193                 if args[0] == errno.EINTR:
194                     continue
195                 else:
196                     raise 
197             except OSError, e:
198                 if e.errno == errno.EINTR:
199                     continue
200                 else:
201                     raise
202         else:
203             return func(*p, **kw)
204     return rv
205
206 def rexec(command, host, user, 
207         port = None, 
208         agent = True,
209         sudo = False,
210         stdin = None,
211         identity = None,
212         server_key = None,
213         env = None,
214         tty = False,
215         timeout = None,
216         retry = 3,
217         err_on_timeout = True,
218         connect_timeout = 30,
219         persistent = True,
220         forward_x11 = False,
221         strict_host_checking = True):
222     """
223     Executes a remote command, returns ((stdout,stderr),process)
224     """
225     
226     tmp_known_hosts = None
227     hostip = gethostbyname(host)
228
229     args = ['ssh', '-C',
230             # Don't bother with localhost. Makes test easier
231             '-o', 'NoHostAuthenticationForLocalhost=yes',
232             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
233             '-o', 'ConnectionAttempts=3',
234             '-o', 'ServerAliveInterval=30',
235             '-o', 'TCPKeepAlive=yes',
236             '-l', user, hostip or host]
237
238     if persistent and openssh_has_persist():
239         args.extend([
240             '-o', 'ControlMaster=auto',
241             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
242             '-o', 'ControlPersist=60' ])
243
244     if not strict_host_checking:
245         # Do not check for Host key. Unsafe.
246         args.extend(['-o', 'StrictHostKeyChecking=no'])
247
248     if agent:
249         args.append('-A')
250
251     if port:
252         args.append('-p%d' % port)
253
254     if identity:
255         args.extend(('-i', identity))
256
257     if tty:
258         args.append('-t')
259         args.append('-t')
260
261     if forward_x11:
262         args.append('-X')
263
264     if server_key:
265         # Create a temporary server key file
266         tmp_known_hosts = make_server_key_args(server_key, host, port)
267         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
268
269     args.append(command)
270
271     for x in xrange(retry):
272         # connects to the remote host and starts a remote connection
273         proc = subprocess.Popen(args,
274                 env = env,
275                 stdout = subprocess.PIPE,
276                 stdin = subprocess.PIPE, 
277                 stderr = subprocess.PIPE)
278         
279         # attach tempfile object to the process, to make sure the file stays
280         # alive until the process is finished with it
281         proc._known_hosts = tmp_known_hosts
282     
283         try:
284             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
285             msg = " rexec - host %s - command %s " % (host, " ".join(args))
286             log(msg, logging.DEBUG, out, err)
287
288             if proc.poll():
289                 skip = False
290
291                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
292                     # SSH error, can safely retry
293                     skip = True 
294                 elif retry:
295                     # Probably timed out or plain failed but can retry
296                     skip = True 
297                 
298                 if skip:
299                     t = x*2
300                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
301                             t, x, host, " ".join(args))
302                     log(msg, logging.DEBUG)
303
304                     time.sleep(t)
305                     continue
306             break
307         except RuntimeError, e:
308             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
309             log(msg, logging.DEBUG, out, err)
310
311             if retry <= 0:
312                 raise
313             retry -= 1
314         
315     return ((out, err), proc)
316
317 def rcopy(source, dest,
318         port = None, 
319         agent = True, 
320         recursive = False,
321         identity = None,
322         server_key = None,
323         retry = 3,
324         strict_host_checking = True):
325     """
326     Copies from/to remote sites.
327     
328     Source and destination should have the user and host encoded
329     as per scp specs.
330     
331     If source is a file object, a special mode will be used to
332     create the remote file with the same contents.
333     
334     If dest is a file object, the remote file (source) will be
335     read and written into dest.
336     
337     In these modes, recursive cannot be True.
338     
339     Source can be a list of files to copy to a single destination,
340     in which case it is advised that the destination be a folder.
341     """
342     
343     if isinstance(source, file) and source.tell() == 0:
344         source = source.name
345     elif hasattr(source, 'read'):
346         tmp = tempfile.NamedTemporaryFile()
347         while True:
348             buf = source.read(65536)
349             if buf:
350                 tmp.write(buf)
351             else:
352                 break
353         tmp.seek(0)
354         source = tmp.name
355     
356     if isinstance(source, file) or isinstance(dest, file) \
357             or hasattr(source, 'read')  or hasattr(dest, 'write'):
358         assert not recursive
359         
360         # Parse source/destination as <user>@<server>:<path>
361         if isinstance(dest, basestring) and ':' in dest:
362             remspec, path = dest.split(':',1)
363         elif isinstance(source, basestring) and ':' in source:
364             remspec, path = source.split(':',1)
365         else:
366             raise ValueError, "Both endpoints cannot be local"
367         user,host = remspec.rsplit('@',1)
368         
369         tmp_known_hosts = None
370         hostip = gethostbyname(host)
371         
372         args = ['ssh', '-l', user, '-C',
373                 # Don't bother with localhost. Makes test easier
374                 '-o', 'NoHostAuthenticationForLocalhost=yes',
375                 '-o', 'ConnectTimeout=60',
376                 '-o', 'ConnectionAttempts=3',
377                 '-o', 'ServerAliveInterval=30',
378                 '-o', 'TCPKeepAlive=yes',
379                 hostip or host ]
380
381         if openssh_has_persist():
382             args.extend([
383                 '-o', 'ControlMaster=auto',
384                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
385                 '-o', 'ControlPersist=60' ])
386
387         if port:
388             args.append('-P%d' % port)
389
390         if identity:
391             args.extend(('-i', identity))
392
393         if server_key:
394             # Create a temporary server key file
395             tmp_known_hosts = make_server_key_args(server_key, host, port)
396             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
397         
398         if isinstance(source, file) or hasattr(source, 'read'):
399             args.append('cat > %s' % (shell_escape(path),))
400         elif isinstance(dest, file) or hasattr(dest, 'write'):
401             args.append('cat %s' % (shell_escape(path),))
402         else:
403             raise AssertionError, "Unreachable code reached! :-Q"
404         
405         # connects to the remote host and starts a remote connection
406         if isinstance(source, file):
407             proc = subprocess.Popen(args, 
408                     stdout = open('/dev/null','w'),
409                     stderr = subprocess.PIPE,
410                     stdin = source)
411             err = proc.stderr.read()
412             proc._known_hosts = tmp_known_hosts
413             eintr_retry(proc.wait)()
414             return ((None,err), proc)
415         elif isinstance(dest, file):
416             proc = subprocess.Popen(args, 
417                     stdout = open('/dev/null','w'),
418                     stderr = subprocess.PIPE,
419                     stdin = source)
420             err = proc.stderr.read()
421             proc._known_hosts = tmp_known_hosts
422             eintr_retry(proc.wait)()
423             return ((None,err), proc)
424         elif hasattr(source, 'read'):
425             # file-like (but not file) source
426             proc = subprocess.Popen(args, 
427                     stdout = open('/dev/null','w'),
428                     stderr = subprocess.PIPE,
429                     stdin = subprocess.PIPE)
430             
431             buf = None
432             err = []
433             while True:
434                 if not buf:
435                     buf = source.read(4096)
436                 if not buf:
437                     #EOF
438                     break
439                 
440                 rdrdy, wrdy, broken = select.select(
441                     [proc.stderr],
442                     [proc.stdin],
443                     [proc.stderr,proc.stdin])
444                 
445                 if proc.stderr in rdrdy:
446                     # use os.read for fully unbuffered behavior
447                     err.append(os.read(proc.stderr.fileno(), 4096))
448                 
449                 if proc.stdin in wrdy:
450                     proc.stdin.write(buf)
451                     buf = None
452                 
453                 if broken:
454                     break
455             proc.stdin.close()
456             err.append(proc.stderr.read())
457                 
458             proc._known_hosts = tmp_known_hosts
459             eintr_retry(proc.wait)()
460             return ((None,''.join(err)), proc)
461         elif hasattr(dest, 'write'):
462             # file-like (but not file) dest
463             proc = subprocess.Popen(args, 
464                     stdout = subprocess.PIPE,
465                     stderr = subprocess.PIPE,
466                     stdin = open('/dev/null','w'))
467             
468             buf = None
469             err = []
470             while True:
471                 rdrdy, wrdy, broken = select.select(
472                     [proc.stderr, proc.stdout],
473                     [],
474                     [proc.stderr, proc.stdout])
475                 
476                 if proc.stderr in rdrdy:
477                     # use os.read for fully unbuffered behavior
478                     err.append(os.read(proc.stderr.fileno(), 4096))
479                 
480                 if proc.stdout in rdrdy:
481                     # use os.read for fully unbuffered behavior
482                     buf = os.read(proc.stdout.fileno(), 4096)
483                     dest.write(buf)
484                     
485                     if not buf:
486                         #EOF
487                         break
488                 
489                 if broken:
490                     break
491             err.append(proc.stderr.read())
492                 
493             proc._known_hosts = tmp_known_hosts
494             eintr_retry(proc.wait)()
495             return ((None,''.join(err)), proc)
496         else:
497             raise AssertionError, "Unreachable code reached! :-Q"
498     else:
499         # Parse destination as <user>@<server>:<path>
500         if isinstance(dest, basestring) and ':' in dest:
501             remspec, path = dest.split(':',1)
502         elif isinstance(source, basestring) and ':' in source:
503             remspec, path = source.split(':',1)
504         else:
505             raise ValueError, "Both endpoints cannot be local"
506         user,host = remspec.rsplit('@',1)
507         
508         # plain scp
509         tmp_known_hosts = None
510
511         args = ['scp', '-q', '-p', '-C',
512                 # Speed up transfer using blowfish cypher specification which is 
513                 # faster than the default one (3des)
514                 '-c', 'blowfish',
515                 # Don't bother with localhost. Makes test easier
516                 '-o', 'NoHostAuthenticationForLocalhost=yes',
517                 '-o', 'ConnectTimeout=60',
518                 '-o', 'ConnectionAttempts=3',
519                 '-o', 'ServerAliveInterval=30',
520                 '-o', 'TCPKeepAlive=yes' ]
521                 
522         if port:
523             args.append('-P%d' % port)
524
525         if recursive:
526             args.append('-r')
527
528         if identity:
529             args.extend(('-i', identity))
530
531         if server_key:
532             # Create a temporary server key file
533             tmp_known_hosts = make_server_key_args(server_key, host, port)
534             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
535
536         if not strict_host_checking:
537             # Do not check for Host key. Unsafe.
538             args.extend(['-o', 'StrictHostKeyChecking=no'])
539
540         if isinstance(source,list):
541             args.extend(source)
542         else:
543             if openssh_has_persist():
544                 args.extend([
545                     '-o', 'ControlMaster=auto',
546                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
547                     ])
548             args.append(source)
549
550         args.append(dest)
551
552         for x in xrange(retry):
553             # connects to the remote host and starts a remote connection
554             proc = subprocess.Popen(args,
555                     stdout = subprocess.PIPE,
556                     stdin = subprocess.PIPE, 
557                     stderr = subprocess.PIPE)
558             
559             # attach tempfile object to the process, to make sure the file stays
560             # alive until the process is finished with it
561             proc._known_hosts = tmp_known_hosts
562         
563             try:
564                 (out, err) = proc.communicate()
565                 eintr_retry(proc.wait)()
566                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
567                 log(msg, logging.DEBUG, out, err)
568
569                 if proc.poll():
570                     t = x*2
571                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
572                             t, x, host, " ".join(args))
573                     log(msg, logging.DEBUG)
574
575                     time.sleep(t)
576                     continue
577
578                 break
579             except RuntimeError, e:
580                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
581                 log(msg, logging.DEBUG, out, err)
582
583                 if retry <= 0:
584                     raise
585                 retry -= 1
586             
587         return ((out, err), proc)
588
589 def rspawn(command, pidfile, 
590         stdout = '/dev/null', 
591         stderr = STDOUT, 
592         stdin = '/dev/null',
593         home = None, 
594         create_home = False, 
595         sudo = False,
596         host = None, 
597         port = None, 
598         user = None, 
599         agent = None, 
600         identity = None, 
601         server_key = None,
602         tty = False):
603     """
604     Spawn a remote command such that it will continue working asynchronously in 
605     background. 
606
607         :param command: The command to run, it should be a single line.
608         :type command: str
609
610         :param pidfile: Path to a file where to store the pid and ppid of the 
611                         spawned process
612         :type pidfile: str
613
614         :param stdout: Path to file to redirect standard output. 
615                        The default value is /dev/null
616         :type stdout: str
617
618         :param stderr: Path to file to redirect standard error.
619                        If the special STDOUT value is used, stderr will 
620                        be redirected to the same file as stdout
621         :type stderr: str
622
623         :param stdin: Path to a file with input to be piped into the command's standard input
624         :type stdin: str
625
626         :param home: Path to working directory folder. 
627                     It is assumed to exist unless the create_home flag is set.
628         :type home: str
629
630         :param create_home: Flag to force creation of the home folder before 
631                             running the command
632         :type create_home: bool
633  
634         :param sudo: Flag forcing execution with sudo user
635         :type sudo: bool
636         
637         :rtype: touple
638
639         (stdout, stderr), process
640         
641         Of the spawning process, which only captures errors at spawning time.
642         Usually only useful for diagnostics.
643     """
644     # Start process in a "daemonized" way, using nohup and heavy
645     # stdin/out redirection to avoid connection issues
646     if stderr is STDOUT:
647         stderr = '&1'
648     else:
649         stderr = ' ' + stderr
650     
651     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
652         'command' : command,
653         'pidfile' : shell_escape(pidfile),
654         'stdout' : stdout,
655         'stderr' : stderr,
656         'stdin' : stdin,
657     }
658     
659     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
660             'command' : shell_escape(daemon_command),
661             'sudo' : 'sudo -S' if sudo else '',
662             'pidfile' : shell_escape(pidfile),
663             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
664             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
665         }
666
667     (out,err),proc = rexec(
668         cmd,
669         host = host,
670         port = port,
671         user = user,
672         agent = agent,
673         identity = identity,
674         server_key = server_key,
675         tty = tty ,
676         )
677     
678     if proc.wait():
679         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
680
681     return ((out, err), proc)
682
683 @eintr_retry
684 def rgetpid(pidfile,
685         host = None, 
686         port = None, 
687         user = None, 
688         agent = None, 
689         identity = None,
690         server_key = None):
691     """
692     Returns the pid and ppid of a process from a remote file where the 
693     information was stored.
694
695         :param home: Path to directory where the pidfile is located
696         :type home: str
697
698         :param pidfile: Name of file containing the pid information
699         :type pidfile: str
700         
701         :rtype: int
702         
703         A (pid, ppid) tuple useful for calling rstatus and rkill,
704         or None if the pidfile isn't valid yet (can happen when process is staring up)
705
706     """
707     (out,err),proc = rexec(
708         "cat %(pidfile)s" % {
709             'pidfile' : pidfile,
710         },
711         host = host,
712         port = port,
713         user = user,
714         agent = agent,
715         identity = identity,
716         server_key = server_key
717         )
718         
719     if proc.wait():
720         return None
721     
722     if out:
723         try:
724             return map(int,out.strip().split(' ',1))
725         except:
726             # Ignore, many ways to fail that don't matter that much
727             return None
728
729 @eintr_retry
730 def rstatus(pid, ppid, 
731         host = None, 
732         port = None, 
733         user = None, 
734         agent = None, 
735         identity = None,
736         server_key = None):
737     """
738     Returns a code representing the the status of a remote process
739
740         :param pid: Process id of the process
741         :type pid: int
742
743         :param ppid: Parent process id of process
744         :type ppid: int
745     
746         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
747     
748     """
749     (out,err),proc = rexec(
750         # Check only by pid. pid+ppid does not always work (especially with sudo) 
751         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
752             'ppid' : ppid,
753             'pid' : pid,
754         },
755         host = host,
756         port = port,
757         user = user,
758         agent = agent,
759         identity = identity,
760         server_key = server_key
761         )
762     
763     if proc.wait():
764         return ProcStatus.NOT_STARTED
765     
766     status = False
767     if err:
768         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
769             status = True
770     elif out:
771         status = (out.strip() == 'wait')
772     else:
773         return ProcStatus.NOT_STARTED
774     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
775
776 @eintr_retry
777 def rkill(pid, ppid,
778         host = None, 
779         port = None, 
780         user = None, 
781         agent = None, 
782         sudo = False,
783         identity = None, 
784         server_key = None, 
785         nowait = False):
786     """
787     Sends a kill signal to a remote process.
788
789     First tries a SIGTERM, and if the process does not end in 10 seconds,
790     it sends a SIGKILL.
791  
792         :param pid: Process id of process to be killed
793         :type pid: int
794
795         :param ppid: Parent process id of process to be killed
796         :type ppid: int
797
798         :param sudo: Flag indicating if sudo should be used to kill the process
799         :type sudo: bool
800         
801     """
802     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
803     cmd = """
804 SUBKILL="%(subkill)s" ;
805 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
806 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
807 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
808     sleep 0.2 
809     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
810         break
811     else
812         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
813         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
814     fi
815     sleep 1.8
816 done
817 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
818     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
819     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
820 fi
821 """
822     if nowait:
823         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
824
825     (out,err),proc = rexec(
826         cmd % {
827             'ppid' : ppid,
828             'pid' : pid,
829             'sudo' : 'sudo -S' if sudo else '',
830             'subkill' : subkill,
831         },
832         host = host,
833         port = port,
834         user = user,
835         agent = agent,
836         identity = identity,
837         server_key = server_key
838         )
839     
840     # wait, don't leave zombies around
841     proc.wait()
842
843     return (out, err), proc
844
845 # POSIX
846 def _communicate(self, input, timeout=None, err_on_timeout=True):
847     read_set = []
848     write_set = []
849     stdout = None # Return
850     stderr = None # Return
851     
852     killed = False
853     
854     if timeout is not None:
855         timelimit = time.time() + timeout
856         killtime = timelimit + 4
857         bailtime = timelimit + 4
858
859     if self.stdin:
860         # Flush stdio buffer.  This might block, if the user has
861         # been writing to .stdin in an uncontrolled fashion.
862         self.stdin.flush()
863         if input:
864             write_set.append(self.stdin)
865         else:
866             self.stdin.close()
867     if self.stdout:
868         read_set.append(self.stdout)
869         stdout = []
870     if self.stderr:
871         read_set.append(self.stderr)
872         stderr = []
873
874     input_offset = 0
875     while read_set or write_set:
876         if timeout is not None:
877             curtime = time.time()
878             if timeout is None or curtime > timelimit:
879                 if curtime > bailtime:
880                     break
881                 elif curtime > killtime:
882                     signum = signal.SIGKILL
883                 else:
884                     signum = signal.SIGTERM
885                 # Lets kill it
886                 os.kill(self.pid, signum)
887                 select_timeout = 0.5
888             else:
889                 select_timeout = timelimit - curtime + 0.1
890         else:
891             select_timeout = 1.0
892         
893         if select_timeout > 1.0:
894             select_timeout = 1.0
895             
896         try:
897             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
898         except select.error,e:
899             if e[0] != 4:
900                 raise
901             else:
902                 continue
903         
904         if not rlist and not wlist and not xlist and self.poll() is not None:
905             # timeout and process exited, say bye
906             break
907
908         if self.stdin in wlist:
909             # When select has indicated that the file is writable,
910             # we can write up to PIPE_BUF bytes without risk
911             # blocking.  POSIX defines PIPE_BUF >= 512
912             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
913             input_offset += bytes_written
914             if input_offset >= len(input):
915                 self.stdin.close()
916                 write_set.remove(self.stdin)
917
918         if self.stdout in rlist:
919             data = os.read(self.stdout.fileno(), 1024)
920             if data == "":
921                 self.stdout.close()
922                 read_set.remove(self.stdout)
923             stdout.append(data)
924
925         if self.stderr in rlist:
926             data = os.read(self.stderr.fileno(), 1024)
927             if data == "":
928                 self.stderr.close()
929                 read_set.remove(self.stderr)
930             stderr.append(data)
931     
932     # All data exchanged.  Translate lists into strings.
933     if stdout is not None:
934         stdout = ''.join(stdout)
935     if stderr is not None:
936         stderr = ''.join(stderr)
937
938     # Translate newlines, if requested.  We cannot let the file
939     # object do the translation: It is based on stdio, which is
940     # impossible to combine with select (unless forcing no
941     # buffering).
942     if self.universal_newlines and hasattr(file, 'newlines'):
943         if stdout:
944             stdout = self._translate_newlines(stdout)
945         if stderr:
946             stderr = self._translate_newlines(stderr)
947
948     if killed and err_on_timeout:
949         errcode = self.poll()
950         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
951     else:
952         if killed:
953             self.poll()
954         else:
955             self.wait()
956         return (stdout, stderr)
957