2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
35 logger = logging.getLogger("sshfuncs")
37 def log(msg, level, out = None, err = None):
39 msg += " - OUT: %s " % out
42 msg += " - ERROR: %s " % err
44 logger.log(level, msg)
47 if hasattr(os, "devnull"):
50 DEV_NULL = "/dev/null"
52 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56 Special value that when given to rspawn in stderr causes stderr to
57 redirect to whatever stdout was redirected to.
62 Codes for status of remote spawned process
64 # Process is still running
70 # Process hasn't started running yet (this should be very rare)
73 hostbyname_cache = dict()
74 hostbyname_cache_lock = threading.Lock()
76 def gethostbyname(host):
77 global hostbyname_cache
78 global hostbyname_cache_lock
80 hostbyname = hostbyname_cache.get(host)
82 with hostbyname_cache_lock:
83 hostbyname = socket.gethostbyname(host)
84 hostbyname_cache[host] = hostbyname
86 msg = " Added hostbyname %s - %s " % (host, hostbyname)
87 log(msg, logging.DEBUG)
91 OPENSSH_HAS_PERSIST = None
93 def openssh_has_persist():
94 """ The ssh_config options ControlMaster and ControlPersist allow to
95 reuse a same network connection for multiple ssh sessions. In this
96 way limitations on number of open ssh connections can be bypassed.
97 However, older versions of openSSH do not support this feature.
98 This function is used to determine if ssh connection persist features
101 global OPENSSH_HAS_PERSIST
102 if OPENSSH_HAS_PERSIST is None:
103 proc = subprocess.Popen(["ssh","-v"],
104 stdout = subprocess.PIPE,
105 stderr = subprocess.STDOUT,
106 stdin = open("/dev/null","r") )
107 out,err = proc.communicate()
110 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
111 OPENSSH_HAS_PERSIST = bool(vre.match(out))
112 return OPENSSH_HAS_PERSIST
114 def make_server_key_args(server_key, host, port):
115 """ Returns a reference to a temporary known_hosts file, to which
116 the server key has been added.
118 Make sure to hold onto the temp file reference until the process is
121 :param server_key: the server public key
122 :type server_key: str
124 :param host: the hostname
127 :param port: the ssh port
132 host = '%s:%s' % (host, str(port))
134 # Create a temporary server key file
135 tmp_known_hosts = tempfile.NamedTemporaryFile()
137 hostbyname = gethostbyname(host)
139 # Add the intended host key
140 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142 # If we're not in strict mode, add user-configured keys
143 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
144 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
145 if os.access(user_hosts_path, os.R_OK):
146 f = open(user_hosts_path, "r")
147 tmp_known_hosts.write(f.read())
150 tmp_known_hosts.flush()
152 return tmp_known_hosts
154 def make_control_path(agent, forward_x11):
155 ctrl_path = "/tmp/nepi_ssh"
163 ctrl_path += "-%r@%h:%p"
168 """ Escapes strings so that they are safe to use as command-line
170 if SHELL_SAFE.match(s):
171 # safe string - no escaping needed
174 # unsafe string - escape
176 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
179 return "'$'\\x%02x''" % (ord(c),)
180 s = ''.join(map(escp,s))
183 def eintr_retry(func):
184 """Retries a function invocation when a EINTR occurs"""
186 @functools.wraps(func)
188 retry = kw.pop("_retry", False)
189 for i in xrange(0 if retry else 4):
191 return func(*p, **kw)
192 except (select.error, socket.error), args:
193 if args[0] == errno.EINTR:
198 if e.errno == errno.EINTR:
203 return func(*p, **kw)
206 def rexec(command, host, user,
217 err_on_timeout = True,
218 connect_timeout = 30,
221 strict_host_checking = True):
223 Executes a remote command, returns ((stdout,stderr),process)
226 tmp_known_hosts = None
227 hostip = gethostbyname(host)
230 # Don't bother with localhost. Makes test easier
231 '-o', 'NoHostAuthenticationForLocalhost=yes',
232 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
233 '-o', 'ConnectionAttempts=3',
234 '-o', 'ServerAliveInterval=30',
235 '-o', 'TCPKeepAlive=yes',
236 '-l', user, hostip or host]
238 if persistent and openssh_has_persist():
240 '-o', 'ControlMaster=auto',
241 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
242 '-o', 'ControlPersist=60' ])
244 if not strict_host_checking:
245 # Do not check for Host key. Unsafe.
246 args.extend(['-o', 'StrictHostKeyChecking=no'])
252 args.append('-p%d' % port)
255 args.extend(('-i', identity))
265 # Create a temporary server key file
266 tmp_known_hosts = make_server_key_args(server_key, host, port)
267 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
271 for x in xrange(retry):
272 # connects to the remote host and starts a remote connection
273 proc = subprocess.Popen(args,
275 stdout = subprocess.PIPE,
276 stdin = subprocess.PIPE,
277 stderr = subprocess.PIPE)
279 # attach tempfile object to the process, to make sure the file stays
280 # alive until the process is finished with it
281 proc._known_hosts = tmp_known_hosts
284 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
285 msg = " rexec - host %s - command %s " % (host, " ".join(args))
286 log(msg, logging.DEBUG, out, err)
291 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
292 # SSH error, can safely retry
295 # Probably timed out or plain failed but can retry
300 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
301 t, x, host, " ".join(args))
302 log(msg, logging.DEBUG)
307 except RuntimeError, e:
308 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
309 log(msg, logging.DEBUG, out, err)
315 return ((out, err), proc)
317 def rcopy(source, dest,
324 strict_host_checking = True):
326 Copies from/to remote sites.
328 Source and destination should have the user and host encoded
331 If source is a file object, a special mode will be used to
332 create the remote file with the same contents.
334 If dest is a file object, the remote file (source) will be
335 read and written into dest.
337 In these modes, recursive cannot be True.
339 Source can be a list of files to copy to a single destination,
340 in which case it is advised that the destination be a folder.
343 if isinstance(source, file) and source.tell() == 0:
345 elif hasattr(source, 'read'):
346 tmp = tempfile.NamedTemporaryFile()
348 buf = source.read(65536)
356 if isinstance(source, file) or isinstance(dest, file) \
357 or hasattr(source, 'read') or hasattr(dest, 'write'):
360 # Parse source/destination as <user>@<server>:<path>
361 if isinstance(dest, basestring) and ':' in dest:
362 remspec, path = dest.split(':',1)
363 elif isinstance(source, basestring) and ':' in source:
364 remspec, path = source.split(':',1)
366 raise ValueError, "Both endpoints cannot be local"
367 user,host = remspec.rsplit('@',1)
369 tmp_known_hosts = None
370 hostip = gethostbyname(host)
372 args = ['ssh', '-l', user, '-C',
373 # Don't bother with localhost. Makes test easier
374 '-o', 'NoHostAuthenticationForLocalhost=yes',
375 '-o', 'ConnectTimeout=60',
376 '-o', 'ConnectionAttempts=3',
377 '-o', 'ServerAliveInterval=30',
378 '-o', 'TCPKeepAlive=yes',
381 if openssh_has_persist():
383 '-o', 'ControlMaster=auto',
384 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
385 '-o', 'ControlPersist=60' ])
388 args.append('-P%d' % port)
391 args.extend(('-i', identity))
394 # Create a temporary server key file
395 tmp_known_hosts = make_server_key_args(server_key, host, port)
396 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
398 if isinstance(source, file) or hasattr(source, 'read'):
399 args.append('cat > %s' % (shell_escape(path),))
400 elif isinstance(dest, file) or hasattr(dest, 'write'):
401 args.append('cat %s' % (shell_escape(path),))
403 raise AssertionError, "Unreachable code reached! :-Q"
405 # connects to the remote host and starts a remote connection
406 if isinstance(source, file):
407 proc = subprocess.Popen(args,
408 stdout = open('/dev/null','w'),
409 stderr = subprocess.PIPE,
411 err = proc.stderr.read()
412 proc._known_hosts = tmp_known_hosts
413 eintr_retry(proc.wait)()
414 return ((None,err), proc)
415 elif isinstance(dest, file):
416 proc = subprocess.Popen(args,
417 stdout = open('/dev/null','w'),
418 stderr = subprocess.PIPE,
420 err = proc.stderr.read()
421 proc._known_hosts = tmp_known_hosts
422 eintr_retry(proc.wait)()
423 return ((None,err), proc)
424 elif hasattr(source, 'read'):
425 # file-like (but not file) source
426 proc = subprocess.Popen(args,
427 stdout = open('/dev/null','w'),
428 stderr = subprocess.PIPE,
429 stdin = subprocess.PIPE)
435 buf = source.read(4096)
440 rdrdy, wrdy, broken = select.select(
443 [proc.stderr,proc.stdin])
445 if proc.stderr in rdrdy:
446 # use os.read for fully unbuffered behavior
447 err.append(os.read(proc.stderr.fileno(), 4096))
449 if proc.stdin in wrdy:
450 proc.stdin.write(buf)
456 err.append(proc.stderr.read())
458 proc._known_hosts = tmp_known_hosts
459 eintr_retry(proc.wait)()
460 return ((None,''.join(err)), proc)
461 elif hasattr(dest, 'write'):
462 # file-like (but not file) dest
463 proc = subprocess.Popen(args,
464 stdout = subprocess.PIPE,
465 stderr = subprocess.PIPE,
466 stdin = open('/dev/null','w'))
471 rdrdy, wrdy, broken = select.select(
472 [proc.stderr, proc.stdout],
474 [proc.stderr, proc.stdout])
476 if proc.stderr in rdrdy:
477 # use os.read for fully unbuffered behavior
478 err.append(os.read(proc.stderr.fileno(), 4096))
480 if proc.stdout in rdrdy:
481 # use os.read for fully unbuffered behavior
482 buf = os.read(proc.stdout.fileno(), 4096)
491 err.append(proc.stderr.read())
493 proc._known_hosts = tmp_known_hosts
494 eintr_retry(proc.wait)()
495 return ((None,''.join(err)), proc)
497 raise AssertionError, "Unreachable code reached! :-Q"
499 # Parse destination as <user>@<server>:<path>
500 if isinstance(dest, basestring) and ':' in dest:
501 remspec, path = dest.split(':',1)
502 elif isinstance(source, basestring) and ':' in source:
503 remspec, path = source.split(':',1)
505 raise ValueError, "Both endpoints cannot be local"
506 user,host = remspec.rsplit('@',1)
509 tmp_known_hosts = None
511 args = ['scp', '-q', '-p', '-C',
512 # Speed up transfer using blowfish cypher specification which is
513 # faster than the default one (3des)
515 # Don't bother with localhost. Makes test easier
516 '-o', 'NoHostAuthenticationForLocalhost=yes',
517 '-o', 'ConnectTimeout=60',
518 '-o', 'ConnectionAttempts=3',
519 '-o', 'ServerAliveInterval=30',
520 '-o', 'TCPKeepAlive=yes' ]
523 args.append('-P%d' % port)
529 args.extend(('-i', identity))
532 # Create a temporary server key file
533 tmp_known_hosts = make_server_key_args(server_key, host, port)
534 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
536 if not strict_host_checking:
537 # Do not check for Host key. Unsafe.
538 args.extend(['-o', 'StrictHostKeyChecking=no'])
540 if isinstance(source,list):
543 if openssh_has_persist():
545 '-o', 'ControlMaster=auto',
546 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
552 for x in xrange(retry):
553 # connects to the remote host and starts a remote connection
554 proc = subprocess.Popen(args,
555 stdout = subprocess.PIPE,
556 stdin = subprocess.PIPE,
557 stderr = subprocess.PIPE)
559 # attach tempfile object to the process, to make sure the file stays
560 # alive until the process is finished with it
561 proc._known_hosts = tmp_known_hosts
564 (out, err) = proc.communicate()
565 eintr_retry(proc.wait)()
566 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
567 log(msg, logging.DEBUG, out, err)
571 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
572 t, x, host, " ".join(args))
573 log(msg, logging.DEBUG)
579 except RuntimeError, e:
580 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
581 log(msg, logging.DEBUG, out, err)
587 return ((out, err), proc)
589 def rspawn(command, pidfile,
590 stdout = '/dev/null',
604 Spawn a remote command such that it will continue working asynchronously in
607 :param command: The command to run, it should be a single line.
610 :param pidfile: Path to a file where to store the pid and ppid of the
614 :param stdout: Path to file to redirect standard output.
615 The default value is /dev/null
618 :param stderr: Path to file to redirect standard error.
619 If the special STDOUT value is used, stderr will
620 be redirected to the same file as stdout
623 :param stdin: Path to a file with input to be piped into the command's standard input
626 :param home: Path to working directory folder.
627 It is assumed to exist unless the create_home flag is set.
630 :param create_home: Flag to force creation of the home folder before
632 :type create_home: bool
634 :param sudo: Flag forcing execution with sudo user
639 (stdout, stderr), process
641 Of the spawning process, which only captures errors at spawning time.
642 Usually only useful for diagnostics.
644 # Start process in a "daemonized" way, using nohup and heavy
645 # stdin/out redirection to avoid connection issues
649 stderr = ' ' + stderr
651 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
653 'pidfile' : shell_escape(pidfile),
659 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
660 'command' : shell_escape(daemon_command),
661 'sudo' : 'sudo -S' if sudo else '',
662 'pidfile' : shell_escape(pidfile),
663 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
664 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
667 (out,err),proc = rexec(
674 server_key = server_key,
679 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
681 return ((out, err), proc)
692 Returns the pid and ppid of a process from a remote file where the
693 information was stored.
695 :param home: Path to directory where the pidfile is located
698 :param pidfile: Name of file containing the pid information
703 A (pid, ppid) tuple useful for calling rstatus and rkill,
704 or None if the pidfile isn't valid yet (can happen when process is staring up)
707 (out,err),proc = rexec(
708 "cat %(pidfile)s" % {
716 server_key = server_key
724 return map(int,out.strip().split(' ',1))
726 # Ignore, many ways to fail that don't matter that much
730 def rstatus(pid, ppid,
738 Returns a code representing the the status of a remote process
740 :param pid: Process id of the process
743 :param ppid: Parent process id of process
746 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
749 (out,err),proc = rexec(
750 # Check only by pid. pid+ppid does not always work (especially with sudo)
751 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
760 server_key = server_key
764 return ProcStatus.NOT_STARTED
768 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
771 status = (out.strip() == 'wait')
773 return ProcStatus.NOT_STARTED
774 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
787 Sends a kill signal to a remote process.
789 First tries a SIGTERM, and if the process does not end in 10 seconds,
792 :param pid: Process id of process to be killed
795 :param ppid: Parent process id of process to be killed
798 :param sudo: Flag indicating if sudo should be used to kill the process
802 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
804 SUBKILL="%(subkill)s" ;
805 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
806 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
807 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
809 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
812 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
813 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
817 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
818 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
819 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
823 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
825 (out,err),proc = rexec(
829 'sudo' : 'sudo -S' if sudo else '',
837 server_key = server_key
840 # wait, don't leave zombies around
843 return (out, err), proc
846 def _communicate(self, input, timeout=None, err_on_timeout=True):
849 stdout = None # Return
850 stderr = None # Return
854 if timeout is not None:
855 timelimit = time.time() + timeout
856 killtime = timelimit + 4
857 bailtime = timelimit + 4
860 # Flush stdio buffer. This might block, if the user has
861 # been writing to .stdin in an uncontrolled fashion.
864 write_set.append(self.stdin)
868 read_set.append(self.stdout)
871 read_set.append(self.stderr)
875 while read_set or write_set:
876 if timeout is not None:
877 curtime = time.time()
878 if timeout is None or curtime > timelimit:
879 if curtime > bailtime:
881 elif curtime > killtime:
882 signum = signal.SIGKILL
884 signum = signal.SIGTERM
886 os.kill(self.pid, signum)
889 select_timeout = timelimit - curtime + 0.1
893 if select_timeout > 1.0:
897 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
898 except select.error,e:
904 if not rlist and not wlist and not xlist and self.poll() is not None:
905 # timeout and process exited, say bye
908 if self.stdin in wlist:
909 # When select has indicated that the file is writable,
910 # we can write up to PIPE_BUF bytes without risk
911 # blocking. POSIX defines PIPE_BUF >= 512
912 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
913 input_offset += bytes_written
914 if input_offset >= len(input):
916 write_set.remove(self.stdin)
918 if self.stdout in rlist:
919 data = os.read(self.stdout.fileno(), 1024)
922 read_set.remove(self.stdout)
925 if self.stderr in rlist:
926 data = os.read(self.stderr.fileno(), 1024)
929 read_set.remove(self.stderr)
932 # All data exchanged. Translate lists into strings.
933 if stdout is not None:
934 stdout = ''.join(stdout)
935 if stderr is not None:
936 stderr = ''.join(stderr)
938 # Translate newlines, if requested. We cannot let the file
939 # object do the translation: It is based on stdio, which is
940 # impossible to combine with select (unless forcing no
942 if self.universal_newlines and hasattr(file, 'newlines'):
944 stdout = self._translate_newlines(stdout)
946 stderr = self._translate_newlines(stderr)
948 if killed and err_on_timeout:
949 errcode = self.poll()
950 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
956 return (stdout, stderr)