2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 ## TODO: This code needs reviewing !!!
38 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
40 logger = logging.getLogger("sshfuncs")
42 def log(msg, level, out = None, err = None):
44 msg += " - OUT: %s " % out
47 msg += " - ERROR: %s " % err
49 logger.log(level, msg)
51 if hasattr(os, "devnull"):
54 DEV_NULL = "/dev/null"
56 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
60 Special value that when given to rspawn in stderr causes stderr to
61 redirect to whatever stdout was redirected to.
66 Codes for status of remote spawned process
68 # Process is still running
74 # Process hasn't started running yet (this should be very rare)
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
80 def resolve_hostname(host):
83 if host in ["localhost", "127.0.0.1", "::1"]:
85 ip = socket.gethostbyname(socket.gethostname())
86 except socket.gaierror, e: #[Errno -5] No address associated with hostname
87 p = subprocess.Popen("ip -o addr list", shell=True,
88 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
89 stdout, stderr = p.communicate()
90 m = _re_inet.findall(stdout)
91 ip = m[0][1].split("/")[0]
93 ip = socket.gethostbyname(host)
97 def gethostbyname(host):
98 global hostbyname_cache
99 global hostbyname_cache_lock
101 hostbyname = hostbyname_cache.get(host)
103 with hostbyname_cache_lock:
104 hostbyname = resolve_hostname(host)
105 hostbyname_cache[host] = hostbyname
107 msg = " Added hostbyname %s - %s " % (host, hostbyname)
108 log(msg, logging.DEBUG)
112 OPENSSH_HAS_PERSIST = None
114 def openssh_has_persist():
115 """ The ssh_config options ControlMaster and ControlPersist allow to
116 reuse a same network connection for multiple ssh sessions. In this
117 way limitations on number of open ssh connections can be bypassed.
118 However, older versions of openSSH do not support this feature.
119 This function is used to determine if ssh connection persist features
122 global OPENSSH_HAS_PERSIST
123 if OPENSSH_HAS_PERSIST is None:
124 proc = subprocess.Popen(["ssh","-v"],
125 stdout = subprocess.PIPE,
126 stderr = subprocess.STDOUT,
127 stdin = open("/dev/null","r") )
128 out,err = proc.communicate()
131 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
132 OPENSSH_HAS_PERSIST = bool(vre.match(out))
133 return OPENSSH_HAS_PERSIST
135 def make_server_key_args(server_key, host, port):
136 """ Returns a reference to a temporary known_hosts file, to which
137 the server key has been added.
139 Make sure to hold onto the temp file reference until the process is
142 :param server_key: the server public key
143 :type server_key: str
145 :param host: the hostname
148 :param port: the ssh port
153 host = '%s:%s' % (host, str(port))
155 # Create a temporary server key file
156 tmp_known_hosts = tempfile.NamedTemporaryFile()
158 hostbyname = gethostbyname(host)
160 # Add the intended host key
161 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
163 # If we're not in strict mode, add user-configured keys
164 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
165 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
166 if os.access(user_hosts_path, os.R_OK):
167 f = open(user_hosts_path, "r")
168 tmp_known_hosts.write(f.read())
171 tmp_known_hosts.flush()
173 return tmp_known_hosts
175 def make_control_path(agent, forward_x11):
176 ctrl_path = "/tmp/nepi_ssh"
184 ctrl_path += "-%r@%h:%p"
189 """ Escapes strings so that they are safe to use as command-line
191 if SHELL_SAFE.match(s):
192 # safe string - no escaping needed
195 # unsafe string - escape
197 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
200 return "'$'\\x%02x''" % (ord(c),)
201 s = ''.join(map(escp,s))
204 def eintr_retry(func):
205 """Retries a function invocation when a EINTR occurs"""
207 @functools.wraps(func)
209 retry = kw.pop("_retry", False)
210 for i in xrange(0 if retry else 4):
212 return func(*p, **kw)
213 except (select.error, socket.error), args:
214 if args[0] == errno.EINTR:
219 if e.errno == errno.EINTR:
224 return func(*p, **kw)
227 def rexec(command, host, user,
237 connect_timeout = 30,
242 strict_host_checking = True):
244 Executes a remote command, returns ((stdout,stderr),process)
247 tmp_known_hosts = None
249 hostip = gethostbyname(host)
253 # Don't bother with localhost. Makes test easier
254 '-o', 'NoHostAuthenticationForLocalhost=yes',
255 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
256 '-o', 'ConnectionAttempts=3',
257 '-o', 'ServerAliveInterval=30',
258 '-o', 'TCPKeepAlive=yes',
259 '-o', 'Batchmode=yes',
260 '-l', user, hostip or host]
262 if persistent and openssh_has_persist():
264 '-o', 'ControlMaster=auto',
265 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
266 '-o', 'ControlPersist=60' ])
268 if not strict_host_checking:
269 # Do not check for Host key. Unsafe.
270 args.extend(['-o', 'StrictHostKeyChecking=no'])
273 proxycommand = _proxy_command(gw, gwuser, identity)
274 args.extend(['-o', proxycommand])
280 args.append('-p%d' % port)
283 identity = os.path.expanduser(identity)
284 args.extend(('-i', identity))
294 # Create a temporary server key file
295 tmp_known_hosts = make_server_key_args(server_key, host, port)
296 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
299 command = "sudo " + command
303 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
305 stdout = stderr = stdin = subprocess.PIPE
307 stdout = stderr = stdin = None
309 return _retry_rexec(args, log_msg,
315 tmp_known_hosts = tmp_known_hosts,
318 def rcopy(source, dest,
326 strict_host_checking = True):
328 Copies from/to remote sites.
330 Source and destination should have the user and host encoded
333 Source can be a list of files to copy to a single destination,
334 (in which case it is advised that the destination be a folder),
335 or a single file in a string.
338 # Parse destination as <user>@<server>:<path>
339 if isinstance(dest, str) and ':' in dest:
340 remspec, path = dest.split(':',1)
341 elif isinstance(source, str) and ':' in source:
342 remspec, path = source.split(':',1)
344 raise ValueError, "Both endpoints cannot be local"
345 user,host = remspec.rsplit('@',1)
348 tmp_known_hosts = None
350 args = ['scp', '-q', '-p', '-C',
351 # Speed up transfer using blowfish cypher specification which is
352 # faster than the default one (3des)
354 # Don't bother with localhost. Makes test easier
355 '-o', 'NoHostAuthenticationForLocalhost=yes',
356 '-o', 'ConnectTimeout=60',
357 '-o', 'ConnectionAttempts=3',
358 '-o', 'ServerAliveInterval=30',
359 '-o', 'TCPKeepAlive=yes' ]
362 args.append('-P%d' % port)
365 proxycommand = _proxy_command(gw, gwuser, identity)
366 args.extend(['-o', proxycommand])
372 identity = os.path.expanduser(identity)
373 args.extend(('-i', identity))
376 # Create a temporary server key file
377 tmp_known_hosts = make_server_key_args(server_key, host, port)
378 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
380 if not strict_host_checking:
381 # Do not check for Host key. Unsafe.
382 args.extend(['-o', 'StrictHostKeyChecking=no'])
384 if isinstance(source, list):
387 if openssh_has_persist():
389 '-o', 'ControlMaster=auto',
390 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
394 if isinstance(dest, list):
399 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
401 return _retry_rexec(args, log_msg, env = None, retry = retry,
402 tmp_known_hosts = tmp_known_hosts,
405 def rspawn(command, pidfile,
406 stdout = '/dev/null',
421 strict_host_checking = True):
423 Spawn a remote command such that it will continue working asynchronously in
426 :param command: The command to run, it should be a single line.
429 :param pidfile: Path to a file where to store the pid and ppid of the
433 :param stdout: Path to file to redirect standard output.
434 The default value is /dev/null
437 :param stderr: Path to file to redirect standard error.
438 If the special STDOUT value is used, stderr will
439 be redirected to the same file as stdout
442 :param stdin: Path to a file with input to be piped into the command's standard input
445 :param home: Path to working directory folder.
446 It is assumed to exist unless the create_home flag is set.
449 :param create_home: Flag to force creation of the home folder before
451 :type create_home: bool
453 :param sudo: Flag forcing execution with sudo user
458 (stdout, stderr), process
460 Of the spawning process, which only captures errors at spawning time.
461 Usually only useful for diagnostics.
463 # Start process in a "daemonized" way, using nohup and heavy
464 # stdin/out redirection to avoid connection issues
468 stderr = ' ' + stderr
470 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
472 'pidfile' : shell_escape(pidfile),
478 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
479 'command' : shell_escape(daemon_command),
480 'sudo' : 'sudo -S' if sudo else '',
481 'pidfile' : shell_escape(pidfile),
482 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
483 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
486 (out,err),proc = rexec(
495 server_key = server_key,
497 strict_host_checking = strict_host_checking ,
501 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
503 return ((out, err), proc)
515 strict_host_checking = True):
517 Returns the pid and ppid of a process from a remote file where the
518 information was stored.
520 :param home: Path to directory where the pidfile is located
523 :param pidfile: Name of file containing the pid information
528 A (pid, ppid) tuple useful for calling rstatus and rkill,
529 or None if the pidfile isn't valid yet (can happen when process is staring up)
532 (out,err),proc = rexec(
533 "cat %(pidfile)s" % {
543 server_key = server_key,
544 strict_host_checking = strict_host_checking
552 return map(int,out.strip().split(' ',1))
554 # Ignore, many ways to fail that don't matter that much
558 def rstatus(pid, ppid,
567 strict_host_checking = True):
569 Returns a code representing the the status of a remote process
571 :param pid: Process id of the process
574 :param ppid: Parent process id of process
577 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
580 (out,err),proc = rexec(
581 # Check only by pid. pid+ppid does not always work (especially with sudo)
582 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
593 server_key = server_key,
594 strict_host_checking = strict_host_checking
598 return ProcStatus.NOT_STARTED
602 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
605 status = (out.strip() == 'wait')
607 return ProcStatus.NOT_STARTED
608 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
622 strict_host_checking = True):
624 Sends a kill signal to a remote process.
626 First tries a SIGTERM, and if the process does not end in 10 seconds,
629 :param pid: Process id of process to be killed
632 :param ppid: Parent process id of process to be killed
635 :param sudo: Flag indicating if sudo should be used to kill the process
639 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
641 SUBKILL="%(subkill)s" ;
642 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
643 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
644 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
646 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
649 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
650 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
654 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
655 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
656 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
660 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
662 (out,err),proc = rexec(
666 'sudo' : 'sudo -S' if sudo else '',
676 server_key = server_key,
677 strict_host_checking = strict_host_checking
680 # wait, don't leave zombies around
683 return (out, err), proc
685 def _retry_rexec(args,
687 stdout = subprocess.PIPE,
688 stdin = subprocess.PIPE,
689 stderr = subprocess.PIPE,
692 tmp_known_hosts = None,
695 for x in xrange(retry):
696 # connects to the remote host and starts a remote connection
697 proc = subprocess.Popen(args,
703 # attach tempfile object to the process, to make sure the file stays
704 # alive until the process is finished with it
705 proc._known_hosts = tmp_known_hosts
707 # The argument block == False forces to rexec to return immediately,
712 #(out, err) = proc.communicate()
713 # The method communicate was re implemented for performance issues
714 # when using python subprocess communicate method the ssh commands
715 # last one minute each
716 out, err = _communicate(proc, input=None)
719 out = proc.stdout.read()
720 if proc.poll() and stderr:
721 err = proc.stderr.read()
723 log(log_msg, logging.DEBUG, out, err)
728 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
729 # SSH error, can safely retry
732 # Probably timed out or plain failed but can retry
737 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
738 t, x, " ".join(args))
739 log(msg, logging.DEBUG)
744 except RuntimeError, e:
745 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
746 log(msg, logging.DEBUG, out, err)
752 return ((out, err), proc)
755 # Don't remove. The method communicate was re implemented for performance issues
756 def _communicate(proc, input, timeout=None, err_on_timeout=True):
759 stdout = None # Return
760 stderr = None # Return
764 if timeout is not None:
765 timelimit = time.time() + timeout
766 killtime = timelimit + 4
767 bailtime = timelimit + 4
770 # Flush stdio buffer. This might block, if the user has
771 # been writing to .stdin in an uncontrolled fashion.
774 write_set.append(proc.stdin)
779 read_set.append(proc.stdout)
783 read_set.append(proc.stderr)
787 while read_set or write_set:
788 if timeout is not None:
789 curtime = time.time()
790 if timeout is None or curtime > timelimit:
791 if curtime > bailtime:
793 elif curtime > killtime:
794 signum = signal.SIGKILL
796 signum = signal.SIGTERM
798 os.kill(proc.pid, signum)
801 select_timeout = timelimit - curtime + 0.1
805 if select_timeout > 1.0:
809 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
810 except select.error,e:
816 if not rlist and not wlist and not xlist and proc.poll() is not None:
817 # timeout and process exited, say bye
820 if proc.stdin in wlist:
821 # When select has indicated that the file is writable,
822 # we can write up to PIPE_BUF bytes without risk
823 # blocking. POSIX defines PIPE_BUF >= 512
824 bytes_written = os.write(proc.stdin.fileno(),
825 buffer(input, input_offset, 512))
826 input_offset += bytes_written
828 if input_offset >= len(input):
830 write_set.remove(proc.stdin)
832 if proc.stdout in rlist:
833 data = os.read(proc.stdout.fileno(), 1024)
836 read_set.remove(proc.stdout)
839 if proc.stderr in rlist:
840 data = os.read(proc.stderr.fileno(), 1024)
843 read_set.remove(proc.stderr)
846 # All data exchanged. Translate lists into strings.
847 if stdout is not None:
848 stdout = ''.join(stdout)
849 if stderr is not None:
850 stderr = ''.join(stderr)
852 # Translate newlines, if requested. We cannot let the file
853 # object do the translation: It is based on stdio, which is
854 # impossible to combine with select (unless forcing no
856 if proc.universal_newlines and hasattr(file, 'newlines'):
858 stdout = proc._translate_newlines(stdout)
860 stderr = proc._translate_newlines(stderr)
862 if killed and err_on_timeout:
863 errcode = proc.poll()
864 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
870 return (stdout, stderr)
872 def _proxy_command(gw, gwuser, gwidentity):
874 Constructs the SSH ProxyCommand option to add to the SSH command to connect
876 :param gw: SSH proxy hostname
879 :param gwuser: SSH proxy username
882 :param gwidentity: SSH proxy identity file
883 :type gwidentity: str
888 returns the SSH ProxyCommand option.
891 proxycommand = 'ProxyCommand=ssh -q '
893 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
895 proxycommand += '%s' % gwuser
898 proxycommand += '@%s -W %%h:%%p' % gw