16 logger = logging.getLogger("sshfuncs")
18 def log(msg, level, out = None, err = None):
20 msg += " - OUT: %s " % out
23 msg += " - ERROR: %s " % err
25 logger.log(level, msg)
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
37 Special value that when given to rspawn in stderr causes stderr to
38 redirect to whatever stdout was redirected to.
43 Process is still running
53 Process hasn't started running yet (this should be very rare)
56 hostbyname_cache = dict()
57 hostbyname_cache_lock = threading.Lock()
59 def gethostbyname(host):
60 global hostbyname_cache
61 global hostbyname_cache_lock
63 hostbyname = hostbyname_cache.get(host)
65 with hostbyname_cache_lock:
66 hostbyname = socket.gethostbyname(host)
67 hostbyname_cache[host] = hostbyname
69 msg = " Added hostbyname %s - %s " % (host, hostbyname)
70 log(msg, logging.DEBUG)
74 OPENSSH_HAS_PERSIST = None
76 def openssh_has_persist():
77 """ The ssh_config options ControlMaster and ControlPersist allow to
78 reuse a same network connection for multiple ssh sessions. In this
79 way limitations on number of open ssh connections can be bypassed.
80 However, older versions of openSSH do not support this feature.
81 This function is used to determine if ssh connection persist features
84 global OPENSSH_HAS_PERSIST
85 if OPENSSH_HAS_PERSIST is None:
86 proc = subprocess.Popen(["ssh","-v"],
87 stdout = subprocess.PIPE,
88 stderr = subprocess.STDOUT,
89 stdin = open("/dev/null","r") )
90 out,err = proc.communicate()
93 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
94 OPENSSH_HAS_PERSIST = bool(vre.match(out))
95 return OPENSSH_HAS_PERSIST
97 def make_server_key_args(server_key, host, port):
98 """ Returns a reference to a temporary known_hosts file, to which
99 the server key has been added.
101 Make sure to hold onto the temp file reference until the process is
104 :param server_key: the server public key
105 :type server_key: str
107 :param host: the hostname
110 :param port: the ssh port
115 host = '%s:%s' % (host, str(port))
117 # Create a temporary server key file
118 tmp_known_hosts = tempfile.NamedTemporaryFile()
120 hostbyname = gethostbyname(host)
122 # Add the intended host key
123 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
125 # If we're not in strict mode, add user-configured keys
126 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
127 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
128 if os.access(user_hosts_path, os.R_OK):
129 f = open(user_hosts_path, "r")
130 tmp_known_hosts.write(f.read())
133 tmp_known_hosts.flush()
135 return tmp_known_hosts
137 def make_control_path(agent, forward_x11):
138 ctrl_path = "/tmp/nepi_ssh"
146 ctrl_path += "-%r@%h:%p"
151 """ Escapes strings so that they are safe to use as command-line
153 if SHELL_SAFE.match(s):
154 # safe string - no escaping needed
157 # unsafe string - escape
159 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
162 return "'$'\\x%02x''" % (ord(c),)
163 s = ''.join(map(escp,s))
166 def eintr_retry(func):
167 """Retries a function invocation when a EINTR occurs"""
169 @functools.wraps(func)
171 retry = kw.pop("_retry", False)
172 for i in xrange(0 if retry else 4):
174 return func(*p, **kw)
175 except (select.error, socket.error), args:
176 if args[0] == errno.EINTR:
181 if e.errno == errno.EINTR:
186 return func(*p, **kw)
189 def rexec(command, host, user,
200 err_on_timeout = True,
201 connect_timeout = 30,
204 strict_host_checking = True):
206 Executes a remote command, returns ((stdout,stderr),process)
209 tmp_known_hosts = None
210 hostip = gethostbyname(host)
213 # Don't bother with localhost. Makes test easier
214 '-o', 'NoHostAuthenticationForLocalhost=yes',
215 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
216 '-o', 'ConnectionAttempts=3',
217 '-o', 'ServerAliveInterval=30',
218 '-o', 'TCPKeepAlive=yes',
219 '-l', user, hostip or host]
221 if persistent and openssh_has_persist():
223 '-o', 'ControlMaster=auto',
224 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
225 '-o', 'ControlPersist=60' ])
227 if not strict_host_checking:
228 # Do not check for Host key. Unsafe.
229 args.extend(['-o', 'StrictHostKeyChecking=no'])
235 args.append('-p%d' % port)
238 args.extend(('-i', identity))
248 # Create a temporary server key file
249 tmp_known_hosts = make_server_key_args(server_key, host, port)
250 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
254 for x in xrange(retry):
255 # connects to the remote host and starts a remote connection
256 proc = subprocess.Popen(args,
258 stdout = subprocess.PIPE,
259 stdin = subprocess.PIPE,
260 stderr = subprocess.PIPE)
262 # attach tempfile object to the process, to make sure the file stays
263 # alive until the process is finished with it
264 proc._known_hosts = tmp_known_hosts
267 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
268 msg = " rexec - host %s - command %s " % (host, " ".join(args))
269 log(msg, logging.DEBUG, out, err)
274 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
275 # SSH error, can safely retry
278 # Probably timed out or plain failed but can retry
283 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
284 t, x, host, " ".join(args))
285 log(msg, logging.DEBUG)
290 except RuntimeError, e:
291 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
292 log(msg, logging.DEBUG, out, err)
298 return ((out, err), proc)
300 def rcopy(source, dest,
307 strict_host_checking = True):
309 Copies from/to remote sites.
311 Source and destination should have the user and host encoded
314 If source is a file object, a special mode will be used to
315 create the remote file with the same contents.
317 If dest is a file object, the remote file (source) will be
318 read and written into dest.
320 In these modes, recursive cannot be True.
322 Source can be a list of files to copy to a single destination,
323 in which case it is advised that the destination be a folder.
326 if isinstance(source, file) and source.tell() == 0:
328 elif hasattr(source, 'read'):
329 tmp = tempfile.NamedTemporaryFile()
331 buf = source.read(65536)
339 if isinstance(source, file) or isinstance(dest, file) \
340 or hasattr(source, 'read') or hasattr(dest, 'write'):
343 # Parse source/destination as <user>@<server>:<path>
344 if isinstance(dest, basestring) and ':' in dest:
345 remspec, path = dest.split(':',1)
346 elif isinstance(source, basestring) and ':' in source:
347 remspec, path = source.split(':',1)
349 raise ValueError, "Both endpoints cannot be local"
350 user,host = remspec.rsplit('@',1)
352 tmp_known_hosts = None
353 hostip = gethostbyname(host)
355 args = ['ssh', '-l', user, '-C',
356 # Don't bother with localhost. Makes test easier
357 '-o', 'NoHostAuthenticationForLocalhost=yes',
358 '-o', 'ConnectTimeout=60',
359 '-o', 'ConnectionAttempts=3',
360 '-o', 'ServerAliveInterval=30',
361 '-o', 'TCPKeepAlive=yes',
364 if openssh_has_persist():
366 '-o', 'ControlMaster=auto',
367 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
368 '-o', 'ControlPersist=60' ])
371 args.append('-P%d' % port)
374 args.extend(('-i', identity))
377 # Create a temporary server key file
378 tmp_known_hosts = make_server_key_args(server_key, host, port)
379 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
381 if isinstance(source, file) or hasattr(source, 'read'):
382 args.append('cat > %s' % (shell_escape(path),))
383 elif isinstance(dest, file) or hasattr(dest, 'write'):
384 args.append('cat %s' % (shell_escape(path),))
386 raise AssertionError, "Unreachable code reached! :-Q"
388 # connects to the remote host and starts a remote connection
389 if isinstance(source, file):
390 proc = subprocess.Popen(args,
391 stdout = open('/dev/null','w'),
392 stderr = subprocess.PIPE,
394 err = proc.stderr.read()
395 proc._known_hosts = tmp_known_hosts
396 eintr_retry(proc.wait)()
397 return ((None,err), proc)
398 elif isinstance(dest, file):
399 proc = subprocess.Popen(args,
400 stdout = open('/dev/null','w'),
401 stderr = subprocess.PIPE,
403 err = proc.stderr.read()
404 proc._known_hosts = tmp_known_hosts
405 eintr_retry(proc.wait)()
406 return ((None,err), proc)
407 elif hasattr(source, 'read'):
408 # file-like (but not file) source
409 proc = subprocess.Popen(args,
410 stdout = open('/dev/null','w'),
411 stderr = subprocess.PIPE,
412 stdin = subprocess.PIPE)
418 buf = source.read(4096)
423 rdrdy, wrdy, broken = select.select(
426 [proc.stderr,proc.stdin])
428 if proc.stderr in rdrdy:
429 # use os.read for fully unbuffered behavior
430 err.append(os.read(proc.stderr.fileno(), 4096))
432 if proc.stdin in wrdy:
433 proc.stdin.write(buf)
439 err.append(proc.stderr.read())
441 proc._known_hosts = tmp_known_hosts
442 eintr_retry(proc.wait)()
443 return ((None,''.join(err)), proc)
444 elif hasattr(dest, 'write'):
445 # file-like (but not file) dest
446 proc = subprocess.Popen(args,
447 stdout = subprocess.PIPE,
448 stderr = subprocess.PIPE,
449 stdin = open('/dev/null','w'))
454 rdrdy, wrdy, broken = select.select(
455 [proc.stderr, proc.stdout],
457 [proc.stderr, proc.stdout])
459 if proc.stderr in rdrdy:
460 # use os.read for fully unbuffered behavior
461 err.append(os.read(proc.stderr.fileno(), 4096))
463 if proc.stdout in rdrdy:
464 # use os.read for fully unbuffered behavior
465 buf = os.read(proc.stdout.fileno(), 4096)
474 err.append(proc.stderr.read())
476 proc._known_hosts = tmp_known_hosts
477 eintr_retry(proc.wait)()
478 return ((None,''.join(err)), proc)
480 raise AssertionError, "Unreachable code reached! :-Q"
482 # Parse destination as <user>@<server>:<path>
483 if isinstance(dest, basestring) and ':' in dest:
484 remspec, path = dest.split(':',1)
485 elif isinstance(source, basestring) and ':' in source:
486 remspec, path = source.split(':',1)
488 raise ValueError, "Both endpoints cannot be local"
489 user,host = remspec.rsplit('@',1)
492 tmp_known_hosts = None
494 args = ['scp', '-q', '-p', '-C',
495 # Don't bother with localhost. Makes test easier
496 '-o', 'NoHostAuthenticationForLocalhost=yes',
497 '-o', 'ConnectTimeout=60',
498 '-o', 'ConnectionAttempts=3',
499 '-o', 'ServerAliveInterval=30',
500 '-o', 'TCPKeepAlive=yes' ]
503 args.append('-P%d' % port)
509 args.extend(('-i', identity))
512 # Create a temporary server key file
513 tmp_known_hosts = make_server_key_args(server_key, host, port)
514 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
516 if not strict_host_checking:
517 # Do not check for Host key. Unsafe.
518 args.extend(['-o', 'StrictHostKeyChecking=no'])
520 if isinstance(source,list):
523 if openssh_has_persist():
525 '-o', 'ControlMaster=auto',
526 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
532 for x in xrange(retry):
533 # connects to the remote host and starts a remote connection
534 proc = subprocess.Popen(args,
535 stdout = subprocess.PIPE,
536 stdin = subprocess.PIPE,
537 stderr = subprocess.PIPE)
539 # attach tempfile object to the process, to make sure the file stays
540 # alive until the process is finished with it
541 proc._known_hosts = tmp_known_hosts
544 (out, err) = proc.communicate()
545 eintr_retry(proc.wait)()
546 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
547 log(msg, logging.DEBUG, out, err)
551 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
552 t, x, host, " ".join(args))
553 log(msg, logging.DEBUG)
559 except RuntimeError, e:
560 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
561 log(msg, logging.DEBUG, out, err)
567 return ((out, err), proc)
569 def rspawn(command, pidfile,
570 stdout = '/dev/null',
584 Spawn a remote command such that it will continue working asynchronously.
587 command: the command to run - it should be a single line.
589 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
591 stdout: path of a file to redirect standard output to - must be a string.
592 Defaults to /dev/null
593 stderr: path of a file to redirect standard error to - string or the special STDOUT value
594 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
595 stdin: path of a file with input to be piped into the command's standard input
597 home: path of a folder to use as working directory - should exist, unless you specify create_home
599 create_home: if True, the home folder will be created first with mkdir -p
601 sudo: whether the command needs to be executed as root
603 host/port/user/agent/identity: see rexec
606 (stdout, stderr), process
608 Of the spawning process, which only captures errors at spawning time.
609 Usually only useful for diagnostics.
611 # Start process in a "daemonized" way, using nohup and heavy
612 # stdin/out redirection to avoid connection issues
616 stderr = ' ' + stderr
618 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
620 'pidfile' : shell_escape(pidfile),
626 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
627 'command' : shell_escape(daemon_command),
628 'sudo' : 'sudo -S' if sudo else '',
629 'pidfile' : shell_escape(pidfile),
630 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
631 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
634 (out,err),proc = rexec(
641 server_key = server_key,
646 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
648 return ((out, err), proc)
651 def rcheckpid(pidfile,
659 Check the pidfile of a process spawned with remote_spawn.
662 pidfile: the pidfile passed to remote_span
664 host/port/user/agent/identity: see rexec
668 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
669 or None if the pidfile isn't valid yet (maybe the process is still starting).
672 (out,err),proc = rexec(
673 "cat %(pidfile)s" % {
681 server_key = server_key
689 return map(int,out.strip().split(' ',1))
691 # Ignore, many ways to fail that don't matter that much
695 def rstatus(pid, ppid,
703 Check the status of a process spawned with remote_spawn.
706 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
708 host/port/user/agent/identity: see rexec
712 One of NOT_STARTED, RUNNING, FINISHED
715 (out,err),proc = rexec(
716 # Check only by pid. pid+ppid does not always work (especially with sudo)
717 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
726 server_key = server_key
734 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
737 status = (out.strip() == 'wait')
740 return RUNNING if status else FINISHED
753 Kill a process spawned with remote_spawn.
755 First tries a SIGTERM, and if the process does not end in 10 seconds,
759 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
761 sudo: whether the command was run with sudo - careful killing like this.
763 host/port/user/agent/identity: see rexec
767 Nothing, should have killed the process
770 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
772 SUBKILL="%(subkill)s" ;
773 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
774 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
775 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
777 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
780 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
781 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
785 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
786 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
787 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
791 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
793 (out,err),proc = rexec(
797 'sudo' : 'sudo -S' if sudo else '',
805 server_key = server_key
808 # wait, don't leave zombies around
811 return (out, err), proc
814 def _communicate(self, input, timeout=None, err_on_timeout=True):
817 stdout = None # Return
818 stderr = None # Return
822 if timeout is not None:
823 timelimit = time.time() + timeout
824 killtime = timelimit + 4
825 bailtime = timelimit + 4
828 # Flush stdio buffer. This might block, if the user has
829 # been writing to .stdin in an uncontrolled fashion.
832 write_set.append(self.stdin)
836 read_set.append(self.stdout)
839 read_set.append(self.stderr)
843 while read_set or write_set:
844 if timeout is not None:
845 curtime = time.time()
846 if timeout is None or curtime > timelimit:
847 if curtime > bailtime:
849 elif curtime > killtime:
850 signum = signal.SIGKILL
852 signum = signal.SIGTERM
854 os.kill(self.pid, signum)
857 select_timeout = timelimit - curtime + 0.1
861 if select_timeout > 1.0:
865 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
866 except select.error,e:
872 if not rlist and not wlist and not xlist and self.poll() is not None:
873 # timeout and process exited, say bye
876 if self.stdin in wlist:
877 # When select has indicated that the file is writable,
878 # we can write up to PIPE_BUF bytes without risk
879 # blocking. POSIX defines PIPE_BUF >= 512
880 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
881 input_offset += bytes_written
882 if input_offset >= len(input):
884 write_set.remove(self.stdin)
886 if self.stdout in rlist:
887 data = os.read(self.stdout.fileno(), 1024)
890 read_set.remove(self.stdout)
893 if self.stderr in rlist:
894 data = os.read(self.stderr.fileno(), 1024)
897 read_set.remove(self.stderr)
900 # All data exchanged. Translate lists into strings.
901 if stdout is not None:
902 stdout = ''.join(stdout)
903 if stderr is not None:
904 stderr = ''.join(stderr)
906 # Translate newlines, if requested. We cannot let the file
907 # object do the translation: It is based on stdio, which is
908 # impossible to combine with select (unless forcing no
910 if self.universal_newlines and hasattr(file, 'newlines'):
912 stdout = self._translate_newlines(stdout)
914 stderr = self._translate_newlines(stderr)
916 if killed and err_on_timeout:
917 errcode = self.poll()
918 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
924 return (stdout, stderr)