2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
218 err_on_timeout = True,
219 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
229 hostip = gethostbyname(host)
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
238 '-l', user, hostip or host]
240 if persistent and openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244 '-o', 'ControlPersist=60' ])
246 if not strict_host_checking:
247 # Do not check for Host key. Unsafe.
248 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 args.append('-p%d' % port)
257 args.extend(('-i', identity))
267 # Create a temporary server key file
268 tmp_known_hosts = make_server_key_args(server_key, host, port)
269 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
272 command = "sudo " + command
276 for x in xrange(retry):
277 # connects to the remote host and starts a remote connection
278 proc = subprocess.Popen(args,
280 stdout = subprocess.PIPE,
281 stdin = subprocess.PIPE,
282 stderr = subprocess.PIPE)
284 # attach tempfile object to the process, to make sure the file stays
285 # alive until the process is finished with it
286 proc._known_hosts = tmp_known_hosts
288 # by default, rexec calls _communicate which will block
289 # until the process has exit. The argument block == False
290 # forces to rexec to return immediately, without blocking
293 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
297 err = proc.stderr.read()
299 msg = " rexec - host %s - command %s " % (host, " ".join(args))
300 log(msg, logging.DEBUG, out, err)
305 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
306 # SSH error, can safely retry
309 # Probably timed out or plain failed but can retry
314 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
315 t, x, host, " ".join(args))
316 log(msg, logging.DEBUG)
321 except RuntimeError, e:
322 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
323 log(msg, logging.DEBUG, out, err)
329 return ((out, err), proc)
331 def rcopy(source, dest,
338 strict_host_checking = True):
340 Copies from/to remote sites.
342 Source and destination should have the user and host encoded
345 If source is a file object, a special mode will be used to
346 create the remote file with the same contents.
348 If dest is a file object, the remote file (source) will be
349 read and written into dest.
351 In these modes, recursive cannot be True.
353 Source can be a list of files to copy to a single destination,
354 in which case it is advised that the destination be a folder.
357 if isinstance(source, file) and source.tell() == 0:
359 elif hasattr(source, 'read'):
360 tmp = tempfile.NamedTemporaryFile()
362 buf = source.read(65536)
370 if isinstance(source, file) or isinstance(dest, file) \
371 or hasattr(source, 'read') or hasattr(dest, 'write'):
374 # Parse source/destination as <user>@<server>:<path>
375 if isinstance(dest, basestring) and ':' in dest:
376 remspec, path = dest.split(':',1)
377 elif isinstance(source, basestring) and ':' in source:
378 remspec, path = source.split(':',1)
380 raise ValueError, "Both endpoints cannot be local"
381 user,host = remspec.rsplit('@',1)
383 tmp_known_hosts = None
384 hostip = gethostbyname(host)
386 args = ['ssh', '-l', user, '-C',
387 # Don't bother with localhost. Makes test easier
388 '-o', 'NoHostAuthenticationForLocalhost=yes',
389 '-o', 'ConnectTimeout=60',
390 '-o', 'ConnectionAttempts=3',
391 '-o', 'ServerAliveInterval=30',
392 '-o', 'TCPKeepAlive=yes',
395 if openssh_has_persist():
397 '-o', 'ControlMaster=auto',
398 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
399 '-o', 'ControlPersist=60' ])
402 args.append('-P%d' % port)
405 args.extend(('-i', identity))
408 # Create a temporary server key file
409 tmp_known_hosts = make_server_key_args(server_key, host, port)
410 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
412 if isinstance(source, file) or hasattr(source, 'read'):
413 args.append('cat > %s' % (shell_escape(path),))
414 elif isinstance(dest, file) or hasattr(dest, 'write'):
415 args.append('cat %s' % (shell_escape(path),))
417 raise AssertionError, "Unreachable code reached! :-Q"
419 # connects to the remote host and starts a remote connection
420 if isinstance(source, file):
421 proc = subprocess.Popen(args,
422 stdout = open('/dev/null','w'),
423 stderr = subprocess.PIPE,
425 err = proc.stderr.read()
426 proc._known_hosts = tmp_known_hosts
427 eintr_retry(proc.wait)()
428 return ((None,err), proc)
429 elif isinstance(dest, file):
430 proc = subprocess.Popen(args,
431 stdout = open('/dev/null','w'),
432 stderr = subprocess.PIPE,
434 err = proc.stderr.read()
435 proc._known_hosts = tmp_known_hosts
436 eintr_retry(proc.wait)()
437 return ((None,err), proc)
438 elif hasattr(source, 'read'):
439 # file-like (but not file) source
440 proc = subprocess.Popen(args,
441 stdout = open('/dev/null','w'),
442 stderr = subprocess.PIPE,
443 stdin = subprocess.PIPE)
449 buf = source.read(4096)
454 rdrdy, wrdy, broken = select.select(
457 [proc.stderr,proc.stdin])
459 if proc.stderr in rdrdy:
460 # use os.read for fully unbuffered behavior
461 err.append(os.read(proc.stderr.fileno(), 4096))
463 if proc.stdin in wrdy:
464 proc.stdin.write(buf)
470 err.append(proc.stderr.read())
472 proc._known_hosts = tmp_known_hosts
473 eintr_retry(proc.wait)()
474 return ((None,''.join(err)), proc)
475 elif hasattr(dest, 'write'):
476 # file-like (but not file) dest
477 proc = subprocess.Popen(args,
478 stdout = subprocess.PIPE,
479 stderr = subprocess.PIPE,
480 stdin = open('/dev/null','w'))
485 rdrdy, wrdy, broken = select.select(
486 [proc.stderr, proc.stdout],
488 [proc.stderr, proc.stdout])
490 if proc.stderr in rdrdy:
491 # use os.read for fully unbuffered behavior
492 err.append(os.read(proc.stderr.fileno(), 4096))
494 if proc.stdout in rdrdy:
495 # use os.read for fully unbuffered behavior
496 buf = os.read(proc.stdout.fileno(), 4096)
505 err.append(proc.stderr.read())
507 proc._known_hosts = tmp_known_hosts
508 eintr_retry(proc.wait)()
509 return ((None,''.join(err)), proc)
511 raise AssertionError, "Unreachable code reached! :-Q"
513 # Parse destination as <user>@<server>:<path>
514 if isinstance(dest, basestring) and ':' in dest:
515 remspec, path = dest.split(':',1)
516 elif isinstance(source, basestring) and ':' in source:
517 remspec, path = source.split(':',1)
519 raise ValueError, "Both endpoints cannot be local"
520 user,host = remspec.rsplit('@',1)
523 tmp_known_hosts = None
525 args = ['scp', '-q', '-p', '-C',
526 # Speed up transfer using blowfish cypher specification which is
527 # faster than the default one (3des)
529 # Don't bother with localhost. Makes test easier
530 '-o', 'NoHostAuthenticationForLocalhost=yes',
531 '-o', 'ConnectTimeout=60',
532 '-o', 'ConnectionAttempts=3',
533 '-o', 'ServerAliveInterval=30',
534 '-o', 'TCPKeepAlive=yes' ]
537 args.append('-P%d' % port)
543 args.extend(('-i', identity))
546 # Create a temporary server key file
547 tmp_known_hosts = make_server_key_args(server_key, host, port)
548 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
550 if not strict_host_checking:
551 # Do not check for Host key. Unsafe.
552 args.extend(['-o', 'StrictHostKeyChecking=no'])
554 if isinstance(source,list):
557 if openssh_has_persist():
559 '-o', 'ControlMaster=auto',
560 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
566 for x in xrange(retry):
567 # connects to the remote host and starts a remote connection
568 proc = subprocess.Popen(args,
569 stdout = subprocess.PIPE,
570 stdin = subprocess.PIPE,
571 stderr = subprocess.PIPE)
573 # attach tempfile object to the process, to make sure the file stays
574 # alive until the process is finished with it
575 proc._known_hosts = tmp_known_hosts
578 (out, err) = proc.communicate()
579 eintr_retry(proc.wait)()
580 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
581 log(msg, logging.DEBUG, out, err)
585 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
586 t, x, host, " ".join(args))
587 log(msg, logging.DEBUG)
593 except RuntimeError, e:
594 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
595 log(msg, logging.DEBUG, out, err)
601 return ((out, err), proc)
603 def rspawn(command, pidfile,
604 stdout = '/dev/null',
618 Spawn a remote command such that it will continue working asynchronously in
621 :param command: The command to run, it should be a single line.
624 :param pidfile: Path to a file where to store the pid and ppid of the
628 :param stdout: Path to file to redirect standard output.
629 The default value is /dev/null
632 :param stderr: Path to file to redirect standard error.
633 If the special STDOUT value is used, stderr will
634 be redirected to the same file as stdout
637 :param stdin: Path to a file with input to be piped into the command's standard input
640 :param home: Path to working directory folder.
641 It is assumed to exist unless the create_home flag is set.
644 :param create_home: Flag to force creation of the home folder before
646 :type create_home: bool
648 :param sudo: Flag forcing execution with sudo user
653 (stdout, stderr), process
655 Of the spawning process, which only captures errors at spawning time.
656 Usually only useful for diagnostics.
658 # Start process in a "daemonized" way, using nohup and heavy
659 # stdin/out redirection to avoid connection issues
663 stderr = ' ' + stderr
665 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
667 'pidfile' : shell_escape(pidfile),
673 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
674 'command' : shell_escape(daemon_command),
675 'sudo' : 'sudo -S' if sudo else '',
676 'pidfile' : shell_escape(pidfile),
677 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
678 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
681 (out,err),proc = rexec(
688 server_key = server_key,
693 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
695 return ((out, err), proc)
706 Returns the pid and ppid of a process from a remote file where the
707 information was stored.
709 :param home: Path to directory where the pidfile is located
712 :param pidfile: Name of file containing the pid information
717 A (pid, ppid) tuple useful for calling rstatus and rkill,
718 or None if the pidfile isn't valid yet (can happen when process is staring up)
721 (out,err),proc = rexec(
722 "cat %(pidfile)s" % {
730 server_key = server_key
738 return map(int,out.strip().split(' ',1))
740 # Ignore, many ways to fail that don't matter that much
744 def rstatus(pid, ppid,
752 Returns a code representing the the status of a remote process
754 :param pid: Process id of the process
757 :param ppid: Parent process id of process
760 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
763 (out,err),proc = rexec(
764 # Check only by pid. pid+ppid does not always work (especially with sudo)
765 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
774 server_key = server_key
778 return ProcStatus.NOT_STARTED
782 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
785 status = (out.strip() == 'wait')
787 return ProcStatus.NOT_STARTED
788 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
801 Sends a kill signal to a remote process.
803 First tries a SIGTERM, and if the process does not end in 10 seconds,
806 :param pid: Process id of process to be killed
809 :param ppid: Parent process id of process to be killed
812 :param sudo: Flag indicating if sudo should be used to kill the process
816 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
818 SUBKILL="%(subkill)s" ;
819 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
820 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
821 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
823 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
826 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
827 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
831 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
832 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
833 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
837 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
839 (out,err),proc = rexec(
843 'sudo' : 'sudo -S' if sudo else '',
851 server_key = server_key
854 # wait, don't leave zombies around
857 return (out, err), proc
860 def _communicate(proc, input, timeout=None, err_on_timeout=True):
863 stdout = None # Return
864 stderr = None # Return
868 if timeout is not None:
869 timelimit = time.time() + timeout
870 killtime = timelimit + 4
871 bailtime = timelimit + 4
874 # Flush stdio buffer. This might block, if the user has
875 # been writing to .stdin in an uncontrolled fashion.
878 write_set.append(proc.stdin)
883 read_set.append(proc.stdout)
887 read_set.append(proc.stderr)
891 while read_set or write_set:
892 if timeout is not None:
893 curtime = time.time()
894 if timeout is None or curtime > timelimit:
895 if curtime > bailtime:
897 elif curtime > killtime:
898 signum = signal.SIGKILL
900 signum = signal.SIGTERM
902 os.kill(proc.pid, signum)
905 select_timeout = timelimit - curtime + 0.1
909 if select_timeout > 1.0:
913 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
914 except select.error,e:
920 if not rlist and not wlist and not xlist and proc.poll() is not None:
921 # timeout and process exited, say bye
924 if proc.stdin in wlist:
925 # When select has indicated that the file is writable,
926 # we can write up to PIPE_BUF bytes without risk
927 # blocking. POSIX defines PIPE_BUF >= 512
928 bytes_written = os.write(proc.stdin.fileno(),
929 buffer(input, input_offset, 512))
930 input_offset += bytes_written
932 if input_offset >= len(input):
934 write_set.remove(proc.stdin)
936 if proc.stdout in rlist:
937 data = os.read(proc.stdout.fileno(), 1024)
940 read_set.remove(proc.stdout)
943 if proc.stderr in rlist:
944 data = os.read(proc.stderr.fileno(), 1024)
947 read_set.remove(proc.stderr)
950 # All data exchanged. Translate lists into strings.
951 if stdout is not None:
952 stdout = ''.join(stdout)
953 if stderr is not None:
954 stderr = ''.join(stderr)
956 # Translate newlines, if requested. We cannot let the file
957 # object do the translation: It is based on stdio, which is
958 # impossible to combine with select (unless forcing no
960 if proc.universal_newlines and hasattr(file, 'newlines'):
962 stdout = proc._translate_newlines(stdout)
964 stderr = proc._translate_newlines(stderr)
966 if killed and err_on_timeout:
967 errcode = proc.poll()
968 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
974 return (stdout, stderr)