2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
218 err_on_timeout = True,
219 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
229 hostip = gethostbyname(host)
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
238 '-l', user, hostip or host]
240 if persistent and openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244 '-o', 'ControlPersist=60' ])
246 if not strict_host_checking:
247 # Do not check for Host key. Unsafe.
248 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 args.append('-p%d' % port)
257 args.extend(('-i', identity))
267 # Create a temporary server key file
268 tmp_known_hosts = make_server_key_args(server_key, host, port)
269 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
272 command = "sudo " + command
276 for x in xrange(retry):
277 # connects to the remote host and starts a remote connection
278 proc = subprocess.Popen(args,
280 stdout = subprocess.PIPE,
281 stdin = subprocess.PIPE,
282 stderr = subprocess.PIPE)
284 # attach tempfile object to the process, to make sure the file stays
285 # alive until the process is finished with it
286 proc._known_hosts = tmp_known_hosts
288 # by default, rexec calls _communicate which will block
289 # until the process has exit. The argument block == False
290 # forces to rexec to return immediately, without blocking
292 return (("", ""), proc)
295 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
296 msg = " rexec - host %s - command %s " % (host, " ".join(args))
297 log(msg, logging.DEBUG, out, err)
302 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
303 # SSH error, can safely retry
306 # Probably timed out or plain failed but can retry
311 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
312 t, x, host, " ".join(args))
313 log(msg, logging.DEBUG)
318 except RuntimeError, e:
319 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
320 log(msg, logging.DEBUG, out, err)
326 return ((out, err), proc)
328 def rcopy(source, dest,
335 strict_host_checking = True):
337 Copies from/to remote sites.
339 Source and destination should have the user and host encoded
342 If source is a file object, a special mode will be used to
343 create the remote file with the same contents.
345 If dest is a file object, the remote file (source) will be
346 read and written into dest.
348 In these modes, recursive cannot be True.
350 Source can be a list of files to copy to a single destination,
351 in which case it is advised that the destination be a folder.
354 if isinstance(source, file) and source.tell() == 0:
356 elif hasattr(source, 'read'):
357 tmp = tempfile.NamedTemporaryFile()
359 buf = source.read(65536)
367 if isinstance(source, file) or isinstance(dest, file) \
368 or hasattr(source, 'read') or hasattr(dest, 'write'):
371 # Parse source/destination as <user>@<server>:<path>
372 if isinstance(dest, basestring) and ':' in dest:
373 remspec, path = dest.split(':',1)
374 elif isinstance(source, basestring) and ':' in source:
375 remspec, path = source.split(':',1)
377 raise ValueError, "Both endpoints cannot be local"
378 user,host = remspec.rsplit('@',1)
380 tmp_known_hosts = None
381 hostip = gethostbyname(host)
383 args = ['ssh', '-l', user, '-C',
384 # Don't bother with localhost. Makes test easier
385 '-o', 'NoHostAuthenticationForLocalhost=yes',
386 '-o', 'ConnectTimeout=60',
387 '-o', 'ConnectionAttempts=3',
388 '-o', 'ServerAliveInterval=30',
389 '-o', 'TCPKeepAlive=yes',
392 if openssh_has_persist():
394 '-o', 'ControlMaster=auto',
395 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
396 '-o', 'ControlPersist=60' ])
399 args.append('-P%d' % port)
402 args.extend(('-i', identity))
405 # Create a temporary server key file
406 tmp_known_hosts = make_server_key_args(server_key, host, port)
407 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
409 if isinstance(source, file) or hasattr(source, 'read'):
410 args.append('cat > %s' % (shell_escape(path),))
411 elif isinstance(dest, file) or hasattr(dest, 'write'):
412 args.append('cat %s' % (shell_escape(path),))
414 raise AssertionError, "Unreachable code reached! :-Q"
416 # connects to the remote host and starts a remote connection
417 if isinstance(source, file):
418 proc = subprocess.Popen(args,
419 stdout = open('/dev/null','w'),
420 stderr = subprocess.PIPE,
422 err = proc.stderr.read()
423 proc._known_hosts = tmp_known_hosts
424 eintr_retry(proc.wait)()
425 return ((None,err), proc)
426 elif isinstance(dest, file):
427 proc = subprocess.Popen(args,
428 stdout = open('/dev/null','w'),
429 stderr = subprocess.PIPE,
431 err = proc.stderr.read()
432 proc._known_hosts = tmp_known_hosts
433 eintr_retry(proc.wait)()
434 return ((None,err), proc)
435 elif hasattr(source, 'read'):
436 # file-like (but not file) source
437 proc = subprocess.Popen(args,
438 stdout = open('/dev/null','w'),
439 stderr = subprocess.PIPE,
440 stdin = subprocess.PIPE)
446 buf = source.read(4096)
451 rdrdy, wrdy, broken = select.select(
454 [proc.stderr,proc.stdin])
456 if proc.stderr in rdrdy:
457 # use os.read for fully unbuffered behavior
458 err.append(os.read(proc.stderr.fileno(), 4096))
460 if proc.stdin in wrdy:
461 proc.stdin.write(buf)
467 err.append(proc.stderr.read())
469 proc._known_hosts = tmp_known_hosts
470 eintr_retry(proc.wait)()
471 return ((None,''.join(err)), proc)
472 elif hasattr(dest, 'write'):
473 # file-like (but not file) dest
474 proc = subprocess.Popen(args,
475 stdout = subprocess.PIPE,
476 stderr = subprocess.PIPE,
477 stdin = open('/dev/null','w'))
482 rdrdy, wrdy, broken = select.select(
483 [proc.stderr, proc.stdout],
485 [proc.stderr, proc.stdout])
487 if proc.stderr in rdrdy:
488 # use os.read for fully unbuffered behavior
489 err.append(os.read(proc.stderr.fileno(), 4096))
491 if proc.stdout in rdrdy:
492 # use os.read for fully unbuffered behavior
493 buf = os.read(proc.stdout.fileno(), 4096)
502 err.append(proc.stderr.read())
504 proc._known_hosts = tmp_known_hosts
505 eintr_retry(proc.wait)()
506 return ((None,''.join(err)), proc)
508 raise AssertionError, "Unreachable code reached! :-Q"
510 # Parse destination as <user>@<server>:<path>
511 if isinstance(dest, basestring) and ':' in dest:
512 remspec, path = dest.split(':',1)
513 elif isinstance(source, basestring) and ':' in source:
514 remspec, path = source.split(':',1)
516 raise ValueError, "Both endpoints cannot be local"
517 user,host = remspec.rsplit('@',1)
520 tmp_known_hosts = None
522 args = ['scp', '-q', '-p', '-C',
523 # Speed up transfer using blowfish cypher specification which is
524 # faster than the default one (3des)
526 # Don't bother with localhost. Makes test easier
527 '-o', 'NoHostAuthenticationForLocalhost=yes',
528 '-o', 'ConnectTimeout=60',
529 '-o', 'ConnectionAttempts=3',
530 '-o', 'ServerAliveInterval=30',
531 '-o', 'TCPKeepAlive=yes' ]
534 args.append('-P%d' % port)
540 args.extend(('-i', identity))
543 # Create a temporary server key file
544 tmp_known_hosts = make_server_key_args(server_key, host, port)
545 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
547 if not strict_host_checking:
548 # Do not check for Host key. Unsafe.
549 args.extend(['-o', 'StrictHostKeyChecking=no'])
551 if isinstance(source,list):
554 if openssh_has_persist():
556 '-o', 'ControlMaster=auto',
557 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
563 for x in xrange(retry):
564 # connects to the remote host and starts a remote connection
565 proc = subprocess.Popen(args,
566 stdout = subprocess.PIPE,
567 stdin = subprocess.PIPE,
568 stderr = subprocess.PIPE)
570 # attach tempfile object to the process, to make sure the file stays
571 # alive until the process is finished with it
572 proc._known_hosts = tmp_known_hosts
575 (out, err) = proc.communicate()
576 eintr_retry(proc.wait)()
577 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
578 log(msg, logging.DEBUG, out, err)
582 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
583 t, x, host, " ".join(args))
584 log(msg, logging.DEBUG)
590 except RuntimeError, e:
591 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
592 log(msg, logging.DEBUG, out, err)
598 return ((out, err), proc)
600 def rspawn(command, pidfile,
601 stdout = '/dev/null',
615 Spawn a remote command such that it will continue working asynchronously in
618 :param command: The command to run, it should be a single line.
621 :param pidfile: Path to a file where to store the pid and ppid of the
625 :param stdout: Path to file to redirect standard output.
626 The default value is /dev/null
629 :param stderr: Path to file to redirect standard error.
630 If the special STDOUT value is used, stderr will
631 be redirected to the same file as stdout
634 :param stdin: Path to a file with input to be piped into the command's standard input
637 :param home: Path to working directory folder.
638 It is assumed to exist unless the create_home flag is set.
641 :param create_home: Flag to force creation of the home folder before
643 :type create_home: bool
645 :param sudo: Flag forcing execution with sudo user
650 (stdout, stderr), process
652 Of the spawning process, which only captures errors at spawning time.
653 Usually only useful for diagnostics.
655 # Start process in a "daemonized" way, using nohup and heavy
656 # stdin/out redirection to avoid connection issues
660 stderr = ' ' + stderr
662 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
664 'pidfile' : shell_escape(pidfile),
670 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
671 'command' : shell_escape(daemon_command),
672 'sudo' : 'sudo -S' if sudo else '',
673 'pidfile' : shell_escape(pidfile),
674 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
675 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
678 (out,err),proc = rexec(
685 server_key = server_key,
690 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
692 return ((out, err), proc)
703 Returns the pid and ppid of a process from a remote file where the
704 information was stored.
706 :param home: Path to directory where the pidfile is located
709 :param pidfile: Name of file containing the pid information
714 A (pid, ppid) tuple useful for calling rstatus and rkill,
715 or None if the pidfile isn't valid yet (can happen when process is staring up)
718 (out,err),proc = rexec(
719 "cat %(pidfile)s" % {
727 server_key = server_key
735 return map(int,out.strip().split(' ',1))
737 # Ignore, many ways to fail that don't matter that much
741 def rstatus(pid, ppid,
749 Returns a code representing the the status of a remote process
751 :param pid: Process id of the process
754 :param ppid: Parent process id of process
757 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
760 (out,err),proc = rexec(
761 # Check only by pid. pid+ppid does not always work (especially with sudo)
762 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
771 server_key = server_key
775 return ProcStatus.NOT_STARTED
779 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
782 status = (out.strip() == 'wait')
784 return ProcStatus.NOT_STARTED
785 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
798 Sends a kill signal to a remote process.
800 First tries a SIGTERM, and if the process does not end in 10 seconds,
803 :param pid: Process id of process to be killed
806 :param ppid: Parent process id of process to be killed
809 :param sudo: Flag indicating if sudo should be used to kill the process
813 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
815 SUBKILL="%(subkill)s" ;
816 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
817 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
818 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
820 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
823 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
824 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
828 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
829 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
830 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
834 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
836 (out,err),proc = rexec(
840 'sudo' : 'sudo -S' if sudo else '',
848 server_key = server_key
851 # wait, don't leave zombies around
854 return (out, err), proc
857 def _communicate(self, input, timeout=None, err_on_timeout=True):
860 stdout = None # Return
861 stderr = None # Return
865 if timeout is not None:
866 timelimit = time.time() + timeout
867 killtime = timelimit + 4
868 bailtime = timelimit + 4
871 # Flush stdio buffer. This might block, if the user has
872 # been writing to .stdin in an uncontrolled fashion.
875 write_set.append(self.stdin)
879 read_set.append(self.stdout)
882 read_set.append(self.stderr)
886 while read_set or write_set:
887 if timeout is not None:
888 curtime = time.time()
889 if timeout is None or curtime > timelimit:
890 if curtime > bailtime:
892 elif curtime > killtime:
893 signum = signal.SIGKILL
895 signum = signal.SIGTERM
897 os.kill(self.pid, signum)
900 select_timeout = timelimit - curtime + 0.1
904 if select_timeout > 1.0:
908 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
909 except select.error,e:
915 if not rlist and not wlist and not xlist and self.poll() is not None:
916 # timeout and process exited, say bye
919 if self.stdin in wlist:
920 # When select has indicated that the file is writable,
921 # we can write up to PIPE_BUF bytes without risk
922 # blocking. POSIX defines PIPE_BUF >= 512
923 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
924 input_offset += bytes_written
925 if input_offset >= len(input):
927 write_set.remove(self.stdin)
929 if self.stdout in rlist:
930 data = os.read(self.stdout.fileno(), 1024)
933 read_set.remove(self.stdout)
936 if self.stderr in rlist:
937 data = os.read(self.stderr.fileno(), 1024)
940 read_set.remove(self.stderr)
943 # All data exchanged. Translate lists into strings.
944 if stdout is not None:
945 stdout = ''.join(stdout)
946 if stderr is not None:
947 stderr = ''.join(stderr)
949 # Translate newlines, if requested. We cannot let the file
950 # object do the translation: It is based on stdio, which is
951 # impossible to combine with select (unless forcing no
953 if self.universal_newlines and hasattr(file, 'newlines'):
955 stdout = self._translate_newlines(stdout)
957 stderr = self._translate_newlines(stderr)
959 if killed and err_on_timeout:
960 errcode = self.poll()
961 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
967 return (stdout, stderr)