2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
220 err_on_timeout = True,
221 connect_timeout = 30,
225 strict_host_checking = True):
227 Executes a remote command, returns ((stdout,stderr),process)
230 tmp_known_hosts = None
232 hostip = gethostbyname(host)
236 # Don't bother with localhost. Makes test easier
237 '-o', 'NoHostAuthenticationForLocalhost=yes',
238 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239 '-o', 'ConnectionAttempts=3',
240 '-o', 'ServerAliveInterval=30',
241 '-o', 'TCPKeepAlive=yes',
242 '-o', 'Batchmode=yes',
243 '-l', user, hostip or host]
245 if persistent and openssh_has_persist():
247 '-o', 'ControlMaster=auto',
248 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249 '-o', 'ControlPersist=60' ])
251 if not strict_host_checking:
252 # Do not check for Host key. Unsafe.
253 args.extend(['-o', 'StrictHostKeyChecking=no'])
257 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
259 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260 args.extend(['-o', proxycommand])
266 args.append('-p%d' % port)
269 args.extend(('-i', identity))
279 # Create a temporary server key file
280 tmp_known_hosts = make_server_key_args(server_key, host, port)
281 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
284 command = "sudo " + command
288 for x in xrange(retry):
289 # connects to the remote host and starts a remote connection
290 proc = subprocess.Popen(args,
292 stdout = subprocess.PIPE,
293 stdin = subprocess.PIPE,
294 stderr = subprocess.PIPE)
296 # attach tempfile object to the process, to make sure the file stays
297 # alive until the process is finished with it
298 proc._known_hosts = tmp_known_hosts
300 # by default, rexec calls _communicate which will block
301 # until the process has exit. The argument block == False
302 # forces to rexec to return immediately, without blocking
305 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
307 err = proc.stderr.read()
308 out = proc.stdout.read()
310 msg = " rexec - host %s - command %s " % (host, " ".join(args))
311 log(msg, logging.DEBUG, out, err)
316 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
317 # SSH error, can safely retry
320 # Probably timed out or plain failed but can retry
325 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
326 t, x, host, " ".join(args))
327 log(msg, logging.DEBUG)
332 except RuntimeError, e:
333 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
334 log(msg, logging.DEBUG, out, err)
340 return ((out, err), proc)
342 def rcopy(source, dest,
351 strict_host_checking = True):
353 Copies from/to remote sites.
355 Source and destination should have the user and host encoded
358 If source is a file object, a special mode will be used to
359 create the remote file with the same contents.
361 If dest is a file object, the remote file (source) will be
362 read and written into dest.
364 In these modes, recursive cannot be True.
366 Source can be a list of files to copy to a single destination,
367 in which case it is advised that the destination be a folder.
370 if isinstance(source, file) and source.tell() == 0:
372 elif hasattr(source, 'read'):
373 tmp = tempfile.NamedTemporaryFile()
375 buf = source.read(65536)
383 if isinstance(source, file) or isinstance(dest, file) \
384 or hasattr(source, 'read') or hasattr(dest, 'write'):
387 # Parse source/destination as <user>@<server>:<path>
388 if isinstance(dest, basestring) and ':' in dest:
389 remspec, path = dest.split(':',1)
390 elif isinstance(source, basestring) and ':' in source:
391 remspec, path = source.split(':',1)
393 raise ValueError, "Both endpoints cannot be local"
394 user,host = remspec.rsplit('@',1)
396 tmp_known_hosts = None
398 hostip = gethostbyname(host)
401 args = ['ssh', '-l', user, '-C',
402 # Don't bother with localhost. Makes test easier
403 '-o', 'NoHostAuthenticationForLocalhost=yes',
404 '-o', 'ConnectTimeout=60',
405 '-o', 'ConnectionAttempts=3',
406 '-o', 'ServerAliveInterval=30',
407 '-o', 'TCPKeepAlive=yes',
410 if openssh_has_persist():
412 '-o', 'ControlMaster=auto',
413 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
414 '-o', 'ControlPersist=60' ])
418 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
420 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
421 args.extend(['-o', proxycommand])
424 args.append('-P%d' % port)
427 args.extend(('-i', identity))
430 # Create a temporary server key file
431 tmp_known_hosts = make_server_key_args(server_key, host, port)
432 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
434 if isinstance(source, file) or hasattr(source, 'read'):
435 args.append('cat > %s' % (shell_escape(path),))
436 elif isinstance(dest, file) or hasattr(dest, 'write'):
437 args.append('cat %s' % (shell_escape(path),))
439 raise AssertionError, "Unreachable code reached! :-Q"
441 # connects to the remote host and starts a remote connection
442 if isinstance(source, file):
443 proc = subprocess.Popen(args,
444 stdout = open('/dev/null','w'),
445 stderr = subprocess.PIPE,
447 err = proc.stderr.read()
448 proc._known_hosts = tmp_known_hosts
449 eintr_retry(proc.wait)()
450 return ((None,err), proc)
451 elif isinstance(dest, file):
452 proc = subprocess.Popen(args,
453 stdout = open('/dev/null','w'),
454 stderr = subprocess.PIPE,
456 err = proc.stderr.read()
457 proc._known_hosts = tmp_known_hosts
458 eintr_retry(proc.wait)()
459 return ((None,err), proc)
460 elif hasattr(source, 'read'):
461 # file-like (but not file) source
462 proc = subprocess.Popen(args,
463 stdout = open('/dev/null','w'),
464 stderr = subprocess.PIPE,
465 stdin = subprocess.PIPE)
471 buf = source.read(4096)
476 rdrdy, wrdy, broken = select.select(
479 [proc.stderr,proc.stdin])
481 if proc.stderr in rdrdy:
482 # use os.read for fully unbuffered behavior
483 err.append(os.read(proc.stderr.fileno(), 4096))
485 if proc.stdin in wrdy:
486 proc.stdin.write(buf)
492 err.append(proc.stderr.read())
494 proc._known_hosts = tmp_known_hosts
495 eintr_retry(proc.wait)()
496 return ((None,''.join(err)), proc)
497 elif hasattr(dest, 'write'):
498 # file-like (but not file) dest
499 proc = subprocess.Popen(args,
500 stdout = subprocess.PIPE,
501 stderr = subprocess.PIPE,
502 stdin = open('/dev/null','w'))
507 rdrdy, wrdy, broken = select.select(
508 [proc.stderr, proc.stdout],
510 [proc.stderr, proc.stdout])
512 if proc.stderr in rdrdy:
513 # use os.read for fully unbuffered behavior
514 err.append(os.read(proc.stderr.fileno(), 4096))
516 if proc.stdout in rdrdy:
517 # use os.read for fully unbuffered behavior
518 buf = os.read(proc.stdout.fileno(), 4096)
527 err.append(proc.stderr.read())
529 proc._known_hosts = tmp_known_hosts
530 eintr_retry(proc.wait)()
531 return ((None,''.join(err)), proc)
533 raise AssertionError, "Unreachable code reached! :-Q"
535 # Parse destination as <user>@<server>:<path>
536 if isinstance(dest, basestring) and ':' in dest:
537 remspec, path = dest.split(':',1)
538 elif isinstance(source, basestring) and ':' in source:
539 remspec, path = source.split(':',1)
541 raise ValueError, "Both endpoints cannot be local"
542 user,host = remspec.rsplit('@',1)
545 tmp_known_hosts = None
547 args = ['scp', '-q', '-p', '-C',
548 # Speed up transfer using blowfish cypher specification which is
549 # faster than the default one (3des)
551 # Don't bother with localhost. Makes test easier
552 '-o', 'NoHostAuthenticationForLocalhost=yes',
553 '-o', 'ConnectTimeout=60',
554 '-o', 'ConnectionAttempts=3',
555 '-o', 'ServerAliveInterval=30',
556 '-o', 'TCPKeepAlive=yes' ]
559 args.append('-P%d' % port)
563 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
565 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
566 args.extend(['-o', proxycommand])
572 args.extend(('-i', identity))
575 # Create a temporary server key file
576 tmp_known_hosts = make_server_key_args(server_key, host, port)
577 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
579 if not strict_host_checking:
580 # Do not check for Host key. Unsafe.
581 args.extend(['-o', 'StrictHostKeyChecking=no'])
584 source = source.split(' ')
586 if isinstance(source,list):
589 if openssh_has_persist():
591 '-o', 'ControlMaster=auto',
592 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
598 for x in xrange(retry):
599 # connects to the remote host and starts a remote connection
600 proc = subprocess.Popen(args,
601 stdout = subprocess.PIPE,
602 stdin = subprocess.PIPE,
603 stderr = subprocess.PIPE)
605 # attach tempfile object to the process, to make sure the file stays
606 # alive until the process is finished with it
607 proc._known_hosts = tmp_known_hosts
610 (out, err) = proc.communicate()
611 eintr_retry(proc.wait)()
612 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
613 log(msg, logging.DEBUG, out, err)
617 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
618 t, x, host, " ".join(args))
619 log(msg, logging.DEBUG)
625 except RuntimeError, e:
626 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
627 log(msg, logging.DEBUG, out, err)
633 return ((out, err), proc)
635 def rspawn(command, pidfile,
636 stdout = '/dev/null',
652 Spawn a remote command such that it will continue working asynchronously in
655 :param command: The command to run, it should be a single line.
658 :param pidfile: Path to a file where to store the pid and ppid of the
662 :param stdout: Path to file to redirect standard output.
663 The default value is /dev/null
666 :param stderr: Path to file to redirect standard error.
667 If the special STDOUT value is used, stderr will
668 be redirected to the same file as stdout
671 :param stdin: Path to a file with input to be piped into the command's standard input
674 :param home: Path to working directory folder.
675 It is assumed to exist unless the create_home flag is set.
678 :param create_home: Flag to force creation of the home folder before
680 :type create_home: bool
682 :param sudo: Flag forcing execution with sudo user
687 (stdout, stderr), process
689 Of the spawning process, which only captures errors at spawning time.
690 Usually only useful for diagnostics.
692 # Start process in a "daemonized" way, using nohup and heavy
693 # stdin/out redirection to avoid connection issues
697 stderr = ' ' + stderr
699 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
701 'pidfile' : shell_escape(pidfile),
707 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
708 'command' : shell_escape(daemon_command),
709 'sudo' : 'sudo -S' if sudo else '',
710 'pidfile' : shell_escape(pidfile),
711 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
712 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
715 (out,err),proc = rexec(
724 server_key = server_key,
729 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
731 return ((out, err), proc)
744 Returns the pid and ppid of a process from a remote file where the
745 information was stored.
747 :param home: Path to directory where the pidfile is located
750 :param pidfile: Name of file containing the pid information
755 A (pid, ppid) tuple useful for calling rstatus and rkill,
756 or None if the pidfile isn't valid yet (can happen when process is staring up)
759 (out,err),proc = rexec(
760 "cat %(pidfile)s" % {
770 server_key = server_key
778 return map(int,out.strip().split(' ',1))
780 # Ignore, many ways to fail that don't matter that much
784 def rstatus(pid, ppid,
794 Returns a code representing the the status of a remote process
796 :param pid: Process id of the process
799 :param ppid: Parent process id of process
802 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
805 (out,err),proc = rexec(
806 # Check only by pid. pid+ppid does not always work (especially with sudo)
807 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
818 server_key = server_key
822 return ProcStatus.NOT_STARTED
826 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
829 status = (out.strip() == 'wait')
831 return ProcStatus.NOT_STARTED
832 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
847 Sends a kill signal to a remote process.
849 First tries a SIGTERM, and if the process does not end in 10 seconds,
852 :param pid: Process id of process to be killed
855 :param ppid: Parent process id of process to be killed
858 :param sudo: Flag indicating if sudo should be used to kill the process
862 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
864 SUBKILL="%(subkill)s" ;
865 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
866 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
867 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
869 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
872 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
873 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
877 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
878 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
879 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
883 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
885 (out,err),proc = rexec(
889 'sudo' : 'sudo -S' if sudo else '',
899 server_key = server_key
902 # wait, don't leave zombies around
905 return (out, err), proc
908 def _communicate(proc, input, timeout=None, err_on_timeout=True):
911 stdout = None # Return
912 stderr = None # Return
916 if timeout is not None:
917 timelimit = time.time() + timeout
918 killtime = timelimit + 4
919 bailtime = timelimit + 4
922 # Flush stdio buffer. This might block, if the user has
923 # been writing to .stdin in an uncontrolled fashion.
926 write_set.append(proc.stdin)
931 read_set.append(proc.stdout)
935 read_set.append(proc.stderr)
939 while read_set or write_set:
940 if timeout is not None:
941 curtime = time.time()
942 if timeout is None or curtime > timelimit:
943 if curtime > bailtime:
945 elif curtime > killtime:
946 signum = signal.SIGKILL
948 signum = signal.SIGTERM
950 os.kill(proc.pid, signum)
953 select_timeout = timelimit - curtime + 0.1
957 if select_timeout > 1.0:
961 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
962 except select.error,e:
968 if not rlist and not wlist and not xlist and proc.poll() is not None:
969 # timeout and process exited, say bye
972 if proc.stdin in wlist:
973 # When select has indicated that the file is writable,
974 # we can write up to PIPE_BUF bytes without risk
975 # blocking. POSIX defines PIPE_BUF >= 512
976 bytes_written = os.write(proc.stdin.fileno(),
977 buffer(input, input_offset, 512))
978 input_offset += bytes_written
980 if input_offset >= len(input):
982 write_set.remove(proc.stdin)
984 if proc.stdout in rlist:
985 data = os.read(proc.stdout.fileno(), 1024)
988 read_set.remove(proc.stdout)
991 if proc.stderr in rlist:
992 data = os.read(proc.stderr.fileno(), 1024)
995 read_set.remove(proc.stderr)
998 # All data exchanged. Translate lists into strings.
999 if stdout is not None:
1000 stdout = ''.join(stdout)
1001 if stderr is not None:
1002 stderr = ''.join(stderr)
1004 # Translate newlines, if requested. We cannot let the file
1005 # object do the translation: It is based on stdio, which is
1006 # impossible to combine with select (unless forcing no
1008 if proc.universal_newlines and hasattr(file, 'newlines'):
1010 stdout = proc._translate_newlines(stdout)
1012 stderr = proc._translate_newlines(stderr)
1014 if killed and err_on_timeout:
1015 errcode = proc.poll()
1016 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1022 return (stdout, stderr)