Making CCN FIBEntry issue ccndc with hostname resolved to IP, since PlanetLab name...
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None, 
209         agent = True,
210         sudo = False,
211         stdin = None,
212         identity = None,
213         server_key = None,
214         env = None,
215         tty = False,
216         timeout = None,
217         retry = 3,
218         err_on_timeout = True,
219         connect_timeout = 30,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     hostip = gethostbyname(host)
230
231     args = ['ssh', '-C',
232             # Don't bother with localhost. Makes test easier
233             '-o', 'NoHostAuthenticationForLocalhost=yes',
234             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235             '-o', 'ConnectionAttempts=3',
236             '-o', 'ServerAliveInterval=30',
237             '-o', 'TCPKeepAlive=yes',
238             '-l', user, hostip or host]
239
240     if persistent and openssh_has_persist():
241         args.extend([
242             '-o', 'ControlMaster=auto',
243             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244             '-o', 'ControlPersist=60' ])
245
246     if not strict_host_checking:
247         # Do not check for Host key. Unsafe.
248         args.extend(['-o', 'StrictHostKeyChecking=no'])
249
250     if agent:
251         args.append('-A')
252
253     if port:
254         args.append('-p%d' % port)
255
256     if identity:
257         args.extend(('-i', identity))
258
259     if tty:
260         args.append('-t')
261         args.append('-t')
262
263     if forward_x11:
264         args.append('-X')
265
266     if server_key:
267         # Create a temporary server key file
268         tmp_known_hosts = make_server_key_args(server_key, host, port)
269         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
270
271     if sudo:
272         command = "sudo " + command
273
274     args.append(command)
275
276     for x in xrange(retry):
277         # connects to the remote host and starts a remote connection
278         proc = subprocess.Popen(args,
279                 env = env,
280                 stdout = subprocess.PIPE,
281                 stdin = subprocess.PIPE, 
282                 stderr = subprocess.PIPE)
283         
284         # attach tempfile object to the process, to make sure the file stays
285         # alive until the process is finished with it
286         proc._known_hosts = tmp_known_hosts
287     
288         # by default, rexec calls _communicate which will block 
289         # until the process has exit. The argument block == False 
290         # forces to rexec to return immediately, without blocking 
291         try:
292             if blocking:
293                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
294             else:
295                 err = proc.stderr.read()
296                 out = proc.stdout.read()
297
298             msg = " rexec - host %s - command %s " % (host, " ".join(args))
299             log(msg, logging.DEBUG, out, err)
300
301             if proc.poll():
302                 skip = False
303
304                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
305                     # SSH error, can safely retry
306                     skip = True 
307                 elif retry:
308                     # Probably timed out or plain failed but can retry
309                     skip = True 
310                 
311                 if skip:
312                     t = x*2
313                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
314                             t, x, host, " ".join(args))
315                     log(msg, logging.DEBUG)
316
317                     time.sleep(t)
318                     continue
319             break
320         except RuntimeError, e:
321             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
322             log(msg, logging.DEBUG, out, err)
323
324             if retry <= 0:
325                 raise
326             retry -= 1
327         
328     return ((out, err), proc)
329
330 def rcopy(source, dest,
331         port = None, 
332         agent = True, 
333         recursive = False,
334         identity = None,
335         server_key = None,
336         retry = 3,
337         strict_host_checking = True):
338     """
339     Copies from/to remote sites.
340     
341     Source and destination should have the user and host encoded
342     as per scp specs.
343     
344     If source is a file object, a special mode will be used to
345     create the remote file with the same contents.
346     
347     If dest is a file object, the remote file (source) will be
348     read and written into dest.
349     
350     In these modes, recursive cannot be True.
351     
352     Source can be a list of files to copy to a single destination,
353     in which case it is advised that the destination be a folder.
354     """
355     
356     if isinstance(source, file) and source.tell() == 0:
357         source = source.name
358     elif hasattr(source, 'read'):
359         tmp = tempfile.NamedTemporaryFile()
360         while True:
361             buf = source.read(65536)
362             if buf:
363                 tmp.write(buf)
364             else:
365                 break
366         tmp.seek(0)
367         source = tmp.name
368     
369     if isinstance(source, file) or isinstance(dest, file) \
370             or hasattr(source, 'read')  or hasattr(dest, 'write'):
371         assert not recursive
372         
373         # Parse source/destination as <user>@<server>:<path>
374         if isinstance(dest, basestring) and ':' in dest:
375             remspec, path = dest.split(':',1)
376         elif isinstance(source, basestring) and ':' in source:
377             remspec, path = source.split(':',1)
378         else:
379             raise ValueError, "Both endpoints cannot be local"
380         user,host = remspec.rsplit('@',1)
381         
382         tmp_known_hosts = None
383         hostip = gethostbyname(host)
384         
385         args = ['ssh', '-l', user, '-C',
386                 # Don't bother with localhost. Makes test easier
387                 '-o', 'NoHostAuthenticationForLocalhost=yes',
388                 '-o', 'ConnectTimeout=60',
389                 '-o', 'ConnectionAttempts=3',
390                 '-o', 'ServerAliveInterval=30',
391                 '-o', 'TCPKeepAlive=yes',
392                 hostip or host ]
393
394         if openssh_has_persist():
395             args.extend([
396                 '-o', 'ControlMaster=auto',
397                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
398                 '-o', 'ControlPersist=60' ])
399
400         if port:
401             args.append('-P%d' % port)
402
403         if identity:
404             args.extend(('-i', identity))
405
406         if server_key:
407             # Create a temporary server key file
408             tmp_known_hosts = make_server_key_args(server_key, host, port)
409             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
410         
411         if isinstance(source, file) or hasattr(source, 'read'):
412             args.append('cat > %s' % (shell_escape(path),))
413         elif isinstance(dest, file) or hasattr(dest, 'write'):
414             args.append('cat %s' % (shell_escape(path),))
415         else:
416             raise AssertionError, "Unreachable code reached! :-Q"
417         
418         # connects to the remote host and starts a remote connection
419         if isinstance(source, file):
420             proc = subprocess.Popen(args, 
421                     stdout = open('/dev/null','w'),
422                     stderr = subprocess.PIPE,
423                     stdin = source)
424             err = proc.stderr.read()
425             proc._known_hosts = tmp_known_hosts
426             eintr_retry(proc.wait)()
427             return ((None,err), proc)
428         elif isinstance(dest, file):
429             proc = subprocess.Popen(args, 
430                     stdout = open('/dev/null','w'),
431                     stderr = subprocess.PIPE,
432                     stdin = source)
433             err = proc.stderr.read()
434             proc._known_hosts = tmp_known_hosts
435             eintr_retry(proc.wait)()
436             return ((None,err), proc)
437         elif hasattr(source, 'read'):
438             # file-like (but not file) source
439             proc = subprocess.Popen(args, 
440                     stdout = open('/dev/null','w'),
441                     stderr = subprocess.PIPE,
442                     stdin = subprocess.PIPE)
443             
444             buf = None
445             err = []
446             while True:
447                 if not buf:
448                     buf = source.read(4096)
449                 if not buf:
450                     #EOF
451                     break
452                 
453                 rdrdy, wrdy, broken = select.select(
454                     [proc.stderr],
455                     [proc.stdin],
456                     [proc.stderr,proc.stdin])
457                 
458                 if proc.stderr in rdrdy:
459                     # use os.read for fully unbuffered behavior
460                     err.append(os.read(proc.stderr.fileno(), 4096))
461                 
462                 if proc.stdin in wrdy:
463                     proc.stdin.write(buf)
464                     buf = None
465                 
466                 if broken:
467                     break
468             proc.stdin.close()
469             err.append(proc.stderr.read())
470                 
471             proc._known_hosts = tmp_known_hosts
472             eintr_retry(proc.wait)()
473             return ((None,''.join(err)), proc)
474         elif hasattr(dest, 'write'):
475             # file-like (but not file) dest
476             proc = subprocess.Popen(args, 
477                     stdout = subprocess.PIPE,
478                     stderr = subprocess.PIPE,
479                     stdin = open('/dev/null','w'))
480             
481             buf = None
482             err = []
483             while True:
484                 rdrdy, wrdy, broken = select.select(
485                     [proc.stderr, proc.stdout],
486                     [],
487                     [proc.stderr, proc.stdout])
488                 
489                 if proc.stderr in rdrdy:
490                     # use os.read for fully unbuffered behavior
491                     err.append(os.read(proc.stderr.fileno(), 4096))
492                 
493                 if proc.stdout in rdrdy:
494                     # use os.read for fully unbuffered behavior
495                     buf = os.read(proc.stdout.fileno(), 4096)
496                     dest.write(buf)
497                     
498                     if not buf:
499                         #EOF
500                         break
501                 
502                 if broken:
503                     break
504             err.append(proc.stderr.read())
505                 
506             proc._known_hosts = tmp_known_hosts
507             eintr_retry(proc.wait)()
508             return ((None,''.join(err)), proc)
509         else:
510             raise AssertionError, "Unreachable code reached! :-Q"
511     else:
512         # Parse destination as <user>@<server>:<path>
513         if isinstance(dest, basestring) and ':' in dest:
514             remspec, path = dest.split(':',1)
515         elif isinstance(source, basestring) and ':' in source:
516             remspec, path = source.split(':',1)
517         else:
518             raise ValueError, "Both endpoints cannot be local"
519         user,host = remspec.rsplit('@',1)
520         
521         # plain scp
522         tmp_known_hosts = None
523
524         args = ['scp', '-q', '-p', '-C',
525                 # Speed up transfer using blowfish cypher specification which is 
526                 # faster than the default one (3des)
527                 '-c', 'blowfish',
528                 # Don't bother with localhost. Makes test easier
529                 '-o', 'NoHostAuthenticationForLocalhost=yes',
530                 '-o', 'ConnectTimeout=60',
531                 '-o', 'ConnectionAttempts=3',
532                 '-o', 'ServerAliveInterval=30',
533                 '-o', 'TCPKeepAlive=yes' ]
534                 
535         if port:
536             args.append('-P%d' % port)
537
538         if recursive:
539             args.append('-r')
540
541         if identity:
542             args.extend(('-i', identity))
543
544         if server_key:
545             # Create a temporary server key file
546             tmp_known_hosts = make_server_key_args(server_key, host, port)
547             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
548
549         if not strict_host_checking:
550             # Do not check for Host key. Unsafe.
551             args.extend(['-o', 'StrictHostKeyChecking=no'])
552
553         if isinstance(source,list):
554             args.extend(source)
555         else:
556             if openssh_has_persist():
557                 args.extend([
558                     '-o', 'ControlMaster=auto',
559                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
560                     ])
561             args.append(source)
562
563         args.append(dest)
564
565         for x in xrange(retry):
566             # connects to the remote host and starts a remote connection
567             proc = subprocess.Popen(args,
568                     stdout = subprocess.PIPE,
569                     stdin = subprocess.PIPE, 
570                     stderr = subprocess.PIPE)
571             
572             # attach tempfile object to the process, to make sure the file stays
573             # alive until the process is finished with it
574             proc._known_hosts = tmp_known_hosts
575         
576             try:
577                 (out, err) = proc.communicate()
578                 eintr_retry(proc.wait)()
579                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
580                 log(msg, logging.DEBUG, out, err)
581
582                 if proc.poll():
583                     t = x*2
584                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
585                             t, x, host, " ".join(args))
586                     log(msg, logging.DEBUG)
587
588                     time.sleep(t)
589                     continue
590
591                 break
592             except RuntimeError, e:
593                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
594                 log(msg, logging.DEBUG, out, err)
595
596                 if retry <= 0:
597                     raise
598                 retry -= 1
599             
600         return ((out, err), proc)
601
602 def rspawn(command, pidfile, 
603         stdout = '/dev/null', 
604         stderr = STDOUT, 
605         stdin = '/dev/null',
606         home = None, 
607         create_home = False, 
608         sudo = False,
609         host = None, 
610         port = None, 
611         user = None, 
612         agent = None, 
613         identity = None, 
614         server_key = None,
615         tty = False):
616     """
617     Spawn a remote command such that it will continue working asynchronously in 
618     background. 
619
620         :param command: The command to run, it should be a single line.
621         :type command: str
622
623         :param pidfile: Path to a file where to store the pid and ppid of the 
624                         spawned process
625         :type pidfile: str
626
627         :param stdout: Path to file to redirect standard output. 
628                        The default value is /dev/null
629         :type stdout: str
630
631         :param stderr: Path to file to redirect standard error.
632                        If the special STDOUT value is used, stderr will 
633                        be redirected to the same file as stdout
634         :type stderr: str
635
636         :param stdin: Path to a file with input to be piped into the command's standard input
637         :type stdin: str
638
639         :param home: Path to working directory folder. 
640                     It is assumed to exist unless the create_home flag is set.
641         :type home: str
642
643         :param create_home: Flag to force creation of the home folder before 
644                             running the command
645         :type create_home: bool
646  
647         :param sudo: Flag forcing execution with sudo user
648         :type sudo: bool
649         
650         :rtype: touple
651
652         (stdout, stderr), process
653         
654         Of the spawning process, which only captures errors at spawning time.
655         Usually only useful for diagnostics.
656     """
657     # Start process in a "daemonized" way, using nohup and heavy
658     # stdin/out redirection to avoid connection issues
659     if stderr is STDOUT:
660         stderr = '&1'
661     else:
662         stderr = ' ' + stderr
663     
664     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
665         'command' : command,
666         'pidfile' : shell_escape(pidfile),
667         'stdout' : stdout,
668         'stderr' : stderr,
669         'stdin' : stdin,
670     }
671     
672     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
673             'command' : shell_escape(daemon_command),
674             'sudo' : 'sudo -S' if sudo else '',
675             'pidfile' : shell_escape(pidfile),
676             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
677             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
678         }
679
680     (out,err),proc = rexec(
681         cmd,
682         host = host,
683         port = port,
684         user = user,
685         agent = agent,
686         identity = identity,
687         server_key = server_key,
688         tty = tty ,
689         )
690     
691     if proc.wait():
692         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
693
694     return ((out, err), proc)
695
696 @eintr_retry
697 def rgetpid(pidfile,
698         host = None, 
699         port = None, 
700         user = None, 
701         agent = None, 
702         identity = None,
703         server_key = None):
704     """
705     Returns the pid and ppid of a process from a remote file where the 
706     information was stored.
707
708         :param home: Path to directory where the pidfile is located
709         :type home: str
710
711         :param pidfile: Name of file containing the pid information
712         :type pidfile: str
713         
714         :rtype: int
715         
716         A (pid, ppid) tuple useful for calling rstatus and rkill,
717         or None if the pidfile isn't valid yet (can happen when process is staring up)
718
719     """
720     (out,err),proc = rexec(
721         "cat %(pidfile)s" % {
722             'pidfile' : pidfile,
723         },
724         host = host,
725         port = port,
726         user = user,
727         agent = agent,
728         identity = identity,
729         server_key = server_key
730         )
731         
732     if proc.wait():
733         return None
734     
735     if out:
736         try:
737             return map(int,out.strip().split(' ',1))
738         except:
739             # Ignore, many ways to fail that don't matter that much
740             return None
741
742 @eintr_retry
743 def rstatus(pid, ppid, 
744         host = None, 
745         port = None, 
746         user = None, 
747         agent = None, 
748         identity = None,
749         server_key = None):
750     """
751     Returns a code representing the the status of a remote process
752
753         :param pid: Process id of the process
754         :type pid: int
755
756         :param ppid: Parent process id of process
757         :type ppid: int
758     
759         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
760     
761     """
762     (out,err),proc = rexec(
763         # Check only by pid. pid+ppid does not always work (especially with sudo) 
764         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
765             'ppid' : ppid,
766             'pid' : pid,
767         },
768         host = host,
769         port = port,
770         user = user,
771         agent = agent,
772         identity = identity,
773         server_key = server_key
774         )
775     
776     if proc.wait():
777         return ProcStatus.NOT_STARTED
778     
779     status = False
780     if err:
781         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
782             status = True
783     elif out:
784         status = (out.strip() == 'wait')
785     else:
786         return ProcStatus.NOT_STARTED
787     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
788
789 @eintr_retry
790 def rkill(pid, ppid,
791         host = None, 
792         port = None, 
793         user = None, 
794         agent = None, 
795         sudo = False,
796         identity = None, 
797         server_key = None, 
798         nowait = False):
799     """
800     Sends a kill signal to a remote process.
801
802     First tries a SIGTERM, and if the process does not end in 10 seconds,
803     it sends a SIGKILL.
804  
805         :param pid: Process id of process to be killed
806         :type pid: int
807
808         :param ppid: Parent process id of process to be killed
809         :type ppid: int
810
811         :param sudo: Flag indicating if sudo should be used to kill the process
812         :type sudo: bool
813         
814     """
815     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
816     cmd = """
817 SUBKILL="%(subkill)s" ;
818 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
819 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
820 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
821     sleep 0.2 
822     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
823         break
824     else
825         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
826         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
827     fi
828     sleep 1.8
829 done
830 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
831     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
832     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
833 fi
834 """
835     if nowait:
836         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
837
838     (out,err),proc = rexec(
839         cmd % {
840             'ppid' : ppid,
841             'pid' : pid,
842             'sudo' : 'sudo -S' if sudo else '',
843             'subkill' : subkill,
844         },
845         host = host,
846         port = port,
847         user = user,
848         agent = agent,
849         identity = identity,
850         server_key = server_key
851         )
852     
853     # wait, don't leave zombies around
854     proc.wait()
855
856     return (out, err), proc
857
858 # POSIX
859 def _communicate(proc, input, timeout=None, err_on_timeout=True):
860     read_set = []
861     write_set = []
862     stdout = None # Return
863     stderr = None # Return
864     
865     killed = False
866     
867     if timeout is not None:
868         timelimit = time.time() + timeout
869         killtime = timelimit + 4
870         bailtime = timelimit + 4
871
872     if proc.stdin:
873         # Flush stdio buffer.  This might block, if the user has
874         # been writing to .stdin in an uncontrolled fashion.
875         proc.stdin.flush()
876         if input:
877             write_set.append(proc.stdin)
878         else:
879             proc.stdin.close()
880
881     if proc.stdout:
882         read_set.append(proc.stdout)
883         stdout = []
884
885     if proc.stderr:
886         read_set.append(proc.stderr)
887         stderr = []
888
889     input_offset = 0
890     while read_set or write_set:
891         if timeout is not None:
892             curtime = time.time()
893             if timeout is None or curtime > timelimit:
894                 if curtime > bailtime:
895                     break
896                 elif curtime > killtime:
897                     signum = signal.SIGKILL
898                 else:
899                     signum = signal.SIGTERM
900                 # Lets kill it
901                 os.kill(proc.pid, signum)
902                 select_timeout = 0.5
903             else:
904                 select_timeout = timelimit - curtime + 0.1
905         else:
906             select_timeout = 1.0
907         
908         if select_timeout > 1.0:
909             select_timeout = 1.0
910             
911         try:
912             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
913         except select.error,e:
914             if e[0] != 4:
915                 raise
916             else:
917                 continue
918         
919         if not rlist and not wlist and not xlist and proc.poll() is not None:
920             # timeout and process exited, say bye
921             break
922
923         if proc.stdin in wlist:
924             # When select has indicated that the file is writable,
925             # we can write up to PIPE_BUF bytes without risk
926             # blocking.  POSIX defines PIPE_BUF >= 512
927             bytes_written = os.write(proc.stdin.fileno(),
928                     buffer(input, input_offset, 512))
929             input_offset += bytes_written
930
931             if input_offset >= len(input):
932                 proc.stdin.close()
933                 write_set.remove(proc.stdin)
934
935         if proc.stdout in rlist:
936             data = os.read(proc.stdout.fileno(), 1024)
937             if data == "":
938                 proc.stdout.close()
939                 read_set.remove(proc.stdout)
940             stdout.append(data)
941
942         if proc.stderr in rlist:
943             data = os.read(proc.stderr.fileno(), 1024)
944             if data == "":
945                 proc.stderr.close()
946                 read_set.remove(proc.stderr)
947             stderr.append(data)
948     
949     # All data exchanged.  Translate lists into strings.
950     if stdout is not None:
951         stdout = ''.join(stdout)
952     if stderr is not None:
953         stderr = ''.join(stderr)
954
955     # Translate newlines, if requested.  We cannot let the file
956     # object do the translation: It is based on stdio, which is
957     # impossible to combine with select (unless forcing no
958     # buffering).
959     if proc.universal_newlines and hasattr(file, 'newlines'):
960         if stdout:
961             stdout = proc._translate_newlines(stdout)
962         if stderr:
963             stderr = proc._translate_newlines(stderr)
964
965     if killed and err_on_timeout:
966         errcode = proc.poll()
967         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
968     else:
969         if killed:
970             proc.poll()
971         else:
972             proc.wait()
973         return (stdout, stderr)
974