16 logger = logging.getLogger("sshfuncs")
18 def log(msg, level, out = None, err = None):
20 msg += " - OUT: %s " % out
23 msg += " - ERROR: %s " % err
25 logger.log(level, msg)
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
37 Special value that when given to rspawn in stderr causes stderr to
38 redirect to whatever stdout was redirected to.
43 Process is still running
53 Process hasn't started running yet (this should be very rare)
56 hostbyname_cache = dict()
58 def gethostbyname(host):
59 hostbyname = hostbyname_cache.get(host)
61 hostbyname = socket.gethostbyname(host)
62 hostbyname_cache[host] = hostbyname
65 OPENSSH_HAS_PERSIST = None
67 def openssh_has_persist():
68 """ The ssh_config options ControlMaster and ControlPersist allow to
69 reuse a same network connection for multiple ssh sessions. In this
70 way limitations on number of open ssh connections can be bypassed.
71 However, older versions of openSSH do not support this feature.
72 This function is used to determine if ssh connection persist features
75 global OPENSSH_HAS_PERSIST
76 if OPENSSH_HAS_PERSIST is None:
77 proc = subprocess.Popen(["ssh","-v"],
78 stdout = subprocess.PIPE,
79 stderr = subprocess.STDOUT,
80 stdin = open("/dev/null","r") )
81 out,err = proc.communicate()
84 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
85 OPENSSH_HAS_PERSIST = bool(vre.match(out))
86 return OPENSSH_HAS_PERSIST
88 def make_server_key_args(server_key, host, port):
89 """ Returns a reference to a temporary known_hosts file, to which
90 the server key has been added.
92 Make sure to hold onto the temp file reference until the process is
95 :param server_key: the server public key
98 :param host: the hostname
101 :param port: the ssh port
106 host = '%s:%s' % (host, str(port))
108 # Create a temporary server key file
109 tmp_known_hosts = tempfile.NamedTemporaryFile()
111 hostbyname = gethostbyname(host)
113 # Add the intended host key
114 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
116 # If we're not in strict mode, add user-configured keys
117 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
118 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
119 if os.access(user_hosts_path, os.R_OK):
120 f = open(user_hosts_path, "r")
121 tmp_known_hosts.write(f.read())
124 tmp_known_hosts.flush()
126 return tmp_known_hosts
128 def make_control_path(agent, forward_x11):
129 ctrl_path = "/tmp/nepi_ssh"
137 ctrl_path += "-%r@%h:%p"
142 """ Escapes strings so that they are safe to use as command-line
144 if SHELL_SAFE.match(s):
145 # safe string - no escaping needed
148 # unsafe string - escape
150 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
153 return "'$'\\x%02x''" % (ord(c),)
154 s = ''.join(map(escp,s))
157 def eintr_retry(func):
158 """Retries a function invocation when a EINTR occurs"""
160 @functools.wraps(func)
162 retry = kw.pop("_retry", False)
163 for i in xrange(0 if retry else 4):
165 return func(*p, **kw)
166 except (select.error, socket.error), args:
167 if args[0] == errno.EINTR:
172 if e.errno == errno.EINTR:
177 return func(*p, **kw)
180 def rexec(command, host, user,
191 err_on_timeout = True,
192 connect_timeout = 30,
194 forward_x11 = False):
196 Executes a remote command, returns ((stdout,stderr),process)
199 tmp_known_hosts = None
200 hostip = gethostbyname(host)
203 # Don't bother with localhost. Makes test easier
204 '-o', 'NoHostAuthenticationForLocalhost=yes',
205 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
206 '-o', 'ConnectionAttempts=3',
207 '-o', 'ServerAliveInterval=30',
208 '-o', 'TCPKeepAlive=yes',
209 '-l', user, hostip or host]
211 if persistent and openssh_has_persist():
213 '-o', 'ControlMaster=auto',
214 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
215 '-o', 'ControlPersist=60' ])
221 args.append('-p%d' % port)
224 args.extend(('-i', identity))
234 # Create a temporary server key file
235 tmp_known_hosts = make_server_key_args(server_key, host, port)
236 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
240 for x in xrange(retry):
241 # connects to the remote host and starts a remote connection
242 proc = subprocess.Popen(args,
244 stdout = subprocess.PIPE,
245 stdin = subprocess.PIPE,
246 stderr = subprocess.PIPE)
248 # attach tempfile object to the process, to make sure the file stays
249 # alive until the process is finished with it
250 proc._known_hosts = tmp_known_hosts
253 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
254 msg = " rexec - host %s - command %s " % (host, " ".join(args))
255 log(msg, logging.DEBUG, out, err)
260 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
261 # SSH error, can safely retry
264 # Probably timed out or plain failed but can retry
269 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
270 t, x, host, " ".join(args))
271 log(msg, logging.DEBUG)
276 except RuntimeError, e:
277 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
278 log(msg, logging.DEBUG, out, err)
284 return ((out, err), proc)
286 def rcopy(source, dest,
293 Copies from/to remote sites.
295 Source and destination should have the user and host encoded
298 If source is a file object, a special mode will be used to
299 create the remote file with the same contents.
301 If dest is a file object, the remote file (source) will be
302 read and written into dest.
304 In these modes, recursive cannot be True.
306 Source can be a list of files to copy to a single destination,
307 in which case it is advised that the destination be a folder.
310 msg = " rcopy - scp %s %s " % (source, dest)
311 log(msg, logging.DEBUG)
313 if isinstance(source, file) and source.tell() == 0:
315 elif hasattr(source, 'read'):
316 tmp = tempfile.NamedTemporaryFile()
318 buf = source.read(65536)
326 if isinstance(source, file) or isinstance(dest, file) \
327 or hasattr(source, 'read') or hasattr(dest, 'write'):
330 # Parse source/destination as <user>@<server>:<path>
331 if isinstance(dest, basestring) and ':' in dest:
332 remspec, path = dest.split(':',1)
333 elif isinstance(source, basestring) and ':' in source:
334 remspec, path = source.split(':',1)
336 raise ValueError, "Both endpoints cannot be local"
337 user,host = remspec.rsplit('@',1)
339 tmp_known_hosts = None
340 hostip = gethostbyname(host)
342 args = ['ssh', '-l', user, '-C',
343 # Don't bother with localhost. Makes test easier
344 '-o', 'NoHostAuthenticationForLocalhost=yes',
345 '-o', 'ConnectTimeout=60',
346 '-o', 'ConnectionAttempts=3',
347 '-o', 'ServerAliveInterval=30',
348 '-o', 'TCPKeepAlive=yes',
351 if openssh_has_persist():
353 '-o', 'ControlMaster=auto',
354 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
355 '-o', 'ControlPersist=60' ])
358 args.append('-P%d' % port)
361 args.extend(('-i', identity))
364 # Create a temporary server key file
365 tmp_known_hosts = make_server_key_args(server_key, host, port)
366 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
368 if isinstance(source, file) or hasattr(source, 'read'):
369 args.append('cat > %s' % (shell_escape(path),))
370 elif isinstance(dest, file) or hasattr(dest, 'write'):
371 args.append('cat %s' % (shell_escape(path),))
373 raise AssertionError, "Unreachable code reached! :-Q"
375 # connects to the remote host and starts a remote connection
376 if isinstance(source, file):
377 proc = subprocess.Popen(args,
378 stdout = open('/dev/null','w'),
379 stderr = subprocess.PIPE,
381 err = proc.stderr.read()
382 proc._known_hosts = tmp_known_hosts
383 eintr_retry(proc.wait)()
384 return ((None,err), proc)
385 elif isinstance(dest, file):
386 proc = subprocess.Popen(args,
387 stdout = open('/dev/null','w'),
388 stderr = subprocess.PIPE,
390 err = proc.stderr.read()
391 proc._known_hosts = tmp_known_hosts
392 eintr_retry(proc.wait)()
393 return ((None,err), proc)
394 elif hasattr(source, 'read'):
395 # file-like (but not file) source
396 proc = subprocess.Popen(args,
397 stdout = open('/dev/null','w'),
398 stderr = subprocess.PIPE,
399 stdin = subprocess.PIPE)
405 buf = source.read(4096)
410 rdrdy, wrdy, broken = select.select(
413 [proc.stderr,proc.stdin])
415 if proc.stderr in rdrdy:
416 # use os.read for fully unbuffered behavior
417 err.append(os.read(proc.stderr.fileno(), 4096))
419 if proc.stdin in wrdy:
420 proc.stdin.write(buf)
426 err.append(proc.stderr.read())
428 proc._known_hosts = tmp_known_hosts
429 eintr_retry(proc.wait)()
430 return ((None,''.join(err)), proc)
431 elif hasattr(dest, 'write'):
432 # file-like (but not file) dest
433 proc = subprocess.Popen(args,
434 stdout = subprocess.PIPE,
435 stderr = subprocess.PIPE,
436 stdin = open('/dev/null','w'))
441 rdrdy, wrdy, broken = select.select(
442 [proc.stderr, proc.stdout],
444 [proc.stderr, proc.stdout])
446 if proc.stderr in rdrdy:
447 # use os.read for fully unbuffered behavior
448 err.append(os.read(proc.stderr.fileno(), 4096))
450 if proc.stdout in rdrdy:
451 # use os.read for fully unbuffered behavior
452 buf = os.read(proc.stdout.fileno(), 4096)
461 err.append(proc.stderr.read())
463 proc._known_hosts = tmp_known_hosts
464 eintr_retry(proc.wait)()
465 return ((None,''.join(err)), proc)
467 raise AssertionError, "Unreachable code reached! :-Q"
469 # Parse destination as <user>@<server>:<path>
470 if isinstance(dest, basestring) and ':' in dest:
471 remspec, path = dest.split(':',1)
472 elif isinstance(source, basestring) and ':' in source:
473 remspec, path = source.split(':',1)
475 raise ValueError, "Both endpoints cannot be local"
476 user,host = remspec.rsplit('@',1)
479 tmp_known_hosts = None
481 args = ['scp', '-q', '-p', '-C',
482 # Don't bother with localhost. Makes test easier
483 '-o', 'NoHostAuthenticationForLocalhost=yes',
484 '-o', 'ConnectTimeout=60',
485 '-o', 'ConnectionAttempts=3',
486 '-o', 'ServerAliveInterval=30',
487 '-o', 'TCPKeepAlive=yes' ]
490 args.append('-P%d' % port)
496 args.extend(('-i', identity))
499 # Create a temporary server key file
500 tmp_known_hosts = make_server_key_args(server_key, host, port)
501 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
503 if isinstance(source,list):
506 if openssh_has_persist():
508 '-o', 'ControlMaster=auto',
509 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
515 # connects to the remote host and starts a remote connection
516 proc = subprocess.Popen(args,
517 stdout = subprocess.PIPE,
518 stdin = subprocess.PIPE,
519 stderr = subprocess.PIPE)
520 proc._known_hosts = tmp_known_hosts
522 (out, err) = proc.communicate()
523 eintr_retry(proc.wait)()
524 return ((out, err), proc)
526 def rspawn(command, pidfile,
527 stdout = '/dev/null',
541 Spawn a remote command such that it will continue working asynchronously.
544 command: the command to run - it should be a single line.
546 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
548 stdout: path of a file to redirect standard output to - must be a string.
549 Defaults to /dev/null
550 stderr: path of a file to redirect standard error to - string or the special STDOUT value
551 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
552 stdin: path of a file with input to be piped into the command's standard input
554 home: path of a folder to use as working directory - should exist, unless you specify create_home
556 create_home: if True, the home folder will be created first with mkdir -p
558 sudo: whether the command needs to be executed as root
560 host/port/user/agent/identity: see rexec
563 (stdout, stderr), process
565 Of the spawning process, which only captures errors at spawning time.
566 Usually only useful for diagnostics.
568 # Start process in a "daemonized" way, using nohup and heavy
569 # stdin/out redirection to avoid connection issues
573 stderr = ' ' + stderr
575 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
577 'pidfile' : shell_escape(pidfile),
583 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
584 'command' : shell_escape(daemon_command),
585 'sudo' : 'sudo -S' if sudo else '',
586 'pidfile' : shell_escape(pidfile),
587 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
588 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
591 (out,err),proc = rexec(
598 server_key = server_key,
603 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
605 return ((out, err), proc)
608 def rcheckpid(pidfile,
616 Check the pidfile of a process spawned with remote_spawn.
619 pidfile: the pidfile passed to remote_span
621 host/port/user/agent/identity: see rexec
625 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
626 or None if the pidfile isn't valid yet (maybe the process is still starting).
629 (out,err),proc = rexec(
630 "cat %(pidfile)s" % {
638 server_key = server_key
646 return map(int,out.strip().split(' ',1))
648 # Ignore, many ways to fail that don't matter that much
652 def rstatus(pid, ppid,
660 Check the status of a process spawned with remote_spawn.
663 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
665 host/port/user/agent/identity: see rexec
669 One of NOT_STARTED, RUNNING, FINISHED
672 (out,err),proc = rexec(
673 # Check only by pid. pid+ppid does not always work (especially with sudo)
674 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
683 server_key = server_key
691 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
694 status = (out.strip() == 'wait')
697 return RUNNING if status else FINISHED
710 Kill a process spawned with remote_spawn.
712 First tries a SIGTERM, and if the process does not end in 10 seconds,
716 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
718 sudo: whether the command was run with sudo - careful killing like this.
720 host/port/user/agent/identity: see rexec
724 Nothing, should have killed the process
727 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
729 SUBKILL="%(subkill)s" ;
730 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
731 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
732 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
734 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
737 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
738 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
742 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
743 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
744 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
748 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
750 (out,err),proc = rexec(
754 'sudo' : 'sudo -S' if sudo else '',
762 server_key = server_key
765 # wait, don't leave zombies around
768 return (out, err), proc
771 def _communicate(self, input, timeout=None, err_on_timeout=True):
774 stdout = None # Return
775 stderr = None # Return
779 if timeout is not None:
780 timelimit = time.time() + timeout
781 killtime = timelimit + 4
782 bailtime = timelimit + 4
785 # Flush stdio buffer. This might block, if the user has
786 # been writing to .stdin in an uncontrolled fashion.
789 write_set.append(self.stdin)
793 read_set.append(self.stdout)
796 read_set.append(self.stderr)
800 while read_set or write_set:
801 if timeout is not None:
802 curtime = time.time()
803 if timeout is None or curtime > timelimit:
804 if curtime > bailtime:
806 elif curtime > killtime:
807 signum = signal.SIGKILL
809 signum = signal.SIGTERM
811 os.kill(self.pid, signum)
814 select_timeout = timelimit - curtime + 0.1
818 if select_timeout > 1.0:
822 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
823 except select.error,e:
829 if not rlist and not wlist and not xlist and self.poll() is not None:
830 # timeout and process exited, say bye
833 if self.stdin in wlist:
834 # When select has indicated that the file is writable,
835 # we can write up to PIPE_BUF bytes without risk
836 # blocking. POSIX defines PIPE_BUF >= 512
837 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
838 input_offset += bytes_written
839 if input_offset >= len(input):
841 write_set.remove(self.stdin)
843 if self.stdout in rlist:
844 data = os.read(self.stdout.fileno(), 1024)
847 read_set.remove(self.stdout)
850 if self.stderr in rlist:
851 data = os.read(self.stderr.fileno(), 1024)
854 read_set.remove(self.stderr)
857 # All data exchanged. Translate lists into strings.
858 if stdout is not None:
859 stdout = ''.join(stdout)
860 if stderr is not None:
861 stderr = ''.join(stderr)
863 # Translate newlines, if requested. We cannot let the file
864 # object do the translation: It is based on stdio, which is
865 # impossible to combine with select (unless forcing no
867 if self.universal_newlines and hasattr(file, 'newlines'):
869 stdout = self._translate_newlines(stdout)
871 stderr = self._translate_newlines(stderr)
873 if killed and err_on_timeout:
874 errcode = self.poll()
875 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
881 return (stdout, stderr)