2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
218 err_on_timeout = True,
219 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
229 hostip = gethostbyname(host)
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
238 '-l', user, hostip or host]
240 if persistent and openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244 '-o', 'ControlPersist=60' ])
246 if not strict_host_checking:
247 # Do not check for Host key. Unsafe.
248 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 args.append('-p%d' % port)
257 args.extend(('-i', identity))
267 # Create a temporary server key file
268 tmp_known_hosts = make_server_key_args(server_key, host, port)
269 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
272 command = "sudo " + command
276 for x in xrange(retry):
277 # connects to the remote host and starts a remote connection
278 proc = subprocess.Popen(args,
280 stdout = subprocess.PIPE,
281 stdin = subprocess.PIPE,
282 stderr = subprocess.PIPE)
284 # attach tempfile object to the process, to make sure the file stays
285 # alive until the process is finished with it
286 proc._known_hosts = tmp_known_hosts
288 # by default, rexec calls _communicate which will block
289 # until the process has exit. The argument block == False
290 # forces to rexec to return immediately, without blocking
293 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
295 err = proc.stderr.read()
296 out = proc.stdout.read()
298 msg = " rexec - host %s - command %s " % (host, " ".join(args))
299 log(msg, logging.DEBUG, out, err)
304 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
305 # SSH error, can safely retry
308 # Probably timed out or plain failed but can retry
313 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
314 t, x, host, " ".join(args))
315 log(msg, logging.DEBUG)
320 except RuntimeError, e:
321 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
322 log(msg, logging.DEBUG, out, err)
328 return ((out, err), proc)
330 def rcopy(source, dest,
337 strict_host_checking = True):
339 Copies from/to remote sites.
341 Source and destination should have the user and host encoded
344 If source is a file object, a special mode will be used to
345 create the remote file with the same contents.
347 If dest is a file object, the remote file (source) will be
348 read and written into dest.
350 In these modes, recursive cannot be True.
352 Source can be a list of files to copy to a single destination,
353 in which case it is advised that the destination be a folder.
356 if isinstance(source, file) and source.tell() == 0:
358 elif hasattr(source, 'read'):
359 tmp = tempfile.NamedTemporaryFile()
361 buf = source.read(65536)
369 if isinstance(source, file) or isinstance(dest, file) \
370 or hasattr(source, 'read') or hasattr(dest, 'write'):
373 # Parse source/destination as <user>@<server>:<path>
374 if isinstance(dest, basestring) and ':' in dest:
375 remspec, path = dest.split(':',1)
376 elif isinstance(source, basestring) and ':' in source:
377 remspec, path = source.split(':',1)
379 raise ValueError, "Both endpoints cannot be local"
380 user,host = remspec.rsplit('@',1)
382 tmp_known_hosts = None
383 hostip = gethostbyname(host)
385 args = ['ssh', '-l', user, '-C',
386 # Don't bother with localhost. Makes test easier
387 '-o', 'NoHostAuthenticationForLocalhost=yes',
388 '-o', 'ConnectTimeout=60',
389 '-o', 'ConnectionAttempts=3',
390 '-o', 'ServerAliveInterval=30',
391 '-o', 'TCPKeepAlive=yes',
394 if openssh_has_persist():
396 '-o', 'ControlMaster=auto',
397 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
398 '-o', 'ControlPersist=60' ])
401 args.append('-P%d' % port)
404 args.extend(('-i', identity))
407 # Create a temporary server key file
408 tmp_known_hosts = make_server_key_args(server_key, host, port)
409 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
411 if isinstance(source, file) or hasattr(source, 'read'):
412 args.append('cat > %s' % (shell_escape(path),))
413 elif isinstance(dest, file) or hasattr(dest, 'write'):
414 args.append('cat %s' % (shell_escape(path),))
416 raise AssertionError, "Unreachable code reached! :-Q"
418 # connects to the remote host and starts a remote connection
419 if isinstance(source, file):
420 proc = subprocess.Popen(args,
421 stdout = open('/dev/null','w'),
422 stderr = subprocess.PIPE,
424 err = proc.stderr.read()
425 proc._known_hosts = tmp_known_hosts
426 eintr_retry(proc.wait)()
427 return ((None,err), proc)
428 elif isinstance(dest, file):
429 proc = subprocess.Popen(args,
430 stdout = open('/dev/null','w'),
431 stderr = subprocess.PIPE,
433 err = proc.stderr.read()
434 proc._known_hosts = tmp_known_hosts
435 eintr_retry(proc.wait)()
436 return ((None,err), proc)
437 elif hasattr(source, 'read'):
438 # file-like (but not file) source
439 proc = subprocess.Popen(args,
440 stdout = open('/dev/null','w'),
441 stderr = subprocess.PIPE,
442 stdin = subprocess.PIPE)
448 buf = source.read(4096)
453 rdrdy, wrdy, broken = select.select(
456 [proc.stderr,proc.stdin])
458 if proc.stderr in rdrdy:
459 # use os.read for fully unbuffered behavior
460 err.append(os.read(proc.stderr.fileno(), 4096))
462 if proc.stdin in wrdy:
463 proc.stdin.write(buf)
469 err.append(proc.stderr.read())
471 proc._known_hosts = tmp_known_hosts
472 eintr_retry(proc.wait)()
473 return ((None,''.join(err)), proc)
474 elif hasattr(dest, 'write'):
475 # file-like (but not file) dest
476 proc = subprocess.Popen(args,
477 stdout = subprocess.PIPE,
478 stderr = subprocess.PIPE,
479 stdin = open('/dev/null','w'))
484 rdrdy, wrdy, broken = select.select(
485 [proc.stderr, proc.stdout],
487 [proc.stderr, proc.stdout])
489 if proc.stderr in rdrdy:
490 # use os.read for fully unbuffered behavior
491 err.append(os.read(proc.stderr.fileno(), 4096))
493 if proc.stdout in rdrdy:
494 # use os.read for fully unbuffered behavior
495 buf = os.read(proc.stdout.fileno(), 4096)
504 err.append(proc.stderr.read())
506 proc._known_hosts = tmp_known_hosts
507 eintr_retry(proc.wait)()
508 return ((None,''.join(err)), proc)
510 raise AssertionError, "Unreachable code reached! :-Q"
512 # Parse destination as <user>@<server>:<path>
513 if isinstance(dest, basestring) and ':' in dest:
514 remspec, path = dest.split(':',1)
515 elif isinstance(source, basestring) and ':' in source:
516 remspec, path = source.split(':',1)
518 raise ValueError, "Both endpoints cannot be local"
519 user,host = remspec.rsplit('@',1)
522 tmp_known_hosts = None
524 args = ['scp', '-q', '-p', '-C',
525 # Speed up transfer using blowfish cypher specification which is
526 # faster than the default one (3des)
528 # Don't bother with localhost. Makes test easier
529 '-o', 'NoHostAuthenticationForLocalhost=yes',
530 '-o', 'ConnectTimeout=60',
531 '-o', 'ConnectionAttempts=3',
532 '-o', 'ServerAliveInterval=30',
533 '-o', 'TCPKeepAlive=yes' ]
536 args.append('-P%d' % port)
542 args.extend(('-i', identity))
545 # Create a temporary server key file
546 tmp_known_hosts = make_server_key_args(server_key, host, port)
547 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
549 if not strict_host_checking:
550 # Do not check for Host key. Unsafe.
551 args.extend(['-o', 'StrictHostKeyChecking=no'])
553 if isinstance(source,list):
556 if openssh_has_persist():
558 '-o', 'ControlMaster=auto',
559 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
565 for x in xrange(retry):
566 # connects to the remote host and starts a remote connection
567 proc = subprocess.Popen(args,
568 stdout = subprocess.PIPE,
569 stdin = subprocess.PIPE,
570 stderr = subprocess.PIPE)
572 # attach tempfile object to the process, to make sure the file stays
573 # alive until the process is finished with it
574 proc._known_hosts = tmp_known_hosts
577 (out, err) = proc.communicate()
578 eintr_retry(proc.wait)()
579 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
580 log(msg, logging.DEBUG, out, err)
584 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
585 t, x, host, " ".join(args))
586 log(msg, logging.DEBUG)
592 except RuntimeError, e:
593 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
594 log(msg, logging.DEBUG, out, err)
600 return ((out, err), proc)
602 def rspawn(command, pidfile,
603 stdout = '/dev/null',
617 Spawn a remote command such that it will continue working asynchronously in
620 :param command: The command to run, it should be a single line.
623 :param pidfile: Path to a file where to store the pid and ppid of the
627 :param stdout: Path to file to redirect standard output.
628 The default value is /dev/null
631 :param stderr: Path to file to redirect standard error.
632 If the special STDOUT value is used, stderr will
633 be redirected to the same file as stdout
636 :param stdin: Path to a file with input to be piped into the command's standard input
639 :param home: Path to working directory folder.
640 It is assumed to exist unless the create_home flag is set.
643 :param create_home: Flag to force creation of the home folder before
645 :type create_home: bool
647 :param sudo: Flag forcing execution with sudo user
652 (stdout, stderr), process
654 Of the spawning process, which only captures errors at spawning time.
655 Usually only useful for diagnostics.
657 # Start process in a "daemonized" way, using nohup and heavy
658 # stdin/out redirection to avoid connection issues
662 stderr = ' ' + stderr
664 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
666 'pidfile' : shell_escape(pidfile),
672 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
673 'command' : shell_escape(daemon_command),
674 'sudo' : 'sudo -S' if sudo else '',
675 'pidfile' : shell_escape(pidfile),
676 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
677 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
680 (out,err),proc = rexec(
687 server_key = server_key,
692 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
694 return ((out, err), proc)
705 Returns the pid and ppid of a process from a remote file where the
706 information was stored.
708 :param home: Path to directory where the pidfile is located
711 :param pidfile: Name of file containing the pid information
716 A (pid, ppid) tuple useful for calling rstatus and rkill,
717 or None if the pidfile isn't valid yet (can happen when process is staring up)
720 (out,err),proc = rexec(
721 "cat %(pidfile)s" % {
729 server_key = server_key
737 return map(int,out.strip().split(' ',1))
739 # Ignore, many ways to fail that don't matter that much
743 def rstatus(pid, ppid,
751 Returns a code representing the the status of a remote process
753 :param pid: Process id of the process
756 :param ppid: Parent process id of process
759 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
762 (out,err),proc = rexec(
763 # Check only by pid. pid+ppid does not always work (especially with sudo)
764 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
773 server_key = server_key
777 return ProcStatus.NOT_STARTED
781 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
784 status = (out.strip() == 'wait')
786 return ProcStatus.NOT_STARTED
787 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
800 Sends a kill signal to a remote process.
802 First tries a SIGTERM, and if the process does not end in 10 seconds,
805 :param pid: Process id of process to be killed
808 :param ppid: Parent process id of process to be killed
811 :param sudo: Flag indicating if sudo should be used to kill the process
815 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
817 SUBKILL="%(subkill)s" ;
818 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
819 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
820 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
822 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
825 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
826 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
830 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
831 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
832 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
836 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
838 (out,err),proc = rexec(
842 'sudo' : 'sudo -S' if sudo else '',
850 server_key = server_key
853 # wait, don't leave zombies around
856 return (out, err), proc
859 def _communicate(proc, input, timeout=None, err_on_timeout=True):
862 stdout = None # Return
863 stderr = None # Return
867 if timeout is not None:
868 timelimit = time.time() + timeout
869 killtime = timelimit + 4
870 bailtime = timelimit + 4
873 # Flush stdio buffer. This might block, if the user has
874 # been writing to .stdin in an uncontrolled fashion.
877 write_set.append(proc.stdin)
882 read_set.append(proc.stdout)
886 read_set.append(proc.stderr)
890 while read_set or write_set:
891 if timeout is not None:
892 curtime = time.time()
893 if timeout is None or curtime > timelimit:
894 if curtime > bailtime:
896 elif curtime > killtime:
897 signum = signal.SIGKILL
899 signum = signal.SIGTERM
901 os.kill(proc.pid, signum)
904 select_timeout = timelimit - curtime + 0.1
908 if select_timeout > 1.0:
912 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
913 except select.error,e:
919 if not rlist and not wlist and not xlist and proc.poll() is not None:
920 # timeout and process exited, say bye
923 if proc.stdin in wlist:
924 # When select has indicated that the file is writable,
925 # we can write up to PIPE_BUF bytes without risk
926 # blocking. POSIX defines PIPE_BUF >= 512
927 bytes_written = os.write(proc.stdin.fileno(),
928 buffer(input, input_offset, 512))
929 input_offset += bytes_written
931 if input_offset >= len(input):
933 write_set.remove(proc.stdin)
935 if proc.stdout in rlist:
936 data = os.read(proc.stdout.fileno(), 1024)
939 read_set.remove(proc.stdout)
942 if proc.stderr in rlist:
943 data = os.read(proc.stderr.fileno(), 1024)
946 read_set.remove(proc.stderr)
949 # All data exchanged. Translate lists into strings.
950 if stdout is not None:
951 stdout = ''.join(stdout)
952 if stderr is not None:
953 stderr = ''.join(stderr)
955 # Translate newlines, if requested. We cannot let the file
956 # object do the translation: It is based on stdio, which is
957 # impossible to combine with select (unless forcing no
959 if proc.universal_newlines and hasattr(file, 'newlines'):
961 stdout = proc._translate_newlines(stdout)
963 stderr = proc._translate_newlines(stderr)
965 if killed and err_on_timeout:
966 errcode = proc.poll()
967 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
973 return (stdout, stderr)