2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 ## TODO: This code needs reviewing !!!
38 logger = logging.getLogger("sshfuncs")
40 def log(msg, level, out = None, err = None):
42 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
47 logger.log(level, msg)
49 if hasattr(os, "devnull"):
52 DEV_NULL = "/dev/null"
54 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
58 Special value that when given to rspawn in stderr causes stderr to
59 redirect to whatever stdout was redirected to.
64 Codes for status of remote spawned process
66 # Process is still running
72 # Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def gethostbyname(host):
79 global hostbyname_cache
80 global hostbyname_cache_lock
82 hostbyname = hostbyname_cache.get(host)
84 with hostbyname_cache_lock:
85 hostbyname = socket.gethostbyname(host)
86 hostbyname_cache[host] = hostbyname
88 msg = " Added hostbyname %s - %s " % (host, hostbyname)
89 log(msg, logging.DEBUG)
93 OPENSSH_HAS_PERSIST = None
95 def openssh_has_persist():
96 """ The ssh_config options ControlMaster and ControlPersist allow to
97 reuse a same network connection for multiple ssh sessions. In this
98 way limitations on number of open ssh connections can be bypassed.
99 However, older versions of openSSH do not support this feature.
100 This function is used to determine if ssh connection persist features
103 global OPENSSH_HAS_PERSIST
104 if OPENSSH_HAS_PERSIST is None:
105 proc = subprocess.Popen(["ssh","-v"],
106 stdout = subprocess.PIPE,
107 stderr = subprocess.STDOUT,
108 stdin = open("/dev/null","r") )
109 out,err = proc.communicate()
112 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113 OPENSSH_HAS_PERSIST = bool(vre.match(out))
114 return OPENSSH_HAS_PERSIST
116 def make_server_key_args(server_key, host, port):
117 """ Returns a reference to a temporary known_hosts file, to which
118 the server key has been added.
120 Make sure to hold onto the temp file reference until the process is
123 :param server_key: the server public key
124 :type server_key: str
126 :param host: the hostname
129 :param port: the ssh port
134 host = '%s:%s' % (host, str(port))
136 # Create a temporary server key file
137 tmp_known_hosts = tempfile.NamedTemporaryFile()
139 hostbyname = gethostbyname(host)
141 # Add the intended host key
142 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
144 # If we're not in strict mode, add user-configured keys
145 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147 if os.access(user_hosts_path, os.R_OK):
148 f = open(user_hosts_path, "r")
149 tmp_known_hosts.write(f.read())
152 tmp_known_hosts.flush()
154 return tmp_known_hosts
156 def make_control_path(agent, forward_x11):
157 ctrl_path = "/tmp/nepi_ssh"
165 ctrl_path += "-%r@%h:%p"
170 """ Escapes strings so that they are safe to use as command-line
172 if SHELL_SAFE.match(s):
173 # safe string - no escaping needed
176 # unsafe string - escape
178 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
181 return "'$'\\x%02x''" % (ord(c),)
182 s = ''.join(map(escp,s))
185 def eintr_retry(func):
186 """Retries a function invocation when a EINTR occurs"""
188 @functools.wraps(func)
190 retry = kw.pop("_retry", False)
191 for i in xrange(0 if retry else 4):
193 return func(*p, **kw)
194 except (select.error, socket.error), args:
195 if args[0] == errno.EINTR:
200 if e.errno == errno.EINTR:
205 return func(*p, **kw)
208 def rexec(command, host, user,
218 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
230 hostip = gethostbyname(host)
234 # Don't bother with localhost. Makes test easier
235 '-o', 'NoHostAuthenticationForLocalhost=yes',
236 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
237 '-o', 'ConnectionAttempts=3',
238 '-o', 'ServerAliveInterval=30',
239 '-o', 'TCPKeepAlive=yes',
240 '-o', 'Batchmode=yes',
241 '-l', user, hostip or host]
243 if persistent and openssh_has_persist():
245 '-o', 'ControlMaster=auto',
246 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
247 '-o', 'ControlPersist=60' ])
249 if not strict_host_checking:
250 # Do not check for Host key. Unsafe.
251 args.extend(['-o', 'StrictHostKeyChecking=no'])
255 proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
257 proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
258 args.extend(['-o', proxycommand])
264 args.append('-p%d' % port)
267 identity = os.path.expanduser(identity)
268 args.extend(('-i', identity))
278 # Create a temporary server key file
279 tmp_known_hosts = make_server_key_args(server_key, host, port)
280 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
283 command = "sudo " + command
287 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
289 stdout = stderr = stdin = subprocess.PIPE
291 stdout = stderr = stdin = None
293 return _retry_rexec(args, log_msg,
299 tmp_known_hosts = tmp_known_hosts,
302 def rcopy(source, dest,
310 strict_host_checking = True):
312 Copies from/to remote sites.
314 Source and destination should have the user and host encoded
317 Source can be a list of files to copy to a single destination,
318 (in which case it is advised that the destination be a folder),
319 or a single file in a string.
322 # Parse destination as <user>@<server>:<path>
323 if isinstance(dest, str) and ':' in dest:
324 remspec, path = dest.split(':',1)
325 elif isinstance(source, str) and ':' in source:
326 remspec, path = source.split(':',1)
328 raise ValueError, "Both endpoints cannot be local"
329 user,host = remspec.rsplit('@',1)
332 tmp_known_hosts = None
334 args = ['scp', '-q', '-p', '-C',
335 # Speed up transfer using blowfish cypher specification which is
336 # faster than the default one (3des)
338 # Don't bother with localhost. Makes test easier
339 '-o', 'NoHostAuthenticationForLocalhost=yes',
340 '-o', 'ConnectTimeout=60',
341 '-o', 'ConnectionAttempts=3',
342 '-o', 'ServerAliveInterval=30',
343 '-o', 'TCPKeepAlive=yes' ]
346 args.append('-P%d' % port)
350 proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
352 proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
353 args.extend(['-o', proxycommand])
359 identity = os.path.expanduser(identity)
360 args.extend(('-i', identity))
363 # Create a temporary server key file
364 tmp_known_hosts = make_server_key_args(server_key, host, port)
365 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
367 if not strict_host_checking:
368 # Do not check for Host key. Unsafe.
369 args.extend(['-o', 'StrictHostKeyChecking=no'])
371 if isinstance(source, list):
374 if openssh_has_persist():
376 '-o', 'ControlMaster=auto',
377 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
381 if isinstance(dest, list):
386 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
388 return _retry_rexec(args, log_msg, env = None, retry = retry,
389 tmp_known_hosts = tmp_known_hosts,
392 def rspawn(command, pidfile,
393 stdout = '/dev/null',
408 strict_host_checking = True):
410 Spawn a remote command such that it will continue working asynchronously in
413 :param command: The command to run, it should be a single line.
416 :param pidfile: Path to a file where to store the pid and ppid of the
420 :param stdout: Path to file to redirect standard output.
421 The default value is /dev/null
424 :param stderr: Path to file to redirect standard error.
425 If the special STDOUT value is used, stderr will
426 be redirected to the same file as stdout
429 :param stdin: Path to a file with input to be piped into the command's standard input
432 :param home: Path to working directory folder.
433 It is assumed to exist unless the create_home flag is set.
436 :param create_home: Flag to force creation of the home folder before
438 :type create_home: bool
440 :param sudo: Flag forcing execution with sudo user
445 (stdout, stderr), process
447 Of the spawning process, which only captures errors at spawning time.
448 Usually only useful for diagnostics.
450 # Start process in a "daemonized" way, using nohup and heavy
451 # stdin/out redirection to avoid connection issues
455 stderr = ' ' + stderr
457 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
459 'pidfile' : shell_escape(pidfile),
465 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
466 'command' : shell_escape(daemon_command),
467 'sudo' : 'sudo -S' if sudo else '',
468 'pidfile' : shell_escape(pidfile),
469 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
470 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
473 (out,err),proc = rexec(
482 server_key = server_key,
484 strict_host_checking = strict_host_checking ,
488 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
490 return ((out, err), proc)
502 strict_host_checking = True):
504 Returns the pid and ppid of a process from a remote file where the
505 information was stored.
507 :param home: Path to directory where the pidfile is located
510 :param pidfile: Name of file containing the pid information
515 A (pid, ppid) tuple useful for calling rstatus and rkill,
516 or None if the pidfile isn't valid yet (can happen when process is staring up)
519 (out,err),proc = rexec(
520 "cat %(pidfile)s" % {
530 server_key = server_key,
531 strict_host_checking = strict_host_checking
539 return map(int,out.strip().split(' ',1))
541 # Ignore, many ways to fail that don't matter that much
545 def rstatus(pid, ppid,
554 strict_host_checking = True):
556 Returns a code representing the the status of a remote process
558 :param pid: Process id of the process
561 :param ppid: Parent process id of process
564 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
567 (out,err),proc = rexec(
568 # Check only by pid. pid+ppid does not always work (especially with sudo)
569 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
580 server_key = server_key,
581 strict_host_checking = strict_host_checking
585 return ProcStatus.NOT_STARTED
589 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
592 status = (out.strip() == 'wait')
594 return ProcStatus.NOT_STARTED
595 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
609 strict_host_checking = True):
611 Sends a kill signal to a remote process.
613 First tries a SIGTERM, and if the process does not end in 10 seconds,
616 :param pid: Process id of process to be killed
619 :param ppid: Parent process id of process to be killed
622 :param sudo: Flag indicating if sudo should be used to kill the process
626 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
628 SUBKILL="%(subkill)s" ;
629 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
630 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
631 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
633 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
636 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
637 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
641 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
642 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
643 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
647 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
649 (out,err),proc = rexec(
653 'sudo' : 'sudo -S' if sudo else '',
663 server_key = server_key,
664 strict_host_checking = strict_host_checking
667 # wait, don't leave zombies around
670 return (out, err), proc
672 def _retry_rexec(args,
674 stdout = subprocess.PIPE,
675 stdin = subprocess.PIPE,
676 stderr = subprocess.PIPE,
679 tmp_known_hosts = None,
682 for x in xrange(retry):
683 # connects to the remote host and starts a remote connection
684 proc = subprocess.Popen(args,
690 # attach tempfile object to the process, to make sure the file stays
691 # alive until the process is finished with it
692 proc._known_hosts = tmp_known_hosts
694 # The argument block == False forces to rexec to return immediately,
699 #(out, err) = proc.communicate()
700 # The method communicate was re implemented for performance issues
701 # when using python subprocess communicate method the ssh commands
702 # last one minute each
703 out, err = _communicate(proc, input=None)
706 out = proc.stdout.read()
707 if proc.poll() and stderr:
708 err = proc.stderr.read()
710 log(log_msg, logging.DEBUG, out, err)
715 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
716 # SSH error, can safely retry
719 # Probably timed out or plain failed but can retry
724 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
725 t, x, " ".join(args))
726 log(msg, logging.DEBUG)
731 except RuntimeError, e:
732 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
733 log(msg, logging.DEBUG, out, err)
739 return ((out, err), proc)
742 # Don't remove. The method communicate was re implemented for performance issues
743 def _communicate(proc, input, timeout=None, err_on_timeout=True):
746 stdout = None # Return
747 stderr = None # Return
751 if timeout is not None:
752 timelimit = time.time() + timeout
753 killtime = timelimit + 4
754 bailtime = timelimit + 4
757 # Flush stdio buffer. This might block, if the user has
758 # been writing to .stdin in an uncontrolled fashion.
761 write_set.append(proc.stdin)
766 read_set.append(proc.stdout)
770 read_set.append(proc.stderr)
774 while read_set or write_set:
775 if timeout is not None:
776 curtime = time.time()
777 if timeout is None or curtime > timelimit:
778 if curtime > bailtime:
780 elif curtime > killtime:
781 signum = signal.SIGKILL
783 signum = signal.SIGTERM
785 os.kill(proc.pid, signum)
788 select_timeout = timelimit - curtime + 0.1
792 if select_timeout > 1.0:
796 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
797 except select.error,e:
803 if not rlist and not wlist and not xlist and proc.poll() is not None:
804 # timeout and process exited, say bye
807 if proc.stdin in wlist:
808 # When select has indicated that the file is writable,
809 # we can write up to PIPE_BUF bytes without risk
810 # blocking. POSIX defines PIPE_BUF >= 512
811 bytes_written = os.write(proc.stdin.fileno(),
812 buffer(input, input_offset, 512))
813 input_offset += bytes_written
815 if input_offset >= len(input):
817 write_set.remove(proc.stdin)
819 if proc.stdout in rlist:
820 data = os.read(proc.stdout.fileno(), 1024)
823 read_set.remove(proc.stdout)
826 if proc.stderr in rlist:
827 data = os.read(proc.stderr.fileno(), 1024)
830 read_set.remove(proc.stderr)
833 # All data exchanged. Translate lists into strings.
834 if stdout is not None:
835 stdout = ''.join(stdout)
836 if stderr is not None:
837 stderr = ''.join(stderr)
839 # Translate newlines, if requested. We cannot let the file
840 # object do the translation: It is based on stdio, which is
841 # impossible to combine with select (unless forcing no
843 if proc.universal_newlines and hasattr(file, 'newlines'):
845 stdout = proc._translate_newlines(stdout)
847 stderr = proc._translate_newlines(stderr)
849 if killed and err_on_timeout:
850 errcode = proc.poll()
851 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
857 return (stdout, stderr)