2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
41 logger = logging.getLogger("sshfuncs")
43 def log(msg, level = logging.DEBUG, out = None, err = None):
45 msg += " - OUT: {} ".format(out)
47 msg += " - ERROR: {} ".format(err)
48 logger.log(level, msg)
50 if hasattr(os, "devnull"):
53 DEV_NULL = "/dev/null"
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
59 Special value that when given to rspawn in stderr causes stderr to
60 redirect to whatever stdout was redirected to.
66 Codes for status of remote spawned process
68 # Process is still running
74 # Process hasn't started running yet (this should be very rare)
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
80 def resolve_hostname(host):
83 if host in ["localhost", "127.0.0.1", "::1"]:
84 extras = {} if PY2 else {'universal_newlines' : True}
88 stdout = subprocess.PIPE,
89 stderr = subprocess.PIPE,
92 stdout, stderr = p.communicate()
93 m = _re_inet.findall(stdout)
94 ip = m[0][1].split("/")[0]
96 ip = socket.gethostbyname(host)
100 def gethostbyname(host):
101 global hostbyname_cache
102 global hostbyname_cache_lock
104 hostbyname = hostbyname_cache.get(host)
106 with hostbyname_cache_lock:
107 hostbyname = resolve_hostname(host)
108 hostbyname_cache[host] = hostbyname
110 msg = " Added hostbyname {} - {} ".format(host, hostbyname)
111 log(msg, logging.DEBUG)
115 OPENSSH_HAS_PERSIST = None
117 def openssh_has_persist():
118 """ The ssh_config options ControlMaster and ControlPersist allow to
119 reuse a same network connection for multiple ssh sessions. In this
120 way limitations on number of open ssh connections can be bypassed.
121 However, older versions of openSSH do not support this feature.
122 This function is used to determine if ssh connection persist features
125 global OPENSSH_HAS_PERSIST
126 if OPENSSH_HAS_PERSIST is None:
127 extras = {} if PY2 else {'universal_newlines' : True}
128 with open("/dev/null") as null:
129 proc = subprocess.Popen(
131 stdout = subprocess.PIPE,
132 stderr = subprocess.STDOUT,
136 out, err = proc.communicate()
139 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
140 OPENSSH_HAS_PERSIST = bool(vre.match(out))
141 return OPENSSH_HAS_PERSIST
143 def make_server_key_args(server_key, host, port):
144 """ Returns a reference to a temporary known_hosts file, to which
145 the server key has been added.
147 Make sure to hold onto the temp file reference until the process is
150 :param server_key: the server public key
151 :type server_key: str
153 :param host: the hostname
156 :param port: the ssh port
161 host = '{}:{}'.format(host, str(port))
163 # Create a temporary server key file
164 tmp_known_hosts = tempfile.NamedTemporaryFile()
166 hostbyname = gethostbyname(host)
168 # Add the intended host key
169 tmp_known_hosts.write('{},{} {}\n'.format(host, hostbyname, server_key))
171 # If we're not in strict mode, add user-configured keys
172 if os.environ.get('NEPI_STRICT_AUTH_MODE', "").lower() not in ('1', 'true', 'on'):
173 user_hosts_path = '{}/.ssh/known_hosts'.format(os.environ.get('HOME', ""))
174 if os.access(user_hosts_path, os.R_OK):
175 with open(user_hosts_path, "r") as f:
176 tmp_known_hosts.write(f.read())
178 tmp_known_hosts.flush()
180 return tmp_known_hosts
182 def make_control_path(agent, forward_x11):
183 ctrl_path = "/tmp/nepi_ssh"
191 ctrl_path += "-%r@%h:%p"
196 """ Escapes strings so that they are safe to use as command-line
198 if SHELL_SAFE.match(s):
199 # safe string - no escaping needed
202 # unsafe string - escape
204 if (32 <= ord(c) < 127 or c in ('\r', '\n', '\t')) and c not in ("'", '"'):
207 return "'$'\\x{:02x}''".format(ord(c))
208 s = ''.join(map(escape, s))
209 return "'{}'".format(s)
211 def eintr_retry(func):
212 """Retries a function invocation when a EINTR occurs"""
214 @functools.wraps(func)
216 retry = kw.pop("_retry", False)
217 for i in range(0 if retry else 4):
219 return func(*p, **kw)
220 except (select.error, socket.error) as args:
221 if args[0] == errno.EINTR:
226 if e.errno == errno.EINTR:
231 return func(*p, **kw)
234 def rexec(command, host, user,
244 connect_timeout = 30,
249 strict_host_checking = True):
251 Executes a remote command, returns ((stdout, stderr), process)
254 tmp_known_hosts = None
256 hostip = gethostbyname(host)
260 # Don't bother with localhost. Makes test easier
261 '-o', 'NoHostAuthenticationForLocalhost=yes',
262 '-o', 'ConnectTimeout={}'.format(connect_timeout),
263 '-o', 'ConnectionAttempts=3',
264 '-o', 'ServerAliveInterval=30',
265 '-o', 'TCPKeepAlive=yes',
266 '-o', 'Batchmode=yes',
267 '-l', user, hostip or host]
269 if persistent and openssh_has_persist():
271 '-o', 'ControlMaster=auto',
272 '-o', 'ControlPath={}'.format(make_control_path(agent, forward_x11)),
273 '-o', 'ControlPersist=60' ])
275 if not strict_host_checking:
276 # Do not check for Host key. Unsafe.
277 args.extend(['-o', 'StrictHostKeyChecking=no'])
280 proxycommand = _proxy_command(gw, gwuser, identity)
281 args.extend(['-o', proxycommand])
287 args.append('-p{}'.format(port))
290 identity = os.path.expanduser(identity)
291 args.extend(('-i', identity))
301 # Create a temporary server key file
302 tmp_known_hosts = make_server_key_args(server_key, host, port)
303 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
306 command = "sudo " + command
310 log_msg = " rexec - host {} - command {} ".format(host, " ".join(map(str, args)))
312 stdout = stderr = stdin = subprocess.PIPE
314 stdout = stderr = stdin = None
316 return _retry_rexec(args, log_msg,
322 tmp_known_hosts = tmp_known_hosts,
325 def rcopy(source, dest,
333 strict_host_checking = True):
335 Copies from/to remote sites.
337 Source and destination should have the user and host encoded
340 Source can be a list of files to copy to a single destination,
341 (in which case it is advised that the destination be a folder),
342 or a single file in a string.
345 # Parse destination as <user>@<server>:<path>
346 if isinstance(dest, str) and ':' in dest:
347 remspec, path = dest.split(':', 1)
348 elif isinstance(source, str) and ':' in source:
349 remspec, path = source.split(':', 1)
351 raise ValueError("Both endpoints cannot be local")
352 user, host = remspec.rsplit('@', 1)
355 tmp_known_hosts = None
357 args = ['scp', '-q', '-p', '-C',
358 # 2015-06-01 Thierry: I am commenting off blowfish
359 # as this is not available on a plain ubuntu 15.04 install
360 # this IMHO is too fragile, shoud be something the user
361 # decides explicitly (so he is at least aware of that dependency)
362 # Speed up transfer using blowfish cypher specification which is
363 # faster than the default one (3des)
365 # Don't bother with localhost. Makes test easier
366 '-o', 'NoHostAuthenticationForLocalhost=yes',
367 '-o', 'ConnectTimeout=60',
368 '-o', 'ConnectionAttempts=3',
369 '-o', 'ServerAliveInterval=30',
370 '-o', 'TCPKeepAlive=yes' ]
373 args.append('-P{}'.format(port))
376 proxycommand = _proxy_command(gw, gwuser, identity)
377 args.extend(['-o', proxycommand])
383 identity = os.path.expanduser(identity)
384 args.extend(('-i', identity))
387 # Create a temporary server key file
388 tmp_known_hosts = make_server_key_args(server_key, host, port)
389 args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
391 if not strict_host_checking:
392 # Do not check for Host key. Unsafe.
393 args.extend(['-o', 'StrictHostKeyChecking=no'])
395 if isinstance(source, list):
398 if openssh_has_persist():
400 '-o', 'ControlMaster=auto',
401 '-o', 'ControlPath={}'.format(make_control_path(False, False))
405 if isinstance(dest, list):
410 log_msg = " rcopy - host {} - command {} ".format(host, " ".join(map(str, args)))
412 return _retry_rexec(args, log_msg, env = None, retry = retry,
413 tmp_known_hosts = tmp_known_hosts,
416 def rspawn(command, pidfile,
417 stdout = '/dev/null',
432 strict_host_checking = True):
434 Spawn a remote command such that it will continue working asynchronously in
437 :param command: The command to run, it should be a single line.
440 :param pidfile: Path to a file where to store the pid and ppid of the
444 :param stdout: Path to file to redirect standard output.
445 The default value is /dev/null
448 :param stderr: Path to file to redirect standard error.
449 If the special STDOUT value is used, stderr will
450 be redirected to the same file as stdout
453 :param stdin: Path to a file with input to be piped into the command's standard input
456 :param home: Path to working directory folder.
457 It is assumed to exist unless the create_home flag is set.
460 :param create_home: Flag to force creation of the home folder before
462 :type create_home: bool
464 :param sudo: Flag forcing execution with sudo user
469 (stdout, stderr), process
471 Of the spawning process, which only captures errors at spawning time.
472 Usually only useful for diagnostics.
474 # Start process in a "daemonized" way, using nohup and heavy
475 # stdin/out redirection to avoid connection issues
479 stderr = ' ' + stderr
481 daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {pidfile} ; }}'\
482 .format(command = command,
483 pidfile = shell_escape(pidfile),
488 cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} nohup bash -c {command} "\
489 .format(command = shell_escape(daemon_command),
490 sudo = 'sudo -S' if sudo else '',
491 pidfile = shell_escape(pidfile),
492 gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
493 create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home and home else '')
495 (out, err), proc = rexec(
504 server_key = server_key,
506 strict_host_checking = strict_host_checking ,
510 raise RuntimeError("Failed to set up application on host {}: {} {}".format(host, out, err))
512 return ((out, err), proc)
524 strict_host_checking = True):
526 Returns the pid and ppid of a process from a remote file where the
527 information was stored.
529 :param home: Path to directory where the pidfile is located
532 :param pidfile: Name of file containing the pid information
537 A (pid, ppid) tuple useful for calling rstatus and rkill,
538 or None if the pidfile isn't valid yet (can happen when process is staring up)
541 (out, err), proc = rexec(
542 "cat {}".format(pidfile),
550 server_key = server_key,
551 strict_host_checking = strict_host_checking
559 return [ int(x) for x in out.strip().split(' ', 1) ]
561 # Ignore, many ways to fail that don't matter that much
565 def rstatus(pid, ppid,
574 strict_host_checking = True):
576 Returns a code representing the the status of a remote process
578 :param pid: Process id of the process
581 :param ppid: Parent process id of process
584 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
587 (out, err), proc = rexec(
588 # Check only by pid. pid+ppid does not always work (especially with sudo)
589 " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait') || echo 'done' ) | tail -n 1"\
598 server_key = server_key,
599 strict_host_checking = strict_host_checking
603 return ProcStatus.NOT_STARTED
607 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
610 status = (out.strip() == 'wait')
612 return ProcStatus.NOT_STARTED
613 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
627 strict_host_checking = True):
629 Sends a kill signal to a remote process.
631 First tries a SIGTERM, and if the process does not end in 10 seconds,
634 :param pid: Process id of process to be killed
637 :param ppid: Parent process id of process to be killed
640 :param sudo: Flag indicating if sudo should be used to kill the process
644 subkill = "$(ps --ppid {} -o pid h)".format(pid)
646 SUBKILL="{subkill}" ;
647 {sudo} kill -- -{pid} $SUBKILL || /bin/true
648 {sudo} kill {pid} $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
651 if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
654 {sudo} kill -- -{pid} $SUBKILL || /bin/true
655 {sudo} kill {pid} $SUBKILL || /bin/true
659 if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
660 {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
661 {sudo} kill -9 {pid} $SUBKILL || /bin/true
665 cmd_format = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd_format)
667 sudo = 'sudo -S' if sudo else ''
668 (out, err), proc = rexec(
669 cmd_format.format(**locals()),
677 server_key = server_key,
678 strict_host_checking = strict_host_checking
681 # wait, don't leave zombies around
684 return (out, err), proc
686 def _retry_rexec(args,
688 stdout = subprocess.PIPE,
689 stdin = subprocess.PIPE,
690 stderr = subprocess.PIPE,
693 tmp_known_hosts = None,
696 for x in range(retry):
697 # display command actually invoked when debug is turned on
698 message = " ".join( [ "'{}'".format(arg) for arg in args ] )
699 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
700 extras = {} if PY2 else {'universal_newlines' : True}
701 # connects to the remote host and starts a remote connection
702 proc = subprocess.Popen(
710 # attach tempfile object to the process, to make sure the file stays
711 # alive until the process is finished with it
712 proc._known_hosts = tmp_known_hosts
714 # The argument block == False forces to rexec to return immediately,
719 #(out, err) = proc.communicate()
720 # The method communicate was re implemented for performance issues
721 # when using python subprocess communicate method the ssh commands
722 # last one minute each
723 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
724 out, err = _communicate(proc, input=None)
725 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
728 out = proc.stdout.read()
729 if proc.poll() and stderr:
730 err = proc.stderr.read()
732 log(log_msg, logging.DEBUG, out, err)
737 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
738 # SSH error, can safely retry
741 # Probably timed out or plain failed but can retry
746 msg = "SLEEPING {} ... ATEMPT {} - command {} "\
747 .format(t, x, " ".join(args))
748 log(msg, logging.DEBUG)
753 except RuntimeError as e:
754 msg = " rexec EXCEPTION - TIMEOUT -> {} \n {}".format(e.args, log_msg)
755 log(msg, logging.DEBUG, out, err)
761 return ((out, err), proc)
764 # Don't remove. The method communicate was re implemented for performance issues
765 def _communicate(proc, input, timeout=None, err_on_timeout=True):
768 stdout = None # Return
769 stderr = None # Return
773 if timeout is not None:
774 timelimit = time.time() + timeout
775 killtime = timelimit + 4
776 bailtime = timelimit + 4
779 # Flush stdio buffer. This might block, if the user has
780 # been writing to .stdin in an uncontrolled fashion.
783 write_set.append(proc.stdin)
788 read_set.append(proc.stdout)
792 read_set.append(proc.stderr)
796 while read_set or write_set:
797 if timeout is not None:
798 curtime = time.time()
799 if timeout is None or curtime > timelimit:
800 if curtime > bailtime:
802 elif curtime > killtime:
803 signum = signal.SIGKILL
805 signum = signal.SIGTERM
807 os.kill(proc.pid, signum)
810 select_timeout = timelimit - curtime + 0.1
814 if select_timeout > 1.0:
818 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
819 except select.error as e:
825 if not rlist and not wlist and not xlist and proc.poll() is not None:
826 # timeout and process exited, say bye
829 if proc.stdin in wlist:
830 # When select has indicated that the file is writable,
831 # we can write up to PIPE_BUF bytes without risk
832 # blocking. POSIX defines PIPE_BUF >= 512
833 bytes_written = os.write(proc.stdin.fileno(),
834 buffer(input, input_offset, 512))
835 input_offset += bytes_written
837 if input_offset >= len(input):
839 write_set.remove(proc.stdin)
841 # xxx possible distortion when upgrading to python3
842 # original py2 version used to do
843 # data = os.read(proc.stdout.fileno(), 1024)
844 # but this would return bytes, so..
845 if proc.stdout in rlist:
846 data = proc.stdout.read()
849 read_set.remove(proc.stdout)
853 if proc.stderr in rlist:
854 data = proc.stderr.read()
857 read_set.remove(proc.stderr)
860 # All data exchanged. Translate lists into strings.
861 if stdout is not None:
862 stdout = ''.join(stdout)
863 if stderr is not None:
864 stderr = ''.join(stderr)
866 # Translate newlines, if requested. We cannot let the file
867 # object do the translation: It is based on stdio, which is
868 # impossible to combine with select (unless forcing no
870 # this however seems to make no sense in the context of python3
872 if proc.universal_newlines and hasattr(file, 'newlines'):
874 stdout = proc._translate_newlines(stdout)
876 stderr = proc._translate_newlines(stderr)
878 if killed and err_on_timeout:
879 errcode = proc.poll()
880 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
886 return (stdout, stderr)
888 def _proxy_command(gw, gwuser, gwidentity):
890 Constructs the SSH ProxyCommand option to add to the SSH command to connect
892 :param gw: SSH proxy hostname
895 :param gwuser: SSH proxy username
898 :param gwidentity: SSH proxy identity file
899 :type gwidentity: str
904 returns the SSH ProxyCommand option.
907 proxycommand = 'ProxyCommand=ssh -q '
909 proxycommand += '-i {} '.format(os.path.expanduser(gwidentity))
911 proxycommand += '{}'.format(gwuser)
914 proxycommand += '@{} -W %h:%p'.format(gw)