2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 ## TODO: This code needs reviewing !!!
38 logger = logging.getLogger("sshfuncs")
40 def log(msg, level, out = None, err = None):
42 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
47 logger.log(level, msg)
49 if hasattr(os, "devnull"):
52 DEV_NULL = "/dev/null"
54 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
58 Special value that when given to rspawn in stderr causes stderr to
59 redirect to whatever stdout was redirected to.
64 Codes for status of remote spawned process
66 # Process is still running
72 # Process hasn't started running yet (this should be very rare)
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
78 def gethostbyname(host):
79 global hostbyname_cache
80 global hostbyname_cache_lock
82 hostbyname = hostbyname_cache.get(host)
84 with hostbyname_cache_lock:
85 hostbyname = socket.gethostbyname(host)
86 hostbyname_cache[host] = hostbyname
88 msg = " Added hostbyname %s - %s " % (host, hostbyname)
89 log(msg, logging.DEBUG)
93 OPENSSH_HAS_PERSIST = None
95 def openssh_has_persist():
96 """ The ssh_config options ControlMaster and ControlPersist allow to
97 reuse a same network connection for multiple ssh sessions. In this
98 way limitations on number of open ssh connections can be bypassed.
99 However, older versions of openSSH do not support this feature.
100 This function is used to determine if ssh connection persist features
103 global OPENSSH_HAS_PERSIST
104 if OPENSSH_HAS_PERSIST is None:
105 proc = subprocess.Popen(["ssh","-v"],
106 stdout = subprocess.PIPE,
107 stderr = subprocess.STDOUT,
108 stdin = open("/dev/null","r") )
109 out,err = proc.communicate()
112 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113 OPENSSH_HAS_PERSIST = bool(vre.match(out))
114 return OPENSSH_HAS_PERSIST
116 def make_server_key_args(server_key, host, port):
117 """ Returns a reference to a temporary known_hosts file, to which
118 the server key has been added.
120 Make sure to hold onto the temp file reference until the process is
123 :param server_key: the server public key
124 :type server_key: str
126 :param host: the hostname
129 :param port: the ssh port
134 host = '%s:%s' % (host, str(port))
136 # Create a temporary server key file
137 tmp_known_hosts = tempfile.NamedTemporaryFile()
139 hostbyname = gethostbyname(host)
141 # Add the intended host key
142 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
144 # If we're not in strict mode, add user-configured keys
145 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147 if os.access(user_hosts_path, os.R_OK):
148 f = open(user_hosts_path, "r")
149 tmp_known_hosts.write(f.read())
152 tmp_known_hosts.flush()
154 return tmp_known_hosts
156 def make_control_path(agent, forward_x11):
157 ctrl_path = "/tmp/nepi_ssh"
165 ctrl_path += "-%r@%h:%p"
170 """ Escapes strings so that they are safe to use as command-line
172 if SHELL_SAFE.match(s):
173 # safe string - no escaping needed
176 # unsafe string - escape
178 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
181 return "'$'\\x%02x''" % (ord(c),)
182 s = ''.join(map(escp,s))
185 def eintr_retry(func):
186 """Retries a function invocation when a EINTR occurs"""
188 @functools.wraps(func)
190 retry = kw.pop("_retry", False)
191 for i in xrange(0 if retry else 4):
193 return func(*p, **kw)
194 except (select.error, socket.error), args:
195 if args[0] == errno.EINTR:
200 if e.errno == errno.EINTR:
205 return func(*p, **kw)
208 def rexec(command, host, user,
218 connect_timeout = 30,
223 strict_host_checking = True):
225 Executes a remote command, returns ((stdout,stderr),process)
228 tmp_known_hosts = None
230 hostip = gethostbyname(host)
234 # Don't bother with localhost. Makes test easier
235 '-o', 'NoHostAuthenticationForLocalhost=yes',
236 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
237 '-o', 'ConnectionAttempts=3',
238 '-o', 'ServerAliveInterval=30',
239 '-o', 'TCPKeepAlive=yes',
240 '-o', 'Batchmode=yes',
241 '-l', user, hostip or host]
243 if persistent and openssh_has_persist():
245 '-o', 'ControlMaster=auto',
246 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
247 '-o', 'ControlPersist=60' ])
249 if not strict_host_checking:
250 # Do not check for Host key. Unsafe.
251 args.extend(['-o', 'StrictHostKeyChecking=no'])
254 proxycommand = _proxy_command(gw, gwuser, identity)
255 args.extend(['-o', proxycommand])
261 args.append('-p%d' % port)
264 identity = os.path.expanduser(identity)
265 args.extend(('-i', identity))
275 # Create a temporary server key file
276 tmp_known_hosts = make_server_key_args(server_key, host, port)
277 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
280 command = "sudo " + command
284 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
286 stdout = stderr = stdin = subprocess.PIPE
288 stdout = stderr = stdin = None
290 return _retry_rexec(args, log_msg,
296 tmp_known_hosts = tmp_known_hosts,
299 def rcopy(source, dest,
307 strict_host_checking = True):
309 Copies from/to remote sites.
311 Source and destination should have the user and host encoded
314 Source can be a list of files to copy to a single destination,
315 (in which case it is advised that the destination be a folder),
316 or a single file in a string.
319 # Parse destination as <user>@<server>:<path>
320 if isinstance(dest, str) and ':' in dest:
321 remspec, path = dest.split(':',1)
322 elif isinstance(source, str) and ':' in source:
323 remspec, path = source.split(':',1)
325 raise ValueError, "Both endpoints cannot be local"
326 user,host = remspec.rsplit('@',1)
329 tmp_known_hosts = None
331 args = ['scp', '-q', '-p', '-C',
332 # Speed up transfer using blowfish cypher specification which is
333 # faster than the default one (3des)
335 # Don't bother with localhost. Makes test easier
336 '-o', 'NoHostAuthenticationForLocalhost=yes',
337 '-o', 'ConnectTimeout=60',
338 '-o', 'ConnectionAttempts=3',
339 '-o', 'ServerAliveInterval=30',
340 '-o', 'TCPKeepAlive=yes' ]
343 args.append('-P%d' % port)
346 proxycommand = _proxy_command(gw, gwuser, identity)
347 args.extend(['-o', proxycommand])
353 identity = os.path.expanduser(identity)
354 args.extend(('-i', identity))
357 # Create a temporary server key file
358 tmp_known_hosts = make_server_key_args(server_key, host, port)
359 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
361 if not strict_host_checking:
362 # Do not check for Host key. Unsafe.
363 args.extend(['-o', 'StrictHostKeyChecking=no'])
365 if isinstance(source, list):
368 if openssh_has_persist():
370 '-o', 'ControlMaster=auto',
371 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
375 if isinstance(dest, list):
380 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
382 return _retry_rexec(args, log_msg, env = None, retry = retry,
383 tmp_known_hosts = tmp_known_hosts,
386 def rspawn(command, pidfile,
387 stdout = '/dev/null',
402 strict_host_checking = True):
404 Spawn a remote command such that it will continue working asynchronously in
407 :param command: The command to run, it should be a single line.
410 :param pidfile: Path to a file where to store the pid and ppid of the
414 :param stdout: Path to file to redirect standard output.
415 The default value is /dev/null
418 :param stderr: Path to file to redirect standard error.
419 If the special STDOUT value is used, stderr will
420 be redirected to the same file as stdout
423 :param stdin: Path to a file with input to be piped into the command's standard input
426 :param home: Path to working directory folder.
427 It is assumed to exist unless the create_home flag is set.
430 :param create_home: Flag to force creation of the home folder before
432 :type create_home: bool
434 :param sudo: Flag forcing execution with sudo user
439 (stdout, stderr), process
441 Of the spawning process, which only captures errors at spawning time.
442 Usually only useful for diagnostics.
444 # Start process in a "daemonized" way, using nohup and heavy
445 # stdin/out redirection to avoid connection issues
449 stderr = ' ' + stderr
451 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
453 'pidfile' : shell_escape(pidfile),
459 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
460 'command' : shell_escape(daemon_command),
461 'sudo' : 'sudo -S' if sudo else '',
462 'pidfile' : shell_escape(pidfile),
463 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
464 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
467 (out,err),proc = rexec(
476 server_key = server_key,
478 strict_host_checking = strict_host_checking ,
482 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
484 return ((out, err), proc)
496 strict_host_checking = True):
498 Returns the pid and ppid of a process from a remote file where the
499 information was stored.
501 :param home: Path to directory where the pidfile is located
504 :param pidfile: Name of file containing the pid information
509 A (pid, ppid) tuple useful for calling rstatus and rkill,
510 or None if the pidfile isn't valid yet (can happen when process is staring up)
513 (out,err),proc = rexec(
514 "cat %(pidfile)s" % {
524 server_key = server_key,
525 strict_host_checking = strict_host_checking
533 return map(int,out.strip().split(' ',1))
535 # Ignore, many ways to fail that don't matter that much
539 def rstatus(pid, ppid,
548 strict_host_checking = True):
550 Returns a code representing the the status of a remote process
552 :param pid: Process id of the process
555 :param ppid: Parent process id of process
558 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
561 (out,err),proc = rexec(
562 # Check only by pid. pid+ppid does not always work (especially with sudo)
563 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
574 server_key = server_key,
575 strict_host_checking = strict_host_checking
579 return ProcStatus.NOT_STARTED
583 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
586 status = (out.strip() == 'wait')
588 return ProcStatus.NOT_STARTED
589 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
603 strict_host_checking = True):
605 Sends a kill signal to a remote process.
607 First tries a SIGTERM, and if the process does not end in 10 seconds,
610 :param pid: Process id of process to be killed
613 :param ppid: Parent process id of process to be killed
616 :param sudo: Flag indicating if sudo should be used to kill the process
620 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
622 SUBKILL="%(subkill)s" ;
623 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
624 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
625 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
627 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
630 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
631 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
635 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
636 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
637 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
641 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
643 (out,err),proc = rexec(
647 'sudo' : 'sudo -S' if sudo else '',
657 server_key = server_key,
658 strict_host_checking = strict_host_checking
661 # wait, don't leave zombies around
664 return (out, err), proc
666 def _retry_rexec(args,
668 stdout = subprocess.PIPE,
669 stdin = subprocess.PIPE,
670 stderr = subprocess.PIPE,
673 tmp_known_hosts = None,
676 for x in xrange(retry):
677 # connects to the remote host and starts a remote connection
678 proc = subprocess.Popen(args,
684 # attach tempfile object to the process, to make sure the file stays
685 # alive until the process is finished with it
686 proc._known_hosts = tmp_known_hosts
688 # The argument block == False forces to rexec to return immediately,
693 #(out, err) = proc.communicate()
694 # The method communicate was re implemented for performance issues
695 # when using python subprocess communicate method the ssh commands
696 # last one minute each
697 out, err = _communicate(proc, input=None)
700 out = proc.stdout.read()
701 if proc.poll() and stderr:
702 err = proc.stderr.read()
704 log(log_msg, logging.DEBUG, out, err)
709 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
710 # SSH error, can safely retry
713 # Probably timed out or plain failed but can retry
718 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
719 t, x, " ".join(args))
720 log(msg, logging.DEBUG)
725 except RuntimeError, e:
726 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
727 log(msg, logging.DEBUG, out, err)
733 return ((out, err), proc)
736 # Don't remove. The method communicate was re implemented for performance issues
737 def _communicate(proc, input, timeout=None, err_on_timeout=True):
740 stdout = None # Return
741 stderr = None # Return
745 if timeout is not None:
746 timelimit = time.time() + timeout
747 killtime = timelimit + 4
748 bailtime = timelimit + 4
751 # Flush stdio buffer. This might block, if the user has
752 # been writing to .stdin in an uncontrolled fashion.
755 write_set.append(proc.stdin)
760 read_set.append(proc.stdout)
764 read_set.append(proc.stderr)
768 while read_set or write_set:
769 if timeout is not None:
770 curtime = time.time()
771 if timeout is None or curtime > timelimit:
772 if curtime > bailtime:
774 elif curtime > killtime:
775 signum = signal.SIGKILL
777 signum = signal.SIGTERM
779 os.kill(proc.pid, signum)
782 select_timeout = timelimit - curtime + 0.1
786 if select_timeout > 1.0:
790 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
791 except select.error,e:
797 if not rlist and not wlist and not xlist and proc.poll() is not None:
798 # timeout and process exited, say bye
801 if proc.stdin in wlist:
802 # When select has indicated that the file is writable,
803 # we can write up to PIPE_BUF bytes without risk
804 # blocking. POSIX defines PIPE_BUF >= 512
805 bytes_written = os.write(proc.stdin.fileno(),
806 buffer(input, input_offset, 512))
807 input_offset += bytes_written
809 if input_offset >= len(input):
811 write_set.remove(proc.stdin)
813 if proc.stdout in rlist:
814 data = os.read(proc.stdout.fileno(), 1024)
817 read_set.remove(proc.stdout)
820 if proc.stderr in rlist:
821 data = os.read(proc.stderr.fileno(), 1024)
824 read_set.remove(proc.stderr)
827 # All data exchanged. Translate lists into strings.
828 if stdout is not None:
829 stdout = ''.join(stdout)
830 if stderr is not None:
831 stderr = ''.join(stderr)
833 # Translate newlines, if requested. We cannot let the file
834 # object do the translation: It is based on stdio, which is
835 # impossible to combine with select (unless forcing no
837 if proc.universal_newlines and hasattr(file, 'newlines'):
839 stdout = proc._translate_newlines(stdout)
841 stderr = proc._translate_newlines(stderr)
843 if killed and err_on_timeout:
844 errcode = proc.poll()
845 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
851 return (stdout, stderr)
853 def _proxy_command(gw, gwuser, gwidentity):
855 Constructs the SSH ProxyCommand option to add to the SSH command to connect
857 :param gw: SSH proxy hostname
860 :param gwuser: SSH proxy username
863 :param gwidentity: SSH proxy identity file
864 :type gwidentity: str
869 returns the SSH ProxyCommand option.
872 proxycommand = 'ProxyCommand=ssh -q '
874 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
876 proxycommand += '%s' % gwuser
879 proxycommand += '@%s -W %%h:%%p' % gw