16 logger = logging.getLogger("sshfuncs")
18 def log(msg, level, out = None, err = None):
20 msg += " - OUT: %s " % out
23 msg += " - ERROR: %s " % err
25 logger.log(level, msg)
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
37 Special value that when given to rspawn in stderr causes stderr to
38 redirect to whatever stdout was redirected to.
43 Process is still running
53 Process hasn't started running yet (this should be very rare)
56 hostbyname_cache = dict()
58 def gethostbyname(host):
59 hostbyname = hostbyname_cache.get(host)
61 hostbyname = socket.gethostbyname(host)
62 hostbyname_cache[host] = hostbyname
65 OPENSSH_HAS_PERSIST = None
67 def openssh_has_persist():
68 """ The ssh_config options ControlMaster and ControlPersist allow to
69 reuse a same network connection for multiple ssh sessions. In this
70 way limitations on number of open ssh connections can be bypassed.
71 However, older versions of openSSH do not support this feature.
72 This function is used to determine if ssh connection persist features
75 global OPENSSH_HAS_PERSIST
76 if OPENSSH_HAS_PERSIST is None:
77 proc = subprocess.Popen(["ssh","-v"],
78 stdout = subprocess.PIPE,
79 stderr = subprocess.STDOUT,
80 stdin = open("/dev/null","r") )
81 out,err = proc.communicate()
84 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
85 OPENSSH_HAS_PERSIST = bool(vre.match(out))
86 return OPENSSH_HAS_PERSIST
88 def make_server_key_args(server_key, host, port):
89 """ Returns a reference to a temporary known_hosts file, to which
90 the server key has been added.
92 Make sure to hold onto the temp file reference until the process is
95 :param server_key: the server public key
98 :param host: the hostname
101 :param port: the ssh port
106 host = '%s:%s' % (host, str(port))
108 # Create a temporary server key file
109 tmp_known_hosts = tempfile.NamedTemporaryFile()
111 hostbyname = gethostbyname(host)
113 # Add the intended host key
114 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
116 # If we're not in strict mode, add user-configured keys
117 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
118 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
119 if os.access(user_hosts_path, os.R_OK):
120 f = open(user_hosts_path, "r")
121 tmp_known_hosts.write(f.read())
124 tmp_known_hosts.flush()
126 return tmp_known_hosts
128 def make_control_path(agent, forward_x11):
129 ctrl_path = "/tmp/nepi_ssh"
137 ctrl_path += "-%r@%h:%p"
142 """ Escapes strings so that they are safe to use as command-line
144 if SHELL_SAFE.match(s):
145 # safe string - no escaping needed
148 # unsafe string - escape
150 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
153 return "'$'\\x%02x''" % (ord(c),)
154 s = ''.join(map(escp,s))
157 def eintr_retry(func):
158 """Retries a function invocation when a EINTR occurs"""
160 @functools.wraps(func)
162 retry = kw.pop("_retry", False)
163 for i in xrange(0 if retry else 4):
165 return func(*p, **kw)
166 except (select.error, socket.error), args:
167 if args[0] == errno.EINTR:
172 if e.errno == errno.EINTR:
177 return func(*p, **kw)
180 def rexec(command, host, user,
191 err_on_timeout = True,
192 connect_timeout = 30,
194 forward_x11 = False):
196 Executes a remote command, returns ((stdout,stderr),process)
199 tmp_known_hosts = None
200 hostip = gethostbyname(host)
203 # Don't bother with localhost. Makes test easier
204 '-o', 'NoHostAuthenticationForLocalhost=yes',
205 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
206 '-o', 'ConnectionAttempts=3',
207 '-o', 'ServerAliveInterval=30',
208 '-o', 'TCPKeepAlive=yes',
209 '-l', user, hostip or host]
211 if persistent and openssh_has_persist():
213 '-o', 'ControlMaster=auto',
214 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
215 '-o', 'ControlPersist=60' ])
221 args.append('-p%d' % port)
224 args.extend(('-i', identity))
234 # Create a temporary server key file
235 tmp_known_hosts = make_server_key_args(server_key, host, port)
236 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
240 for x in xrange(retry):
241 # connects to the remote host and starts a remote connection
242 proc = subprocess.Popen(args,
243 stdout = subprocess.PIPE,
244 stdin = subprocess.PIPE,
245 stderr = subprocess.PIPE)
247 # attach tempfile object to the process, to make sure the file stays
248 # alive until the process is finished with it
249 proc._known_hosts = tmp_known_hosts
252 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
253 msg = " rexec - host %s - command %s " % (host, " ".join(args))
254 log(msg, logging.DEBUG, out, err)
259 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
260 # SSH error, can safely retry
263 # Probably timed out or plain failed but can retry
268 msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % (
269 t, x, host, " ".join(args))
270 log(msg, logging.DEBUG)
275 except RuntimeError, e:
276 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
277 log(msg, logging.DEBUG, out, err)
283 return ((out, err), proc)
285 def rcopy(source, dest,
292 Copies from/to remote sites.
294 Source and destination should have the user and host encoded
297 If source is a file object, a special mode will be used to
298 create the remote file with the same contents.
300 If dest is a file object, the remote file (source) will be
301 read and written into dest.
303 In these modes, recursive cannot be True.
305 Source can be a list of files to copy to a single destination,
306 in which case it is advised that the destination be a folder.
309 msg = " rcopy - scp %s %s " % (source, dest)
310 log(msg, logging.DEBUG)
312 if isinstance(source, file) and source.tell() == 0:
314 elif hasattr(source, 'read'):
315 tmp = tempfile.NamedTemporaryFile()
317 buf = source.read(65536)
325 if isinstance(source, file) or isinstance(dest, file) \
326 or hasattr(source, 'read') or hasattr(dest, 'write'):
329 # Parse source/destination as <user>@<server>:<path>
330 if isinstance(dest, basestring) and ':' in dest:
331 remspec, path = dest.split(':',1)
332 elif isinstance(source, basestring) and ':' in source:
333 remspec, path = source.split(':',1)
335 raise ValueError, "Both endpoints cannot be local"
336 user,host = remspec.rsplit('@',1)
338 tmp_known_hosts = None
339 hostip = gethostbyname(host)
341 args = ['ssh', '-l', user, '-C',
342 # Don't bother with localhost. Makes test easier
343 '-o', 'NoHostAuthenticationForLocalhost=yes',
344 '-o', 'ConnectTimeout=60',
345 '-o', 'ConnectionAttempts=3',
346 '-o', 'ServerAliveInterval=30',
347 '-o', 'TCPKeepAlive=yes',
350 if openssh_has_persist():
352 '-o', 'ControlMaster=auto',
353 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
354 '-o', 'ControlPersist=60' ])
357 args.append('-P%d' % port)
360 args.extend(('-i', identity))
363 # Create a temporary server key file
364 tmp_known_hosts = make_server_key_args(server_key, host, port)
365 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
367 if isinstance(source, file) or hasattr(source, 'read'):
368 args.append('cat > %s' % (shell_escape(path),))
369 elif isinstance(dest, file) or hasattr(dest, 'write'):
370 args.append('cat %s' % (shell_escape(path),))
372 raise AssertionError, "Unreachable code reached! :-Q"
374 # connects to the remote host and starts a remote connection
375 if isinstance(source, file):
376 proc = subprocess.Popen(args,
377 stdout = open('/dev/null','w'),
378 stderr = subprocess.PIPE,
380 err = proc.stderr.read()
381 proc._known_hosts = tmp_known_hosts
382 eintr_retry(proc.wait)()
383 return ((None,err), proc)
384 elif isinstance(dest, file):
385 proc = subprocess.Popen(args,
386 stdout = open('/dev/null','w'),
387 stderr = subprocess.PIPE,
389 err = proc.stderr.read()
390 proc._known_hosts = tmp_known_hosts
391 eintr_retry(proc.wait)()
392 return ((None,err), proc)
393 elif hasattr(source, 'read'):
394 # file-like (but not file) source
395 proc = subprocess.Popen(args,
396 stdout = open('/dev/null','w'),
397 stderr = subprocess.PIPE,
398 stdin = subprocess.PIPE)
404 buf = source.read(4096)
409 rdrdy, wrdy, broken = select.select(
412 [proc.stderr,proc.stdin])
414 if proc.stderr in rdrdy:
415 # use os.read for fully unbuffered behavior
416 err.append(os.read(proc.stderr.fileno(), 4096))
418 if proc.stdin in wrdy:
419 proc.stdin.write(buf)
425 err.append(proc.stderr.read())
427 proc._known_hosts = tmp_known_hosts
428 eintr_retry(proc.wait)()
429 return ((None,''.join(err)), proc)
430 elif hasattr(dest, 'write'):
431 # file-like (but not file) dest
432 proc = subprocess.Popen(args,
433 stdout = subprocess.PIPE,
434 stderr = subprocess.PIPE,
435 stdin = open('/dev/null','w'))
440 rdrdy, wrdy, broken = select.select(
441 [proc.stderr, proc.stdout],
443 [proc.stderr, proc.stdout])
445 if proc.stderr in rdrdy:
446 # use os.read for fully unbuffered behavior
447 err.append(os.read(proc.stderr.fileno(), 4096))
449 if proc.stdout in rdrdy:
450 # use os.read for fully unbuffered behavior
451 buf = os.read(proc.stdout.fileno(), 4096)
460 err.append(proc.stderr.read())
462 proc._known_hosts = tmp_known_hosts
463 eintr_retry(proc.wait)()
464 return ((None,''.join(err)), proc)
466 raise AssertionError, "Unreachable code reached! :-Q"
468 # Parse destination as <user>@<server>:<path>
469 if isinstance(dest, basestring) and ':' in dest:
470 remspec, path = dest.split(':',1)
471 elif isinstance(source, basestring) and ':' in source:
472 remspec, path = source.split(':',1)
474 raise ValueError, "Both endpoints cannot be local"
475 user,host = remspec.rsplit('@',1)
478 tmp_known_hosts = None
480 args = ['scp', '-q', '-p', '-C',
481 # Don't bother with localhost. Makes test easier
482 '-o', 'NoHostAuthenticationForLocalhost=yes',
483 '-o', 'ConnectTimeout=60',
484 '-o', 'ConnectionAttempts=3',
485 '-o', 'ServerAliveInterval=30',
486 '-o', 'TCPKeepAlive=yes' ]
489 args.append('-P%d' % port)
495 args.extend(('-i', identity))
498 # Create a temporary server key file
499 tmp_known_hosts = make_server_key_args(server_key, host, port)
500 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
502 if isinstance(source,list):
505 if openssh_has_persist():
507 '-o', 'ControlMaster=auto',
508 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
514 # connects to the remote host and starts a remote connection
515 proc = subprocess.Popen(args,
516 stdout = subprocess.PIPE,
517 stdin = subprocess.PIPE,
518 stderr = subprocess.PIPE)
519 proc._known_hosts = tmp_known_hosts
521 (out, err) = proc.communicate()
522 eintr_retry(proc.wait)()
523 return ((out, err), proc)
525 def rspawn(command, pidfile,
526 stdout = '/dev/null',
540 Spawn a remote command such that it will continue working asynchronously.
543 command: the command to run - it should be a single line.
545 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
547 stdout: path of a file to redirect standard output to - must be a string.
548 Defaults to /dev/null
549 stderr: path of a file to redirect standard error to - string or the special STDOUT value
550 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
551 stdin: path of a file with input to be piped into the command's standard input
553 home: path of a folder to use as working directory - should exist, unless you specify create_home
555 create_home: if True, the home folder will be created first with mkdir -p
557 sudo: whether the command needs to be executed as root
559 host/port/user/agent/identity: see rexec
562 (stdout, stderr), process
564 Of the spawning process, which only captures errors at spawning time.
565 Usually only useful for diagnostics.
567 # Start process in a "daemonized" way, using nohup and heavy
568 # stdin/out redirection to avoid connection issues
572 stderr = ' ' + stderr
574 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
576 'pidfile' : shell_escape(pidfile),
582 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
583 'command' : shell_escape(daemon_command),
584 'sudo' : 'sudo -S' if sudo else '',
585 'pidfile' : shell_escape(pidfile),
586 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
587 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
590 (out,err),proc = rexec(
597 server_key = server_key,
602 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
604 return ((out, err), proc)
607 def rcheckpid(pidfile,
615 Check the pidfile of a process spawned with remote_spawn.
618 pidfile: the pidfile passed to remote_span
620 host/port/user/agent/identity: see rexec
624 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
625 or None if the pidfile isn't valid yet (maybe the process is still starting).
628 (out,err),proc = rexec(
629 "cat %(pidfile)s" % {
637 server_key = server_key
645 return map(int,out.strip().split(' ',1))
647 # Ignore, many ways to fail that don't matter that much
651 def rstatus(pid, ppid,
659 Check the status of a process spawned with remote_spawn.
662 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
664 host/port/user/agent/identity: see rexec
668 One of NOT_STARTED, RUNNING, FINISHED
671 (out,err),proc = rexec(
672 # Check only by pid. pid+ppid does not always work (especially with sudo)
673 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
682 server_key = server_key
690 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
693 status = (out.strip() == 'wait')
696 return RUNNING if status else FINISHED
709 Kill a process spawned with remote_spawn.
711 First tries a SIGTERM, and if the process does not end in 10 seconds,
715 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
717 sudo: whether the command was run with sudo - careful killing like this.
719 host/port/user/agent/identity: see rexec
723 Nothing, should have killed the process
726 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
728 SUBKILL="%(subkill)s" ;
729 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
730 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
731 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
733 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
736 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
737 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
741 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
742 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
743 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
747 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
749 (out,err),proc = rexec(
753 'sudo' : 'sudo -S' if sudo else '',
761 server_key = server_key
764 # wait, don't leave zombies around
767 return (out, err), proc
770 def _communicate(self, input, timeout=None, err_on_timeout=True):
773 stdout = None # Return
774 stderr = None # Return
778 if timeout is not None:
779 timelimit = time.time() + timeout
780 killtime = timelimit + 4
781 bailtime = timelimit + 4
784 # Flush stdio buffer. This might block, if the user has
785 # been writing to .stdin in an uncontrolled fashion.
788 write_set.append(self.stdin)
792 read_set.append(self.stdout)
795 read_set.append(self.stderr)
799 while read_set or write_set:
800 if timeout is not None:
801 curtime = time.time()
802 if timeout is None or curtime > timelimit:
803 if curtime > bailtime:
805 elif curtime > killtime:
806 signum = signal.SIGKILL
808 signum = signal.SIGTERM
810 os.kill(self.pid, signum)
813 select_timeout = timelimit - curtime + 0.1
817 if select_timeout > 1.0:
821 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
822 except select.error,e:
828 if not rlist and not wlist and not xlist and self.poll() is not None:
829 # timeout and process exited, say bye
832 if self.stdin in wlist:
833 # When select has indicated that the file is writable,
834 # we can write up to PIPE_BUF bytes without risk
835 # blocking. POSIX defines PIPE_BUF >= 512
836 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
837 input_offset += bytes_written
838 if input_offset >= len(input):
840 write_set.remove(self.stdin)
842 if self.stdout in rlist:
843 data = os.read(self.stdout.fileno(), 1024)
846 read_set.remove(self.stdout)
849 if self.stderr in rlist:
850 data = os.read(self.stderr.fileno(), 1024)
853 read_set.remove(self.stderr)
856 # All data exchanged. Translate lists into strings.
857 if stdout is not None:
858 stdout = ''.join(stdout)
859 if stderr is not None:
860 stderr = ''.join(stderr)
862 # Translate newlines, if requested. We cannot let the file
863 # object do the translation: It is based on stdio, which is
864 # impossible to combine with select (unless forcing no
866 if self.universal_newlines and hasattr(file, 'newlines'):
868 stdout = self._translate_newlines(stdout)
870 stderr = self._translate_newlines(stderr)
872 if killed and err_on_timeout:
873 errcode = self.poll()
874 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
880 return (stdout, stderr)