2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
42 # logging.getLogger('sshfuncs').setLevel(logging.DEBUG)
43 logger = logging.getLogger("sshfuncs")
45 def log(msg, level = logging.DEBUG, out = None, err = None):
47 msg += " - OUT is {} long".format(len(out))
49 msg += " - ERR is {} long".format(len(err))
50 logger.log(level, msg)
52 if hasattr(os, "devnull"):
55 DEV_NULL = "/dev/null"
57 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
61 Special value that when given to rspawn in stderr causes stderr to
62 redirect to whatever stdout was redirected to.
68 Codes for status of remote spawned process
70 # Process is still running
76 # Process hasn't started running yet (this should be very rare)
79 hostbyname_cache = dict()
80 hostbyname_cache_lock = threading.Lock()
82 def resolve_hostname(host):
85 if host in ["localhost", "127.0.0.1", "::1"]:
86 extras = {} if PY2 else {'universal_newlines' : True}
90 stdout = subprocess.PIPE,
91 stderr = subprocess.PIPE,
94 stdout, stderr = p.communicate()
95 m = _re_inet.findall(stdout)
96 ip = m[0][1].split("/")[0]
98 ip = socket.gethostbyname(host)
102 def gethostbyname(host):
103 global hostbyname_cache
104 global hostbyname_cache_lock
106 hostbyname = hostbyname_cache.get(host)
108 with hostbyname_cache_lock:
109 hostbyname = resolve_hostname(host)
110 hostbyname_cache[host] = hostbyname
112 msg = " Added hostbyname {} - {} ".format(host, hostbyname)
113 log(msg, logging.DEBUG)
117 OPENSSH_HAS_PERSIST = None
119 def openssh_has_persist():
120 """ The ssh_config options ControlMaster and ControlPersist allow to
121 reuse a same network connection for multiple ssh sessions. In this
122 way limitations on number of open ssh connections can be bypassed.
123 However, older versions of openSSH do not support this feature.
124 This function is used to determine if ssh connection persist features
127 global OPENSSH_HAS_PERSIST
128 if OPENSSH_HAS_PERSIST is None:
129 extras = {} if PY2 else {'universal_newlines' : True}
130 with open("/dev/null") as null:
131 proc = subprocess.Popen(
133 stdout = subprocess.PIPE,
134 stderr = subprocess.STDOUT,
138 out, err = proc.communicate()
141 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
142 OPENSSH_HAS_PERSIST = bool(vre.match(out))
143 return OPENSSH_HAS_PERSIST
145 def make_server_key_args(server_key, host, port):
146 """ Returns a reference to a temporary known_hosts file, to which
147 the server key has been added.
149 Make sure to hold onto the temp file reference until the process is
152 :param server_key: the server public key
153 :type server_key: str
155 :param host: the hostname
158 :param port: the ssh port
163 host = '{}:{}'.format(host, str(port))
165 # Create a temporary server key file
166 tmp_known_hosts = tempfile.NamedTemporaryFile()
168 hostbyname = gethostbyname(host)
170 # Add the intended host key
171 tmp_known_hosts.write('{},{} {}\n'.format(host, hostbyname, server_key))
173 # If we're not in strict mode, add user-configured keys
174 if os.environ.get('NEPI_STRICT_AUTH_MODE', "").lower() not in ('1', 'true', 'on'):
175 user_hosts_path = '{}/.ssh/known_hosts'.format(os.environ.get('HOME', ""))
176 if os.access(user_hosts_path, os.R_OK):
177 with open(user_hosts_path, "r") as f:
178 tmp_known_hosts.write(f.read())
180 tmp_known_hosts.flush()
182 return tmp_known_hosts
184 def make_control_path(agent, forward_x11):
185 ctrl_path = "/tmp/nepi_ssh"
193 ctrl_path += "-%r@%h:%p"
198 """ Escapes strings so that they are safe to use as command-line
200 if SHELL_SAFE.match(s):
201 # safe string - no escaping needed
204 # unsafe string - escape
206 if (32 <= ord(c) < 127 or c in ('\r', '\n', '\t')) and c not in ("'", '"'):
209 return "'$'\\x{:02x}''".format(ord(c))
210 s = ''.join(map(escape, s))
211 return "'{}'".format(s)
213 def eintr_retry(func):
214 """Retries a function invocation when a EINTR occurs"""
216 @functools.wraps(func)
218 retry = kw.pop("_retry", False)
219 for i in range(0 if retry else 4):
221 return func(*p, **kw)
222 except (select.error, socket.error) as args:
223 if args[0] == errno.EINTR:
228 if e.errno == errno.EINTR:
233 return func(*p, **kw)
236 def rexec(command, host, user,
246 connect_timeout = 30,
251 strict_host_checking = True):
253 Executes a remote command, returns ((stdout, stderr), process)
256 tmp_known_hosts = None
258 hostip = gethostbyname(host)
262 # Don't bother with localhost. Makes test easier
263 '-o', 'NoHostAuthenticationForLocalhost=yes',
264 '-o', 'ConnectTimeout={}'.format(connect_timeout),
265 '-o', 'ConnectionAttempts=3',
266 '-o', 'ServerAliveInterval=30',
267 '-o', 'TCPKeepAlive=yes',
268 '-o', 'Batchmode=yes',
269 '-l', user, hostip or host]
271 if persistent and openssh_has_persist():
273 '-o', 'ControlMaster=auto',
274 '-o', 'ControlPath={}'.format(make_control_path(agent, forward_x11)),
275 '-o', 'ControlPersist=60' ])
277 if not strict_host_checking:
278 # Do not check for Host key. Unsafe.
279 args.extend(['-o', 'StrictHostKeyChecking=no'])
282 proxycommand = _proxy_command(gw, gwuser, identity)
283 args.extend(['-o', proxycommand])
289 args.append('-p{}'.format(port))
292 identity = os.path.expanduser(identity)
293 args.extend(('-i', identity))
303 # Create a temporary server key file
304 tmp_known_hosts = make_server_key_args(server_key, host, port)
305 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
308 command = "sudo " + command
312 log_msg = " rexec - host {} - command {} ".format(host, pretty_args(args))
314 stdout = stderr = stdin = subprocess.PIPE
316 stdout = stderr = stdin = None
318 return _retry_rexec(args, log_msg,
319 stdin = stdin, stdout = stdout, stderr = stderr,
322 tmp_known_hosts = tmp_known_hosts,
325 def rcopy(source, dest,
333 strict_host_checking = True):
335 Copies from/to remote sites.
337 Source and destination should have the user and host encoded
340 Source can be a list of files to copy to a single destination,
341 (in which case it is advised that the destination be a folder),
342 or a single file in a string.
345 # Parse destination as <user>@<server>:<path>
346 if isinstance(dest, str) and ':' in dest:
347 remspec, path = dest.split(':', 1)
348 elif isinstance(source, str) and ':' in source:
349 remspec, path = source.split(':', 1)
351 raise ValueError("Both endpoints cannot be local")
352 user, host = remspec.rsplit('@', 1)
355 tmp_known_hosts = None
357 args = ['scp', '-q', '-p', '-C',
358 # 2015-06-01 Thierry: I am commenting off blowfish
359 # as this is not available on a plain ubuntu 15.04 install
360 # this IMHO is too fragile, shoud be something the user
361 # decides explicitly (so he is at least aware of that dependency)
362 # Speed up transfer using blowfish cypher specification which is
363 # faster than the default one (3des)
365 # Don't bother with localhost. Makes test easier
366 '-o', 'NoHostAuthenticationForLocalhost=yes',
367 '-o', 'ConnectTimeout=60',
368 '-o', 'ConnectionAttempts=3',
369 '-o', 'ServerAliveInterval=30',
370 '-o', 'TCPKeepAlive=yes' ]
373 args.append('-P{}'.format(port))
376 proxycommand = _proxy_command(gw, gwuser, identity)
377 args.extend(['-o', proxycommand])
383 identity = os.path.expanduser(identity)
384 args.extend(('-i', identity))
387 # Create a temporary server key file
388 tmp_known_hosts = make_server_key_args(server_key, host, port)
389 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
391 if not strict_host_checking:
392 # Do not check for Host key. Unsafe.
393 args.extend(['-o', 'StrictHostKeyChecking=no'])
395 if isinstance(source, list):
398 if openssh_has_persist():
400 '-o', 'ControlMaster=auto',
401 '-o', 'ControlPath={}'.format(make_control_path(False, False))
405 if isinstance(dest, list):
410 log_msg = " rcopy - host {} - command {} ".format(host, pretty_args(args))
412 return _retry_rexec(args, log_msg, env = None, retry = retry,
413 tmp_known_hosts = tmp_known_hosts,
416 def rspawn(command, pidfile,
417 stdout = '/dev/null',
432 strict_host_checking = True):
434 Spawn a remote command such that it will continue working asynchronously in
437 :param command: The command to run, it should be a single line.
440 :param pidfile: Path to a file where to store the pid and ppid of the
444 :param stdout: Path to file to redirect standard output.
445 The default value is /dev/null
448 :param stderr: Path to file to redirect standard error.
449 If the special STDOUT value is used, stderr will
450 be redirected to the same file as stdout
453 :param stdin: Path to a file with input to be piped into the command's standard input
456 :param home: Path to working directory folder.
457 It is assumed to exist unless the create_home flag is set.
460 :param create_home: Flag to force creation of the home folder before
462 :type create_home: bool
464 :param sudo: Flag forcing execution with sudo user
469 (stdout, stderr), process
471 Of the spawning process, which only captures errors at spawning time.
472 Usually only useful for diagnostics.
474 # Start process in a "daemonized" way, using nohup and heavy
475 # stdin/out redirection to avoid connection issues
479 stderr = ' ' + stderr
481 daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {pidfile} ; }}'\
482 .format(command = command,
483 pidfile = shell_escape(pidfile),
488 cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} nohup bash -c {command} "\
489 .format(command = shell_escape(daemon_command),
490 sudo = 'sudo -S' if sudo else '',
491 pidfile = shell_escape(pidfile),
492 gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
493 create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home and home else '')
495 (out, err), proc = rexec(
504 server_key = server_key,
506 strict_host_checking = strict_host_checking ,
510 raise RuntimeError("Failed to set up application on host {}: {} {}".format(host, out, err))
512 return ((out, err), proc)
524 strict_host_checking = True):
526 Returns the pid and ppid of a process from a remote file where the
527 information was stored.
529 :param home: Path to directory where the pidfile is located
532 :param pidfile: Name of file containing the pid information
537 A (pid, ppid) tuple useful for calling rstatus and rkill,
538 or None if the pidfile isn't valid yet (can happen when process is staring up)
541 (out, err), proc = rexec(
542 "cat {}".format(pidfile),
550 server_key = server_key,
551 strict_host_checking = strict_host_checking
559 return [ int(x) for x in out.strip().split(' ', 1) ]
561 # Ignore, many ways to fail that don't matter that much
565 def rstatus(pid, ppid,
574 strict_host_checking = True):
576 Returns a code representing the the status of a remote process
578 :param pid: Process id of the process
581 :param ppid: Parent process id of process
584 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
587 (out, err), proc = rexec(
588 # Check only by pid. pid+ppid does not always work (especially with sudo)
589 " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait') || echo 'done' ) | tail -n 1"\
598 server_key = server_key,
599 strict_host_checking = strict_host_checking
603 return ProcStatus.NOT_STARTED
607 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
610 status = (out.strip() == 'wait')
612 return ProcStatus.NOT_STARTED
613 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
627 strict_host_checking = True):
629 Sends a kill signal to a remote process.
631 First tries a SIGTERM, and if the process does not end in 10 seconds,
634 :param pid: Process id of process to be killed
637 :param ppid: Parent process id of process to be killed
640 :param sudo: Flag indicating if sudo should be used to kill the process
644 subkill = "$(ps --ppid {} -o pid h)".format(pid)
646 SUBKILL="{subkill}" ;
647 {sudo} kill -- -{pid} $SUBKILL || /bin/true
648 {sudo} kill {pid} $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
651 if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
654 {sudo} kill -- -{pid} $SUBKILL || /bin/true
655 {sudo} kill {pid} $SUBKILL || /bin/true
659 if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
660 {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
661 {sudo} kill -9 {pid} $SUBKILL || /bin/true
665 cmd_format = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd_format)
667 sudo = 'sudo -S' if sudo else ''
668 (out, err), proc = rexec(
669 cmd_format.format(**locals()),
677 server_key = server_key,
678 strict_host_checking = strict_host_checking
681 # wait, don't leave zombies around
684 return (out, err), proc
686 # add quotes around a shell arg only if it has spaces
687 def pretty_arg(shell_arg):
688 return shell_arg if ' ' not in shell_arg else "'{}'".format(shell_arg)
689 def pretty_args(shell_args):
690 return " ".join([pretty_arg(shell_arg) for shell_arg in shell_args])
692 def _retry_rexec(args,
694 stdout = subprocess.PIPE,
695 stdin = subprocess.PIPE,
696 stderr = subprocess.PIPE,
699 tmp_known_hosts = None,
702 for x in range(retry):
703 # display command actually invoked when debug is turned on
704 message = pretty_args(args)
705 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
706 extras = {} if PY2 else {'universal_newlines' : True}
707 # connects to the remote host and starts a remote connection
708 proc = subprocess.Popen(
716 # attach tempfile object to the process, to make sure the file stays
717 # alive until the process is finished with it
718 proc._known_hosts = tmp_known_hosts
720 # The argument block == False forces to rexec to return immediately,
725 #(out, err) = proc.communicate()
726 # The method communicate was re implemented for performance issues
727 # when using python subprocess communicate method the ssh commands
728 # last one minute each
729 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
730 out, err = _communicate(proc, input=None)
731 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
734 out = proc.stdout.read()
735 if proc.poll() and stderr:
736 err = proc.stderr.read()
738 log(log_msg, logging.DEBUG, out, err)
743 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
744 # SSH error, can safely retry
747 # Probably timed out or plain failed but can retry
752 msg = "SLEEPING {} ... ATEMPT {} - command {} "\
753 .format(t, x, " ".join(args))
754 log(msg, logging.DEBUG)
759 except RuntimeError as e:
760 msg = " rexec EXCEPTION - TIMEOUT -> {} \n {}".format(e.args, log_msg)
761 log(msg, logging.DEBUG, out, err)
767 return ((out, err), proc)
770 # Don't remove. The method communicate was re implemented for performance issues
771 def _communicate(proc, input, timeout=None, err_on_timeout=True):
774 stdout = None # Return
775 stderr = None # Return
779 if timeout is not None:
780 timelimit = time.time() + timeout
781 killtime = timelimit + 4
782 bailtime = timelimit + 4
785 # Flush stdio buffer. This might block, if the user has
786 # been writing to .stdin in an uncontrolled fashion.
789 write_set.append(proc.stdin)
794 read_set.append(proc.stdout)
798 read_set.append(proc.stderr)
802 while read_set or write_set:
803 if timeout is not None:
804 curtime = time.time()
805 if timeout is None or curtime > timelimit:
806 if curtime > bailtime:
808 elif curtime > killtime:
809 signum = signal.SIGKILL
811 signum = signal.SIGTERM
813 os.kill(proc.pid, signum)
816 select_timeout = timelimit - curtime + 0.1
820 if select_timeout > 1.0:
824 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
825 except select.error as e:
831 if not rlist and not wlist and not xlist and proc.poll() is not None:
832 # timeout and process exited, say bye
835 if proc.stdin in wlist:
836 # When select has indicated that the file is writable,
837 # we can write up to PIPE_BUF bytes without risk
838 # blocking. POSIX defines PIPE_BUF >= 512
839 bytes_written = os.write(proc.stdin.fileno(),
840 buffer(input, input_offset, 512))
841 input_offset += bytes_written
843 if input_offset >= len(input):
845 write_set.remove(proc.stdin)
847 # xxx possible distortion when upgrading to python3
848 # original py2 version used to do
849 # data = os.read(proc.stdout.fileno(), 1024)
850 # but this would return bytes, so..
851 if proc.stdout in rlist:
852 data = proc.stdout.read()
855 read_set.remove(proc.stdout)
857 log("have read {} bytes from stdout".format(len(stdout)))
860 if proc.stderr in rlist:
861 data = proc.stderr.read()
864 read_set.remove(proc.stderr)
866 log("have read {} bytes from stderr".format(len(stdout)))
868 # All data exchanged. Translate lists into strings.
869 if stdout is not None:
870 stdout = ''.join(stdout)
871 if stderr is not None:
872 stderr = ''.join(stderr)
874 # Translate newlines, if requested. We cannot let the file
875 # object do the translation: It is based on stdio, which is
876 # impossible to combine with select (unless forcing no
878 # this however seems to make no sense in the context of python3
880 if proc.universal_newlines and hasattr(file, 'newlines'):
882 stdout = proc._translate_newlines(stdout)
884 stderr = proc._translate_newlines(stderr)
886 if killed and err_on_timeout:
887 errcode = proc.poll()
888 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
894 return (stdout, stderr)
896 def _proxy_command(gw, gwuser, gwidentity):
898 Constructs the SSH ProxyCommand option to add to the SSH command to connect
900 :param gw: SSH proxy hostname
903 :param gwuser: SSH proxy username
906 :param gwidentity: SSH proxy identity file
907 :type gwidentity: str
912 returns the SSH ProxyCommand option.
915 proxycommand = 'ProxyCommand=ssh -q '
917 proxycommand += '-i {} '.format(os.path.expanduser(gwidentity))
919 proxycommand += '{}'.format(gwuser)
922 proxycommand += '@{} -W %h:%p'.format(gw)