15 # TODO: Add retries to rcopy!! rcopy is not being retried!
17 logger = logging.getLogger("sshfuncs")
19 def log(msg, level, out = None, err = None):
21 msg += " - OUT: %s " % out
24 msg += " - ERROR: %s " % err
26 logger.log(level, msg)
29 if hasattr(os, "devnull"):
32 DEV_NULL = "/dev/null"
34 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
38 Special value that when given to rspawn in stderr causes stderr to
39 redirect to whatever stdout was redirected to.
44 Process is still running
54 Process hasn't started running yet (this should be very rare)
57 hostbyname_cache = dict()
59 def gethostbyname(host):
60 global hostbyname_cache
62 hostbyname = hostbyname_cache.get(host)
64 hostbyname = socket.gethostbyname(host)
65 hostbyname_cache[host] = hostbyname
68 OPENSSH_HAS_PERSIST = None
70 def openssh_has_persist():
71 """ The ssh_config options ControlMaster and ControlPersist allow to
72 reuse a same network connection for multiple ssh sessions. In this
73 way limitations on number of open ssh connections can be bypassed.
74 However, older versions of openSSH do not support this feature.
75 This function is used to determine if ssh connection persist features
78 global OPENSSH_HAS_PERSIST
79 if OPENSSH_HAS_PERSIST is None:
80 proc = subprocess.Popen(["ssh","-v"],
81 stdout = subprocess.PIPE,
82 stderr = subprocess.STDOUT,
83 stdin = open("/dev/null","r") )
84 out,err = proc.communicate()
87 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
88 OPENSSH_HAS_PERSIST = bool(vre.match(out))
89 return OPENSSH_HAS_PERSIST
91 def make_server_key_args(server_key, host, port):
92 """ Returns a reference to a temporary known_hosts file, to which
93 the server key has been added.
95 Make sure to hold onto the temp file reference until the process is
98 :param server_key: the server public key
101 :param host: the hostname
104 :param port: the ssh port
109 host = '%s:%s' % (host, str(port))
111 # Create a temporary server key file
112 tmp_known_hosts = tempfile.NamedTemporaryFile()
114 hostbyname = gethostbyname(host)
116 # Add the intended host key
117 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
119 # If we're not in strict mode, add user-configured keys
120 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
121 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
122 if os.access(user_hosts_path, os.R_OK):
123 f = open(user_hosts_path, "r")
124 tmp_known_hosts.write(f.read())
127 tmp_known_hosts.flush()
129 return tmp_known_hosts
131 def make_control_path(agent, forward_x11):
132 ctrl_path = "/tmp/nepi_ssh"
140 ctrl_path += "-%r@%h:%p"
145 """ Escapes strings so that they are safe to use as command-line
147 if SHELL_SAFE.match(s):
148 # safe string - no escaping needed
151 # unsafe string - escape
153 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
156 return "'$'\\x%02x''" % (ord(c),)
157 s = ''.join(map(escp,s))
160 def eintr_retry(func):
161 """Retries a function invocation when a EINTR occurs"""
163 @functools.wraps(func)
165 retry = kw.pop("_retry", False)
166 for i in xrange(0 if retry else 4):
168 return func(*p, **kw)
169 except (select.error, socket.error), args:
170 if args[0] == errno.EINTR:
175 if e.errno == errno.EINTR:
180 return func(*p, **kw)
183 def rexec(command, host, user,
194 err_on_timeout = True,
195 connect_timeout = 30,
198 strict_host_checking = True):
200 Executes a remote command, returns ((stdout,stderr),process)
203 tmp_known_hosts = None
204 hostip = gethostbyname(host)
207 # Don't bother with localhost. Makes test easier
208 '-o', 'NoHostAuthenticationForLocalhost=yes',
209 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
210 '-o', 'ConnectionAttempts=3',
211 '-o', 'ServerAliveInterval=30',
212 '-o', 'TCPKeepAlive=yes',
213 '-l', user, hostip or host]
215 if persistent and openssh_has_persist():
217 '-o', 'ControlMaster=auto',
218 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
219 '-o', 'ControlPersist=60' ])
221 if not strict_host_checking:
222 # Do not check for Host key. Unsafe.
223 args.extend(['-o', 'StrictHostKeyChecking=no'])
229 args.append('-p%d' % port)
232 args.extend(('-i', identity))
242 # Create a temporary server key file
243 tmp_known_hosts = make_server_key_args(server_key, host, port)
244 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
248 for x in xrange(retry):
249 # connects to the remote host and starts a remote connection
250 proc = subprocess.Popen(args,
252 stdout = subprocess.PIPE,
253 stdin = subprocess.PIPE,
254 stderr = subprocess.PIPE)
256 # attach tempfile object to the process, to make sure the file stays
257 # alive until the process is finished with it
258 proc._known_hosts = tmp_known_hosts
261 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
262 msg = " rexec - host %s - command %s " % (host, " ".join(args))
263 log(msg, logging.DEBUG, out, err)
268 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
269 # SSH error, can safely retry
272 # Probably timed out or plain failed but can retry
277 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
278 t, x, host, " ".join(args))
279 log(msg, logging.DEBUG)
284 except RuntimeError, e:
285 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
286 log(msg, logging.DEBUG, out, err)
292 return ((out, err), proc)
294 def rcopy(source, dest,
300 strict_host_checking = True):
302 Copies from/to remote sites.
304 Source and destination should have the user and host encoded
307 If source is a file object, a special mode will be used to
308 create the remote file with the same contents.
310 If dest is a file object, the remote file (source) will be
311 read and written into dest.
313 In these modes, recursive cannot be True.
315 Source can be a list of files to copy to a single destination,
316 in which case it is advised that the destination be a folder.
319 msg = " rcopy - scp %s %s " % (source, dest)
320 log(msg, logging.DEBUG)
322 if isinstance(source, file) and source.tell() == 0:
324 elif hasattr(source, 'read'):
325 tmp = tempfile.NamedTemporaryFile()
327 buf = source.read(65536)
335 if isinstance(source, file) or isinstance(dest, file) \
336 or hasattr(source, 'read') or hasattr(dest, 'write'):
339 # Parse source/destination as <user>@<server>:<path>
340 if isinstance(dest, basestring) and ':' in dest:
341 remspec, path = dest.split(':',1)
342 elif isinstance(source, basestring) and ':' in source:
343 remspec, path = source.split(':',1)
345 raise ValueError, "Both endpoints cannot be local"
346 user,host = remspec.rsplit('@',1)
348 tmp_known_hosts = None
349 hostip = gethostbyname(host)
351 args = ['ssh', '-l', user, '-C',
352 # Don't bother with localhost. Makes test easier
353 '-o', 'NoHostAuthenticationForLocalhost=yes',
354 '-o', 'ConnectTimeout=60',
355 '-o', 'ConnectionAttempts=3',
356 '-o', 'ServerAliveInterval=30',
357 '-o', 'TCPKeepAlive=yes',
360 if openssh_has_persist():
362 '-o', 'ControlMaster=auto',
363 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
364 '-o', 'ControlPersist=60' ])
367 args.append('-P%d' % port)
370 args.extend(('-i', identity))
373 # Create a temporary server key file
374 tmp_known_hosts = make_server_key_args(server_key, host, port)
375 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
377 if isinstance(source, file) or hasattr(source, 'read'):
378 args.append('cat > %s' % (shell_escape(path),))
379 elif isinstance(dest, file) or hasattr(dest, 'write'):
380 args.append('cat %s' % (shell_escape(path),))
382 raise AssertionError, "Unreachable code reached! :-Q"
384 # connects to the remote host and starts a remote connection
385 if isinstance(source, file):
386 proc = subprocess.Popen(args,
387 stdout = open('/dev/null','w'),
388 stderr = subprocess.PIPE,
390 err = proc.stderr.read()
391 proc._known_hosts = tmp_known_hosts
392 eintr_retry(proc.wait)()
393 return ((None,err), proc)
394 elif isinstance(dest, file):
395 proc = subprocess.Popen(args,
396 stdout = open('/dev/null','w'),
397 stderr = subprocess.PIPE,
399 err = proc.stderr.read()
400 proc._known_hosts = tmp_known_hosts
401 eintr_retry(proc.wait)()
402 return ((None,err), proc)
403 elif hasattr(source, 'read'):
404 # file-like (but not file) source
405 proc = subprocess.Popen(args,
406 stdout = open('/dev/null','w'),
407 stderr = subprocess.PIPE,
408 stdin = subprocess.PIPE)
414 buf = source.read(4096)
419 rdrdy, wrdy, broken = select.select(
422 [proc.stderr,proc.stdin])
424 if proc.stderr in rdrdy:
425 # use os.read for fully unbuffered behavior
426 err.append(os.read(proc.stderr.fileno(), 4096))
428 if proc.stdin in wrdy:
429 proc.stdin.write(buf)
435 err.append(proc.stderr.read())
437 proc._known_hosts = tmp_known_hosts
438 eintr_retry(proc.wait)()
439 return ((None,''.join(err)), proc)
440 elif hasattr(dest, 'write'):
441 # file-like (but not file) dest
442 proc = subprocess.Popen(args,
443 stdout = subprocess.PIPE,
444 stderr = subprocess.PIPE,
445 stdin = open('/dev/null','w'))
450 rdrdy, wrdy, broken = select.select(
451 [proc.stderr, proc.stdout],
453 [proc.stderr, proc.stdout])
455 if proc.stderr in rdrdy:
456 # use os.read for fully unbuffered behavior
457 err.append(os.read(proc.stderr.fileno(), 4096))
459 if proc.stdout in rdrdy:
460 # use os.read for fully unbuffered behavior
461 buf = os.read(proc.stdout.fileno(), 4096)
470 err.append(proc.stderr.read())
472 proc._known_hosts = tmp_known_hosts
473 eintr_retry(proc.wait)()
474 return ((None,''.join(err)), proc)
476 raise AssertionError, "Unreachable code reached! :-Q"
478 # Parse destination as <user>@<server>:<path>
479 if isinstance(dest, basestring) and ':' in dest:
480 remspec, path = dest.split(':',1)
481 elif isinstance(source, basestring) and ':' in source:
482 remspec, path = source.split(':',1)
484 raise ValueError, "Both endpoints cannot be local"
485 user,host = remspec.rsplit('@',1)
488 tmp_known_hosts = None
490 args = ['scp', '-q', '-p', '-C',
491 # Don't bother with localhost. Makes test easier
492 '-o', 'NoHostAuthenticationForLocalhost=yes',
493 '-o', 'ConnectTimeout=60',
494 '-o', 'ConnectionAttempts=3',
495 '-o', 'ServerAliveInterval=30',
496 '-o', 'TCPKeepAlive=yes' ]
499 args.append('-P%d' % port)
505 args.extend(('-i', identity))
508 # Create a temporary server key file
509 tmp_known_hosts = make_server_key_args(server_key, host, port)
510 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
512 if not strict_host_checking:
513 # Do not check for Host key. Unsafe.
514 args.extend(['-o', 'StrictHostKeyChecking=no'])
516 if isinstance(source,list):
519 if openssh_has_persist():
521 '-o', 'ControlMaster=auto',
522 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
528 # connects to the remote host and starts a remote connection
529 proc = subprocess.Popen(args,
530 stdout = subprocess.PIPE,
531 stdin = subprocess.PIPE,
532 stderr = subprocess.PIPE)
533 proc._known_hosts = tmp_known_hosts
535 (out, err) = proc.communicate()
536 eintr_retry(proc.wait)()
537 return ((out, err), proc)
539 def rspawn(command, pidfile,
540 stdout = '/dev/null',
554 Spawn a remote command such that it will continue working asynchronously.
557 command: the command to run - it should be a single line.
559 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
561 stdout: path of a file to redirect standard output to - must be a string.
562 Defaults to /dev/null
563 stderr: path of a file to redirect standard error to - string or the special STDOUT value
564 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
565 stdin: path of a file with input to be piped into the command's standard input
567 home: path of a folder to use as working directory - should exist, unless you specify create_home
569 create_home: if True, the home folder will be created first with mkdir -p
571 sudo: whether the command needs to be executed as root
573 host/port/user/agent/identity: see rexec
576 (stdout, stderr), process
578 Of the spawning process, which only captures errors at spawning time.
579 Usually only useful for diagnostics.
581 # Start process in a "daemonized" way, using nohup and heavy
582 # stdin/out redirection to avoid connection issues
586 stderr = ' ' + stderr
588 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
590 'pidfile' : shell_escape(pidfile),
596 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
597 'command' : shell_escape(daemon_command),
598 'sudo' : 'sudo -S' if sudo else '',
599 'pidfile' : shell_escape(pidfile),
600 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
601 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
604 (out,err),proc = rexec(
611 server_key = server_key,
616 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
618 return ((out, err), proc)
621 def rcheckpid(pidfile,
629 Check the pidfile of a process spawned with remote_spawn.
632 pidfile: the pidfile passed to remote_span
634 host/port/user/agent/identity: see rexec
638 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
639 or None if the pidfile isn't valid yet (maybe the process is still starting).
642 (out,err),proc = rexec(
643 "cat %(pidfile)s" % {
651 server_key = server_key
659 return map(int,out.strip().split(' ',1))
661 # Ignore, many ways to fail that don't matter that much
665 def rstatus(pid, ppid,
673 Check the status of a process spawned with remote_spawn.
676 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
678 host/port/user/agent/identity: see rexec
682 One of NOT_STARTED, RUNNING, FINISHED
685 (out,err),proc = rexec(
686 # Check only by pid. pid+ppid does not always work (especially with sudo)
687 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
696 server_key = server_key
704 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
707 status = (out.strip() == 'wait')
710 return RUNNING if status else FINISHED
723 Kill a process spawned with remote_spawn.
725 First tries a SIGTERM, and if the process does not end in 10 seconds,
729 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
731 sudo: whether the command was run with sudo - careful killing like this.
733 host/port/user/agent/identity: see rexec
737 Nothing, should have killed the process
740 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
742 SUBKILL="%(subkill)s" ;
743 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
744 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
745 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
747 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
750 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
751 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
755 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
756 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
757 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
761 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
763 (out,err),proc = rexec(
767 'sudo' : 'sudo -S' if sudo else '',
775 server_key = server_key
778 # wait, don't leave zombies around
781 return (out, err), proc
784 def _communicate(self, input, timeout=None, err_on_timeout=True):
787 stdout = None # Return
788 stderr = None # Return
792 if timeout is not None:
793 timelimit = time.time() + timeout
794 killtime = timelimit + 4
795 bailtime = timelimit + 4
798 # Flush stdio buffer. This might block, if the user has
799 # been writing to .stdin in an uncontrolled fashion.
802 write_set.append(self.stdin)
806 read_set.append(self.stdout)
809 read_set.append(self.stderr)
813 while read_set or write_set:
814 if timeout is not None:
815 curtime = time.time()
816 if timeout is None or curtime > timelimit:
817 if curtime > bailtime:
819 elif curtime > killtime:
820 signum = signal.SIGKILL
822 signum = signal.SIGTERM
824 os.kill(self.pid, signum)
827 select_timeout = timelimit - curtime + 0.1
831 if select_timeout > 1.0:
835 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
836 except select.error,e:
842 if not rlist and not wlist and not xlist and self.poll() is not None:
843 # timeout and process exited, say bye
846 if self.stdin in wlist:
847 # When select has indicated that the file is writable,
848 # we can write up to PIPE_BUF bytes without risk
849 # blocking. POSIX defines PIPE_BUF >= 512
850 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
851 input_offset += bytes_written
852 if input_offset >= len(input):
854 write_set.remove(self.stdin)
856 if self.stdout in rlist:
857 data = os.read(self.stdout.fileno(), 1024)
860 read_set.remove(self.stdout)
863 if self.stderr in rlist:
864 data = os.read(self.stderr.fileno(), 1024)
867 read_set.remove(self.stderr)
870 # All data exchanged. Translate lists into strings.
871 if stdout is not None:
872 stdout = ''.join(stdout)
873 if stderr is not None:
874 stderr = ''.join(stderr)
876 # Translate newlines, if requested. We cannot let the file
877 # object do the translation: It is based on stdio, which is
878 # impossible to combine with select (unless forcing no
880 if self.universal_newlines and hasattr(file, 'newlines'):
882 stdout = self._translate_newlines(stdout)
884 stderr = self._translate_newlines(stderr)
886 if killed and err_on_timeout:
887 errcode = self.poll()
888 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
894 return (stdout, stderr)