2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
36 logger = logging.getLogger("sshfuncs")
38 def log(msg, level, out = None, err = None):
40 msg += " - OUT: %s " % out
43 msg += " - ERROR: %s " % err
45 logger.log(level, msg)
48 if hasattr(os, "devnull"):
51 DEV_NULL = "/dev/null"
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57 Special value that when given to rspawn in stderr causes stderr to
58 redirect to whatever stdout was redirected to.
63 Codes for status of remote spawned process
65 # Process is still running
71 # Process hasn't started running yet (this should be very rare)
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
77 def gethostbyname(host):
78 global hostbyname_cache
79 global hostbyname_cache_lock
81 hostbyname = hostbyname_cache.get(host)
83 with hostbyname_cache_lock:
84 hostbyname = socket.gethostbyname(host)
85 hostbyname_cache[host] = hostbyname
87 msg = " Added hostbyname %s - %s " % (host, hostbyname)
88 log(msg, logging.DEBUG)
92 OPENSSH_HAS_PERSIST = None
94 def openssh_has_persist():
95 """ The ssh_config options ControlMaster and ControlPersist allow to
96 reuse a same network connection for multiple ssh sessions. In this
97 way limitations on number of open ssh connections can be bypassed.
98 However, older versions of openSSH do not support this feature.
99 This function is used to determine if ssh connection persist features
102 global OPENSSH_HAS_PERSIST
103 if OPENSSH_HAS_PERSIST is None:
104 proc = subprocess.Popen(["ssh","-v"],
105 stdout = subprocess.PIPE,
106 stderr = subprocess.STDOUT,
107 stdin = open("/dev/null","r") )
108 out,err = proc.communicate()
111 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112 OPENSSH_HAS_PERSIST = bool(vre.match(out))
113 return OPENSSH_HAS_PERSIST
115 def make_server_key_args(server_key, host, port):
116 """ Returns a reference to a temporary known_hosts file, to which
117 the server key has been added.
119 Make sure to hold onto the temp file reference until the process is
122 :param server_key: the server public key
123 :type server_key: str
125 :param host: the hostname
128 :param port: the ssh port
133 host = '%s:%s' % (host, str(port))
135 # Create a temporary server key file
136 tmp_known_hosts = tempfile.NamedTemporaryFile()
138 hostbyname = gethostbyname(host)
140 # Add the intended host key
141 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143 # If we're not in strict mode, add user-configured keys
144 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146 if os.access(user_hosts_path, os.R_OK):
147 f = open(user_hosts_path, "r")
148 tmp_known_hosts.write(f.read())
151 tmp_known_hosts.flush()
153 return tmp_known_hosts
155 def make_control_path(agent, forward_x11):
156 ctrl_path = "/tmp/nepi_ssh"
164 ctrl_path += "-%r@%h:%p"
169 """ Escapes strings so that they are safe to use as command-line
171 if SHELL_SAFE.match(s):
172 # safe string - no escaping needed
175 # unsafe string - escape
177 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180 return "'$'\\x%02x''" % (ord(c),)
181 s = ''.join(map(escp,s))
184 def eintr_retry(func):
185 """Retries a function invocation when a EINTR occurs"""
187 @functools.wraps(func)
189 retry = kw.pop("_retry", False)
190 for i in xrange(0 if retry else 4):
192 return func(*p, **kw)
193 except (select.error, socket.error), args:
194 if args[0] == errno.EINTR:
199 if e.errno == errno.EINTR:
204 return func(*p, **kw)
207 def rexec(command, host, user,
218 err_on_timeout = True,
219 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
229 hostip = gethostbyname(host)
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
238 '-l', user, hostip or host]
240 if persistent and openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244 '-o', 'ControlPersist=60' ])
246 if not strict_host_checking:
247 # Do not check for Host key. Unsafe.
248 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 args.append('-p%d' % port)
257 args.extend(('-i', identity))
267 # Create a temporary server key file
268 tmp_known_hosts = make_server_key_args(server_key, host, port)
269 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
273 for x in xrange(retry):
274 # connects to the remote host and starts a remote connection
275 proc = subprocess.Popen(args,
277 stdout = subprocess.PIPE,
278 stdin = subprocess.PIPE,
279 stderr = subprocess.PIPE)
281 # attach tempfile object to the process, to make sure the file stays
282 # alive until the process is finished with it
283 proc._known_hosts = tmp_known_hosts
285 # by default, rexec calls _communicate which will block
286 # until the process has exit. The argument block == False
287 # forces to rexec to return immediately, without blocking
289 return (("", ""), proc)
292 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
293 msg = " rexec - host %s - command %s " % (host, " ".join(args))
294 log(msg, logging.DEBUG, out, err)
299 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
300 # SSH error, can safely retry
303 # Probably timed out or plain failed but can retry
308 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
309 t, x, host, " ".join(args))
310 log(msg, logging.DEBUG)
315 except RuntimeError, e:
316 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
317 log(msg, logging.DEBUG, out, err)
323 return ((out, err), proc)
325 def rcopy(source, dest,
332 strict_host_checking = True):
334 Copies from/to remote sites.
336 Source and destination should have the user and host encoded
339 If source is a file object, a special mode will be used to
340 create the remote file with the same contents.
342 If dest is a file object, the remote file (source) will be
343 read and written into dest.
345 In these modes, recursive cannot be True.
347 Source can be a list of files to copy to a single destination,
348 in which case it is advised that the destination be a folder.
351 if isinstance(source, file) and source.tell() == 0:
353 elif hasattr(source, 'read'):
354 tmp = tempfile.NamedTemporaryFile()
356 buf = source.read(65536)
364 if isinstance(source, file) or isinstance(dest, file) \
365 or hasattr(source, 'read') or hasattr(dest, 'write'):
368 # Parse source/destination as <user>@<server>:<path>
369 if isinstance(dest, basestring) and ':' in dest:
370 remspec, path = dest.split(':',1)
371 elif isinstance(source, basestring) and ':' in source:
372 remspec, path = source.split(':',1)
374 raise ValueError, "Both endpoints cannot be local"
375 user,host = remspec.rsplit('@',1)
377 tmp_known_hosts = None
378 hostip = gethostbyname(host)
380 args = ['ssh', '-l', user, '-C',
381 # Don't bother with localhost. Makes test easier
382 '-o', 'NoHostAuthenticationForLocalhost=yes',
383 '-o', 'ConnectTimeout=60',
384 '-o', 'ConnectionAttempts=3',
385 '-o', 'ServerAliveInterval=30',
386 '-o', 'TCPKeepAlive=yes',
389 if openssh_has_persist():
391 '-o', 'ControlMaster=auto',
392 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
393 '-o', 'ControlPersist=60' ])
396 args.append('-P%d' % port)
399 args.extend(('-i', identity))
402 # Create a temporary server key file
403 tmp_known_hosts = make_server_key_args(server_key, host, port)
404 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
406 if isinstance(source, file) or hasattr(source, 'read'):
407 args.append('cat > %s' % (shell_escape(path),))
408 elif isinstance(dest, file) or hasattr(dest, 'write'):
409 args.append('cat %s' % (shell_escape(path),))
411 raise AssertionError, "Unreachable code reached! :-Q"
413 # connects to the remote host and starts a remote connection
414 if isinstance(source, file):
415 proc = subprocess.Popen(args,
416 stdout = open('/dev/null','w'),
417 stderr = subprocess.PIPE,
419 err = proc.stderr.read()
420 proc._known_hosts = tmp_known_hosts
421 eintr_retry(proc.wait)()
422 return ((None,err), proc)
423 elif isinstance(dest, file):
424 proc = subprocess.Popen(args,
425 stdout = open('/dev/null','w'),
426 stderr = subprocess.PIPE,
428 err = proc.stderr.read()
429 proc._known_hosts = tmp_known_hosts
430 eintr_retry(proc.wait)()
431 return ((None,err), proc)
432 elif hasattr(source, 'read'):
433 # file-like (but not file) source
434 proc = subprocess.Popen(args,
435 stdout = open('/dev/null','w'),
436 stderr = subprocess.PIPE,
437 stdin = subprocess.PIPE)
443 buf = source.read(4096)
448 rdrdy, wrdy, broken = select.select(
451 [proc.stderr,proc.stdin])
453 if proc.stderr in rdrdy:
454 # use os.read for fully unbuffered behavior
455 err.append(os.read(proc.stderr.fileno(), 4096))
457 if proc.stdin in wrdy:
458 proc.stdin.write(buf)
464 err.append(proc.stderr.read())
466 proc._known_hosts = tmp_known_hosts
467 eintr_retry(proc.wait)()
468 return ((None,''.join(err)), proc)
469 elif hasattr(dest, 'write'):
470 # file-like (but not file) dest
471 proc = subprocess.Popen(args,
472 stdout = subprocess.PIPE,
473 stderr = subprocess.PIPE,
474 stdin = open('/dev/null','w'))
479 rdrdy, wrdy, broken = select.select(
480 [proc.stderr, proc.stdout],
482 [proc.stderr, proc.stdout])
484 if proc.stderr in rdrdy:
485 # use os.read for fully unbuffered behavior
486 err.append(os.read(proc.stderr.fileno(), 4096))
488 if proc.stdout in rdrdy:
489 # use os.read for fully unbuffered behavior
490 buf = os.read(proc.stdout.fileno(), 4096)
499 err.append(proc.stderr.read())
501 proc._known_hosts = tmp_known_hosts
502 eintr_retry(proc.wait)()
503 return ((None,''.join(err)), proc)
505 raise AssertionError, "Unreachable code reached! :-Q"
507 # Parse destination as <user>@<server>:<path>
508 if isinstance(dest, basestring) and ':' in dest:
509 remspec, path = dest.split(':',1)
510 elif isinstance(source, basestring) and ':' in source:
511 remspec, path = source.split(':',1)
513 raise ValueError, "Both endpoints cannot be local"
514 user,host = remspec.rsplit('@',1)
517 tmp_known_hosts = None
519 args = ['scp', '-q', '-p', '-C',
520 # Speed up transfer using blowfish cypher specification which is
521 # faster than the default one (3des)
523 # Don't bother with localhost. Makes test easier
524 '-o', 'NoHostAuthenticationForLocalhost=yes',
525 '-o', 'ConnectTimeout=60',
526 '-o', 'ConnectionAttempts=3',
527 '-o', 'ServerAliveInterval=30',
528 '-o', 'TCPKeepAlive=yes' ]
531 args.append('-P%d' % port)
537 args.extend(('-i', identity))
540 # Create a temporary server key file
541 tmp_known_hosts = make_server_key_args(server_key, host, port)
542 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
544 if not strict_host_checking:
545 # Do not check for Host key. Unsafe.
546 args.extend(['-o', 'StrictHostKeyChecking=no'])
548 if isinstance(source,list):
551 if openssh_has_persist():
553 '-o', 'ControlMaster=auto',
554 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
560 for x in xrange(retry):
561 # connects to the remote host and starts a remote connection
562 proc = subprocess.Popen(args,
563 stdout = subprocess.PIPE,
564 stdin = subprocess.PIPE,
565 stderr = subprocess.PIPE)
567 # attach tempfile object to the process, to make sure the file stays
568 # alive until the process is finished with it
569 proc._known_hosts = tmp_known_hosts
572 (out, err) = proc.communicate()
573 eintr_retry(proc.wait)()
574 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
575 log(msg, logging.DEBUG, out, err)
579 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
580 t, x, host, " ".join(args))
581 log(msg, logging.DEBUG)
587 except RuntimeError, e:
588 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
589 log(msg, logging.DEBUG, out, err)
595 return ((out, err), proc)
597 def rspawn(command, pidfile,
598 stdout = '/dev/null',
612 Spawn a remote command such that it will continue working asynchronously in
615 :param command: The command to run, it should be a single line.
618 :param pidfile: Path to a file where to store the pid and ppid of the
622 :param stdout: Path to file to redirect standard output.
623 The default value is /dev/null
626 :param stderr: Path to file to redirect standard error.
627 If the special STDOUT value is used, stderr will
628 be redirected to the same file as stdout
631 :param stdin: Path to a file with input to be piped into the command's standard input
634 :param home: Path to working directory folder.
635 It is assumed to exist unless the create_home flag is set.
638 :param create_home: Flag to force creation of the home folder before
640 :type create_home: bool
642 :param sudo: Flag forcing execution with sudo user
647 (stdout, stderr), process
649 Of the spawning process, which only captures errors at spawning time.
650 Usually only useful for diagnostics.
652 # Start process in a "daemonized" way, using nohup and heavy
653 # stdin/out redirection to avoid connection issues
657 stderr = ' ' + stderr
659 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
661 'pidfile' : shell_escape(pidfile),
667 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
668 'command' : shell_escape(daemon_command),
669 'sudo' : 'sudo -S' if sudo else '',
670 'pidfile' : shell_escape(pidfile),
671 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
672 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
675 (out,err),proc = rexec(
682 server_key = server_key,
687 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
689 return ((out, err), proc)
700 Returns the pid and ppid of a process from a remote file where the
701 information was stored.
703 :param home: Path to directory where the pidfile is located
706 :param pidfile: Name of file containing the pid information
711 A (pid, ppid) tuple useful for calling rstatus and rkill,
712 or None if the pidfile isn't valid yet (can happen when process is staring up)
715 (out,err),proc = rexec(
716 "cat %(pidfile)s" % {
724 server_key = server_key
732 return map(int,out.strip().split(' ',1))
734 # Ignore, many ways to fail that don't matter that much
738 def rstatus(pid, ppid,
746 Returns a code representing the the status of a remote process
748 :param pid: Process id of the process
751 :param ppid: Parent process id of process
754 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
757 (out,err),proc = rexec(
758 # Check only by pid. pid+ppid does not always work (especially with sudo)
759 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
768 server_key = server_key
772 return ProcStatus.NOT_STARTED
776 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
779 status = (out.strip() == 'wait')
781 return ProcStatus.NOT_STARTED
782 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
795 Sends a kill signal to a remote process.
797 First tries a SIGTERM, and if the process does not end in 10 seconds,
800 :param pid: Process id of process to be killed
803 :param ppid: Parent process id of process to be killed
806 :param sudo: Flag indicating if sudo should be used to kill the process
810 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
812 SUBKILL="%(subkill)s" ;
813 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
814 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
815 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
817 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
820 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
821 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
825 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
826 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
827 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
831 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
833 (out,err),proc = rexec(
837 'sudo' : 'sudo -S' if sudo else '',
845 server_key = server_key
848 # wait, don't leave zombies around
851 return (out, err), proc
854 def _communicate(self, input, timeout=None, err_on_timeout=True):
857 stdout = None # Return
858 stderr = None # Return
862 if timeout is not None:
863 timelimit = time.time() + timeout
864 killtime = timelimit + 4
865 bailtime = timelimit + 4
868 # Flush stdio buffer. This might block, if the user has
869 # been writing to .stdin in an uncontrolled fashion.
872 write_set.append(self.stdin)
876 read_set.append(self.stdout)
879 read_set.append(self.stderr)
883 while read_set or write_set:
884 if timeout is not None:
885 curtime = time.time()
886 if timeout is None or curtime > timelimit:
887 if curtime > bailtime:
889 elif curtime > killtime:
890 signum = signal.SIGKILL
892 signum = signal.SIGTERM
894 os.kill(self.pid, signum)
897 select_timeout = timelimit - curtime + 0.1
901 if select_timeout > 1.0:
905 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
906 except select.error,e:
912 if not rlist and not wlist and not xlist and self.poll() is not None:
913 # timeout and process exited, say bye
916 if self.stdin in wlist:
917 # When select has indicated that the file is writable,
918 # we can write up to PIPE_BUF bytes without risk
919 # blocking. POSIX defines PIPE_BUF >= 512
920 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
921 input_offset += bytes_written
922 if input_offset >= len(input):
924 write_set.remove(self.stdin)
926 if self.stdout in rlist:
927 data = os.read(self.stdout.fileno(), 1024)
930 read_set.remove(self.stdout)
933 if self.stderr in rlist:
934 data = os.read(self.stderr.fileno(), 1024)
937 read_set.remove(self.stderr)
940 # All data exchanged. Translate lists into strings.
941 if stdout is not None:
942 stdout = ''.join(stdout)
943 if stderr is not None:
944 stderr = ''.join(stderr)
946 # Translate newlines, if requested. We cannot let the file
947 # object do the translation: It is based on stdio, which is
948 # impossible to combine with select (unless forcing no
950 if self.universal_newlines and hasattr(file, 'newlines'):
952 stdout = self._translate_newlines(stdout)
954 stderr = self._translate_newlines(stderr)
956 if killed and err_on_timeout:
957 errcode = self.poll()
958 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
964 return (stdout, stderr)