2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
20 ## TODO: This code needs reviewing !!!
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
39 logger = logging.getLogger("sshfuncs")
41 def log(msg, level, out = None, err = None):
43 msg += " - OUT: %s " % out
46 msg += " - ERROR: %s " % err
48 logger.log(level, msg)
50 if hasattr(os, "devnull"):
53 DEV_NULL = "/dev/null"
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
59 Special value that when given to rspawn in stderr causes stderr to
60 redirect to whatever stdout was redirected to.
65 Codes for status of remote spawned process
67 # Process is still running
73 # Process hasn't started running yet (this should be very rare)
76 hostbyname_cache = dict()
77 hostbyname_cache_lock = threading.Lock()
79 def resolve_hostname(host):
82 if host in ["localhost", "127.0.0.1", "::1"]:
83 p = subprocess.Popen("ip -o addr list", shell=True,
84 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
85 stdout, stderr = p.communicate()
86 m = _re_inet.findall(stdout)
87 ip = m[0][1].split("/")[0]
89 ip = socket.gethostbyname(host)
93 def gethostbyname(host):
94 global hostbyname_cache
95 global hostbyname_cache_lock
97 hostbyname = hostbyname_cache.get(host)
99 with hostbyname_cache_lock:
100 hostbyname = resolve_hostname(host)
101 hostbyname_cache[host] = hostbyname
103 msg = " Added hostbyname %s - %s " % (host, hostbyname)
104 log(msg, logging.DEBUG)
108 OPENSSH_HAS_PERSIST = None
110 def openssh_has_persist():
111 """ The ssh_config options ControlMaster and ControlPersist allow to
112 reuse a same network connection for multiple ssh sessions. In this
113 way limitations on number of open ssh connections can be bypassed.
114 However, older versions of openSSH do not support this feature.
115 This function is used to determine if ssh connection persist features
118 global OPENSSH_HAS_PERSIST
119 if OPENSSH_HAS_PERSIST is None:
120 proc = subprocess.Popen(["ssh","-v"],
121 stdout = subprocess.PIPE,
122 stderr = subprocess.STDOUT,
123 stdin = open("/dev/null","r") )
124 out,err = proc.communicate()
127 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
128 OPENSSH_HAS_PERSIST = bool(vre.match(out))
129 return OPENSSH_HAS_PERSIST
131 def make_server_key_args(server_key, host, port):
132 """ Returns a reference to a temporary known_hosts file, to which
133 the server key has been added.
135 Make sure to hold onto the temp file reference until the process is
138 :param server_key: the server public key
139 :type server_key: str
141 :param host: the hostname
144 :param port: the ssh port
149 host = '%s:%s' % (host, str(port))
151 # Create a temporary server key file
152 tmp_known_hosts = tempfile.NamedTemporaryFile()
154 hostbyname = gethostbyname(host)
156 # Add the intended host key
157 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
159 # If we're not in strict mode, add user-configured keys
160 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
161 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
162 if os.access(user_hosts_path, os.R_OK):
163 f = open(user_hosts_path, "r")
164 tmp_known_hosts.write(f.read())
167 tmp_known_hosts.flush()
169 return tmp_known_hosts
171 def make_control_path(agent, forward_x11):
172 ctrl_path = "/tmp/nepi_ssh"
180 ctrl_path += "-%r@%h:%p"
185 """ Escapes strings so that they are safe to use as command-line
187 if SHELL_SAFE.match(s):
188 # safe string - no escaping needed
191 # unsafe string - escape
193 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
196 return "'$'\\x%02x''" % (ord(c),)
197 s = ''.join(map(escp,s))
200 def eintr_retry(func):
201 """Retries a function invocation when a EINTR occurs"""
203 @functools.wraps(func)
205 retry = kw.pop("_retry", False)
206 for i in xrange(0 if retry else 4):
208 return func(*p, **kw)
209 except (select.error, socket.error), args:
210 if args[0] == errno.EINTR:
215 if e.errno == errno.EINTR:
220 return func(*p, **kw)
223 def rexec(command, host, user,
233 connect_timeout = 30,
238 strict_host_checking = True):
240 Executes a remote command, returns ((stdout,stderr),process)
243 tmp_known_hosts = None
245 hostip = gethostbyname(host)
249 # Don't bother with localhost. Makes test easier
250 '-o', 'NoHostAuthenticationForLocalhost=yes',
251 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
252 '-o', 'ConnectionAttempts=3',
253 '-o', 'ServerAliveInterval=30',
254 '-o', 'TCPKeepAlive=yes',
255 '-o', 'Batchmode=yes',
256 '-l', user, hostip or host]
258 if persistent and openssh_has_persist():
260 '-o', 'ControlMaster=auto',
261 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
262 '-o', 'ControlPersist=60' ])
264 if not strict_host_checking:
265 # Do not check for Host key. Unsafe.
266 args.extend(['-o', 'StrictHostKeyChecking=no'])
269 proxycommand = _proxy_command(gw, gwuser, identity)
270 args.extend(['-o', proxycommand])
276 args.append('-p%d' % port)
279 identity = os.path.expanduser(identity)
280 args.extend(('-i', identity))
290 # Create a temporary server key file
291 tmp_known_hosts = make_server_key_args(server_key, host, port)
292 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
295 command = "sudo " + command
299 log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
301 stdout = stderr = stdin = subprocess.PIPE
303 stdout = stderr = stdin = None
305 return _retry_rexec(args, log_msg,
311 tmp_known_hosts = tmp_known_hosts,
314 def rcopy(source, dest,
322 strict_host_checking = True):
324 Copies from/to remote sites.
326 Source and destination should have the user and host encoded
329 Source can be a list of files to copy to a single destination,
330 (in which case it is advised that the destination be a folder),
331 or a single file in a string.
334 # Parse destination as <user>@<server>:<path>
335 if isinstance(dest, str) and ':' in dest:
336 remspec, path = dest.split(':',1)
337 elif isinstance(source, str) and ':' in source:
338 remspec, path = source.split(':',1)
340 raise ValueError, "Both endpoints cannot be local"
341 user,host = remspec.rsplit('@',1)
344 tmp_known_hosts = None
346 args = ['scp', '-q', '-p', '-C',
347 # Speed up transfer using blowfish cypher specification which is
348 # faster than the default one (3des)
350 # Don't bother with localhost. Makes test easier
351 '-o', 'NoHostAuthenticationForLocalhost=yes',
352 '-o', 'ConnectTimeout=60',
353 '-o', 'ConnectionAttempts=3',
354 '-o', 'ServerAliveInterval=30',
355 '-o', 'TCPKeepAlive=yes' ]
358 args.append('-P%d' % port)
361 proxycommand = _proxy_command(gw, gwuser, identity)
362 args.extend(['-o', proxycommand])
368 identity = os.path.expanduser(identity)
369 args.extend(('-i', identity))
372 # Create a temporary server key file
373 tmp_known_hosts = make_server_key_args(server_key, host, port)
374 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
376 if not strict_host_checking:
377 # Do not check for Host key. Unsafe.
378 args.extend(['-o', 'StrictHostKeyChecking=no'])
380 if isinstance(source, list):
383 if openssh_has_persist():
385 '-o', 'ControlMaster=auto',
386 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
390 if isinstance(dest, list):
395 log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
397 return _retry_rexec(args, log_msg, env = None, retry = retry,
398 tmp_known_hosts = tmp_known_hosts,
401 def rspawn(command, pidfile,
402 stdout = '/dev/null',
417 strict_host_checking = True):
419 Spawn a remote command such that it will continue working asynchronously in
422 :param command: The command to run, it should be a single line.
425 :param pidfile: Path to a file where to store the pid and ppid of the
429 :param stdout: Path to file to redirect standard output.
430 The default value is /dev/null
433 :param stderr: Path to file to redirect standard error.
434 If the special STDOUT value is used, stderr will
435 be redirected to the same file as stdout
438 :param stdin: Path to a file with input to be piped into the command's standard input
441 :param home: Path to working directory folder.
442 It is assumed to exist unless the create_home flag is set.
445 :param create_home: Flag to force creation of the home folder before
447 :type create_home: bool
449 :param sudo: Flag forcing execution with sudo user
454 (stdout, stderr), process
456 Of the spawning process, which only captures errors at spawning time.
457 Usually only useful for diagnostics.
459 # Start process in a "daemonized" way, using nohup and heavy
460 # stdin/out redirection to avoid connection issues
464 stderr = ' ' + stderr
466 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
468 'pidfile' : shell_escape(pidfile),
474 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
475 'command' : shell_escape(daemon_command),
476 'sudo' : 'sudo -S' if sudo else '',
477 'pidfile' : shell_escape(pidfile),
478 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
479 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
482 (out,err),proc = rexec(
491 server_key = server_key,
493 strict_host_checking = strict_host_checking ,
497 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
499 return ((out, err), proc)
511 strict_host_checking = True):
513 Returns the pid and ppid of a process from a remote file where the
514 information was stored.
516 :param home: Path to directory where the pidfile is located
519 :param pidfile: Name of file containing the pid information
524 A (pid, ppid) tuple useful for calling rstatus and rkill,
525 or None if the pidfile isn't valid yet (can happen when process is staring up)
528 (out,err),proc = rexec(
529 "cat %(pidfile)s" % {
539 server_key = server_key,
540 strict_host_checking = strict_host_checking
548 return map(int,out.strip().split(' ',1))
550 # Ignore, many ways to fail that don't matter that much
554 def rstatus(pid, ppid,
563 strict_host_checking = True):
565 Returns a code representing the the status of a remote process
567 :param pid: Process id of the process
570 :param ppid: Parent process id of process
573 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
576 (out,err),proc = rexec(
577 # Check only by pid. pid+ppid does not always work (especially with sudo)
578 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
589 server_key = server_key,
590 strict_host_checking = strict_host_checking
594 return ProcStatus.NOT_STARTED
598 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
601 status = (out.strip() == 'wait')
603 return ProcStatus.NOT_STARTED
604 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
618 strict_host_checking = True):
620 Sends a kill signal to a remote process.
622 First tries a SIGTERM, and if the process does not end in 10 seconds,
625 :param pid: Process id of process to be killed
628 :param ppid: Parent process id of process to be killed
631 :param sudo: Flag indicating if sudo should be used to kill the process
635 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
637 SUBKILL="%(subkill)s" ;
638 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
639 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
640 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
642 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
645 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
646 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
650 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
651 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
652 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
656 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
658 (out,err),proc = rexec(
662 'sudo' : 'sudo -S' if sudo else '',
672 server_key = server_key,
673 strict_host_checking = strict_host_checking
676 # wait, don't leave zombies around
679 return (out, err), proc
681 def _retry_rexec(args,
683 stdout = subprocess.PIPE,
684 stdin = subprocess.PIPE,
685 stderr = subprocess.PIPE,
688 tmp_known_hosts = None,
691 for x in xrange(retry):
692 # connects to the remote host and starts a remote connection
693 proc = subprocess.Popen(args,
699 # attach tempfile object to the process, to make sure the file stays
700 # alive until the process is finished with it
701 proc._known_hosts = tmp_known_hosts
703 # The argument block == False forces to rexec to return immediately,
708 #(out, err) = proc.communicate()
709 # The method communicate was re implemented for performance issues
710 # when using python subprocess communicate method the ssh commands
711 # last one minute each
712 out, err = _communicate(proc, input=None)
715 out = proc.stdout.read()
716 if proc.poll() and stderr:
717 err = proc.stderr.read()
719 log(log_msg, logging.DEBUG, out, err)
724 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
725 # SSH error, can safely retry
728 # Probably timed out or plain failed but can retry
733 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
734 t, x, " ".join(args))
735 log(msg, logging.DEBUG)
740 except RuntimeError, e:
741 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
742 log(msg, logging.DEBUG, out, err)
748 return ((out, err), proc)
751 # Don't remove. The method communicate was re implemented for performance issues
752 def _communicate(proc, input, timeout=None, err_on_timeout=True):
755 stdout = None # Return
756 stderr = None # Return
760 if timeout is not None:
761 timelimit = time.time() + timeout
762 killtime = timelimit + 4
763 bailtime = timelimit + 4
766 # Flush stdio buffer. This might block, if the user has
767 # been writing to .stdin in an uncontrolled fashion.
770 write_set.append(proc.stdin)
775 read_set.append(proc.stdout)
779 read_set.append(proc.stderr)
783 while read_set or write_set:
784 if timeout is not None:
785 curtime = time.time()
786 if timeout is None or curtime > timelimit:
787 if curtime > bailtime:
789 elif curtime > killtime:
790 signum = signal.SIGKILL
792 signum = signal.SIGTERM
794 os.kill(proc.pid, signum)
797 select_timeout = timelimit - curtime + 0.1
801 if select_timeout > 1.0:
805 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
806 except select.error,e:
812 if not rlist and not wlist and not xlist and proc.poll() is not None:
813 # timeout and process exited, say bye
816 if proc.stdin in wlist:
817 # When select has indicated that the file is writable,
818 # we can write up to PIPE_BUF bytes without risk
819 # blocking. POSIX defines PIPE_BUF >= 512
820 bytes_written = os.write(proc.stdin.fileno(),
821 buffer(input, input_offset, 512))
822 input_offset += bytes_written
824 if input_offset >= len(input):
826 write_set.remove(proc.stdin)
828 if proc.stdout in rlist:
829 data = os.read(proc.stdout.fileno(), 1024)
832 read_set.remove(proc.stdout)
835 if proc.stderr in rlist:
836 data = os.read(proc.stderr.fileno(), 1024)
839 read_set.remove(proc.stderr)
842 # All data exchanged. Translate lists into strings.
843 if stdout is not None:
844 stdout = ''.join(stdout)
845 if stderr is not None:
846 stderr = ''.join(stderr)
848 # Translate newlines, if requested. We cannot let the file
849 # object do the translation: It is based on stdio, which is
850 # impossible to combine with select (unless forcing no
852 if proc.universal_newlines and hasattr(file, 'newlines'):
854 stdout = proc._translate_newlines(stdout)
856 stderr = proc._translate_newlines(stderr)
858 if killed and err_on_timeout:
859 errcode = proc.poll()
860 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
866 return (stdout, stderr)
868 def _proxy_command(gw, gwuser, gwidentity):
870 Constructs the SSH ProxyCommand option to add to the SSH command to connect
872 :param gw: SSH proxy hostname
875 :param gwuser: SSH proxy username
878 :param gwidentity: SSH proxy identity file
879 :type gwidentity: str
884 returns the SSH ProxyCommand option.
887 proxycommand = 'ProxyCommand=ssh -q '
889 proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
891 proxycommand += '%s' % gwuser
894 proxycommand += '@%s -W %%h:%%p' % gw