15 logger = logging.getLogger("neco.execution.utils.sshfuncs")
17 if hasattr(os, "devnull"):
20 DEV_NULL = "/dev/null"
22 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
26 Special value that when given to rspawn in stderr causes stderr to
27 redirect to whatever stdout was redirected to.
32 Process is still running
42 Process hasn't started running yet (this should be very rare)
45 hostbyname_cache = dict()
47 def gethostbyname(host):
48 hostbyname = hostbyname_cache.get(host)
50 hostbyname = socket.gethostbyname(host)
51 hostbyname_cache[host] = hostbyname
54 OPENSSH_HAS_PERSIST = None
56 def openssh_has_persist():
57 """ The ssh_config options ControlMaster and ControlPersist allow to
58 reuse a same network connection for multiple ssh sessions. In this
59 way limitations on number of open ssh connections can be bypassed.
60 However, older versions of openSSH do not support this feature.
61 This function is used to determine if ssh connection persist features
64 global OPENSSH_HAS_PERSIST
65 if OPENSSH_HAS_PERSIST is None:
66 proc = subprocess.Popen(["ssh","-v"],
67 stdout = subprocess.PIPE,
68 stderr = subprocess.STDOUT,
69 stdin = open("/dev/null","r") )
70 out,err = proc.communicate()
73 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
74 OPENSSH_HAS_PERSIST = bool(vre.match(out))
75 return OPENSSH_HAS_PERSIST
77 def make_server_key_args(server_key, host, port):
78 """ Returns a reference to a temporary known_hosts file, to which
79 the server key has been added.
81 Make sure to hold onto the temp file reference until the process is
84 :param server_key: the server public key
87 :param host: the hostname
90 :param port: the ssh port
95 host = '%s:%s' % (host, str(port))
97 # Create a temporary server key file
98 tmp_known_hosts = tempfile.NamedTemporaryFile()
100 hostbyname = gethostbyname(host)
102 # Add the intended host key
103 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
105 # If we're not in strict mode, add user-configured keys
106 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
107 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
108 if os.access(user_hosts_path, os.R_OK):
109 f = open(user_hosts_path, "r")
110 tmp_known_hosts.write(f.read())
113 tmp_known_hosts.flush()
115 return tmp_known_hosts
117 def make_control_path(agent, forward_x11):
118 ctrl_path = "/tmp/nepi_ssh"
126 ctrl_path += "-%r@%h:%p"
131 """ Escapes strings so that they are safe to use as command-line
133 if SHELL_SAFE.match(s):
134 # safe string - no escaping needed
137 # unsafe string - escape
139 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
142 return "'$'\\x%02x''" % (ord(c),)
143 s = ''.join(map(escp,s))
146 def eintr_retry(func):
147 """Retries a function invocation when a EINTR occurs"""
149 @functools.wraps(func)
151 retry = kw.pop("_retry", False)
152 for i in xrange(0 if retry else 4):
154 return func(*p, **kw)
155 except (select.error, socket.error), args:
156 if args[0] == errno.EINTR:
161 if e.errno == errno.EINTR:
166 return func(*p, **kw)
169 def rexec(command, host, user,
180 err_on_timeout = True,
181 connect_timeout = 30,
183 forward_x11 = False):
185 Executes a remote command, returns ((stdout,stderr),process)
188 tmp_known_hosts = None
189 hostip = gethostbyname(host)
192 # Don't bother with localhost. Makes test easier
193 '-o', 'NoHostAuthenticationForLocalhost=yes',
194 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
195 '-o', 'ConnectionAttempts=3',
196 '-o', 'ServerAliveInterval=30',
197 '-o', 'TCPKeepAlive=yes',
198 '-l', user, hostip or host]
200 if persistent and openssh_has_persist():
202 '-o', 'ControlMaster=auto',
203 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
204 '-o', 'ControlPersist=60' ])
210 args.append('-p%d' % port)
213 args.extend(('-i', identity))
223 # Create a temporary server key file
224 tmp_known_hosts = make_server_key_args(server_key, host, port)
225 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
229 for x in xrange(retry or 3):
230 # connects to the remote host and starts a remote connection
231 proc = subprocess.Popen(args,
232 stdout = subprocess.PIPE,
233 stdin = subprocess.PIPE,
234 stderr = subprocess.PIPE)
236 # attach tempfile object to the process, to make sure the file stays
237 # alive until the process is finished with it
238 proc._known_hosts = tmp_known_hosts
241 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
242 logger.debug("COMMAND host %s, command %s, out %s, error %s" % (
243 host, " ".join(args), out, err))
246 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
247 # SSH error, can safely retry
250 # Probably timed out or plain failed but can retry
253 except RuntimeError, e:
254 logger.debug("EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % (
255 host, " ".join(args), out, err, e.args))
261 return ((out, err), proc)
263 def rcopy(source, dest,
270 Copies from/to remote sites.
272 Source and destination should have the user and host encoded
275 If source is a file object, a special mode will be used to
276 create the remote file with the same contents.
278 If dest is a file object, the remote file (source) will be
279 read and written into dest.
281 In these modes, recursive cannot be True.
283 Source can be a list of files to copy to a single destination,
284 in which case it is advised that the destination be a folder.
287 logger.debug("SCP %s %s" % (source, dest))
289 if isinstance(source, file) and source.tell() == 0:
291 elif hasattr(source, 'read'):
292 tmp = tempfile.NamedTemporaryFile()
294 buf = source.read(65536)
302 if isinstance(source, file) or isinstance(dest, file) \
303 or hasattr(source, 'read') or hasattr(dest, 'write'):
306 # Parse source/destination as <user>@<server>:<path>
307 if isinstance(dest, basestring) and ':' in dest:
308 remspec, path = dest.split(':',1)
309 elif isinstance(source, basestring) and ':' in source:
310 remspec, path = source.split(':',1)
312 raise ValueError, "Both endpoints cannot be local"
313 user,host = remspec.rsplit('@',1)
315 tmp_known_hosts = None
316 hostip = gethostbyname(host)
318 args = ['ssh', '-l', user, '-C',
319 # Don't bother with localhost. Makes test easier
320 '-o', 'NoHostAuthenticationForLocalhost=yes',
321 '-o', 'ConnectTimeout=60',
322 '-o', 'ConnectionAttempts=3',
323 '-o', 'ServerAliveInterval=30',
324 '-o', 'TCPKeepAlive=yes',
327 if openssh_has_persist():
329 '-o', 'ControlMaster=auto',
330 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
331 '-o', 'ControlPersist=60' ])
334 args.append('-P%d' % port)
337 args.extend(('-i', identity))
340 # Create a temporary server key file
341 tmp_known_hosts = make_server_key_args(server_key, host, port)
342 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
344 if isinstance(source, file) or hasattr(source, 'read'):
345 args.append('cat > %s' % (shell_escape(path),))
346 elif isinstance(dest, file) or hasattr(dest, 'write'):
347 args.append('cat %s' % (shell_escape(path),))
349 raise AssertionError, "Unreachable code reached! :-Q"
351 # connects to the remote host and starts a remote connection
352 if isinstance(source, file):
353 proc = subprocess.Popen(args,
354 stdout = open('/dev/null','w'),
355 stderr = subprocess.PIPE,
357 err = proc.stderr.read()
358 proc._known_hosts = tmp_known_hosts
359 eintr_retry(proc.wait)()
360 return ((None,err), proc)
361 elif isinstance(dest, file):
362 proc = subprocess.Popen(args,
363 stdout = open('/dev/null','w'),
364 stderr = subprocess.PIPE,
366 err = proc.stderr.read()
367 proc._known_hosts = tmp_known_hosts
368 eintr_retry(proc.wait)()
369 return ((None,err), proc)
370 elif hasattr(source, 'read'):
371 # file-like (but not file) source
372 proc = subprocess.Popen(args,
373 stdout = open('/dev/null','w'),
374 stderr = subprocess.PIPE,
375 stdin = subprocess.PIPE)
381 buf = source.read(4096)
386 rdrdy, wrdy, broken = select.select(
389 [proc.stderr,proc.stdin])
391 if proc.stderr in rdrdy:
392 # use os.read for fully unbuffered behavior
393 err.append(os.read(proc.stderr.fileno(), 4096))
395 if proc.stdin in wrdy:
396 proc.stdin.write(buf)
402 err.append(proc.stderr.read())
404 proc._known_hosts = tmp_known_hosts
405 eintr_retry(proc.wait)()
406 return ((None,''.join(err)), proc)
407 elif hasattr(dest, 'write'):
408 # file-like (but not file) dest
409 proc = subprocess.Popen(args,
410 stdout = subprocess.PIPE,
411 stderr = subprocess.PIPE,
412 stdin = open('/dev/null','w'))
417 rdrdy, wrdy, broken = select.select(
418 [proc.stderr, proc.stdout],
420 [proc.stderr, proc.stdout])
422 if proc.stderr in rdrdy:
423 # use os.read for fully unbuffered behavior
424 err.append(os.read(proc.stderr.fileno(), 4096))
426 if proc.stdout in rdrdy:
427 # use os.read for fully unbuffered behavior
428 buf = os.read(proc.stdout.fileno(), 4096)
437 err.append(proc.stderr.read())
439 proc._known_hosts = tmp_known_hosts
440 eintr_retry(proc.wait)()
441 return ((None,''.join(err)), proc)
443 raise AssertionError, "Unreachable code reached! :-Q"
445 # Parse destination as <user>@<server>:<path>
446 if isinstance(dest, basestring) and ':' in dest:
447 remspec, path = dest.split(':',1)
448 elif isinstance(source, basestring) and ':' in source:
449 remspec, path = source.split(':',1)
451 raise ValueError, "Both endpoints cannot be local"
452 user,host = remspec.rsplit('@',1)
455 tmp_known_hosts = None
457 args = ['scp', '-q', '-p', '-C',
458 # Don't bother with localhost. Makes test easier
459 '-o', 'NoHostAuthenticationForLocalhost=yes',
460 '-o', 'ConnectTimeout=60',
461 '-o', 'ConnectionAttempts=3',
462 '-o', 'ServerAliveInterval=30',
463 '-o', 'TCPKeepAlive=yes' ]
466 args.append('-P%d' % port)
472 args.extend(('-i', identity))
475 # Create a temporary server key file
476 tmp_known_hosts = make_server_key_args(server_key, host, port)
477 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
479 if isinstance(source,list):
482 if openssh_has_persist():
484 '-o', 'ControlMaster=auto',
485 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
491 # connects to the remote host and starts a remote connection
492 proc = subprocess.Popen(args,
493 stdout = subprocess.PIPE,
494 stdin = subprocess.PIPE,
495 stderr = subprocess.PIPE)
496 proc._known_hosts = tmp_known_hosts
498 (out, err) = proc.communicate()
499 eintr_retry(proc.wait)()
500 return ((out, err), proc)
502 def rspawn(command, pidfile,
503 stdout = '/dev/null',
517 Spawn a remote command such that it will continue working asynchronously.
520 command: the command to run - it should be a single line.
522 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
524 stdout: path of a file to redirect standard output to - must be a string.
525 Defaults to /dev/null
526 stderr: path of a file to redirect standard error to - string or the special STDOUT value
527 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
528 stdin: path of a file with input to be piped into the command's standard input
530 home: path of a folder to use as working directory - should exist, unless you specify create_home
532 create_home: if True, the home folder will be created first with mkdir -p
534 sudo: whether the command needs to be executed as root
536 host/port/user/agent/identity: see rexec
539 (stdout, stderr), process
541 Of the spawning process, which only captures errors at spawning time.
542 Usually only useful for diagnostics.
544 # Start process in a "daemonized" way, using nohup and heavy
545 # stdin/out redirection to avoid connection issues
549 stderr = ' ' + stderr
551 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
553 'pidfile' : shell_escape(pidfile),
559 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
560 'command' : shell_escape(daemon_command),
561 'sudo' : 'sudo -S' if sudo else '',
562 'pidfile' : shell_escape(pidfile),
563 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
564 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
567 (out,err),proc = rexec(
574 server_key = server_key,
579 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
581 return ((out, err), proc)
584 def rcheckpid(pidfile,
592 Check the pidfile of a process spawned with remote_spawn.
595 pidfile: the pidfile passed to remote_span
597 host/port/user/agent/identity: see rexec
601 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
602 or None if the pidfile isn't valid yet (maybe the process is still starting).
605 (out,err),proc = rexec(
606 "cat %(pidfile)s" % {
614 server_key = server_key
622 return map(int,out.strip().split(' ',1))
624 # Ignore, many ways to fail that don't matter that much
628 def rstatus(pid, ppid,
636 Check the status of a process spawned with remote_spawn.
639 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
641 host/port/user/agent/identity: see rexec
645 One of NOT_STARTED, RUNNING, FINISHED
648 (out,err),proc = rexec(
649 # Check only by pid. pid+ppid does not always work (especially with sudo)
650 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
659 server_key = server_key
667 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
670 status = (out.strip() == 'wait')
673 return RUNNING if status else FINISHED
686 Kill a process spawned with remote_spawn.
688 First tries a SIGTERM, and if the process does not end in 10 seconds,
692 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
694 sudo: whether the command was run with sudo - careful killing like this.
696 host/port/user/agent/identity: see rexec
700 Nothing, should have killed the process
703 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
705 SUBKILL="%(subkill)s" ;
706 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
707 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
708 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
710 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
713 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
714 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
718 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
719 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
720 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
724 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
726 (out,err),proc = rexec(
730 'sudo' : 'sudo -S' if sudo else '',
738 server_key = server_key
741 # wait, don't leave zombies around
744 return (out, err), proc
747 def _communicate(self, input, timeout=None, err_on_timeout=True):
750 stdout = None # Return
751 stderr = None # Return
755 if timeout is not None:
756 timelimit = time.time() + timeout
757 killtime = timelimit + 4
758 bailtime = timelimit + 4
761 # Flush stdio buffer. This might block, if the user has
762 # been writing to .stdin in an uncontrolled fashion.
765 write_set.append(self.stdin)
769 read_set.append(self.stdout)
772 read_set.append(self.stderr)
776 while read_set or write_set:
777 if timeout is not None:
778 curtime = time.time()
779 if timeout is None or curtime > timelimit:
780 if curtime > bailtime:
782 elif curtime > killtime:
783 signum = signal.SIGKILL
785 signum = signal.SIGTERM
787 os.kill(self.pid, signum)
790 select_timeout = timelimit - curtime + 0.1
794 if select_timeout > 1.0:
798 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
799 except select.error,e:
805 if not rlist and not wlist and not xlist and self.poll() is not None:
806 # timeout and process exited, say bye
809 if self.stdin in wlist:
810 # When select has indicated that the file is writable,
811 # we can write up to PIPE_BUF bytes without risk
812 # blocking. POSIX defines PIPE_BUF >= 512
813 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
814 input_offset += bytes_written
815 if input_offset >= len(input):
817 write_set.remove(self.stdin)
819 if self.stdout in rlist:
820 data = os.read(self.stdout.fileno(), 1024)
823 read_set.remove(self.stdout)
826 if self.stderr in rlist:
827 data = os.read(self.stderr.fileno(), 1024)
830 read_set.remove(self.stderr)
833 # All data exchanged. Translate lists into strings.
834 if stdout is not None:
835 stdout = ''.join(stdout)
836 if stderr is not None:
837 stderr = ''.join(stderr)
839 # Translate newlines, if requested. We cannot let the file
840 # object do the translation: It is based on stdio, which is
841 # impossible to combine with select (unless forcing no
843 if self.universal_newlines and hasattr(file, 'newlines'):
845 stdout = self._translate_newlines(stdout)
847 stderr = self._translate_newlines(stderr)
849 if killed and err_on_timeout:
850 errcode = self.poll()
851 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
857 return (stdout, stderr)