2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
39 logger = logging.getLogger("sshfuncs")
41 def log(msg, level = logging.DEBUG, out = None, err = None):
43 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
46 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
64 Codes for status of remote spawned process
66 # Process is still running
72 # Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def resolve_hostname(host):
81 if host in ["localhost", "127.0.0.1", "::1"]:
85 stdout=subprocess.PIPE,
86 stderr=subprocess.PIPE,
88 stdout, stderr = p.communicate()
89 m = _re_inet.findall(stdout)
90 ip = m[0][1].split("/")[0]
92 ip = socket.gethostbyname(host)
96 def gethostbyname(host):
97 global hostbyname_cache
98 global hostbyname_cache_lock
100 hostbyname = hostbyname_cache.get(host)
102 with hostbyname_cache_lock:
103 hostbyname = resolve_hostname(host)
104 hostbyname_cache[host] = hostbyname
106 msg = " Added hostbyname %s - %s " % (host, hostbyname)
107 log(msg, logging.DEBUG)
111 OPENSSH_HAS_PERSIST = None
113 def openssh_has_persist():
114 """ The ssh_config options ControlMaster and ControlPersist allow to
115 reuse a same network connection for multiple ssh sessions. In this
116 way limitations on number of open ssh connections can be bypassed.
117 However, older versions of openSSH do not support this feature.
118 This function is used to determine if ssh connection persist features
121 global OPENSSH_HAS_PERSIST
122 if OPENSSH_HAS_PERSIST is None:
123 proc = subprocess.Popen(
125 stdout = subprocess.PIPE,
126 stderr = subprocess.STDOUT,
127 stdin = open("/dev/null"),
129 out,err = proc.communicate()
132 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
133 OPENSSH_HAS_PERSIST = bool(vre.match(out))
134 return OPENSSH_HAS_PERSIST
136 def make_server_key_args(server_key, host, port):
137 """ Returns a reference to a temporary known_hosts file, to which
138 the server key has been added.
140 Make sure to hold onto the temp file reference until the process is
143 :param server_key: the server public key
144 :type server_key: str
146 :param host: the hostname
149 :param port: the ssh port
154 host = '%s:%s' % (host, str(port))
156 # Create a temporary server key file
157 tmp_known_hosts = tempfile.NamedTemporaryFile()
159 hostbyname = gethostbyname(host)
161 # Add the intended host key
162 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
164 # If we're not in strict mode, add user-configured keys
165 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
166 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
167 if os.access(user_hosts_path, os.R_OK):
168 f = open(user_hosts_path, "r")
169 tmp_known_hosts.write(f.read())
172 tmp_known_hosts.flush()
174 return tmp_known_hosts
176 def make_control_path(agent, forward_x11):
177 ctrl_path = "/tmp/nepi_ssh"
185 ctrl_path += "-%r@%h:%p"
190 """ Escapes strings so that they are safe to use as command-line
192 if SHELL_SAFE.match(s):
193 # safe string - no escaping needed
196 # unsafe string - escape
198 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
201 return "'$'\\x%02x''" % (ord(c),)
202 s = ''.join(map(escape, s))
205 def eintr_retry(func):
206 """Retries a function invocation when a EINTR occurs"""
208 @functools.wraps(func)
210 retry = kw.pop("_retry", False)
211 for i in range(0 if retry else 4):
213 return func(*p, **kw)
214 except (select.error, socket.error) as args:
215 if args[0] == errno.EINTR:
220 if e.errno == errno.EINTR:
225 return func(*p, **kw)
228 def rexec(command, host, user,
238 connect_timeout = 30,
243 strict_host_checking = True):
245 Executes a remote command, returns ((stdout,stderr),process)
248 tmp_known_hosts = None
250 hostip = gethostbyname(host)
254 # Don't bother with localhost. Makes test easier
255 '-o', 'NoHostAuthenticationForLocalhost=yes',
256 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
257 '-o', 'ConnectionAttempts=3',
258 '-o', 'ServerAliveInterval=30',
259 '-o', 'TCPKeepAlive=yes',
260 '-o', 'Batchmode=yes',
261 '-l', user, hostip or host]
263 if persistent and openssh_has_persist():
265 '-o', 'ControlMaster=auto',
266 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
267 '-o', 'ControlPersist=60' ])
269 if not strict_host_checking:
270 # Do not check for Host key. Unsafe.
271 args.extend(['-o', 'StrictHostKeyChecking=no'])
274 proxycommand = _proxy_command(gw, gwuser, identity)
275 args.extend(['-o', proxycommand])
281 args.append('-p%d' % port)
284 identity = os.path.expanduser(identity)
285 args.extend(('-i', identity))
295 # Create a temporary server key file
296 tmp_known_hosts = make_server_key_args(server_key, host, port)
297 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
300 command = "sudo " + command
304 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
306 stdout = stderr = stdin = subprocess.PIPE
308 stdout = stderr = stdin = None
310 return _retry_rexec(args, log_msg,
316 tmp_known_hosts = tmp_known_hosts,
319 def rcopy(source, dest,
327 strict_host_checking = True):
329 Copies from/to remote sites.
331 Source and destination should have the user and host encoded
334 Source can be a list of files to copy to a single destination,
335 (in which case it is advised that the destination be a folder),
336 or a single file in a string.
339 # Parse destination as <user>@<server>:<path>
340 if isinstance(dest, str) and ':' in dest:
341 remspec, path = dest.split(':',1)
342 elif isinstance(source, str) and ':' in source:
343 remspec, path = source.split(':',1)
345 raise ValueError("Both endpoints cannot be local")
346 user,host = remspec.rsplit('@',1)
349 tmp_known_hosts = None
351 args = ['scp', '-q', '-p', '-C',
352 # 2015-06-01 Thierry: I am commenting off blowfish
353 # as this is not available on a plain ubuntu 15.04 install
354 # this IMHO is too fragile, shoud be something the user
355 # decides explicitly (so he is at least aware of that dependency)
356 # Speed up transfer using blowfish cypher specification which is
357 # faster than the default one (3des)
359 # Don't bother with localhost. Makes test easier
360 '-o', 'NoHostAuthenticationForLocalhost=yes',
361 '-o', 'ConnectTimeout=60',
362 '-o', 'ConnectionAttempts=3',
363 '-o', 'ServerAliveInterval=30',
364 '-o', 'TCPKeepAlive=yes' ]
367 args.append('-P%d' % port)
370 proxycommand = _proxy_command(gw, gwuser, identity)
371 args.extend(['-o', proxycommand])
377 identity = os.path.expanduser(identity)
378 args.extend(('-i', identity))
381 # Create a temporary server key file
382 tmp_known_hosts = make_server_key_args(server_key, host, port)
383 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
385 if not strict_host_checking:
386 # Do not check for Host key. Unsafe.
387 args.extend(['-o', 'StrictHostKeyChecking=no'])
389 if isinstance(source, list):
392 if openssh_has_persist():
394 '-o', 'ControlMaster=auto',
395 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
399 if isinstance(dest, list):
404 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
406 return _retry_rexec(args, log_msg, env = None, retry = retry,
407 tmp_known_hosts = tmp_known_hosts,
410 def rspawn(command, pidfile,
411 stdout = '/dev/null',
426 strict_host_checking = True):
428 Spawn a remote command such that it will continue working asynchronously in
431 :param command: The command to run, it should be a single line.
434 :param pidfile: Path to a file where to store the pid and ppid of the
438 :param stdout: Path to file to redirect standard output.
439 The default value is /dev/null
442 :param stderr: Path to file to redirect standard error.
443 If the special STDOUT value is used, stderr will
444 be redirected to the same file as stdout
447 :param stdin: Path to a file with input to be piped into the command's standard input
450 :param home: Path to working directory folder.
451 It is assumed to exist unless the create_home flag is set.
454 :param create_home: Flag to force creation of the home folder before
456 :type create_home: bool
458 :param sudo: Flag forcing execution with sudo user
463 (stdout, stderr), process
465 Of the spawning process, which only captures errors at spawning time.
466 Usually only useful for diagnostics.
468 # Start process in a "daemonized" way, using nohup and heavy
469 # stdin/out redirection to avoid connection issues
473 stderr = ' ' + stderr
475 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
477 'pidfile' : shell_escape(pidfile),
483 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
484 'command' : shell_escape(daemon_command),
485 'sudo' : 'sudo -S' if sudo else '',
486 'pidfile' : shell_escape(pidfile),
487 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
488 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
491 (out,err),proc = rexec(
500 server_key = server_key,
502 strict_host_checking = strict_host_checking ,
506 raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
508 return ((out, err), proc)
520 strict_host_checking = True):
522 Returns the pid and ppid of a process from a remote file where the
523 information was stored.
525 :param home: Path to directory where the pidfile is located
528 :param pidfile: Name of file containing the pid information
533 A (pid, ppid) tuple useful for calling rstatus and rkill,
534 or None if the pidfile isn't valid yet (can happen when process is staring up)
537 (out,err),proc = rexec(
538 "cat %(pidfile)s" % {
548 server_key = server_key,
549 strict_host_checking = strict_host_checking
557 return list(map(int,out.strip().split(' ',1)))
559 # Ignore, many ways to fail that don't matter that much
563 def rstatus(pid, ppid,
572 strict_host_checking = True):
574 Returns a code representing the the status of a remote process
576 :param pid: Process id of the process
579 :param ppid: Parent process id of process
582 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
585 (out,err),proc = rexec(
586 # Check only by pid. pid+ppid does not always work (especially with sudo)
587 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
598 server_key = server_key,
599 strict_host_checking = strict_host_checking
603 return ProcStatus.NOT_STARTED
607 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
610 status = (out.strip() == 'wait')
612 return ProcStatus.NOT_STARTED
613 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
627 strict_host_checking = True):
629 Sends a kill signal to a remote process.
631 First tries a SIGTERM, and if the process does not end in 10 seconds,
634 :param pid: Process id of process to be killed
637 :param ppid: Parent process id of process to be killed
640 :param sudo: Flag indicating if sudo should be used to kill the process
644 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
646 SUBKILL="%(subkill)s" ;
647 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
648 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
651 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
654 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
655 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
659 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
660 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
661 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
665 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
667 (out,err),proc = rexec(
671 'sudo' : 'sudo -S' if sudo else '',
681 server_key = server_key,
682 strict_host_checking = strict_host_checking
685 # wait, don't leave zombies around
688 return (out, err), proc
690 def _retry_rexec(args,
692 stdout = subprocess.PIPE,
693 stdin = subprocess.PIPE,
694 stderr = subprocess.PIPE,
697 tmp_known_hosts = None,
700 for x in range(retry):
701 # display command actually invoked when debug is turned on
702 message = " ".join( [ "'{}'".format(arg) for arg in args ] )
703 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
704 # connects to the remote host and starts a remote connection
705 proc = subprocess.Popen(
712 # attach tempfile object to the process, to make sure the file stays
713 # alive until the process is finished with it
714 proc._known_hosts = tmp_known_hosts
716 # The argument block == False forces to rexec to return immediately,
721 #(out, err) = proc.communicate()
722 # The method communicate was re implemented for performance issues
723 # when using python subprocess communicate method the ssh commands
724 # last one minute each
725 out, err = _communicate(proc, input=None)
728 out = proc.stdout.read()
729 if proc.poll() and stderr:
730 err = proc.stderr.read()
732 log(log_msg, logging.DEBUG, out, err)
737 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
738 # SSH error, can safely retry
741 # Probably timed out or plain failed but can retry
746 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
747 t, x, " ".join(args))
748 log(msg, logging.DEBUG)
753 except RuntimeError as e:
754 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
755 log(msg, logging.DEBUG, out, err)
761 return ((out, err), proc)
764 # Don't remove. The method communicate was re implemented for performance issues
765 def _communicate(proc, input, timeout=None, err_on_timeout=True):
768 stdout = None # Return
769 stderr = None # Return
773 if timeout is not None:
774 timelimit = time.time() + timeout
775 killtime = timelimit + 4
776 bailtime = timelimit + 4
779 # Flush stdio buffer. This might block, if the user has
780 # been writing to .stdin in an uncontrolled fashion.
783 write_set.append(proc.stdin)
788 read_set.append(proc.stdout)
792 read_set.append(proc.stderr)
796 while read_set or write_set:
797 if timeout is not None:
798 curtime = time.time()
799 if timeout is None or curtime > timelimit:
800 if curtime > bailtime:
802 elif curtime > killtime:
803 signum = signal.SIGKILL
805 signum = signal.SIGTERM
807 os.kill(proc.pid, signum)
810 select_timeout = timelimit - curtime + 0.1
814 if select_timeout > 1.0:
818 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
819 except select.error as e:
825 if not rlist and not wlist and not xlist and proc.poll() is not None:
826 # timeout and process exited, say bye
829 if proc.stdin in wlist:
830 # When select has indicated that the file is writable,
831 # we can write up to PIPE_BUF bytes without risk
832 # blocking. POSIX defines PIPE_BUF >= 512
833 bytes_written = os.write(proc.stdin.fileno(),
834 buffer(input, input_offset, 512))
835 input_offset += bytes_written
837 if input_offset >= len(input):
839 write_set.remove(proc.stdin)
841 if proc.stdout in rlist:
842 data = os.read(proc.stdout.fileno(), 1024)
845 read_set.remove(proc.stdout)
848 if proc.stderr in rlist:
849 data = os.read(proc.stderr.fileno(), 1024)
852 read_set.remove(proc.stderr)
855 # All data exchanged. Translate lists into strings.
856 if stdout is not None:
857 stdout = ''.join(stdout)
858 if stderr is not None:
859 stderr = ''.join(stderr)
861 # Translate newlines, if requested. We cannot let the file
862 # object do the translation: It is based on stdio, which is
863 # impossible to combine with select (unless forcing no
865 if proc.universal_newlines and hasattr(file, 'newlines'):
867 stdout = proc._translate_newlines(stdout)
869 stderr = proc._translate_newlines(stderr)
871 if killed and err_on_timeout:
872 errcode = proc.poll()
873 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
879 return (stdout, stderr)
881 def _proxy_command(gw, gwuser, gwidentity):
883 Constructs the SSH ProxyCommand option to add to the SSH command to connect
885 :param gw: SSH proxy hostname
888 :param gwuser: SSH proxy username
891 :param gwidentity: SSH proxy identity file
892 :type gwidentity: str
897 returns the SSH ProxyCommand option.
900 proxycommand = 'ProxyCommand=ssh -q '
902 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
904 proxycommand += '%s' % gwuser
907 proxycommand += '@%s -W %%h:%%p' % gw