2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
41 logger = logging.getLogger("sshfuncs")
43 def log(msg, level = logging.DEBUG, out = None, err = None):
45 msg += " - OUT: {} ".format(out)
47 msg += " - ERROR: {} ".format(err)
48 logger.log(level, msg)
50 if hasattr(os, "devnull"):
53 DEV_NULL = "/dev/null"
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
59 Special value that when given to rspawn in stderr causes stderr to
60 redirect to whatever stdout was redirected to.
66 Codes for status of remote spawned process
68 # Process is still running
74 # Process hasn't started running yet (this should be very rare)
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
80 def resolve_hostname(host):
83 if host in ["localhost", "127.0.0.1", "::1"]:
84 extras = {} if PY2 else {'universal_newlines' : True}
88 stdout = subprocess.PIPE,
89 stderr = subprocess.PIPE,
92 stdout, stderr = p.communicate()
93 m = _re_inet.findall(stdout)
94 ip = m[0][1].split("/")[0]
96 ip = socket.gethostbyname(host)
100 def gethostbyname(host):
101 global hostbyname_cache
102 global hostbyname_cache_lock
104 hostbyname = hostbyname_cache.get(host)
106 with hostbyname_cache_lock:
107 hostbyname = resolve_hostname(host)
108 hostbyname_cache[host] = hostbyname
110 msg = " Added hostbyname {} - {} ".format(host, hostbyname)
111 log(msg, logging.DEBUG)
115 OPENSSH_HAS_PERSIST = None
117 def openssh_has_persist():
118 """ The ssh_config options ControlMaster and ControlPersist allow to
119 reuse a same network connection for multiple ssh sessions. In this
120 way limitations on number of open ssh connections can be bypassed.
121 However, older versions of openSSH do not support this feature.
122 This function is used to determine if ssh connection persist features
125 global OPENSSH_HAS_PERSIST
126 if OPENSSH_HAS_PERSIST is None:
127 extras = {} if PY2 else {'universal_newlines' : True}
128 with open("/dev/null") as null:
129 proc = subprocess.Popen(
131 stdout = subprocess.PIPE,
132 stderr = subprocess.STDOUT,
136 out, err = proc.communicate()
139 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
140 OPENSSH_HAS_PERSIST = bool(vre.match(out))
141 return OPENSSH_HAS_PERSIST
143 def make_server_key_args(server_key, host, port):
144 """ Returns a reference to a temporary known_hosts file, to which
145 the server key has been added.
147 Make sure to hold onto the temp file reference until the process is
150 :param server_key: the server public key
151 :type server_key: str
153 :param host: the hostname
156 :param port: the ssh port
161 host = '{}:{}'.format(host, str(port))
163 # Create a temporary server key file
164 tmp_known_hosts = tempfile.NamedTemporaryFile()
166 hostbyname = gethostbyname(host)
168 # Add the intended host key
169 tmp_known_hosts.write('{},{} {}\n'.format(host, hostbyname, server_key))
171 # If we're not in strict mode, add user-configured keys
172 if os.environ.get('NEPI_STRICT_AUTH_MODE', "").lower() not in ('1', 'true', 'on'):
173 user_hosts_path = '{}/.ssh/known_hosts'.format(os.environ.get('HOME', ""))
174 if os.access(user_hosts_path, os.R_OK):
175 with open(user_hosts_path, "r") as f:
176 tmp_known_hosts.write(f.read())
178 tmp_known_hosts.flush()
180 return tmp_known_hosts
182 def make_control_path(agent, forward_x11):
183 ctrl_path = "/tmp/nepi_ssh"
191 ctrl_path += "-%r@%h:%p"
196 """ Escapes strings so that they are safe to use as command-line
198 if SHELL_SAFE.match(s):
199 # safe string - no escaping needed
202 # unsafe string - escape
204 if (32 <= ord(c) < 127 or c in ('\r', '\n', '\t')) and c not in ("'", '"'):
207 return "'$'\\x{:02x}''".format(ord(c))
208 s = ''.join(map(escape, s))
209 return "'{}'".format(s)
211 def eintr_retry(func):
212 """Retries a function invocation when a EINTR occurs"""
214 @functools.wraps(func)
216 retry = kw.pop("_retry", False)
217 for i in range(0 if retry else 4):
219 return func(*p, **kw)
220 except (select.error, socket.error) as args:
221 if args[0] == errno.EINTR:
226 if e.errno == errno.EINTR:
231 return func(*p, **kw)
234 def rexec(command, host, user,
244 connect_timeout = 30,
249 strict_host_checking = True):
251 Executes a remote command, returns ((stdout, stderr), process)
254 tmp_known_hosts = None
256 hostip = gethostbyname(host)
260 # Don't bother with localhost. Makes test easier
261 '-o', 'NoHostAuthenticationForLocalhost=yes',
262 '-o', 'ConnectTimeout={}'.format(connect_timeout),
263 '-o', 'ConnectionAttempts=3',
264 '-o', 'ServerAliveInterval=30',
265 '-o', 'TCPKeepAlive=yes',
266 '-o', 'Batchmode=yes',
267 '-l', user, hostip or host]
269 if persistent and openssh_has_persist():
271 '-o', 'ControlMaster=auto',
272 '-o', 'ControlPath={}'.format(make_control_path(agent, forward_x11)),
273 '-o', 'ControlPersist=60' ])
275 if not strict_host_checking:
276 # Do not check for Host key. Unsafe.
277 args.extend(['-o', 'StrictHostKeyChecking=no'])
280 proxycommand = _proxy_command(gw, gwuser, identity)
281 args.extend(['-o', proxycommand])
287 args.append('-p{}'.format(port))
290 identity = os.path.expanduser(identity)
291 args.extend(('-i', identity))
301 # Create a temporary server key file
302 tmp_known_hosts = make_server_key_args(server_key, host, port)
303 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
306 command = "sudo " + command
310 log_msg = " rexec - host {} - command {} ".format(host, " ".join(map(str, args)))
312 stdout = stderr = stdin = subprocess.PIPE
314 stdout = stderr = stdin = None
316 return _retry_rexec(args, log_msg,
322 tmp_known_hosts = tmp_known_hosts,
325 def rcopy(source, dest,
333 strict_host_checking = True):
335 Copies from/to remote sites.
337 Source and destination should have the user and host encoded
340 Source can be a list of files to copy to a single destination,
341 (in which case it is advised that the destination be a folder),
342 or a single file in a string.
345 # Parse destination as <user>@<server>:<path>
346 if isinstance(dest, str) and ':' in dest:
347 remspec, path = dest.split(':', 1)
348 elif isinstance(source, str) and ':' in source:
349 remspec, path = source.split(':', 1)
351 raise ValueError("Both endpoints cannot be local")
352 user, host = remspec.rsplit('@', 1)
355 tmp_known_hosts = None
357 args = ['scp', '-q', '-p', '-C',
358 # 2015-06-01 Thierry: I am commenting off blowfish
359 # as this is not available on a plain ubuntu 15.04 install
360 # this IMHO is too fragile, shoud be something the user
361 # decides explicitly (so he is at least aware of that dependency)
362 # Speed up transfer using blowfish cypher specification which is
363 # faster than the default one (3des)
365 # Don't bother with localhost. Makes test easier
366 '-o', 'NoHostAuthenticationForLocalhost=yes',
367 '-o', 'ConnectTimeout=60',
368 '-o', 'ConnectionAttempts=3',
369 '-o', 'ServerAliveInterval=30',
370 '-o', 'TCPKeepAlive=yes' ]
373 args.append('-P{}'.format(port))
376 proxycommand = _proxy_command(gw, gwuser, identity)
377 args.extend(['-o', proxycommand])
383 identity = os.path.expanduser(identity)
384 args.extend(('-i', identity))
387 # Create a temporary server key file
388 tmp_known_hosts = make_server_key_args(server_key, host, port)
389 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
391 if not strict_host_checking:
392 # Do not check for Host key. Unsafe.
393 args.extend(['-o', 'StrictHostKeyChecking=no'])
395 if isinstance(source, list):
398 if openssh_has_persist():
400 '-o', 'ControlMaster=auto',
401 '-o', 'ControlPath={}'.format(make_control_path(False, False))
405 if isinstance(dest, list):
410 log_msg = " rcopy - host {} - command {} ".format(host, " ".join(map(str, args)))
412 return _retry_rexec(args, log_msg, env = None, retry = retry,
413 tmp_known_hosts = tmp_known_hosts,
416 def rspawn(command, pidfile,
417 stdout = '/dev/null',
432 strict_host_checking = True):
434 Spawn a remote command such that it will continue working asynchronously in
437 :param command: The command to run, it should be a single line.
440 :param pidfile: Path to a file where to store the pid and ppid of the
444 :param stdout: Path to file to redirect standard output.
445 The default value is /dev/null
448 :param stderr: Path to file to redirect standard error.
449 If the special STDOUT value is used, stderr will
450 be redirected to the same file as stdout
453 :param stdin: Path to a file with input to be piped into the command's standard input
456 :param home: Path to working directory folder.
457 It is assumed to exist unless the create_home flag is set.
460 :param create_home: Flag to force creation of the home folder before
462 :type create_home: bool
464 :param sudo: Flag forcing execution with sudo user
469 (stdout, stderr), process
471 Of the spawning process, which only captures errors at spawning time.
472 Usually only useful for diagnostics.
474 # Start process in a "daemonized" way, using nohup and heavy
475 # stdin/out redirection to avoid connection issues
479 stderr = ' ' + stderr
481 daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {pidfile} ; }}'\
482 .format(command = command,
483 pidfile = shell_escape(pidfile),
488 cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} nohup bash -c {command} "\
489 .format(command = shell_escape(daemon_command),
490 sudo = 'sudo -S' if sudo else '',
491 pidfile = shell_escape(pidfile),
492 gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
493 create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home and home else '')
495 (out, err), proc = rexec(
504 server_key = server_key,
506 strict_host_checking = strict_host_checking ,
510 raise RuntimeError("Failed to set up application on host {}: {} {}".format(host, out, err))
512 return ((out, err), proc)
524 strict_host_checking = True):
526 Returns the pid and ppid of a process from a remote file where the
527 information was stored.
529 :param home: Path to directory where the pidfile is located
532 :param pidfile: Name of file containing the pid information
537 A (pid, ppid) tuple useful for calling rstatus and rkill,
538 or None if the pidfile isn't valid yet (can happen when process is staring up)
541 (out, err), proc = rexec(
542 "cat {}".format(pidfile),
550 server_key = server_key,
551 strict_host_checking = strict_host_checking
559 return [ int(x) for x in out.strip().split(' ', 1) ]
561 # Ignore, many ways to fail that don't matter that much
565 def rstatus(pid, ppid,
574 strict_host_checking = True):
576 Returns a code representing the the status of a remote process
578 :param pid: Process id of the process
581 :param ppid: Parent process id of process
584 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
587 (out, err), proc = rexec(
588 # Check only by pid. pid+ppid does not always work (especially with sudo)
589 " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait') || echo 'done' ) | tail -n 1"\
598 server_key = server_key,
599 strict_host_checking = strict_host_checking
603 return ProcStatus.NOT_STARTED
607 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
610 status = (out.strip() == 'wait')
612 return ProcStatus.NOT_STARTED
613 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
627 strict_host_checking = True):
629 Sends a kill signal to a remote process.
631 First tries a SIGTERM, and if the process does not end in 10 seconds,
634 :param pid: Process id of process to be killed
637 :param ppid: Parent process id of process to be killed
640 :param sudo: Flag indicating if sudo should be used to kill the process
644 subkill = "$(ps --ppid {} -o pid h)".format(pid)
646 SUBKILL="{subkill}" ;
647 {sudo} kill -- -{pid} $SUBKILL || /bin/true
648 {sudo} kill {pid} $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
651 if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
654 {sudo} kill -- -{pid} $SUBKILL || /bin/true
655 {sudo} kill {pid} $SUBKILL || /bin/true
659 if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
660 {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
661 {sudo} kill -9 {pid} $SUBKILL || /bin/true
665 cmd_format = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd_format)
667 sudo = 'sudo -S' if sudo else ''
668 (out, err), proc = rexec(
669 cmd_format.format(**locals()),
677 server_key = server_key,
678 strict_host_checking = strict_host_checking
681 # wait, don't leave zombies around
684 return (out, err), proc
686 # add quotes around a shell arg only if it has spaces
687 def pretty_arg(shell_arg):
688 return shell_arg if ' ' not in shell_arg else "'{}'".format(shell_arg)
689 def pretty_args(shell_args):
690 return " ".join([pretty_arg(shell_arg) for shell_arg in shell_args])
692 def _retry_rexec(args,
694 stdout = subprocess.PIPE,
695 stdin = subprocess.PIPE,
696 stderr = subprocess.PIPE,
699 tmp_known_hosts = None,
702 for x in range(retry):
703 # display command actually invoked when debug is turned on
704 message = pretty_args(args)
705 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
706 extras = {} if PY2 else {'universal_newlines' : True}
707 # connects to the remote host and starts a remote connection
708 proc = subprocess.Popen(
716 # attach tempfile object to the process, to make sure the file stays
717 # alive until the process is finished with it
718 proc._known_hosts = tmp_known_hosts
720 # The argument block == False forces to rexec to return immediately,
725 #(out, err) = proc.communicate()
726 # The method communicate was re implemented for performance issues
727 # when using python subprocess communicate method the ssh commands
728 # last one minute each
729 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
730 out, err = _communicate(proc, input=None)
731 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
734 out = proc.stdout.read()
735 if proc.poll() and stderr:
736 err = proc.stderr.read()
738 log(log_msg, logging.DEBUG, out, err)
743 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
744 # SSH error, can safely retry
747 # Probably timed out or plain failed but can retry
752 msg = "SLEEPING {} ... ATEMPT {} - command {} "\
753 .format(t, x, " ".join(args))
754 log(msg, logging.DEBUG)
759 except RuntimeError as e:
760 msg = " rexec EXCEPTION - TIMEOUT -> {} \n {}".format(e.args, log_msg)
761 log(msg, logging.DEBUG, out, err)
767 return ((out, err), proc)
770 # Don't remove. The method communicate was re implemented for performance issues
771 def _communicate(proc, input, timeout=None, err_on_timeout=True):
774 stdout = None # Return
775 stderr = None # Return
779 if timeout is not None:
780 timelimit = time.time() + timeout
781 killtime = timelimit + 4
782 bailtime = timelimit + 4
785 # Flush stdio buffer. This might block, if the user has
786 # been writing to .stdin in an uncontrolled fashion.
789 write_set.append(proc.stdin)
794 read_set.append(proc.stdout)
798 read_set.append(proc.stderr)
802 while read_set or write_set:
803 if timeout is not None:
804 curtime = time.time()
805 if timeout is None or curtime > timelimit:
806 if curtime > bailtime:
808 elif curtime > killtime:
809 signum = signal.SIGKILL
811 signum = signal.SIGTERM
813 os.kill(proc.pid, signum)
816 select_timeout = timelimit - curtime + 0.1
820 if select_timeout > 1.0:
824 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
825 except select.error as e:
831 if not rlist and not wlist and not xlist and proc.poll() is not None:
832 # timeout and process exited, say bye
835 if proc.stdin in wlist:
836 # When select has indicated that the file is writable,
837 # we can write up to PIPE_BUF bytes without risk
838 # blocking. POSIX defines PIPE_BUF >= 512
839 bytes_written = os.write(proc.stdin.fileno(),
840 buffer(input, input_offset, 512))
841 input_offset += bytes_written
843 if input_offset >= len(input):
845 write_set.remove(proc.stdin)
847 # xxx possible distortion when upgrading to python3
848 # original py2 version used to do
849 # data = os.read(proc.stdout.fileno(), 1024)
850 # but this would return bytes, so..
851 if proc.stdout in rlist:
852 data = proc.stdout.read()
855 read_set.remove(proc.stdout)
859 if proc.stderr in rlist:
860 data = proc.stderr.read()
863 read_set.remove(proc.stderr)
866 # All data exchanged. Translate lists into strings.
867 if stdout is not None:
868 stdout = ''.join(stdout)
869 if stderr is not None:
870 stderr = ''.join(stderr)
872 # Translate newlines, if requested. We cannot let the file
873 # object do the translation: It is based on stdio, which is
874 # impossible to combine with select (unless forcing no
876 # this however seems to make no sense in the context of python3
878 if proc.universal_newlines and hasattr(file, 'newlines'):
880 stdout = proc._translate_newlines(stdout)
882 stderr = proc._translate_newlines(stderr)
884 if killed and err_on_timeout:
885 errcode = proc.poll()
886 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
892 return (stdout, stderr)
894 def _proxy_command(gw, gwuser, gwidentity):
896 Constructs the SSH ProxyCommand option to add to the SSH command to connect
898 :param gw: SSH proxy hostname
901 :param gwuser: SSH proxy username
904 :param gwidentity: SSH proxy identity file
905 :type gwidentity: str
910 returns the SSH ProxyCommand option.
913 proxycommand = 'ProxyCommand=ssh -q '
915 proxycommand += '-i {} '.format(os.path.expanduser(gwidentity))
917 proxycommand += '{}'.format(gwuser)
920 proxycommand += '@{} -W %h:%p'.format(gw)