2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 logger = logging.getLogger("sshfuncs")
37 def log(msg, level, out = None, err = None):
39 msg += " - OUT: %s " % out
42 msg += " - ERROR: %s " % err
44 logger.log(level, msg)
47 if hasattr(os, "devnull"):
50 DEV_NULL = "/dev/null"
52 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56 Special value that when given to rspawn in stderr causes stderr to
57 redirect to whatever stdout was redirected to.
62 Process is still running
72 Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def gethostbyname(host):
79 global hostbyname_cache
80 global hostbyname_cache_lock
82 hostbyname = hostbyname_cache.get(host)
84 with hostbyname_cache_lock:
85 hostbyname = socket.gethostbyname(host)
86 hostbyname_cache[host] = hostbyname
88 msg = " Added hostbyname %s - %s " % (host, hostbyname)
89 log(msg, logging.DEBUG)
93 OPENSSH_HAS_PERSIST = None
95 def openssh_has_persist():
96 """ The ssh_config options ControlMaster and ControlPersist allow to
97 reuse a same network connection for multiple ssh sessions. In this
98 way limitations on number of open ssh connections can be bypassed.
99 However, older versions of openSSH do not support this feature.
100 This function is used to determine if ssh connection persist features
103 global OPENSSH_HAS_PERSIST
104 if OPENSSH_HAS_PERSIST is None:
105 proc = subprocess.Popen(["ssh","-v"],
106 stdout = subprocess.PIPE,
107 stderr = subprocess.STDOUT,
108 stdin = open("/dev/null","r") )
109 out,err = proc.communicate()
112 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113 OPENSSH_HAS_PERSIST = bool(vre.match(out))
114 return OPENSSH_HAS_PERSIST
116 def make_server_key_args(server_key, host, port):
117 """ Returns a reference to a temporary known_hosts file, to which
118 the server key has been added.
120 Make sure to hold onto the temp file reference until the process is
123 :param server_key: the server public key
124 :type server_key: str
126 :param host: the hostname
129 :param port: the ssh port
134 host = '%s:%s' % (host, str(port))
136 # Create a temporary server key file
137 tmp_known_hosts = tempfile.NamedTemporaryFile()
139 hostbyname = gethostbyname(host)
141 # Add the intended host key
142 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
144 # If we're not in strict mode, add user-configured keys
145 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147 if os.access(user_hosts_path, os.R_OK):
148 f = open(user_hosts_path, "r")
149 tmp_known_hosts.write(f.read())
152 tmp_known_hosts.flush()
154 return tmp_known_hosts
156 def make_control_path(agent, forward_x11):
157 ctrl_path = "/tmp/nepi_ssh"
165 ctrl_path += "-%r@%h:%p"
170 """ Escapes strings so that they are safe to use as command-line
172 if SHELL_SAFE.match(s):
173 # safe string - no escaping needed
176 # unsafe string - escape
178 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
181 return "'$'\\x%02x''" % (ord(c),)
182 s = ''.join(map(escp,s))
185 def eintr_retry(func):
186 """Retries a function invocation when a EINTR occurs"""
188 @functools.wraps(func)
190 retry = kw.pop("_retry", False)
191 for i in xrange(0 if retry else 4):
193 return func(*p, **kw)
194 except (select.error, socket.error), args:
195 if args[0] == errno.EINTR:
200 if e.errno == errno.EINTR:
205 return func(*p, **kw)
208 def rexec(command, host, user,
219 err_on_timeout = True,
220 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
229 hostip = gethostbyname(host)
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
238 '-l', user, hostip or host]
240 if persistent and openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
244 '-o', 'ControlPersist=60' ])
246 if not strict_host_checking:
247 # Do not check for Host key. Unsafe.
248 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 args.append('-p%d' % port)
257 args.extend(('-i', identity))
267 # Create a temporary server key file
268 tmp_known_hosts = make_server_key_args(server_key, host, port)
269 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
273 for x in xrange(retry):
274 # connects to the remote host and starts a remote connection
275 proc = subprocess.Popen(args,
277 stdout = subprocess.PIPE,
278 stdin = subprocess.PIPE,
279 stderr = subprocess.PIPE)
281 # attach tempfile object to the process, to make sure the file stays
282 # alive until the process is finished with it
283 proc._known_hosts = tmp_known_hosts
286 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
287 msg = " rexec - host %s - command %s " % (host, " ".join(args))
288 log(msg, logging.DEBUG, out, err)
293 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
294 # SSH error, can safely retry
297 # Probably timed out or plain failed but can retry
302 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
303 t, x, host, " ".join(args))
304 log(msg, logging.DEBUG)
309 except RuntimeError, e:
310 msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
311 log(msg, logging.DEBUG, out, err)
317 return ((out, err), proc)
319 def rcopy(source, dest,
326 strict_host_checking = True):
328 Copies from/to remote sites.
330 Source and destination should have the user and host encoded
333 If source is a file object, a special mode will be used to
334 create the remote file with the same contents.
336 If dest is a file object, the remote file (source) will be
337 read and written into dest.
339 In these modes, recursive cannot be True.
341 Source can be a list of files to copy to a single destination,
342 in which case it is advised that the destination be a folder.
345 if isinstance(source, file) and source.tell() == 0:
347 elif hasattr(source, 'read'):
348 tmp = tempfile.NamedTemporaryFile()
350 buf = source.read(65536)
358 if isinstance(source, file) or isinstance(dest, file) \
359 or hasattr(source, 'read') or hasattr(dest, 'write'):
362 # Parse source/destination as <user>@<server>:<path>
363 if isinstance(dest, basestring) and ':' in dest:
364 remspec, path = dest.split(':',1)
365 elif isinstance(source, basestring) and ':' in source:
366 remspec, path = source.split(':',1)
368 raise ValueError, "Both endpoints cannot be local"
369 user,host = remspec.rsplit('@',1)
371 tmp_known_hosts = None
372 hostip = gethostbyname(host)
374 args = ['ssh', '-l', user, '-C',
375 # Don't bother with localhost. Makes test easier
376 '-o', 'NoHostAuthenticationForLocalhost=yes',
377 '-o', 'ConnectTimeout=60',
378 '-o', 'ConnectionAttempts=3',
379 '-o', 'ServerAliveInterval=30',
380 '-o', 'TCPKeepAlive=yes',
383 if openssh_has_persist():
385 '-o', 'ControlMaster=auto',
386 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
387 '-o', 'ControlPersist=60' ])
390 args.append('-P%d' % port)
393 args.extend(('-i', identity))
396 # Create a temporary server key file
397 tmp_known_hosts = make_server_key_args(server_key, host, port)
398 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
400 if isinstance(source, file) or hasattr(source, 'read'):
401 args.append('cat > %s' % (shell_escape(path),))
402 elif isinstance(dest, file) or hasattr(dest, 'write'):
403 args.append('cat %s' % (shell_escape(path),))
405 raise AssertionError, "Unreachable code reached! :-Q"
407 # connects to the remote host and starts a remote connection
408 if isinstance(source, file):
409 proc = subprocess.Popen(args,
410 stdout = open('/dev/null','w'),
411 stderr = subprocess.PIPE,
413 err = proc.stderr.read()
414 proc._known_hosts = tmp_known_hosts
415 eintr_retry(proc.wait)()
416 return ((None,err), proc)
417 elif isinstance(dest, file):
418 proc = subprocess.Popen(args,
419 stdout = open('/dev/null','w'),
420 stderr = subprocess.PIPE,
422 err = proc.stderr.read()
423 proc._known_hosts = tmp_known_hosts
424 eintr_retry(proc.wait)()
425 return ((None,err), proc)
426 elif hasattr(source, 'read'):
427 # file-like (but not file) source
428 proc = subprocess.Popen(args,
429 stdout = open('/dev/null','w'),
430 stderr = subprocess.PIPE,
431 stdin = subprocess.PIPE)
437 buf = source.read(4096)
442 rdrdy, wrdy, broken = select.select(
445 [proc.stderr,proc.stdin])
447 if proc.stderr in rdrdy:
448 # use os.read for fully unbuffered behavior
449 err.append(os.read(proc.stderr.fileno(), 4096))
451 if proc.stdin in wrdy:
452 proc.stdin.write(buf)
458 err.append(proc.stderr.read())
460 proc._known_hosts = tmp_known_hosts
461 eintr_retry(proc.wait)()
462 return ((None,''.join(err)), proc)
463 elif hasattr(dest, 'write'):
464 # file-like (but not file) dest
465 proc = subprocess.Popen(args,
466 stdout = subprocess.PIPE,
467 stderr = subprocess.PIPE,
468 stdin = open('/dev/null','w'))
473 rdrdy, wrdy, broken = select.select(
474 [proc.stderr, proc.stdout],
476 [proc.stderr, proc.stdout])
478 if proc.stderr in rdrdy:
479 # use os.read for fully unbuffered behavior
480 err.append(os.read(proc.stderr.fileno(), 4096))
482 if proc.stdout in rdrdy:
483 # use os.read for fully unbuffered behavior
484 buf = os.read(proc.stdout.fileno(), 4096)
493 err.append(proc.stderr.read())
495 proc._known_hosts = tmp_known_hosts
496 eintr_retry(proc.wait)()
497 return ((None,''.join(err)), proc)
499 raise AssertionError, "Unreachable code reached! :-Q"
501 # Parse destination as <user>@<server>:<path>
502 if isinstance(dest, basestring) and ':' in dest:
503 remspec, path = dest.split(':',1)
504 elif isinstance(source, basestring) and ':' in source:
505 remspec, path = source.split(':',1)
507 raise ValueError, "Both endpoints cannot be local"
508 user,host = remspec.rsplit('@',1)
511 tmp_known_hosts = None
513 args = ['scp', '-q', '-p', '-C',
514 # Don't bother with localhost. Makes test easier
515 '-o', 'NoHostAuthenticationForLocalhost=yes',
516 '-o', 'ConnectTimeout=60',
517 '-o', 'ConnectionAttempts=3',
518 '-o', 'ServerAliveInterval=30',
519 '-o', 'TCPKeepAlive=yes' ]
522 args.append('-P%d' % port)
528 args.extend(('-i', identity))
531 # Create a temporary server key file
532 tmp_known_hosts = make_server_key_args(server_key, host, port)
533 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
535 if not strict_host_checking:
536 # Do not check for Host key. Unsafe.
537 args.extend(['-o', 'StrictHostKeyChecking=no'])
539 if isinstance(source,list):
542 if openssh_has_persist():
544 '-o', 'ControlMaster=auto',
545 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
551 for x in xrange(retry):
552 # connects to the remote host and starts a remote connection
553 proc = subprocess.Popen(args,
554 stdout = subprocess.PIPE,
555 stdin = subprocess.PIPE,
556 stderr = subprocess.PIPE)
558 # attach tempfile object to the process, to make sure the file stays
559 # alive until the process is finished with it
560 proc._known_hosts = tmp_known_hosts
563 (out, err) = proc.communicate()
564 eintr_retry(proc.wait)()
565 msg = " rcopy - host %s - command %s " % (host, " ".join(args))
566 log(msg, logging.DEBUG, out, err)
570 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
571 t, x, host, " ".join(args))
572 log(msg, logging.DEBUG)
578 except RuntimeError, e:
579 msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
580 log(msg, logging.DEBUG, out, err)
586 return ((out, err), proc)
588 def rspawn(command, pidfile,
589 stdout = '/dev/null',
603 Spawn a remote command such that it will continue working asynchronously.
606 command: the command to run - it should be a single line.
608 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
610 stdout: path of a file to redirect standard output to - must be a string.
611 Defaults to /dev/null
612 stderr: path of a file to redirect standard error to - string or the special STDOUT value
613 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
614 stdin: path of a file with input to be piped into the command's standard input
616 home: path of a folder to use as working directory - should exist, unless you specify create_home
618 create_home: if True, the home folder will be created first with mkdir -p
620 sudo: whether the command needs to be executed as root
622 host/port/user/agent/identity: see rexec
625 (stdout, stderr), process
627 Of the spawning process, which only captures errors at spawning time.
628 Usually only useful for diagnostics.
630 # Start process in a "daemonized" way, using nohup and heavy
631 # stdin/out redirection to avoid connection issues
635 stderr = ' ' + stderr
637 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
639 'pidfile' : shell_escape(pidfile),
645 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
646 'command' : shell_escape(daemon_command),
647 'sudo' : 'sudo -S' if sudo else '',
648 'pidfile' : shell_escape(pidfile),
649 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
650 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
653 (out,err),proc = rexec(
660 server_key = server_key,
665 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
667 return ((out, err), proc)
670 def rcheckpid(pidfile,
678 Check the pidfile of a process spawned with remote_spawn.
681 pidfile: the pidfile passed to remote_span
683 host/port/user/agent/identity: see rexec
687 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
688 or None if the pidfile isn't valid yet (maybe the process is still starting).
691 (out,err),proc = rexec(
692 "cat %(pidfile)s" % {
700 server_key = server_key
708 return map(int,out.strip().split(' ',1))
710 # Ignore, many ways to fail that don't matter that much
714 def rstatus(pid, ppid,
722 Check the status of a process spawned with remote_spawn.
725 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
727 host/port/user/agent/identity: see rexec
731 One of NOT_STARTED, RUNNING, FINISHED
734 (out,err),proc = rexec(
735 # Check only by pid. pid+ppid does not always work (especially with sudo)
736 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
745 server_key = server_key
753 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
756 status = (out.strip() == 'wait')
759 return RUNNING if status else FINISHED
772 Kill a process spawned with remote_spawn.
774 First tries a SIGTERM, and if the process does not end in 10 seconds,
778 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
780 sudo: whether the command was run with sudo - careful killing like this.
782 host/port/user/agent/identity: see rexec
786 Nothing, should have killed the process
789 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
791 SUBKILL="%(subkill)s" ;
792 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
793 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
794 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
796 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
799 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
800 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
804 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
805 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
806 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
810 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
812 (out,err),proc = rexec(
816 'sudo' : 'sudo -S' if sudo else '',
824 server_key = server_key
827 # wait, don't leave zombies around
830 return (out, err), proc
833 def _communicate(self, input, timeout=None, err_on_timeout=True):
836 stdout = None # Return
837 stderr = None # Return
841 if timeout is not None:
842 timelimit = time.time() + timeout
843 killtime = timelimit + 4
844 bailtime = timelimit + 4
847 # Flush stdio buffer. This might block, if the user has
848 # been writing to .stdin in an uncontrolled fashion.
851 write_set.append(self.stdin)
855 read_set.append(self.stdout)
858 read_set.append(self.stderr)
862 while read_set or write_set:
863 if timeout is not None:
864 curtime = time.time()
865 if timeout is None or curtime > timelimit:
866 if curtime > bailtime:
868 elif curtime > killtime:
869 signum = signal.SIGKILL
871 signum = signal.SIGTERM
873 os.kill(self.pid, signum)
876 select_timeout = timelimit - curtime + 0.1
880 if select_timeout > 1.0:
884 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
885 except select.error,e:
891 if not rlist and not wlist and not xlist and self.poll() is not None:
892 # timeout and process exited, say bye
895 if self.stdin in wlist:
896 # When select has indicated that the file is writable,
897 # we can write up to PIPE_BUF bytes without risk
898 # blocking. POSIX defines PIPE_BUF >= 512
899 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
900 input_offset += bytes_written
901 if input_offset >= len(input):
903 write_set.remove(self.stdin)
905 if self.stdout in rlist:
906 data = os.read(self.stdout.fileno(), 1024)
909 read_set.remove(self.stdout)
912 if self.stderr in rlist:
913 data = os.read(self.stderr.fileno(), 1024)
916 read_set.remove(self.stderr)
919 # All data exchanged. Translate lists into strings.
920 if stdout is not None:
921 stdout = ''.join(stdout)
922 if stderr is not None:
923 stderr = ''.join(stderr)
925 # Translate newlines, if requested. We cannot let the file
926 # object do the translation: It is based on stdio, which is
927 # impossible to combine with select (unless forcing no
929 if self.universal_newlines and hasattr(file, 'newlines'):
931 stdout = self._translate_newlines(stdout)
933 stderr = self._translate_newlines(stderr)
935 if killed and err_on_timeout:
936 errcode = self.poll()
937 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
943 return (stdout, stderr)