2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
220 err_on_timeout = True,
221 connect_timeout = 30,
225 strict_host_checking = True):
227 Executes a remote command, returns ((stdout,stderr),process)
230 tmp_known_hosts = None
232 hostip = gethostbyname(host)
236 # Don't bother with localhost. Makes test easier
237 '-o', 'NoHostAuthenticationForLocalhost=yes',
238 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239 '-o', 'ConnectionAttempts=3',
240 '-o', 'ServerAliveInterval=30',
241 '-o', 'TCPKeepAlive=yes',
242 '-l', user, hostip or host]
244 if persistent and openssh_has_persist():
246 '-o', 'ControlMaster=auto',
247 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
248 '-o', 'ControlPersist=60' ])
250 if not strict_host_checking:
251 # Do not check for Host key. Unsafe.
252 args.extend(['-o', 'StrictHostKeyChecking=no'])
256 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
259 args.extend(['-o', proxycommand])
265 args.append('-p%d' % port)
268 args.extend(('-i', identity))
278 # Create a temporary server key file
279 tmp_known_hosts = make_server_key_args(server_key, host, port)
280 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
283 command = "sudo " + command
287 for x in xrange(retry):
288 # connects to the remote host and starts a remote connection
289 proc = subprocess.Popen(args,
291 stdout = subprocess.PIPE,
292 stdin = subprocess.PIPE,
293 stderr = subprocess.PIPE)
295 # attach tempfile object to the process, to make sure the file stays
296 # alive until the process is finished with it
297 proc._known_hosts = tmp_known_hosts
299 # by default, rexec calls _communicate which will block
300 # until the process has exit. The argument block == False
301 # forces to rexec to return immediately, without blocking
304 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
306 err = proc.stderr.read()
307 out = proc.stdout.read()
309 msg = " rexec - host %s - command %s " % (host, " ".join(args))
310 log(msg, logging.DEBUG, out, err)
315 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
316 # SSH error, can safely retry
319 # Probably timed out or plain failed but can retry
324 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
325 t, x, host, " ".join(args))
326 log(msg, logging.DEBUG)
331 except RuntimeError, e:
332 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
333 log(msg, logging.DEBUG, out, err)
339 return ((out, err), proc)
341 def rcopy(source, dest,
350 strict_host_checking = True):
352 Copies from/to remote sites.
354 Source and destination should have the user and host encoded
357 If source is a file object, a special mode will be used to
358 create the remote file with the same contents.
360 If dest is a file object, the remote file (source) will be
361 read and written into dest.
363 In these modes, recursive cannot be True.
365 Source can be a list of files to copy to a single destination,
366 in which case it is advised that the destination be a folder.
369 if isinstance(source, file) and source.tell() == 0:
371 elif hasattr(source, 'read'):
372 tmp = tempfile.NamedTemporaryFile()
374 buf = source.read(65536)
382 if isinstance(source, file) or isinstance(dest, file) \
383 or hasattr(source, 'read') or hasattr(dest, 'write'):
386 # Parse source/destination as <user>@<server>:<path>
387 if isinstance(dest, basestring) and ':' in dest:
388 remspec, path = dest.split(':',1)
389 elif isinstance(source, basestring) and ':' in source:
390 remspec, path = source.split(':',1)
392 raise ValueError, "Both endpoints cannot be local"
393 user,host = remspec.rsplit('@',1)
395 tmp_known_hosts = None
397 hostip = gethostbyname(host)
400 args = ['ssh', '-l', user, '-C',
401 # Don't bother with localhost. Makes test easier
402 '-o', 'NoHostAuthenticationForLocalhost=yes',
403 '-o', 'ConnectTimeout=60',
404 '-o', 'ConnectionAttempts=3',
405 '-o', 'ServerAliveInterval=30',
406 '-o', 'TCPKeepAlive=yes',
409 if openssh_has_persist():
411 '-o', 'ControlMaster=auto',
412 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
413 '-o', 'ControlPersist=60' ])
417 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
419 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
420 args.extend(['-o', proxycommand])
423 args.append('-P%d' % port)
426 args.extend(('-i', identity))
429 # Create a temporary server key file
430 tmp_known_hosts = make_server_key_args(server_key, host, port)
431 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
433 if isinstance(source, file) or hasattr(source, 'read'):
434 args.append('cat > %s' % (shell_escape(path),))
435 elif isinstance(dest, file) or hasattr(dest, 'write'):
436 args.append('cat %s' % (shell_escape(path),))
438 raise AssertionError, "Unreachable code reached! :-Q"
440 # connects to the remote host and starts a remote connection
441 if isinstance(source, file):
442 proc = subprocess.Popen(args,
443 stdout = open('/dev/null','w'),
444 stderr = subprocess.PIPE,
446 err = proc.stderr.read()
447 proc._known_hosts = tmp_known_hosts
448 eintr_retry(proc.wait)()
449 return ((None,err), proc)
450 elif isinstance(dest, file):
451 proc = subprocess.Popen(args,
452 stdout = open('/dev/null','w'),
453 stderr = subprocess.PIPE,
455 err = proc.stderr.read()
456 proc._known_hosts = tmp_known_hosts
457 eintr_retry(proc.wait)()
458 return ((None,err), proc)
459 elif hasattr(source, 'read'):
460 # file-like (but not file) source
461 proc = subprocess.Popen(args,
462 stdout = open('/dev/null','w'),
463 stderr = subprocess.PIPE,
464 stdin = subprocess.PIPE)
470 buf = source.read(4096)
475 rdrdy, wrdy, broken = select.select(
478 [proc.stderr,proc.stdin])
480 if proc.stderr in rdrdy:
481 # use os.read for fully unbuffered behavior
482 err.append(os.read(proc.stderr.fileno(), 4096))
484 if proc.stdin in wrdy:
485 proc.stdin.write(buf)
491 err.append(proc.stderr.read())
493 proc._known_hosts = tmp_known_hosts
494 eintr_retry(proc.wait)()
495 return ((None,''.join(err)), proc)
496 elif hasattr(dest, 'write'):
497 # file-like (but not file) dest
498 proc = subprocess.Popen(args,
499 stdout = subprocess.PIPE,
500 stderr = subprocess.PIPE,
501 stdin = open('/dev/null','w'))
506 rdrdy, wrdy, broken = select.select(
507 [proc.stderr, proc.stdout],
509 [proc.stderr, proc.stdout])
511 if proc.stderr in rdrdy:
512 # use os.read for fully unbuffered behavior
513 err.append(os.read(proc.stderr.fileno(), 4096))
515 if proc.stdout in rdrdy:
516 # use os.read for fully unbuffered behavior
517 buf = os.read(proc.stdout.fileno(), 4096)
526 err.append(proc.stderr.read())
528 proc._known_hosts = tmp_known_hosts
529 eintr_retry(proc.wait)()
530 return ((None,''.join(err)), proc)
532 raise AssertionError, "Unreachable code reached! :-Q"
534 # Parse destination as <user>@<server>:<path>
535 if isinstance(dest, basestring) and ':' in dest:
536 remspec, path = dest.split(':',1)
537 elif isinstance(source, basestring) and ':' in source:
538 remspec, path = source.split(':',1)
540 raise ValueError, "Both endpoints cannot be local"
541 user,host = remspec.rsplit('@',1)
544 tmp_known_hosts = None
546 args = ['scp', '-q', '-p', '-C',
547 # Speed up transfer using blowfish cypher specification which is
548 # faster than the default one (3des)
550 # Don't bother with localhost. Makes test easier
551 '-o', 'NoHostAuthenticationForLocalhost=yes',
552 '-o', 'ConnectTimeout=60',
553 '-o', 'ConnectionAttempts=3',
554 '-o', 'ServerAliveInterval=30',
555 '-o', 'TCPKeepAlive=yes' ]
558 args.append('-P%d' % port)
562 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
564 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
565 args.extend(['-o', proxycommand])
571 args.extend(('-i', identity))
574 # Create a temporary server key file
575 tmp_known_hosts = make_server_key_args(server_key, host, port)
576 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
578 if not strict_host_checking:
579 # Do not check for Host key. Unsafe.
580 args.extend(['-o', 'StrictHostKeyChecking=no'])
582 if isinstance(source,list):
585 if openssh_has_persist():
587 '-o', 'ControlMaster=auto',
588 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
594 for x in xrange(retry):
595 # connects to the remote host and starts a remote connection
596 proc = subprocess.Popen(args,
597 stdout = subprocess.PIPE,
598 stdin = subprocess.PIPE,
599 stderr = subprocess.PIPE)
601 # attach tempfile object to the process, to make sure the file stays
602 # alive until the process is finished with it
603 proc._known_hosts = tmp_known_hosts
606 (out, err) = proc.communicate()
607 eintr_retry(proc.wait)()
608 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
609 log(msg, logging.DEBUG, out, err)
613 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
614 t, x, host, " ".join(args))
615 log(msg, logging.DEBUG)
621 except RuntimeError, e:
622 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
623 log(msg, logging.DEBUG, out, err)
629 return ((out, err), proc)
631 def rspawn(command, pidfile,
632 stdout = '/dev/null',
648 Spawn a remote command such that it will continue working asynchronously in
651 :param command: The command to run, it should be a single line.
654 :param pidfile: Path to a file where to store the pid and ppid of the
658 :param stdout: Path to file to redirect standard output.
659 The default value is /dev/null
662 :param stderr: Path to file to redirect standard error.
663 If the special STDOUT value is used, stderr will
664 be redirected to the same file as stdout
667 :param stdin: Path to a file with input to be piped into the command's standard input
670 :param home: Path to working directory folder.
671 It is assumed to exist unless the create_home flag is set.
674 :param create_home: Flag to force creation of the home folder before
676 :type create_home: bool
678 :param sudo: Flag forcing execution with sudo user
683 (stdout, stderr), process
685 Of the spawning process, which only captures errors at spawning time.
686 Usually only useful for diagnostics.
688 # Start process in a "daemonized" way, using nohup and heavy
689 # stdin/out redirection to avoid connection issues
693 stderr = ' ' + stderr
695 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
697 'pidfile' : shell_escape(pidfile),
703 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
704 'command' : shell_escape(daemon_command),
705 'sudo' : 'sudo -S' if sudo else '',
706 'pidfile' : shell_escape(pidfile),
707 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
708 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
711 (out,err),proc = rexec(
720 server_key = server_key,
725 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
727 return ((out, err), proc)
740 Returns the pid and ppid of a process from a remote file where the
741 information was stored.
743 :param home: Path to directory where the pidfile is located
746 :param pidfile: Name of file containing the pid information
751 A (pid, ppid) tuple useful for calling rstatus and rkill,
752 or None if the pidfile isn't valid yet (can happen when process is staring up)
755 (out,err),proc = rexec(
756 "cat %(pidfile)s" % {
766 server_key = server_key
774 return map(int,out.strip().split(' ',1))
776 # Ignore, many ways to fail that don't matter that much
780 def rstatus(pid, ppid,
790 Returns a code representing the the status of a remote process
792 :param pid: Process id of the process
795 :param ppid: Parent process id of process
798 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
801 (out,err),proc = rexec(
802 # Check only by pid. pid+ppid does not always work (especially with sudo)
803 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
814 server_key = server_key
818 return ProcStatus.NOT_STARTED
822 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
825 status = (out.strip() == 'wait')
827 return ProcStatus.NOT_STARTED
828 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
843 Sends a kill signal to a remote process.
845 First tries a SIGTERM, and if the process does not end in 10 seconds,
848 :param pid: Process id of process to be killed
851 :param ppid: Parent process id of process to be killed
854 :param sudo: Flag indicating if sudo should be used to kill the process
858 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
860 SUBKILL="%(subkill)s" ;
861 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
862 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
863 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
865 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
868 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
869 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
873 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
874 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
875 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
879 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
881 (out,err),proc = rexec(
885 'sudo' : 'sudo -S' if sudo else '',
895 server_key = server_key
898 # wait, don't leave zombies around
901 return (out, err), proc
904 def _communicate(proc, input, timeout=None, err_on_timeout=True):
907 stdout = None # Return
908 stderr = None # Return
912 if timeout is not None:
913 timelimit = time.time() + timeout
914 killtime = timelimit + 4
915 bailtime = timelimit + 4
918 # Flush stdio buffer. This might block, if the user has
919 # been writing to .stdin in an uncontrolled fashion.
922 write_set.append(proc.stdin)
927 read_set.append(proc.stdout)
931 read_set.append(proc.stderr)
935 while read_set or write_set:
936 if timeout is not None:
937 curtime = time.time()
938 if timeout is None or curtime > timelimit:
939 if curtime > bailtime:
941 elif curtime > killtime:
942 signum = signal.SIGKILL
944 signum = signal.SIGTERM
946 os.kill(proc.pid, signum)
949 select_timeout = timelimit - curtime + 0.1
953 if select_timeout > 1.0:
957 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
958 except select.error,e:
964 if not rlist and not wlist and not xlist and proc.poll() is not None:
965 # timeout and process exited, say bye
968 if proc.stdin in wlist:
969 # When select has indicated that the file is writable,
970 # we can write up to PIPE_BUF bytes without risk
971 # blocking. POSIX defines PIPE_BUF >= 512
972 bytes_written = os.write(proc.stdin.fileno(),
973 buffer(input, input_offset, 512))
974 input_offset += bytes_written
976 if input_offset >= len(input):
978 write_set.remove(proc.stdin)
980 if proc.stdout in rlist:
981 data = os.read(proc.stdout.fileno(), 1024)
984 read_set.remove(proc.stdout)
987 if proc.stderr in rlist:
988 data = os.read(proc.stderr.fileno(), 1024)
991 read_set.remove(proc.stderr)
994 # All data exchanged. Translate lists into strings.
995 if stdout is not None:
996 stdout = ''.join(stdout)
997 if stderr is not None:
998 stderr = ''.join(stderr)
1000 # Translate newlines, if requested. We cannot let the file
1001 # object do the translation: It is based on stdio, which is
1002 # impossible to combine with select (unless forcing no
1004 if proc.universal_newlines and hasattr(file, 'newlines'):
1006 stdout = proc._translate_newlines(stdout)
1008 stderr = proc._translate_newlines(stderr)
1010 if killed and err_on_timeout:
1011 errcode = proc.poll()
1012 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1018 return (stdout, stderr)