2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
39 logger = logging.getLogger("sshfuncs")
41 def log(msg, level = logging.DEBUG, out = None, err = None):
43 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
46 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
64 Codes for status of remote spawned process
66 # Process is still running
72 # Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def resolve_hostname(host):
81 if host in ["localhost", "127.0.0.1", "::1"]:
85 stdout=subprocess.PIPE,
86 stderr=subprocess.PIPE,
87 universal_newlines = True,
89 stdout, stderr = p.communicate()
90 m = _re_inet.findall(stdout)
91 ip = m[0][1].split("/")[0]
93 ip = socket.gethostbyname(host)
97 def gethostbyname(host):
98 global hostbyname_cache
99 global hostbyname_cache_lock
101 hostbyname = hostbyname_cache.get(host)
103 with hostbyname_cache_lock:
104 hostbyname = resolve_hostname(host)
105 hostbyname_cache[host] = hostbyname
107 msg = " Added hostbyname %s - %s " % (host, hostbyname)
108 log(msg, logging.DEBUG)
112 OPENSSH_HAS_PERSIST = None
114 def openssh_has_persist():
115 """ The ssh_config options ControlMaster and ControlPersist allow to
116 reuse a same network connection for multiple ssh sessions. In this
117 way limitations on number of open ssh connections can be bypassed.
118 However, older versions of openSSH do not support this feature.
119 This function is used to determine if ssh connection persist features
122 global OPENSSH_HAS_PERSIST
123 if OPENSSH_HAS_PERSIST is None:
124 proc = subprocess.Popen(
126 stdout = subprocess.PIPE,
127 stderr = subprocess.STDOUT,
128 stdin = subprocess.DEVNULL,
129 universal_newlines = True,
131 out,err = proc.communicate()
134 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
135 OPENSSH_HAS_PERSIST = bool(vre.match(out))
136 return OPENSSH_HAS_PERSIST
138 def make_server_key_args(server_key, host, port):
139 """ Returns a reference to a temporary known_hosts file, to which
140 the server key has been added.
142 Make sure to hold onto the temp file reference until the process is
145 :param server_key: the server public key
146 :type server_key: str
148 :param host: the hostname
151 :param port: the ssh port
156 host = '%s:%s' % (host, str(port))
158 # Create a temporary server key file
159 tmp_known_hosts = tempfile.NamedTemporaryFile()
161 hostbyname = gethostbyname(host)
163 # Add the intended host key
164 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
166 # If we're not in strict mode, add user-configured keys
167 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
168 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
169 if os.access(user_hosts_path, os.R_OK):
170 with open(user_hosts_path, "r") as f:
171 tmp_known_hosts.write(f.read())
173 tmp_known_hosts.flush()
175 return tmp_known_hosts
177 def make_control_path(agent, forward_x11):
178 ctrl_path = "/tmp/nepi_ssh"
186 ctrl_path += "-%r@%h:%p"
191 """ Escapes strings so that they are safe to use as command-line
193 if SHELL_SAFE.match(s):
194 # safe string - no escaping needed
197 # unsafe string - escape
199 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
202 return "'$'\\x%02x''" % (ord(c),)
203 s = ''.join(map(escape, s))
206 def eintr_retry(func):
207 """Retries a function invocation when a EINTR occurs"""
209 @functools.wraps(func)
211 retry = kw.pop("_retry", False)
212 for i in range(0 if retry else 4):
214 return func(*p, **kw)
215 except (select.error, socket.error) as args:
216 if args[0] == errno.EINTR:
221 if e.errno == errno.EINTR:
226 return func(*p, **kw)
229 def rexec(command, host, user,
239 connect_timeout = 30,
244 strict_host_checking = True):
246 Executes a remote command, returns ((stdout,stderr),process)
249 tmp_known_hosts = None
251 hostip = gethostbyname(host)
255 # Don't bother with localhost. Makes test easier
256 '-o', 'NoHostAuthenticationForLocalhost=yes',
257 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
258 '-o', 'ConnectionAttempts=3',
259 '-o', 'ServerAliveInterval=30',
260 '-o', 'TCPKeepAlive=yes',
261 '-o', 'Batchmode=yes',
262 '-l', user, hostip or host]
264 if persistent and openssh_has_persist():
266 '-o', 'ControlMaster=auto',
267 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
268 '-o', 'ControlPersist=60' ])
270 if not strict_host_checking:
271 # Do not check for Host key. Unsafe.
272 args.extend(['-o', 'StrictHostKeyChecking=no'])
275 proxycommand = _proxy_command(gw, gwuser, identity)
276 args.extend(['-o', proxycommand])
282 args.append('-p%d' % port)
285 identity = os.path.expanduser(identity)
286 args.extend(('-i', identity))
296 # Create a temporary server key file
297 tmp_known_hosts = make_server_key_args(server_key, host, port)
298 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
301 command = "sudo " + command
305 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
307 stdout = stderr = stdin = subprocess.PIPE
309 stdout = stderr = stdin = None
311 return _retry_rexec(args, log_msg,
317 tmp_known_hosts = tmp_known_hosts,
320 def rcopy(source, dest,
328 strict_host_checking = True):
330 Copies from/to remote sites.
332 Source and destination should have the user and host encoded
335 Source can be a list of files to copy to a single destination,
336 (in which case it is advised that the destination be a folder),
337 or a single file in a string.
340 # Parse destination as <user>@<server>:<path>
341 if isinstance(dest, str) and ':' in dest:
342 remspec, path = dest.split(':',1)
343 elif isinstance(source, str) and ':' in source:
344 remspec, path = source.split(':',1)
346 raise ValueError("Both endpoints cannot be local")
347 user,host = remspec.rsplit('@',1)
350 tmp_known_hosts = None
352 args = ['scp', '-q', '-p', '-C',
353 # 2015-06-01 Thierry: I am commenting off blowfish
354 # as this is not available on a plain ubuntu 15.04 install
355 # this IMHO is too fragile, shoud be something the user
356 # decides explicitly (so he is at least aware of that dependency)
357 # Speed up transfer using blowfish cypher specification which is
358 # faster than the default one (3des)
360 # Don't bother with localhost. Makes test easier
361 '-o', 'NoHostAuthenticationForLocalhost=yes',
362 '-o', 'ConnectTimeout=60',
363 '-o', 'ConnectionAttempts=3',
364 '-o', 'ServerAliveInterval=30',
365 '-o', 'TCPKeepAlive=yes' ]
368 args.append('-P%d' % port)
371 proxycommand = _proxy_command(gw, gwuser, identity)
372 args.extend(['-o', proxycommand])
378 identity = os.path.expanduser(identity)
379 args.extend(('-i', identity))
382 # Create a temporary server key file
383 tmp_known_hosts = make_server_key_args(server_key, host, port)
384 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
386 if not strict_host_checking:
387 # Do not check for Host key. Unsafe.
388 args.extend(['-o', 'StrictHostKeyChecking=no'])
390 if isinstance(source, list):
393 if openssh_has_persist():
395 '-o', 'ControlMaster=auto',
396 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
400 if isinstance(dest, list):
405 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
407 return _retry_rexec(args, log_msg, env = None, retry = retry,
408 tmp_known_hosts = tmp_known_hosts,
411 def rspawn(command, pidfile,
412 stdout = '/dev/null',
427 strict_host_checking = True):
429 Spawn a remote command such that it will continue working asynchronously in
432 :param command: The command to run, it should be a single line.
435 :param pidfile: Path to a file where to store the pid and ppid of the
439 :param stdout: Path to file to redirect standard output.
440 The default value is /dev/null
443 :param stderr: Path to file to redirect standard error.
444 If the special STDOUT value is used, stderr will
445 be redirected to the same file as stdout
448 :param stdin: Path to a file with input to be piped into the command's standard input
451 :param home: Path to working directory folder.
452 It is assumed to exist unless the create_home flag is set.
455 :param create_home: Flag to force creation of the home folder before
457 :type create_home: bool
459 :param sudo: Flag forcing execution with sudo user
464 (stdout, stderr), process
466 Of the spawning process, which only captures errors at spawning time.
467 Usually only useful for diagnostics.
469 # Start process in a "daemonized" way, using nohup and heavy
470 # stdin/out redirection to avoid connection issues
474 stderr = ' ' + stderr
476 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
478 'pidfile' : shell_escape(pidfile),
484 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
485 'command' : shell_escape(daemon_command),
486 'sudo' : 'sudo -S' if sudo else '',
487 'pidfile' : shell_escape(pidfile),
488 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
489 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
492 (out,err),proc = rexec(
501 server_key = server_key,
503 strict_host_checking = strict_host_checking ,
507 raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
509 return ((out, err), proc)
521 strict_host_checking = True):
523 Returns the pid and ppid of a process from a remote file where the
524 information was stored.
526 :param home: Path to directory where the pidfile is located
529 :param pidfile: Name of file containing the pid information
534 A (pid, ppid) tuple useful for calling rstatus and rkill,
535 or None if the pidfile isn't valid yet (can happen when process is staring up)
538 (out,err),proc = rexec(
539 "cat %(pidfile)s" % {
549 server_key = server_key,
550 strict_host_checking = strict_host_checking
558 return [ int(x) for x in out.strip().split(' ',1)) ]
560 # Ignore, many ways to fail that don't matter that much
564 def rstatus(pid, ppid,
573 strict_host_checking = True):
575 Returns a code representing the the status of a remote process
577 :param pid: Process id of the process
580 :param ppid: Parent process id of process
583 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
586 (out,err),proc = rexec(
587 # Check only by pid. pid+ppid does not always work (especially with sudo)
588 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
599 server_key = server_key,
600 strict_host_checking = strict_host_checking
604 return ProcStatus.NOT_STARTED
608 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
611 status = (out.strip() == 'wait')
613 return ProcStatus.NOT_STARTED
614 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
628 strict_host_checking = True):
630 Sends a kill signal to a remote process.
632 First tries a SIGTERM, and if the process does not end in 10 seconds,
635 :param pid: Process id of process to be killed
638 :param ppid: Parent process id of process to be killed
641 :param sudo: Flag indicating if sudo should be used to kill the process
645 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
647 SUBKILL="%(subkill)s" ;
648 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
649 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
650 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
652 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
655 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
656 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
660 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
661 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
662 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
666 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
668 (out,err),proc = rexec(
672 'sudo' : 'sudo -S' if sudo else '',
682 server_key = server_key,
683 strict_host_checking = strict_host_checking
686 # wait, don't leave zombies around
689 return (out, err), proc
691 def _retry_rexec(args,
693 stdout = subprocess.PIPE,
694 stdin = subprocess.PIPE,
695 stderr = subprocess.PIPE,
698 tmp_known_hosts = None,
701 for x in range(retry):
702 # display command actually invoked when debug is turned on
703 message = " ".join( [ "'{}'".format(arg) for arg in args ] )
704 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
705 # connects to the remote host and starts a remote connection
706 proc = subprocess.Popen(
712 universal_newlines = True,
714 # attach tempfile object to the process, to make sure the file stays
715 # alive until the process is finished with it
716 proc._known_hosts = tmp_known_hosts
718 # The argument block == False forces to rexec to return immediately,
723 #(out, err) = proc.communicate()
724 # The method communicate was re implemented for performance issues
725 # when using python subprocess communicate method the ssh commands
726 # last one minute each
727 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
728 out, err = _communicate(proc, input=None)
729 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
732 out = proc.stdout.read()
733 if proc.poll() and stderr:
734 err = proc.stderr.read()
736 log(log_msg, logging.DEBUG, out, err)
741 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
742 # SSH error, can safely retry
745 # Probably timed out or plain failed but can retry
750 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
751 t, x, " ".join(args))
752 log(msg, logging.DEBUG)
757 except RuntimeError as e:
758 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
759 log(msg, logging.DEBUG, out, err)
765 return ((out, err), proc)
768 # Don't remove. The method communicate was re implemented for performance issues
769 def _communicate(proc, input, timeout=None, err_on_timeout=True):
772 stdout = None # Return
773 stderr = None # Return
777 if timeout is not None:
778 timelimit = time.time() + timeout
779 killtime = timelimit + 4
780 bailtime = timelimit + 4
783 # Flush stdio buffer. This might block, if the user has
784 # been writing to .stdin in an uncontrolled fashion.
787 write_set.append(proc.stdin)
792 read_set.append(proc.stdout)
796 read_set.append(proc.stderr)
800 while read_set or write_set:
801 if timeout is not None:
802 curtime = time.time()
803 if timeout is None or curtime > timelimit:
804 if curtime > bailtime:
806 elif curtime > killtime:
807 signum = signal.SIGKILL
809 signum = signal.SIGTERM
811 os.kill(proc.pid, signum)
814 select_timeout = timelimit - curtime + 0.1
818 if select_timeout > 1.0:
822 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
823 except select.error as e:
829 if not rlist and not wlist and not xlist and proc.poll() is not None:
830 # timeout and process exited, say bye
833 if proc.stdin in wlist:
834 # When select has indicated that the file is writable,
835 # we can write up to PIPE_BUF bytes without risk
836 # blocking. POSIX defines PIPE_BUF >= 512
837 bytes_written = os.write(proc.stdin.fileno(),
838 buffer(input, input_offset, 512))
839 input_offset += bytes_written
841 if input_offset >= len(input):
843 write_set.remove(proc.stdin)
845 if proc.stdout in rlist:
846 # python2 version used to do this
847 # data = os.read(proc.stdout.fileno(), 1024)
848 # however this always returned bytes...
849 data = proc.stdout.read()
850 log('we have read {}'.format(data))
851 # data should be str and not bytes because we use
852 # universal_lines = True, but to be clean
853 # instead of saying data != ""
855 log('closing stdout')
857 read_set.remove(proc.stdout)
860 if proc.stderr in rlist:
861 # likewise (see above)
862 # data = os.read(proc.stderr.fileno(), 1024)
863 data = proc.stderr.read()
866 read_set.remove(proc.stderr)
869 # All data exchanged. Translate lists into strings.
870 if stdout is not None:
871 stdout = ''.join(stdout)
872 if stderr is not None:
873 stderr = ''.join(stderr)
875 # # Translate newlines, if requested. We cannot let the file
876 # # object do the translation: It is based on stdio, which is
877 # # impossible to combine with select (unless forcing no
879 # if proc.universal_newlines and hasattr(file, 'newlines'):
881 # stdout = proc._translate_newlines(stdout)
883 # stderr = proc._translate_newlines(stderr)
885 if killed and err_on_timeout:
886 errcode = proc.poll()
887 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
893 return (stdout, stderr)
895 def _proxy_command(gw, gwuser, gwidentity):
897 Constructs the SSH ProxyCommand option to add to the SSH command to connect
899 :param gw: SSH proxy hostname
902 :param gwuser: SSH proxy username
905 :param gwidentity: SSH proxy identity file
906 :type gwidentity: str
911 returns the SSH ProxyCommand option.
914 proxycommand = 'ProxyCommand=ssh -q '
916 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
918 proxycommand += '%s' % gwuser
921 proxycommand += '@%s -W %%h:%%p' % gw