2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 ## TODO: This code needs reviewing !!!
38 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
40 logger = logging.getLogger("sshfuncs")
42 def log(msg, level, out = None, err = None):
44 msg += " - OUT: %s " % out
47 msg += " - ERROR: %s " % err
49 logger.log(level, msg)
51 if hasattr(os, "devnull"):
54 DEV_NULL = "/dev/null"
56 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
60 Special value that when given to rspawn in stderr causes stderr to
61 redirect to whatever stdout was redirected to.
66 Codes for status of remote spawned process
68 # Process is still running
74 # Process hasn't started running yet (this should be very rare)
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
80 def resolve_hostname(host):
83 if host in ["localhost", "127.0.0.1", "::1"]:
84 p = subprocess.Popen("ip -o addr list", shell=True,
85 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
86 stdout, stderr = p.communicate()
87 m = _re_inet.findall(stdout)
88 ip = m[0][1].split("/")[0]
90 ip = socket.gethostbyname(host)
94 def gethostbyname(host):
95 global hostbyname_cache
96 global hostbyname_cache_lock
98 hostbyname = hostbyname_cache.get(host)
100 with hostbyname_cache_lock:
101 hostbyname = resolve_hostname(host)
102 hostbyname_cache[host] = hostbyname
104 msg = " Added hostbyname %s - %s " % (host, hostbyname)
105 log(msg, logging.DEBUG)
109 OPENSSH_HAS_PERSIST = None
111 def openssh_has_persist():
112 """ The ssh_config options ControlMaster and ControlPersist allow to
113 reuse a same network connection for multiple ssh sessions. In this
114 way limitations on number of open ssh connections can be bypassed.
115 However, older versions of openSSH do not support this feature.
116 This function is used to determine if ssh connection persist features
119 global OPENSSH_HAS_PERSIST
120 if OPENSSH_HAS_PERSIST is None:
121 proc = subprocess.Popen(["ssh","-v"],
122 stdout = subprocess.PIPE,
123 stderr = subprocess.STDOUT,
124 stdin = open("/dev/null","r") )
125 out,err = proc.communicate()
128 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
129 OPENSSH_HAS_PERSIST = bool(vre.match(out))
130 return OPENSSH_HAS_PERSIST
132 def make_server_key_args(server_key, host, port):
133 """ Returns a reference to a temporary known_hosts file, to which
134 the server key has been added.
136 Make sure to hold onto the temp file reference until the process is
139 :param server_key: the server public key
140 :type server_key: str
142 :param host: the hostname
145 :param port: the ssh port
150 host = '%s:%s' % (host, str(port))
152 # Create a temporary server key file
153 tmp_known_hosts = tempfile.NamedTemporaryFile()
155 hostbyname = gethostbyname(host)
157 # Add the intended host key
158 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
160 # If we're not in strict mode, add user-configured keys
161 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
162 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
163 if os.access(user_hosts_path, os.R_OK):
164 f = open(user_hosts_path, "r")
165 tmp_known_hosts.write(f.read())
168 tmp_known_hosts.flush()
170 return tmp_known_hosts
172 def make_control_path(agent, forward_x11):
173 ctrl_path = "/tmp/nepi_ssh"
181 ctrl_path += "-%r@%h:%p"
186 """ Escapes strings so that they are safe to use as command-line
188 if SHELL_SAFE.match(s):
189 # safe string - no escaping needed
192 # unsafe string - escape
194 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
197 return "'$'\\x%02x''" % (ord(c),)
198 s = ''.join(map(escp,s))
201 def eintr_retry(func):
202 """Retries a function invocation when a EINTR occurs"""
204 @functools.wraps(func)
206 retry = kw.pop("_retry", False)
207 for i in xrange(0 if retry else 4):
209 return func(*p, **kw)
210 except (select.error, socket.error), args:
211 if args[0] == errno.EINTR:
216 if e.errno == errno.EINTR:
221 return func(*p, **kw)
224 def rexec(command, host, user,
234 connect_timeout = 30,
239 strict_host_checking = True):
241 Executes a remote command, returns ((stdout,stderr),process)
244 tmp_known_hosts = None
246 hostip = gethostbyname(host)
250 # Don't bother with localhost. Makes test easier
251 '-o', 'NoHostAuthenticationForLocalhost=yes',
252 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
253 '-o', 'ConnectionAttempts=3',
254 '-o', 'ServerAliveInterval=30',
255 '-o', 'TCPKeepAlive=yes',
256 '-o', 'Batchmode=yes',
257 '-l', user, hostip or host]
259 if persistent and openssh_has_persist():
261 '-o', 'ControlMaster=auto',
262 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
263 '-o', 'ControlPersist=60' ])
265 if not strict_host_checking:
266 # Do not check for Host key. Unsafe.
267 args.extend(['-o', 'StrictHostKeyChecking=no'])
270 proxycommand = _proxy_command(gw, gwuser, identity)
271 args.extend(['-o', proxycommand])
277 args.append('-p%d' % port)
280 identity = os.path.expanduser(identity)
281 args.extend(('-i', identity))
291 # Create a temporary server key file
292 tmp_known_hosts = make_server_key_args(server_key, host, port)
293 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
296 command = "sudo " + command
300 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
302 stdout = stderr = stdin = subprocess.PIPE
304 stdout = stderr = stdin = None
306 return _retry_rexec(args, log_msg,
312 tmp_known_hosts = tmp_known_hosts,
315 def rcopy(source, dest,
323 strict_host_checking = True):
325 Copies from/to remote sites.
327 Source and destination should have the user and host encoded
330 Source can be a list of files to copy to a single destination,
331 (in which case it is advised that the destination be a folder),
332 or a single file in a string.
335 # Parse destination as <user>@<server>:<path>
336 if isinstance(dest, str) and ':' in dest:
337 remspec, path = dest.split(':',1)
338 elif isinstance(source, str) and ':' in source:
339 remspec, path = source.split(':',1)
341 raise ValueError, "Both endpoints cannot be local"
342 user,host = remspec.rsplit('@',1)
345 tmp_known_hosts = None
347 args = ['scp', '-q', '-p', '-C',
348 # Speed up transfer using blowfish cypher specification which is
349 # faster than the default one (3des)
351 # Don't bother with localhost. Makes test easier
352 '-o', 'NoHostAuthenticationForLocalhost=yes',
353 '-o', 'ConnectTimeout=60',
354 '-o', 'ConnectionAttempts=3',
355 '-o', 'ServerAliveInterval=30',
356 '-o', 'TCPKeepAlive=yes' ]
359 args.append('-P%d' % port)
362 proxycommand = _proxy_command(gw, gwuser, identity)
363 args.extend(['-o', proxycommand])
369 identity = os.path.expanduser(identity)
370 args.extend(('-i', identity))
373 # Create a temporary server key file
374 tmp_known_hosts = make_server_key_args(server_key, host, port)
375 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
377 if not strict_host_checking:
378 # Do not check for Host key. Unsafe.
379 args.extend(['-o', 'StrictHostKeyChecking=no'])
381 if isinstance(source, list):
384 if openssh_has_persist():
386 '-o', 'ControlMaster=auto',
387 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
391 if isinstance(dest, list):
396 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
398 return _retry_rexec(args, log_msg, env = None, retry = retry,
399 tmp_known_hosts = tmp_known_hosts,
402 def rspawn(command, pidfile,
403 stdout = '/dev/null',
418 strict_host_checking = True):
420 Spawn a remote command such that it will continue working asynchronously in
423 :param command: The command to run, it should be a single line.
426 :param pidfile: Path to a file where to store the pid and ppid of the
430 :param stdout: Path to file to redirect standard output.
431 The default value is /dev/null
434 :param stderr: Path to file to redirect standard error.
435 If the special STDOUT value is used, stderr will
436 be redirected to the same file as stdout
439 :param stdin: Path to a file with input to be piped into the command's standard input
442 :param home: Path to working directory folder.
443 It is assumed to exist unless the create_home flag is set.
446 :param create_home: Flag to force creation of the home folder before
448 :type create_home: bool
450 :param sudo: Flag forcing execution with sudo user
455 (stdout, stderr), process
457 Of the spawning process, which only captures errors at spawning time.
458 Usually only useful for diagnostics.
460 # Start process in a "daemonized" way, using nohup and heavy
461 # stdin/out redirection to avoid connection issues
465 stderr = ' ' + stderr
467 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
469 'pidfile' : shell_escape(pidfile),
475 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
476 'command' : shell_escape(daemon_command),
477 'sudo' : 'sudo -S' if sudo else '',
478 'pidfile' : shell_escape(pidfile),
479 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
480 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
483 (out,err),proc = rexec(
492 server_key = server_key,
494 strict_host_checking = strict_host_checking ,
498 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
500 return ((out, err), proc)
512 strict_host_checking = True):
514 Returns the pid and ppid of a process from a remote file where the
515 information was stored.
517 :param home: Path to directory where the pidfile is located
520 :param pidfile: Name of file containing the pid information
525 A (pid, ppid) tuple useful for calling rstatus and rkill,
526 or None if the pidfile isn't valid yet (can happen when process is staring up)
529 (out,err),proc = rexec(
530 "cat %(pidfile)s" % {
540 server_key = server_key,
541 strict_host_checking = strict_host_checking
549 return map(int,out.strip().split(' ',1))
551 # Ignore, many ways to fail that don't matter that much
555 def rstatus(pid, ppid,
564 strict_host_checking = True):
566 Returns a code representing the the status of a remote process
568 :param pid: Process id of the process
571 :param ppid: Parent process id of process
574 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
577 (out,err),proc = rexec(
578 # Check only by pid. pid+ppid does not always work (especially with sudo)
579 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
590 server_key = server_key,
591 strict_host_checking = strict_host_checking
595 return ProcStatus.NOT_STARTED
599 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
602 status = (out.strip() == 'wait')
604 return ProcStatus.NOT_STARTED
605 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
619 strict_host_checking = True):
621 Sends a kill signal to a remote process.
623 First tries a SIGTERM, and if the process does not end in 10 seconds,
626 :param pid: Process id of process to be killed
629 :param ppid: Parent process id of process to be killed
632 :param sudo: Flag indicating if sudo should be used to kill the process
636 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
638 SUBKILL="%(subkill)s" ;
639 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
640 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
641 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
643 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
646 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
647 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
651 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
652 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
653 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
657 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
659 (out,err),proc = rexec(
663 'sudo' : 'sudo -S' if sudo else '',
673 server_key = server_key,
674 strict_host_checking = strict_host_checking
677 # wait, don't leave zombies around
680 return (out, err), proc
682 def _retry_rexec(args,
684 stdout = subprocess.PIPE,
685 stdin = subprocess.PIPE,
686 stderr = subprocess.PIPE,
689 tmp_known_hosts = None,
692 for x in xrange(retry):
693 # connects to the remote host and starts a remote connection
694 proc = subprocess.Popen(args,
700 # attach tempfile object to the process, to make sure the file stays
701 # alive until the process is finished with it
702 proc._known_hosts = tmp_known_hosts
704 # The argument block == False forces to rexec to return immediately,
709 #(out, err) = proc.communicate()
710 # The method communicate was re implemented for performance issues
711 # when using python subprocess communicate method the ssh commands
712 # last one minute each
713 out, err = _communicate(proc, input=None)
716 out = proc.stdout.read()
717 if proc.poll() and stderr:
718 err = proc.stderr.read()
720 log(log_msg, logging.DEBUG, out, err)
725 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
726 # SSH error, can safely retry
729 # Probably timed out or plain failed but can retry
734 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
735 t, x, " ".join(args))
736 log(msg, logging.DEBUG)
741 except RuntimeError, e:
742 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
743 log(msg, logging.DEBUG, out, err)
749 return ((out, err), proc)
752 # Don't remove. The method communicate was re implemented for performance issues
753 def _communicate(proc, input, timeout=None, err_on_timeout=True):
756 stdout = None # Return
757 stderr = None # Return
761 if timeout is not None:
762 timelimit = time.time() + timeout
763 killtime = timelimit + 4
764 bailtime = timelimit + 4
767 # Flush stdio buffer. This might block, if the user has
768 # been writing to .stdin in an uncontrolled fashion.
771 write_set.append(proc.stdin)
776 read_set.append(proc.stdout)
780 read_set.append(proc.stderr)
784 while read_set or write_set:
785 if timeout is not None:
786 curtime = time.time()
787 if timeout is None or curtime > timelimit:
788 if curtime > bailtime:
790 elif curtime > killtime:
791 signum = signal.SIGKILL
793 signum = signal.SIGTERM
795 os.kill(proc.pid, signum)
798 select_timeout = timelimit - curtime + 0.1
802 if select_timeout > 1.0:
806 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
807 except select.error,e:
813 if not rlist and not wlist and not xlist and proc.poll() is not None:
814 # timeout and process exited, say bye
817 if proc.stdin in wlist:
818 # When select has indicated that the file is writable,
819 # we can write up to PIPE_BUF bytes without risk
820 # blocking. POSIX defines PIPE_BUF >= 512
821 bytes_written = os.write(proc.stdin.fileno(),
822 buffer(input, input_offset, 512))
823 input_offset += bytes_written
825 if input_offset >= len(input):
827 write_set.remove(proc.stdin)
829 if proc.stdout in rlist:
830 data = os.read(proc.stdout.fileno(), 1024)
833 read_set.remove(proc.stdout)
836 if proc.stderr in rlist:
837 data = os.read(proc.stderr.fileno(), 1024)
840 read_set.remove(proc.stderr)
843 # All data exchanged. Translate lists into strings.
844 if stdout is not None:
845 stdout = ''.join(stdout)
846 if stderr is not None:
847 stderr = ''.join(stderr)
849 # Translate newlines, if requested. We cannot let the file
850 # object do the translation: It is based on stdio, which is
851 # impossible to combine with select (unless forcing no
853 if proc.universal_newlines and hasattr(file, 'newlines'):
855 stdout = proc._translate_newlines(stdout)
857 stderr = proc._translate_newlines(stderr)
859 if killed and err_on_timeout:
860 errcode = proc.poll()
861 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
867 return (stdout, stderr)
869 def _proxy_command(gw, gwuser, gwidentity):
871 Constructs the SSH ProxyCommand option to add to the SSH command to connect
873 :param gw: SSH proxy hostname
876 :param gwuser: SSH proxy username
879 :param gwidentity: SSH proxy identity file
880 :type gwidentity: str
885 returns the SSH ProxyCommand option.
888 proxycommand = 'ProxyCommand=ssh -q '
890 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
892 proxycommand += '%s' % gwuser
895 proxycommand += '@%s -W %%h:%%p' % gw