2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
220 err_on_timeout = True,
221 connect_timeout = 30,
225 strict_host_checking = True):
227 Executes a remote command, returns ((stdout,stderr),process)
230 tmp_known_hosts = None
232 hostip = gethostbyname(host)
236 # Don't bother with localhost. Makes test easier
237 '-o', 'NoHostAuthenticationForLocalhost=yes',
238 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239 '-o', 'ConnectionAttempts=3',
240 '-o', 'ServerAliveInterval=30',
241 '-o', 'TCPKeepAlive=yes',
242 '-o', 'Batchmode=yes',
243 '-l', user, hostip or host]
245 if persistent and openssh_has_persist():
247 '-o', 'ControlMaster=auto',
248 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249 '-o', 'ControlPersist=60' ])
251 if not strict_host_checking:
252 # Do not check for Host key. Unsafe.
253 args.extend(['-o', 'StrictHostKeyChecking=no'])
257 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
259 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260 args.extend(['-o', proxycommand])
266 args.append('-p%d' % port)
269 identity = os.path.expanduser(identity)
270 args.extend(('-i', identity))
280 # Create a temporary server key file
281 tmp_known_hosts = make_server_key_args(server_key, host, port)
282 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
285 command = "sudo " + command
289 for x in xrange(retry):
290 # connects to the remote host and starts a remote connection
291 proc = subprocess.Popen(args,
293 stdout = subprocess.PIPE,
294 stdin = subprocess.PIPE,
295 stderr = subprocess.PIPE)
297 # attach tempfile object to the process, to make sure the file stays
298 # alive until the process is finished with it
299 proc._known_hosts = tmp_known_hosts
301 # by default, rexec calls _communicate which will block
302 # until the process has exit. The argument block == False
303 # forces to rexec to return immediately, without blocking
306 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
308 err = proc.stderr.read()
309 out = proc.stdout.read()
311 msg = " rexec - host %s - command %s " % (host, " ".join(args))
312 log(msg, logging.DEBUG, out, err)
317 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
318 # SSH error, can safely retry
321 # Probably timed out or plain failed but can retry
326 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
327 t, x, host, " ".join(args))
328 log(msg, logging.DEBUG)
333 except RuntimeError, e:
334 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
335 log(msg, logging.DEBUG, out, err)
341 return ((out, err), proc)
343 def rcopy(source, dest,
352 strict_host_checking = True):
354 Copies from/to remote sites.
356 Source and destination should have the user and host encoded
359 Source can be a list of files to copy to a single destination,
360 (in which case it is advised that the destination be a folder),
361 a single file in a string or a semi-colon separated list of files
365 # Parse destination as <user>@<server>:<path>
366 if isinstance(dest, str) and ':' in dest:
367 remspec, path = dest.split(':',1)
368 elif isinstance(source, str) and ':' in source:
369 remspec, path = source.split(':',1)
371 raise ValueError, "Both endpoints cannot be local"
372 user,host = remspec.rsplit('@',1)
375 tmp_known_hosts = None
377 args = ['scp', '-q', '-p', '-C',
378 # Speed up transfer using blowfish cypher specification which is
379 # faster than the default one (3des)
381 # Don't bother with localhost. Makes test easier
382 '-o', 'NoHostAuthenticationForLocalhost=yes',
383 '-o', 'ConnectTimeout=60',
384 '-o', 'ConnectionAttempts=3',
385 '-o', 'ServerAliveInterval=30',
386 '-o', 'TCPKeepAlive=yes' ]
389 args.append('-P%d' % port)
393 proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
395 proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
396 args.extend(['-o', proxycommand])
402 identity = os.path.expanduser(identity)
403 args.extend(('-i', identity))
406 # Create a temporary server key file
407 tmp_known_hosts = make_server_key_args(server_key, host, port)
408 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
410 if not strict_host_checking:
411 # Do not check for Host key. Unsafe.
412 args.extend(['-o', 'StrictHostKeyChecking=no'])
414 if openssh_has_persist():
416 '-o', 'ControlMaster=auto',
417 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
420 if isinstance(dest, str):
421 dest = map(str.strip, dest.split(";"))
423 if isinstance(source, str):
424 source = map(str.strip, source.split(";"))
430 for x in xrange(retry):
431 # connects to the remote host and starts a remote connection
432 proc = subprocess.Popen(args,
433 stdout = subprocess.PIPE,
434 stdin = subprocess.PIPE,
435 stderr = subprocess.PIPE)
437 # attach tempfile object to the process, to make sure the file stays
438 # alive until the process is finished with it
439 proc._known_hosts = tmp_known_hosts
442 (out, err) = proc.communicate()
443 eintr_retry(proc.wait)()
444 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
445 log(msg, logging.DEBUG, out, err)
449 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
450 t, x, host, " ".join(args))
451 log(msg, logging.DEBUG)
457 except RuntimeError, e:
458 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
459 log(msg, logging.DEBUG, out, err)
465 return ((out, err), proc)
467 def rspawn(command, pidfile,
468 stdout = '/dev/null',
484 Spawn a remote command such that it will continue working asynchronously in
487 :param command: The command to run, it should be a single line.
490 :param pidfile: Path to a file where to store the pid and ppid of the
494 :param stdout: Path to file to redirect standard output.
495 The default value is /dev/null
498 :param stderr: Path to file to redirect standard error.
499 If the special STDOUT value is used, stderr will
500 be redirected to the same file as stdout
503 :param stdin: Path to a file with input to be piped into the command's standard input
506 :param home: Path to working directory folder.
507 It is assumed to exist unless the create_home flag is set.
510 :param create_home: Flag to force creation of the home folder before
512 :type create_home: bool
514 :param sudo: Flag forcing execution with sudo user
519 (stdout, stderr), process
521 Of the spawning process, which only captures errors at spawning time.
522 Usually only useful for diagnostics.
524 # Start process in a "daemonized" way, using nohup and heavy
525 # stdin/out redirection to avoid connection issues
529 stderr = ' ' + stderr
531 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
533 'pidfile' : shell_escape(pidfile),
539 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
540 'command' : shell_escape(daemon_command),
541 'sudo' : 'sudo -S' if sudo else '',
542 'pidfile' : shell_escape(pidfile),
543 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
544 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
547 (out,err),proc = rexec(
556 server_key = server_key,
561 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
563 return ((out, err), proc)
576 Returns the pid and ppid of a process from a remote file where the
577 information was stored.
579 :param home: Path to directory where the pidfile is located
582 :param pidfile: Name of file containing the pid information
587 A (pid, ppid) tuple useful for calling rstatus and rkill,
588 or None if the pidfile isn't valid yet (can happen when process is staring up)
591 (out,err),proc = rexec(
592 "cat %(pidfile)s" % {
602 server_key = server_key
610 return map(int,out.strip().split(' ',1))
612 # Ignore, many ways to fail that don't matter that much
616 def rstatus(pid, ppid,
626 Returns a code representing the the status of a remote process
628 :param pid: Process id of the process
631 :param ppid: Parent process id of process
634 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
637 (out,err),proc = rexec(
638 # Check only by pid. pid+ppid does not always work (especially with sudo)
639 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
650 server_key = server_key
654 return ProcStatus.NOT_STARTED
658 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
661 status = (out.strip() == 'wait')
663 return ProcStatus.NOT_STARTED
664 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
679 Sends a kill signal to a remote process.
681 First tries a SIGTERM, and if the process does not end in 10 seconds,
684 :param pid: Process id of process to be killed
687 :param ppid: Parent process id of process to be killed
690 :param sudo: Flag indicating if sudo should be used to kill the process
694 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
696 SUBKILL="%(subkill)s" ;
697 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
698 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
699 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
701 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
704 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
705 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
709 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
710 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
711 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
715 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
717 (out,err),proc = rexec(
721 'sudo' : 'sudo -S' if sudo else '',
731 server_key = server_key
734 # wait, don't leave zombies around
737 return (out, err), proc
740 def _communicate(proc, input, timeout=None, err_on_timeout=True):
743 stdout = None # Return
744 stderr = None # Return
748 if timeout is not None:
749 timelimit = time.time() + timeout
750 killtime = timelimit + 4
751 bailtime = timelimit + 4
754 # Flush stdio buffer. This might block, if the user has
755 # been writing to .stdin in an uncontrolled fashion.
758 write_set.append(proc.stdin)
763 read_set.append(proc.stdout)
767 read_set.append(proc.stderr)
771 while read_set or write_set:
772 if timeout is not None:
773 curtime = time.time()
774 if timeout is None or curtime > timelimit:
775 if curtime > bailtime:
777 elif curtime > killtime:
778 signum = signal.SIGKILL
780 signum = signal.SIGTERM
782 os.kill(proc.pid, signum)
785 select_timeout = timelimit - curtime + 0.1
789 if select_timeout > 1.0:
793 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
794 except select.error,e:
800 if not rlist and not wlist and not xlist and proc.poll() is not None:
801 # timeout and process exited, say bye
804 if proc.stdin in wlist:
805 # When select has indicated that the file is writable,
806 # we can write up to PIPE_BUF bytes without risk
807 # blocking. POSIX defines PIPE_BUF >= 512
808 bytes_written = os.write(proc.stdin.fileno(),
809 buffer(input, input_offset, 512))
810 input_offset += bytes_written
812 if input_offset >= len(input):
814 write_set.remove(proc.stdin)
816 if proc.stdout in rlist:
817 data = os.read(proc.stdout.fileno(), 1024)
820 read_set.remove(proc.stdout)
823 if proc.stderr in rlist:
824 data = os.read(proc.stderr.fileno(), 1024)
827 read_set.remove(proc.stderr)
830 # All data exchanged. Translate lists into strings.
831 if stdout is not None:
832 stdout = ''.join(stdout)
833 if stderr is not None:
834 stderr = ''.join(stderr)
836 # Translate newlines, if requested. We cannot let the file
837 # object do the translation: It is based on stdio, which is
838 # impossible to combine with select (unless forcing no
840 if proc.universal_newlines and hasattr(file, 'newlines'):
842 stdout = proc._translate_newlines(stdout)
844 stderr = proc._translate_newlines(stderr)
846 if killed and err_on_timeout:
847 errcode = proc.poll()
848 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
854 return (stdout, stderr)