Adding BatchMode=yes for rexec func
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-o', 'Batchmode=yes',
243             '-l', user, hostip or host]
244
245     if persistent and openssh_has_persist():
246         args.extend([
247             '-o', 'ControlMaster=auto',
248             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249             '-o', 'ControlPersist=60' ])
250
251     if not strict_host_checking:
252         # Do not check for Host key. Unsafe.
253         args.extend(['-o', 'StrictHostKeyChecking=no'])
254
255     if gw:
256         if gwuser:
257             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258         else:
259             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260         args.extend(['-o', proxycommand])
261
262     if agent:
263         args.append('-A')
264
265     if port:
266         args.append('-p%d' % port)
267
268     if identity:
269         args.extend(('-i', identity))
270
271     if tty:
272         args.append('-t')
273         args.append('-t')
274
275     if forward_x11:
276         args.append('-X')
277
278     if server_key:
279         # Create a temporary server key file
280         tmp_known_hosts = make_server_key_args(server_key, host, port)
281         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
282
283     if sudo:
284         command = "sudo " + command
285
286     args.append(command)
287
288     for x in xrange(retry):
289         # connects to the remote host and starts a remote connection
290         proc = subprocess.Popen(args,
291                 env = env,
292                 stdout = subprocess.PIPE,
293                 stdin = subprocess.PIPE, 
294                 stderr = subprocess.PIPE)
295        
296         # attach tempfile object to the process, to make sure the file stays
297         # alive until the process is finished with it
298         proc._known_hosts = tmp_known_hosts
299     
300         # by default, rexec calls _communicate which will block 
301         # until the process has exit. The argument block == False 
302         # forces to rexec to return immediately, without blocking 
303         try:
304             if blocking:
305                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
306             else:
307                 err = proc.stderr.read()
308                 out = proc.stdout.read()
309
310             msg = " rexec - host %s - command %s " % (host, " ".join(args))
311             log(msg, logging.DEBUG, out, err)
312
313             if proc.poll():
314                 skip = False
315
316                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
317                     # SSH error, can safely retry
318                     skip = True 
319                 elif retry:
320                     # Probably timed out or plain failed but can retry
321                     skip = True 
322                 
323                 if skip:
324                     t = x*2
325                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
326                             t, x, host, " ".join(args))
327                     log(msg, logging.DEBUG)
328
329                     time.sleep(t)
330                     continue
331             break
332         except RuntimeError, e:
333             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
334             log(msg, logging.DEBUG, out, err)
335
336             if retry <= 0:
337                 raise
338             retry -= 1
339         
340     return ((out, err), proc)
341
342 def rcopy(source, dest,
343         port = None,
344         gwuser = None,
345         gw = None,
346         agent = True, 
347         recursive = False,
348         identity = None,
349         server_key = None,
350         retry = 3,
351         strict_host_checking = True):
352     """
353     Copies from/to remote sites.
354     
355     Source and destination should have the user and host encoded
356     as per scp specs.
357     
358     If source is a file object, a special mode will be used to
359     create the remote file with the same contents.
360     
361     If dest is a file object, the remote file (source) will be
362     read and written into dest.
363     
364     In these modes, recursive cannot be True.
365     
366     Source can be a list of files to copy to a single destination,
367     in which case it is advised that the destination be a folder.
368     """
369     
370     if isinstance(source, file) and source.tell() == 0:
371         source = source.name
372     elif hasattr(source, 'read'):
373         tmp = tempfile.NamedTemporaryFile()
374         while True:
375             buf = source.read(65536)
376             if buf:
377                 tmp.write(buf)
378             else:
379                 break
380         tmp.seek(0)
381         source = tmp.name
382     
383     if isinstance(source, file) or isinstance(dest, file) \
384             or hasattr(source, 'read')  or hasattr(dest, 'write'):
385         assert not recursive
386         
387         # Parse source/destination as <user>@<server>:<path>
388         if isinstance(dest, basestring) and ':' in dest:
389             remspec, path = dest.split(':',1)
390         elif isinstance(source, basestring) and ':' in source:
391             remspec, path = source.split(':',1)
392         else:
393             raise ValueError, "Both endpoints cannot be local"
394         user,host = remspec.rsplit('@',1)
395         
396         tmp_known_hosts = None
397         if not gw:
398             hostip = gethostbyname(host)
399         else: hostip = None
400         
401         args = ['ssh', '-l', user, '-C',
402                 # Don't bother with localhost. Makes test easier
403                 '-o', 'NoHostAuthenticationForLocalhost=yes',
404                 '-o', 'ConnectTimeout=60',
405                 '-o', 'ConnectionAttempts=3',
406                 '-o', 'ServerAliveInterval=30',
407                 '-o', 'TCPKeepAlive=yes',
408                 hostip or host ]
409
410         if openssh_has_persist():
411             args.extend([
412                 '-o', 'ControlMaster=auto',
413                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
414                 '-o', 'ControlPersist=60' ])
415
416         if gw:
417             if gwuser:
418                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
419             else:
420                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
421             args.extend(['-o', proxycommand])
422
423         if port:
424             args.append('-P%d' % port)
425
426         if identity:
427             args.extend(('-i', identity))
428
429         if server_key:
430             # Create a temporary server key file
431             tmp_known_hosts = make_server_key_args(server_key, host, port)
432             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
433         
434         if isinstance(source, file) or hasattr(source, 'read'):
435             args.append('cat > %s' % (shell_escape(path),))
436         elif isinstance(dest, file) or hasattr(dest, 'write'):
437             args.append('cat %s' % (shell_escape(path),))
438         else:
439             raise AssertionError, "Unreachable code reached! :-Q"
440         
441         # connects to the remote host and starts a remote connection
442         if isinstance(source, file):
443             proc = subprocess.Popen(args, 
444                     stdout = open('/dev/null','w'),
445                     stderr = subprocess.PIPE,
446                     stdin = source)
447             err = proc.stderr.read()
448             proc._known_hosts = tmp_known_hosts
449             eintr_retry(proc.wait)()
450             return ((None,err), proc)
451         elif isinstance(dest, file):
452             proc = subprocess.Popen(args, 
453                     stdout = open('/dev/null','w'),
454                     stderr = subprocess.PIPE,
455                     stdin = source)
456             err = proc.stderr.read()
457             proc._known_hosts = tmp_known_hosts
458             eintr_retry(proc.wait)()
459             return ((None,err), proc)
460         elif hasattr(source, 'read'):
461             # file-like (but not file) source
462             proc = subprocess.Popen(args, 
463                     stdout = open('/dev/null','w'),
464                     stderr = subprocess.PIPE,
465                     stdin = subprocess.PIPE)
466             
467             buf = None
468             err = []
469             while True:
470                 if not buf:
471                     buf = source.read(4096)
472                 if not buf:
473                     #EOF
474                     break
475                 
476                 rdrdy, wrdy, broken = select.select(
477                     [proc.stderr],
478                     [proc.stdin],
479                     [proc.stderr,proc.stdin])
480                 
481                 if proc.stderr in rdrdy:
482                     # use os.read for fully unbuffered behavior
483                     err.append(os.read(proc.stderr.fileno(), 4096))
484                 
485                 if proc.stdin in wrdy:
486                     proc.stdin.write(buf)
487                     buf = None
488                 
489                 if broken:
490                     break
491             proc.stdin.close()
492             err.append(proc.stderr.read())
493                 
494             proc._known_hosts = tmp_known_hosts
495             eintr_retry(proc.wait)()
496             return ((None,''.join(err)), proc)
497         elif hasattr(dest, 'write'):
498             # file-like (but not file) dest
499             proc = subprocess.Popen(args, 
500                     stdout = subprocess.PIPE,
501                     stderr = subprocess.PIPE,
502                     stdin = open('/dev/null','w'))
503             
504             buf = None
505             err = []
506             while True:
507                 rdrdy, wrdy, broken = select.select(
508                     [proc.stderr, proc.stdout],
509                     [],
510                     [proc.stderr, proc.stdout])
511                 
512                 if proc.stderr in rdrdy:
513                     # use os.read for fully unbuffered behavior
514                     err.append(os.read(proc.stderr.fileno(), 4096))
515                 
516                 if proc.stdout in rdrdy:
517                     # use os.read for fully unbuffered behavior
518                     buf = os.read(proc.stdout.fileno(), 4096)
519                     dest.write(buf)
520                     
521                     if not buf:
522                         #EOF
523                         break
524                 
525                 if broken:
526                     break
527             err.append(proc.stderr.read())
528                 
529             proc._known_hosts = tmp_known_hosts
530             eintr_retry(proc.wait)()
531             return ((None,''.join(err)), proc)
532         else:
533             raise AssertionError, "Unreachable code reached! :-Q"
534     else:
535         # Parse destination as <user>@<server>:<path>
536         if isinstance(dest, basestring) and ':' in dest:
537             remspec, path = dest.split(':',1)
538         elif isinstance(source, basestring) and ':' in source:
539             remspec, path = source.split(':',1)
540         else:
541             raise ValueError, "Both endpoints cannot be local"
542         user,host = remspec.rsplit('@',1)
543         
544         # plain scp
545         tmp_known_hosts = None
546
547         args = ['scp', '-q', '-p', '-C',
548                 # Speed up transfer using blowfish cypher specification which is 
549                 # faster than the default one (3des)
550                 '-c', 'blowfish',
551                 # Don't bother with localhost. Makes test easier
552                 '-o', 'NoHostAuthenticationForLocalhost=yes',
553                 '-o', 'ConnectTimeout=60',
554                 '-o', 'ConnectionAttempts=3',
555                 '-o', 'ServerAliveInterval=30',
556                 '-o', 'TCPKeepAlive=yes' ]
557                 
558         if port:
559             args.append('-P%d' % port)
560
561         if gw:
562             if gwuser:
563                 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
564             else:
565                 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
566             args.extend(['-o', proxycommand])
567
568         if recursive:
569             args.append('-r')
570
571         if identity:
572             args.extend(('-i', identity))
573
574         if server_key:
575             # Create a temporary server key file
576             tmp_known_hosts = make_server_key_args(server_key, host, port)
577             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
578
579         if not strict_host_checking:
580             # Do not check for Host key. Unsafe.
581             args.extend(['-o', 'StrictHostKeyChecking=no'])
582
583         if isinstance(source,list):
584             args.extend(source)
585         else:
586             if openssh_has_persist():
587                 args.extend([
588                     '-o', 'ControlMaster=auto',
589                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
590                     ])
591             args.append(source)
592
593         args.append(dest)
594         
595         for x in xrange(retry):
596             # connects to the remote host and starts a remote connection
597             proc = subprocess.Popen(args,
598                     stdout = subprocess.PIPE,
599                     stdin = subprocess.PIPE, 
600                     stderr = subprocess.PIPE)
601             
602             # attach tempfile object to the process, to make sure the file stays
603             # alive until the process is finished with it
604             proc._known_hosts = tmp_known_hosts
605         
606             try:
607                 (out, err) = proc.communicate()
608                 eintr_retry(proc.wait)()
609                 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
610                 log(msg, logging.DEBUG, out, err)
611
612                 if proc.poll():
613                     t = x*2
614                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
615                             t, x, host, " ".join(args))
616                     log(msg, logging.DEBUG)
617
618                     time.sleep(t)
619                     continue
620
621                 break
622             except RuntimeError, e:
623                 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
624                 log(msg, logging.DEBUG, out, err)
625
626                 if retry <= 0:
627                     raise
628                 retry -= 1
629             
630         return ((out, err), proc)
631
632 def rspawn(command, pidfile, 
633         stdout = '/dev/null', 
634         stderr = STDOUT, 
635         stdin = '/dev/null',
636         home = None, 
637         create_home = False, 
638         sudo = False,
639         host = None, 
640         port = None, 
641         user = None, 
642         gwuser = None,
643         gw = None,
644         agent = None, 
645         identity = None, 
646         server_key = None,
647         tty = False):
648     """
649     Spawn a remote command such that it will continue working asynchronously in 
650     background. 
651
652         :param command: The command to run, it should be a single line.
653         :type command: str
654
655         :param pidfile: Path to a file where to store the pid and ppid of the 
656                         spawned process
657         :type pidfile: str
658
659         :param stdout: Path to file to redirect standard output. 
660                        The default value is /dev/null
661         :type stdout: str
662
663         :param stderr: Path to file to redirect standard error.
664                        If the special STDOUT value is used, stderr will 
665                        be redirected to the same file as stdout
666         :type stderr: str
667
668         :param stdin: Path to a file with input to be piped into the command's standard input
669         :type stdin: str
670
671         :param home: Path to working directory folder. 
672                     It is assumed to exist unless the create_home flag is set.
673         :type home: str
674
675         :param create_home: Flag to force creation of the home folder before 
676                             running the command
677         :type create_home: bool
678  
679         :param sudo: Flag forcing execution with sudo user
680         :type sudo: bool
681         
682         :rtype: touple
683
684         (stdout, stderr), process
685         
686         Of the spawning process, which only captures errors at spawning time.
687         Usually only useful for diagnostics.
688     """
689     # Start process in a "daemonized" way, using nohup and heavy
690     # stdin/out redirection to avoid connection issues
691     if stderr is STDOUT:
692         stderr = '&1'
693     else:
694         stderr = ' ' + stderr
695     
696     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
697         'command' : command,
698         'pidfile' : shell_escape(pidfile),
699         'stdout' : stdout,
700         'stderr' : stderr,
701         'stdin' : stdin,
702     }
703     
704     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
705             'command' : shell_escape(daemon_command),
706             'sudo' : 'sudo -S' if sudo else '',
707             'pidfile' : shell_escape(pidfile),
708             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
709             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
710         }
711
712     (out,err),proc = rexec(
713         cmd,
714         host = host,
715         port = port,
716         user = user,
717         gwuser = gwuser,
718         gw = gw,
719         agent = agent,
720         identity = identity,
721         server_key = server_key,
722         tty = tty ,
723         )
724     
725     if proc.wait():
726         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
727
728     return ((out, err), proc)
729
730 @eintr_retry
731 def rgetpid(pidfile,
732         host = None, 
733         port = None, 
734         user = None, 
735         gwuser = None,
736         gw = None,
737         agent = None, 
738         identity = None,
739         server_key = None):
740     """
741     Returns the pid and ppid of a process from a remote file where the 
742     information was stored.
743
744         :param home: Path to directory where the pidfile is located
745         :type home: str
746
747         :param pidfile: Name of file containing the pid information
748         :type pidfile: str
749         
750         :rtype: int
751         
752         A (pid, ppid) tuple useful for calling rstatus and rkill,
753         or None if the pidfile isn't valid yet (can happen when process is staring up)
754
755     """
756     (out,err),proc = rexec(
757         "cat %(pidfile)s" % {
758             'pidfile' : pidfile,
759         },
760         host = host,
761         port = port,
762         user = user,
763         gwuser = gwuser,
764         gw = gw,
765         agent = agent,
766         identity = identity,
767         server_key = server_key
768         )
769         
770     if proc.wait():
771         return None
772     
773     if out:
774         try:
775             return map(int,out.strip().split(' ',1))
776         except:
777             # Ignore, many ways to fail that don't matter that much
778             return None
779
780 @eintr_retry
781 def rstatus(pid, ppid, 
782         host = None, 
783         port = None, 
784         user = None, 
785         gwuser = None,
786         gw = None,
787         agent = None, 
788         identity = None,
789         server_key = None):
790     """
791     Returns a code representing the the status of a remote process
792
793         :param pid: Process id of the process
794         :type pid: int
795
796         :param ppid: Parent process id of process
797         :type ppid: int
798     
799         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
800     
801     """
802     (out,err),proc = rexec(
803         # Check only by pid. pid+ppid does not always work (especially with sudo) 
804         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
805             'ppid' : ppid,
806             'pid' : pid,
807         },
808         host = host,
809         port = port,
810         user = user,
811         gwuser = gwuser,
812         gw = gw,
813         agent = agent,
814         identity = identity,
815         server_key = server_key
816         )
817     
818     if proc.wait():
819         return ProcStatus.NOT_STARTED
820     
821     status = False
822     if err:
823         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
824             status = True
825     elif out:
826         status = (out.strip() == 'wait')
827     else:
828         return ProcStatus.NOT_STARTED
829     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
830
831 @eintr_retry
832 def rkill(pid, ppid,
833         host = None, 
834         port = None, 
835         user = None, 
836         gwuser = None,
837         gw = None,
838         agent = None, 
839         sudo = False,
840         identity = None, 
841         server_key = None, 
842         nowait = False):
843     """
844     Sends a kill signal to a remote process.
845
846     First tries a SIGTERM, and if the process does not end in 10 seconds,
847     it sends a SIGKILL.
848  
849         :param pid: Process id of process to be killed
850         :type pid: int
851
852         :param ppid: Parent process id of process to be killed
853         :type ppid: int
854
855         :param sudo: Flag indicating if sudo should be used to kill the process
856         :type sudo: bool
857         
858     """
859     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
860     cmd = """
861 SUBKILL="%(subkill)s" ;
862 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
863 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
864 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
865     sleep 0.2 
866     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
867         break
868     else
869         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
870         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
871     fi
872     sleep 1.8
873 done
874 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
875     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
876     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
877 fi
878 """
879     if nowait:
880         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
881
882     (out,err),proc = rexec(
883         cmd % {
884             'ppid' : ppid,
885             'pid' : pid,
886             'sudo' : 'sudo -S' if sudo else '',
887             'subkill' : subkill,
888         },
889         host = host,
890         port = port,
891         user = user,
892         gwuser = gwuser,
893         gw = gw,
894         agent = agent,
895         identity = identity,
896         server_key = server_key
897         )
898     
899     # wait, don't leave zombies around
900     proc.wait()
901
902     return (out, err), proc
903
904 # POSIX
905 def _communicate(proc, input, timeout=None, err_on_timeout=True):
906     read_set = []
907     write_set = []
908     stdout = None # Return
909     stderr = None # Return
910     
911     killed = False
912     
913     if timeout is not None:
914         timelimit = time.time() + timeout
915         killtime = timelimit + 4
916         bailtime = timelimit + 4
917
918     if proc.stdin:
919         # Flush stdio buffer.  This might block, if the user has
920         # been writing to .stdin in an uncontrolled fashion.
921         proc.stdin.flush()
922         if input:
923             write_set.append(proc.stdin)
924         else:
925             proc.stdin.close()
926
927     if proc.stdout:
928         read_set.append(proc.stdout)
929         stdout = []
930
931     if proc.stderr:
932         read_set.append(proc.stderr)
933         stderr = []
934
935     input_offset = 0
936     while read_set or write_set:
937         if timeout is not None:
938             curtime = time.time()
939             if timeout is None or curtime > timelimit:
940                 if curtime > bailtime:
941                     break
942                 elif curtime > killtime:
943                     signum = signal.SIGKILL
944                 else:
945                     signum = signal.SIGTERM
946                 # Lets kill it
947                 os.kill(proc.pid, signum)
948                 select_timeout = 0.5
949             else:
950                 select_timeout = timelimit - curtime + 0.1
951         else:
952             select_timeout = 1.0
953         
954         if select_timeout > 1.0:
955             select_timeout = 1.0
956             
957         try:
958             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
959         except select.error,e:
960             if e[0] != 4:
961                 raise
962             else:
963                 continue
964         
965         if not rlist and not wlist and not xlist and proc.poll() is not None:
966             # timeout and process exited, say bye
967             break
968
969         if proc.stdin in wlist:
970             # When select has indicated that the file is writable,
971             # we can write up to PIPE_BUF bytes without risk
972             # blocking.  POSIX defines PIPE_BUF >= 512
973             bytes_written = os.write(proc.stdin.fileno(),
974                     buffer(input, input_offset, 512))
975             input_offset += bytes_written
976
977             if input_offset >= len(input):
978                 proc.stdin.close()
979                 write_set.remove(proc.stdin)
980
981         if proc.stdout in rlist:
982             data = os.read(proc.stdout.fileno(), 1024)
983             if data == "":
984                 proc.stdout.close()
985                 read_set.remove(proc.stdout)
986             stdout.append(data)
987
988         if proc.stderr in rlist:
989             data = os.read(proc.stderr.fileno(), 1024)
990             if data == "":
991                 proc.stderr.close()
992                 read_set.remove(proc.stderr)
993             stderr.append(data)
994     
995     # All data exchanged.  Translate lists into strings.
996     if stdout is not None:
997         stdout = ''.join(stdout)
998     if stderr is not None:
999         stderr = ''.join(stderr)
1000
1001     # Translate newlines, if requested.  We cannot let the file
1002     # object do the translation: It is based on stdio, which is
1003     # impossible to combine with select (unless forcing no
1004     # buffering).
1005     if proc.universal_newlines and hasattr(file, 'newlines'):
1006         if stdout:
1007             stdout = proc._translate_newlines(stdout)
1008         if stderr:
1009             stderr = proc._translate_newlines(stderr)
1010
1011     if killed and err_on_timeout:
1012         errcode = proc.poll()
1013         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1014     else:
1015         if killed:
1016             proc.poll()
1017         else:
1018             proc.wait()
1019         return (stdout, stderr)
1020