2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
39 logger = logging.getLogger("sshfuncs")
41 def log(msg, level = logging.DEBUG, out = None, err = None):
43 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
46 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
64 Codes for status of remote spawned process
66 # Process is still running
72 # Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def resolve_hostname(host):
81 if host in ["localhost", "127.0.0.1", "::1"]:
85 stdout=subprocess.PIPE,
86 stderr=subprocess.PIPE,
87 universal_newlines = True,
89 stdout, stderr = p.communicate()
90 m = _re_inet.findall(stdout)
91 ip = m[0][1].split("/")[0]
93 ip = socket.gethostbyname(host)
97 def gethostbyname(host):
98 global hostbyname_cache
99 global hostbyname_cache_lock
101 hostbyname = hostbyname_cache.get(host)
103 with hostbyname_cache_lock:
104 hostbyname = resolve_hostname(host)
105 hostbyname_cache[host] = hostbyname
107 msg = " Added hostbyname %s - %s " % (host, hostbyname)
108 log(msg, logging.DEBUG)
112 OPENSSH_HAS_PERSIST = None
114 def openssh_has_persist():
115 """ The ssh_config options ControlMaster and ControlPersist allow to
116 reuse a same network connection for multiple ssh sessions. In this
117 way limitations on number of open ssh connections can be bypassed.
118 However, older versions of openSSH do not support this feature.
119 This function is used to determine if ssh connection persist features
122 global OPENSSH_HAS_PERSIST
123 if OPENSSH_HAS_PERSIST is None:
124 proc = subprocess.Popen(
126 stdout = subprocess.PIPE,
127 stderr = subprocess.STDOUT,
128 stdin = subprocess.DEVNULL,
129 universal_newlines = True,
131 out,err = proc.communicate()
134 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
135 OPENSSH_HAS_PERSIST = bool(vre.match(out))
136 return OPENSSH_HAS_PERSIST
138 def make_server_key_args(server_key, host, port):
139 """ Returns a reference to a temporary known_hosts file, to which
140 the server key has been added.
142 Make sure to hold onto the temp file reference until the process is
145 :param server_key: the server public key
146 :type server_key: str
148 :param host: the hostname
151 :param port: the ssh port
156 host = '%s:%s' % (host, str(port))
158 # Create a temporary server key file
159 tmp_known_hosts = tempfile.NamedTemporaryFile()
161 hostbyname = gethostbyname(host)
163 # Add the intended host key
164 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
166 # If we're not in strict mode, add user-configured keys
167 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
168 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
169 if os.access(user_hosts_path, os.R_OK):
170 f = open(user_hosts_path, "r")
171 tmp_known_hosts.write(f.read())
174 tmp_known_hosts.flush()
176 return tmp_known_hosts
178 def make_control_path(agent, forward_x11):
179 ctrl_path = "/tmp/nepi_ssh"
187 ctrl_path += "-%r@%h:%p"
192 """ Escapes strings so that they are safe to use as command-line
194 if SHELL_SAFE.match(s):
195 # safe string - no escaping needed
198 # unsafe string - escape
200 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
203 return "'$'\\x%02x''" % (ord(c),)
204 s = ''.join(map(escape, s))
207 def eintr_retry(func):
208 """Retries a function invocation when a EINTR occurs"""
210 @functools.wraps(func)
212 retry = kw.pop("_retry", False)
213 for i in range(0 if retry else 4):
215 return func(*p, **kw)
216 except (select.error, socket.error) as args:
217 if args[0] == errno.EINTR:
222 if e.errno == errno.EINTR:
227 return func(*p, **kw)
230 def rexec(command, host, user,
240 connect_timeout = 30,
245 strict_host_checking = True):
247 Executes a remote command, returns ((stdout,stderr),process)
250 tmp_known_hosts = None
252 hostip = gethostbyname(host)
256 # Don't bother with localhost. Makes test easier
257 '-o', 'NoHostAuthenticationForLocalhost=yes',
258 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
259 '-o', 'ConnectionAttempts=3',
260 '-o', 'ServerAliveInterval=30',
261 '-o', 'TCPKeepAlive=yes',
262 '-o', 'Batchmode=yes',
263 '-l', user, hostip or host]
265 if persistent and openssh_has_persist():
267 '-o', 'ControlMaster=auto',
268 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
269 '-o', 'ControlPersist=60' ])
271 if not strict_host_checking:
272 # Do not check for Host key. Unsafe.
273 args.extend(['-o', 'StrictHostKeyChecking=no'])
276 proxycommand = _proxy_command(gw, gwuser, identity)
277 args.extend(['-o', proxycommand])
283 args.append('-p%d' % port)
286 identity = os.path.expanduser(identity)
287 args.extend(('-i', identity))
297 # Create a temporary server key file
298 tmp_known_hosts = make_server_key_args(server_key, host, port)
299 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
302 command = "sudo " + command
306 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
308 stdout = stderr = stdin = subprocess.PIPE
310 stdout = stderr = stdin = None
312 return _retry_rexec(args, log_msg,
318 tmp_known_hosts = tmp_known_hosts,
321 def rcopy(source, dest,
329 strict_host_checking = True):
331 Copies from/to remote sites.
333 Source and destination should have the user and host encoded
336 Source can be a list of files to copy to a single destination,
337 (in which case it is advised that the destination be a folder),
338 or a single file in a string.
341 # Parse destination as <user>@<server>:<path>
342 if isinstance(dest, str) and ':' in dest:
343 remspec, path = dest.split(':',1)
344 elif isinstance(source, str) and ':' in source:
345 remspec, path = source.split(':',1)
347 raise ValueError("Both endpoints cannot be local")
348 user,host = remspec.rsplit('@',1)
351 tmp_known_hosts = None
353 args = ['scp', '-q', '-p', '-C',
354 # 2015-06-01 Thierry: I am commenting off blowfish
355 # as this is not available on a plain ubuntu 15.04 install
356 # this IMHO is too fragile, shoud be something the user
357 # decides explicitly (so he is at least aware of that dependency)
358 # Speed up transfer using blowfish cypher specification which is
359 # faster than the default one (3des)
361 # Don't bother with localhost. Makes test easier
362 '-o', 'NoHostAuthenticationForLocalhost=yes',
363 '-o', 'ConnectTimeout=60',
364 '-o', 'ConnectionAttempts=3',
365 '-o', 'ServerAliveInterval=30',
366 '-o', 'TCPKeepAlive=yes' ]
369 args.append('-P%d' % port)
372 proxycommand = _proxy_command(gw, gwuser, identity)
373 args.extend(['-o', proxycommand])
379 identity = os.path.expanduser(identity)
380 args.extend(('-i', identity))
383 # Create a temporary server key file
384 tmp_known_hosts = make_server_key_args(server_key, host, port)
385 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
387 if not strict_host_checking:
388 # Do not check for Host key. Unsafe.
389 args.extend(['-o', 'StrictHostKeyChecking=no'])
391 if isinstance(source, list):
394 if openssh_has_persist():
396 '-o', 'ControlMaster=auto',
397 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
401 if isinstance(dest, list):
406 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
408 return _retry_rexec(args, log_msg, env = None, retry = retry,
409 tmp_known_hosts = tmp_known_hosts,
412 def rspawn(command, pidfile,
413 stdout = '/dev/null',
428 strict_host_checking = True):
430 Spawn a remote command such that it will continue working asynchronously in
433 :param command: The command to run, it should be a single line.
436 :param pidfile: Path to a file where to store the pid and ppid of the
440 :param stdout: Path to file to redirect standard output.
441 The default value is /dev/null
444 :param stderr: Path to file to redirect standard error.
445 If the special STDOUT value is used, stderr will
446 be redirected to the same file as stdout
449 :param stdin: Path to a file with input to be piped into the command's standard input
452 :param home: Path to working directory folder.
453 It is assumed to exist unless the create_home flag is set.
456 :param create_home: Flag to force creation of the home folder before
458 :type create_home: bool
460 :param sudo: Flag forcing execution with sudo user
465 (stdout, stderr), process
467 Of the spawning process, which only captures errors at spawning time.
468 Usually only useful for diagnostics.
470 # Start process in a "daemonized" way, using nohup and heavy
471 # stdin/out redirection to avoid connection issues
475 stderr = ' ' + stderr
477 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
479 'pidfile' : shell_escape(pidfile),
485 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
486 'command' : shell_escape(daemon_command),
487 'sudo' : 'sudo -S' if sudo else '',
488 'pidfile' : shell_escape(pidfile),
489 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
490 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
493 (out,err),proc = rexec(
502 server_key = server_key,
504 strict_host_checking = strict_host_checking ,
508 raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
510 return ((out, err), proc)
522 strict_host_checking = True):
524 Returns the pid and ppid of a process from a remote file where the
525 information was stored.
527 :param home: Path to directory where the pidfile is located
530 :param pidfile: Name of file containing the pid information
535 A (pid, ppid) tuple useful for calling rstatus and rkill,
536 or None if the pidfile isn't valid yet (can happen when process is staring up)
539 (out,err),proc = rexec(
540 "cat %(pidfile)s" % {
550 server_key = server_key,
551 strict_host_checking = strict_host_checking
559 return list(map(int,out.strip().split(' ',1)))
561 # Ignore, many ways to fail that don't matter that much
565 def rstatus(pid, ppid,
574 strict_host_checking = True):
576 Returns a code representing the the status of a remote process
578 :param pid: Process id of the process
581 :param ppid: Parent process id of process
584 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
587 (out,err),proc = rexec(
588 # Check only by pid. pid+ppid does not always work (especially with sudo)
589 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
600 server_key = server_key,
601 strict_host_checking = strict_host_checking
605 return ProcStatus.NOT_STARTED
609 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
612 status = (out.strip() == 'wait')
614 return ProcStatus.NOT_STARTED
615 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
629 strict_host_checking = True):
631 Sends a kill signal to a remote process.
633 First tries a SIGTERM, and if the process does not end in 10 seconds,
636 :param pid: Process id of process to be killed
639 :param ppid: Parent process id of process to be killed
642 :param sudo: Flag indicating if sudo should be used to kill the process
646 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
648 SUBKILL="%(subkill)s" ;
649 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
650 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
651 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
653 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
656 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
657 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
661 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
662 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
663 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
667 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
669 (out,err),proc = rexec(
673 'sudo' : 'sudo -S' if sudo else '',
683 server_key = server_key,
684 strict_host_checking = strict_host_checking
687 # wait, don't leave zombies around
690 return (out, err), proc
692 def _retry_rexec(args,
694 stdout = subprocess.PIPE,
695 stdin = subprocess.PIPE,
696 stderr = subprocess.PIPE,
699 tmp_known_hosts = None,
702 for x in range(retry):
703 # display command actually invoked when debug is turned on
704 message = " ".join( [ "'{}'".format(arg) for arg in args ] )
705 log("sshfuncs: invoking {}".format(message), logging.DEBUG)
706 # connects to the remote host and starts a remote connection
707 proc = subprocess.Popen(
713 universal_newlines = True,
715 # attach tempfile object to the process, to make sure the file stays
716 # alive until the process is finished with it
717 proc._known_hosts = tmp_known_hosts
719 # The argument block == False forces to rexec to return immediately,
724 #(out, err) = proc.communicate()
725 # The method communicate was re implemented for performance issues
726 # when using python subprocess communicate method the ssh commands
727 # last one minute each
728 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
729 out, err = _communicate(proc, input=None)
730 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
733 out = proc.stdout.read()
734 if proc.poll() and stderr:
735 err = proc.stderr.read()
737 log(log_msg, logging.DEBUG, out, err)
742 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
743 # SSH error, can safely retry
746 # Probably timed out or plain failed but can retry
751 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
752 t, x, " ".join(args))
753 log(msg, logging.DEBUG)
758 except RuntimeError as e:
759 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
760 log(msg, logging.DEBUG, out, err)
766 return ((out, err), proc)
769 # Don't remove. The method communicate was re implemented for performance issues
770 def _communicate(proc, input, timeout=None, err_on_timeout=True):
773 stdout = None # Return
774 stderr = None # Return
778 if timeout is not None:
779 timelimit = time.time() + timeout
780 killtime = timelimit + 4
781 bailtime = timelimit + 4
784 # Flush stdio buffer. This might block, if the user has
785 # been writing to .stdin in an uncontrolled fashion.
788 write_set.append(proc.stdin)
793 read_set.append(proc.stdout)
797 read_set.append(proc.stderr)
801 while read_set or write_set:
802 if timeout is not None:
803 curtime = time.time()
804 if timeout is None or curtime > timelimit:
805 if curtime > bailtime:
807 elif curtime > killtime:
808 signum = signal.SIGKILL
810 signum = signal.SIGTERM
812 os.kill(proc.pid, signum)
815 select_timeout = timelimit - curtime + 0.1
819 if select_timeout > 1.0:
823 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
824 except select.error as e:
830 if not rlist and not wlist and not xlist and proc.poll() is not None:
831 # timeout and process exited, say bye
834 if proc.stdin in wlist:
835 # When select has indicated that the file is writable,
836 # we can write up to PIPE_BUF bytes without risk
837 # blocking. POSIX defines PIPE_BUF >= 512
838 bytes_written = os.write(proc.stdin.fileno(),
839 buffer(input, input_offset, 512))
840 input_offset += bytes_written
842 if input_offset >= len(input):
844 write_set.remove(proc.stdin)
846 if proc.stdout in rlist:
847 # python2 version used to do this
848 # data = os.read(proc.stdout.fileno(), 1024)
849 # however this always returned bytes...
850 data = proc.stdout.read()
851 log('we have read {}'.format(data))
852 # data should be str and not bytes because we use
853 # universal_lines = True, but to be clean
854 # instead of saying data != ""
856 log('closing stdout')
858 read_set.remove(proc.stdout)
861 if proc.stderr in rlist:
862 # likewise (see above)
863 # data = os.read(proc.stderr.fileno(), 1024)
864 data = proc.stderr.read()
867 read_set.remove(proc.stderr)
870 # All data exchanged. Translate lists into strings.
871 if stdout is not None:
872 stdout = ''.join(stdout)
873 if stderr is not None:
874 stderr = ''.join(stderr)
876 # # Translate newlines, if requested. We cannot let the file
877 # # object do the translation: It is based on stdio, which is
878 # # impossible to combine with select (unless forcing no
880 # if proc.universal_newlines and hasattr(file, 'newlines'):
882 # stdout = proc._translate_newlines(stdout)
884 # stderr = proc._translate_newlines(stderr)
886 if killed and err_on_timeout:
887 errcode = proc.poll()
888 raise RuntimeError("Operation timed out", errcode, stdout, stderr)
894 return (stdout, stderr)
896 def _proxy_command(gw, gwuser, gwidentity):
898 Constructs the SSH ProxyCommand option to add to the SSH command to connect
900 :param gw: SSH proxy hostname
903 :param gwuser: SSH proxy username
906 :param gwidentity: SSH proxy identity file
907 :type gwidentity: str
912 returns the SSH ProxyCommand option.
915 proxycommand = 'ProxyCommand=ssh -q '
917 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
919 proxycommand += '%s' % gwuser
922 proxycommand += '@%s -W %%h:%%p' % gw