2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
41 logger = logging.getLogger("sshfuncs")
43 def log(msg, level = logging.DEBUG, out = None, err = None):
45 msg += " - OUT: %s " % out
47 msg += " - ERROR: %s " % err
48 logger.log(level, msg)
50 if hasattr(os, "devnull"):
53 DEV_NULL = "/dev/null"
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
59 Special value that when given to rspawn in stderr causes stderr to
60 redirect to whatever stdout was redirected to.
66 Codes for status of remote spawned process
68 # Process is still running
74 # Process hasn't started running yet (this should be very rare)
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
80 def resolve_hostname(host):
83 if host in ["localhost", "127.0.0.1", "::1"]:
84 extras = {} if PY2 else {'universal_newlines' : True}
88 stdout=subprocess.PIPE,
89 stderr=subprocess.PIPE,
92 stdout, stderr = p.communicate()
93 m = _re_inet.findall(stdout)
94 ip = m[0][1].split("/")[0]
96 ip = socket.gethostbyname(host)
100 def gethostbyname(host):
101 global hostbyname_cache
102 global hostbyname_cache_lock
104 hostbyname = hostbyname_cache.get(host)
106 with hostbyname_cache_lock:
107 hostbyname = resolve_hostname(host)
108 hostbyname_cache[host] = hostbyname
110 msg = " Added hostbyname %s - %s " % (host, hostbyname)
111 log(msg, logging.DEBUG)
115 OPENSSH_HAS_PERSIST = None
117 def openssh_has_persist():
118 """ The ssh_config options ControlMaster and ControlPersist allow to
119 reuse a same network connection for multiple ssh sessions. In this
120 way limitations on number of open ssh connections can be bypassed.
121 However, older versions of openSSH do not support this feature.
122 This function is used to determine if ssh connection persist features
125 global OPENSSH_HAS_PERSIST
126 if OPENSSH_HAS_PERSIST is None:
127 extras = {} if PY2 else {'universal_newlines' : True}
128 with open("/dev/null") as null:
129 proc = subprocess.Popen(
131 stdout = subprocess.PIPE,
132 stderr = subprocess.STDOUT,
136 out,err = proc.communicate()
139 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
140 OPENSSH_HAS_PERSIST = bool(vre.match(out))
141 return OPENSSH_HAS_PERSIST
143 def make_server_key_args(server_key, host, port):
144 """ Returns a reference to a temporary known_hosts file, to which
145 the server key has been added.
147 Make sure to hold onto the temp file reference until the process is
150 :param server_key: the server public key
151 :type server_key: str
153 :param host: the hostname
156 :param port: the ssh port
161 host = '%s:%s' % (host, str(port))
163 # Create a temporary server key file
164 tmp_known_hosts = tempfile.NamedTemporaryFile()
166 hostbyname = gethostbyname(host)
168 # Add the intended host key
169 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
171 # If we're not in strict mode, add user-configured keys
172 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
173 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
174 if os.access(user_hosts_path, os.R_OK):
175 with open(user_hosts_path, "r") as f:
176 tmp_known_hosts.write(f.read())
178 tmp_known_hosts.flush()
180 return tmp_known_hosts
182 def make_control_path(agent, forward_x11):
183 ctrl_path = "/tmp/nepi_ssh"
191 ctrl_path += "-%r@%h:%p"
196 """ Escapes strings so that they are safe to use as command-line
198 if SHELL_SAFE.match(s):
199 # safe string - no escaping needed
202 # unsafe string - escape
204 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
207 return "'$'\\x%02x''" % (ord(c),)
208 s = ''.join(map(escape, s))
211 def eintr_retry(func):
212 """Retries a function invocation when a EINTR occurs"""
214 @functools.wraps(func)
216 retry = kw.pop("_retry", False)
217 for i in range(0 if retry else 4):
219 return func(*p, **kw)
220 except (select.error, socket.error) as args:
221 if args[0] == errno.EINTR:
226 if e.errno == errno.EINTR:
231 return func(*p, **kw)
234 def rexec(command, host, user,
244 connect_timeout = 30,
249 strict_host_checking = True):
251 Executes a remote command, returns ((stdout,stderr),process)
254 tmp_known_hosts = None
256 hostip = gethostbyname(host)
260 # Don't bother with localhost. Makes test easier
261 '-o', 'NoHostAuthenticationForLocalhost=yes',
262 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
263 '-o', 'ConnectionAttempts=3',
264 '-o', 'ServerAliveInterval=30',
265 '-o', 'TCPKeepAlive=yes',
266 '-o', 'Batchmode=yes',
267 '-l', user, hostip or host]
269 if persistent and openssh_has_persist():
271 '-o', 'ControlMaster=auto',
272 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
273 '-o', 'ControlPersist=60' ])
275 if not strict_host_checking:
276 # Do not check for Host key. Unsafe.
277 args.extend(['-o', 'StrictHostKeyChecking=no'])
280 proxycommand = _proxy_command(gw, gwuser, identity)
281 args.extend(['-o', proxycommand])
287 args.append('-p%d' % port)
290 identity = os.path.expanduser(identity)
291 args.extend(('-i', identity))
301 # Create a temporary server key file
302 tmp_known_hosts = make_server_key_args(server_key, host, port)
303 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
306 command = "sudo " + command
310 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
312 stdout = stderr = stdin = subprocess.PIPE
314 stdout = stderr = stdin = None
316 return _retry_rexec(args, log_msg,
322 tmp_known_hosts = tmp_known_hosts,
325 def rcopy(source, dest,
333 strict_host_checking = True):
335 Copies from/to remote sites.
337 Source and destination should have the user and host encoded
340 Source can be a list of files to copy to a single destination,
341 (in which case it is advised that the destination be a folder),
342 or a single file in a string.
345 # Parse destination as <user>@<server>:<path>
346 if isinstance(dest, str) and ':' in dest:
347 remspec, path = dest.split(':',1)
348 elif isinstance(source, str) and ':' in source:
349 remspec, path = source.split(':',1)
351 raise ValueError("Both endpoints cannot be local")
352 user,host = remspec.rsplit('@',1)
355 tmp_known_hosts = None
357 args = ['scp', '-q', '-p', '-C',
358 # 2015-06-01 Thierry: I am commenting off blowfish
359 # as this is not available on a plain ubuntu 15.04 install
360 # this IMHO is too fragile, shoud be something the user
361 # decides explicitly (so he is at least aware of that dependency)
362 # Speed up transfer using blowfish cypher specification which is
363 # faster than the default one (3des)
365 # Don't bother with localhost. Makes test easier
366 '-o', 'NoHostAuthenticationForLocalhost=yes',
367 '-o', 'ConnectTimeout=60',
368 '-o', 'ConnectionAttempts=3',
369 '-o', 'ServerAliveInterval=30',
370 '-o', 'TCPKeepAlive=yes' ]
373 args.append('-P%d' % port)
376 proxycommand = _proxy_command(gw, gwuser, identity)
377 args.extend(['-o', proxycommand])
383 identity = os.path.expanduser(identity)
384 args.extend(('-i', identity))
387 # Create a temporary server key file
388 tmp_known_hosts = make_server_key_args(server_key, host, port)
389 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
391 if not strict_host_checking:
392 # Do not check for Host key. Unsafe.
393 args.extend(['-o', 'StrictHostKeyChecking=no'])
395 if isinstance(source, list):
398 if openssh_has_persist():
400 '-o', 'ControlMaster=auto',
401 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
405 if isinstance(dest, list):
410 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
412 return _retry_rexec(args, log_msg, env = None, retry = retry,
413 tmp_known_hosts = tmp_known_hosts,
416 def rspawn(command, pidfile,
417 stdout = '/dev/null',
432 strict_host_checking = True):
434 Spawn a remote command such that it will continue working asynchronously in
437 :param command: The command to run, it should be a single line.
440 :param pidfile: Path to a file where to store the pid and ppid of the
444 :param stdout: Path to file to redirect standard output.
445 The default value is /dev/null
448 :param stderr: Path to file to redirect standard error.
449 If the special STDOUT value is used, stderr will
450 be redirected to the same file as stdout
453 :param stdin: Path to a file with input to be piped into the command's standard input
456 :param home: Path to working directory folder.
457 It is assumed to exist unless the create_home flag is set.
460 :param create_home: Flag to force creation of the home folder before
462 :type create_home: bool
464 :param sudo: Flag forcing execution with sudo user
469 (stdout, stderr), process
471 Of the spawning process, which only captures errors at spawning time.
472 Usually only useful for diagnostics.
474 # Start process in a "daemonized" way, using nohup and heavy
475 # stdin/out redirection to avoid connection issues
479 stderr = ' ' + stderr
481 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
483 'pidfile' : shell_escape(pidfile),
489 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
490 'command' : shell_escape(daemon_command),
491 'sudo' : 'sudo -S' if sudo else '',
492 'pidfile' : shell_escape(pidfile),
493 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
494 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
497 (out,err),proc = rexec(
506 server_key = server_key,
508 strict_host_checking = strict_host_checking ,
512 raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
514 return ((out, err), proc)
526 strict_host_checking = True):
528 Returns the pid and ppid of a process from a remote file where the
529 information was stored.
531 :param home: Path to directory where the pidfile is located
534 :param pidfile: Name of file containing the pid information
539 A (pid, ppid) tuple useful for calling rstatus and rkill,
540 or None if the pidfile isn't valid yet (can happen when process is staring up)
543 (out,err),proc = rexec(
544 "cat %(pidfile)s" % {
554 server_key = server_key,
555 strict_host_checking = strict_host_checking
563 return [ int(x) for x in out.strip().split(' ', 1) ]
565 # Ignore, many ways to fail that don't matter that much
569 def rstatus(pid, ppid,
578 strict_host_checking = True):
580 Returns a code representing the the status of a remote process
582 :param pid: Process id of the process
585 :param ppid: Parent process id of process
588 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
591 (out,err),proc = rexec(
592 # Check only by pid. pid+ppid does not always work (especially with sudo)
593 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
604 server_key = server_key,
605 strict_host_checking = strict_host_checking
609 return ProcStatus.NOT_STARTED
613 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
616 status = (out.strip() == 'wait')
618 return ProcStatus.NOT_STARTED
619 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
633 strict_host_checking = True):
635 Sends a kill signal to a remote process.
637 First tries a SIGTERM, and if the process does not end in 10 seconds,
640 :param pid: Process id of process to be killed
643 :param ppid: Parent process id of process to be killed
646 :param sudo: Flag indicating if sudo should be used to kill the process
650 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
652 SUBKILL="%(subkill)s" ;
653 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
654 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
655 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
657 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
660 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
661 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
665 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
666 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
667 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
671 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
673 (out,err),proc = rexec(
677 'sudo' : 'sudo -S' if sudo else '',
687 server_key = server_key,
688 strict_host_checking = strict_host_checking
691 # wait, don't leave zombies around
694 return (out, err), proc
696 def _retry_rexec(args,
698 stdout = subprocess.PIPE,
699 stdin = subprocess.PIPE,
700 stderr = subprocess.PIPE,
703 tmp_known_hosts = None,
706 for x in range(retry):
707 # display command actually invoked when debug is turned on
708 message = " ".join( [ "'{}'".format(arg) for arg in args ] )
709 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
710 extras = {} if PY2 else {'universal_newlines' : True}
711 # connects to the remote host and starts a remote connection
712 proc = subprocess.Popen(
720 # attach tempfile object to the process, to make sure the file stays
721 # alive until the process is finished with it
722 proc._known_hosts = tmp_known_hosts
724 # The argument block == False forces to rexec to return immediately,
729 #(out, err) = proc.communicate()
730 # The method communicate was re implemented for performance issues
731 # when using python subprocess communicate method the ssh commands
732 # last one minute each
733 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
734 out, err = _communicate(proc, input=None)
735 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
738 out = proc.stdout.read()
739 if proc.poll() and stderr:
740 err = proc.stderr.read()
742 log(log_msg, logging.DEBUG, out, err)
747 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
748 # SSH error, can safely retry
751 # Probably timed out or plain failed but can retry
756 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
757 t, x, " ".join(args))
758 log(msg, logging.DEBUG)
763 except RuntimeError as e:
764 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
765 log(msg, logging.DEBUG, out, err)
771 return ((out, err), proc)
774 # Don't remove. The method communicate was re implemented for performance issues
775 def _communicate(proc, input, timeout=None, err_on_timeout=True):
778 stdout = None # Return
779 stderr = None # Return
783 if timeout is not None:
784 timelimit = time.time() + timeout
785 killtime = timelimit + 4
786 bailtime = timelimit + 4
789 # Flush stdio buffer. This might block, if the user has
790 # been writing to .stdin in an uncontrolled fashion.
793 write_set.append(proc.stdin)
798 read_set.append(proc.stdout)
802 read_set.append(proc.stderr)
806 while read_set or write_set:
807 if timeout is not None:
808 curtime = time.time()
809 if timeout is None or curtime > timelimit:
810 if curtime > bailtime:
812 elif curtime > killtime:
813 signum = signal.SIGKILL
815 signum = signal.SIGTERM
817 os.kill(proc.pid, signum)
820 select_timeout = timelimit - curtime + 0.1
824 if select_timeout > 1.0:
828 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
829 except select.error as e:
835 if not rlist and not wlist and not xlist and proc.poll() is not None:
836 # timeout and process exited, say bye
839 if proc.stdin in wlist:
840 # When select has indicated that the file is writable,
841 # we can write up to PIPE_BUF bytes without risk
842 # blocking. POSIX defines PIPE_BUF >= 512
843 bytes_written = os.write(proc.stdin.fileno(),
844 buffer(input, input_offset, 512))
845 input_offset += bytes_written
847 if input_offset >= len(input):
849 write_set.remove(proc.stdin)
851 # xxx possible distortion when upgrading to python3
852 # original py2 version used to do
853 # data = os.read(proc.stdout.fileno(), 1024)
854 # but this would return bytes, so..
855 if proc.stdout in rlist:
856 data = proc.stdout.read()
859 read_set.remove(proc.stdout)
863 if proc.stderr in rlist:
864 data = proc.stderr.read()
867 read_set.remove(proc.stderr)
870 # All data exchanged. Translate lists into strings.
871 if stdout is not None:
872 stdout = ''.join(stdout)
873 if stderr is not None:
874 stderr = ''.join(stderr)
876 # Translate newlines, if requested. We cannot let the file
877 # object do the translation: It is based on stdio, which is
878 # impossible to combine with select (unless forcing no
880 # this however seems to make no sense in the context of python3
882 if proc.universal_newlines and hasattr(file, 'newlines'):
884 stdout = proc._translate_newlines(stdout)
886 stderr = proc._translate_newlines(stderr)
888 if killed and err_on_timeout:
889 errcode = proc.poll()
890 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
896 return (stdout, stderr)
898 def _proxy_command(gw, gwuser, gwidentity):
900 Constructs the SSH ProxyCommand option to add to the SSH command to connect
902 :param gw: SSH proxy hostname
905 :param gwuser: SSH proxy username
908 :param gwidentity: SSH proxy identity file
909 :type gwidentity: str
914 returns the SSH ProxyCommand option.
917 proxycommand = 'ProxyCommand=ssh -q '
919 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
921 proxycommand += '%s' % gwuser
924 proxycommand += '@%s -W %%h:%%p' % gw