2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 ## TODO: This code needs reviewing !!!
38 logger = logging.getLogger("sshfuncs")
40 def log(msg, level, out = None, err = None):
42 msg += " - OUT: %s " % out
45 msg += " - ERROR: %s " % err
47 logger.log(level, msg)
50 if hasattr(os, "devnull"):
53 DEV_NULL = "/dev/null"
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
59 Special value that when given to rspawn in stderr causes stderr to
60 redirect to whatever stdout was redirected to.
65 Codes for status of remote spawned process
67 # Process is still running
73 # Process hasn't started running yet (this should be very rare)
76 hostbyname_cache = dict()
77 hostbyname_cache_lock = threading.Lock()
79 def gethostbyname(host):
80 global hostbyname_cache
81 global hostbyname_cache_lock
83 hostbyname = hostbyname_cache.get(host)
85 with hostbyname_cache_lock:
86 hostbyname = socket.gethostbyname(host)
87 hostbyname_cache[host] = hostbyname
89 msg = " Added hostbyname %s - %s " % (host, hostbyname)
90 log(msg, logging.DEBUG)
94 OPENSSH_HAS_PERSIST = None
96 def openssh_has_persist():
97 """ The ssh_config options ControlMaster and ControlPersist allow to
98 reuse a same network connection for multiple ssh sessions. In this
99 way limitations on number of open ssh connections can be bypassed.
100 However, older versions of openSSH do not support this feature.
101 This function is used to determine if ssh connection persist features
104 global OPENSSH_HAS_PERSIST
105 if OPENSSH_HAS_PERSIST is None:
106 proc = subprocess.Popen(["ssh","-v"],
107 stdout = subprocess.PIPE,
108 stderr = subprocess.STDOUT,
109 stdin = open("/dev/null","r") )
110 out,err = proc.communicate()
113 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
114 OPENSSH_HAS_PERSIST = bool(vre.match(out))
115 return OPENSSH_HAS_PERSIST
117 def make_server_key_args(server_key, host, port):
118 """ Returns a reference to a temporary known_hosts file, to which
119 the server key has been added.
121 Make sure to hold onto the temp file reference until the process is
124 :param server_key: the server public key
125 :type server_key: str
127 :param host: the hostname
130 :param port: the ssh port
135 host = '%s:%s' % (host, str(port))
137 # Create a temporary server key file
138 tmp_known_hosts = tempfile.NamedTemporaryFile()
140 hostbyname = gethostbyname(host)
142 # Add the intended host key
143 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
145 # If we're not in strict mode, add user-configured keys
146 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
147 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
148 if os.access(user_hosts_path, os.R_OK):
149 f = open(user_hosts_path, "r")
150 tmp_known_hosts.write(f.read())
153 tmp_known_hosts.flush()
155 return tmp_known_hosts
157 def make_control_path(agent, forward_x11):
158 ctrl_path = "/tmp/nepi_ssh"
166 ctrl_path += "-%r@%h:%p"
171 """ Escapes strings so that they are safe to use as command-line
173 if SHELL_SAFE.match(s):
174 # safe string - no escaping needed
177 # unsafe string - escape
179 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
182 return "'$'\\x%02x''" % (ord(c),)
183 s = ''.join(map(escp,s))
186 def eintr_retry(func):
187 """Retries a function invocation when a EINTR occurs"""
189 @functools.wraps(func)
191 retry = kw.pop("_retry", False)
192 for i in xrange(0 if retry else 4):
194 return func(*p, **kw)
195 except (select.error, socket.error), args:
196 if args[0] == errno.EINTR:
201 if e.errno == errno.EINTR:
206 return func(*p, **kw)
209 def socat(local_socket_name, remote_socket_name,
218 connect_timeout = 30,
220 strict_host_checking = True):
222 Executes a remote command, returns ((stdout,stderr),process)
225 tmp_known_hosts = None
226 hostip = gethostbyname(host)
230 args.append("UNIX-LISTEN:%s,unlink-early,fork" % local_socket_name)
232 ssh_args = ['ssh', '-C',
233 # Don't bother with localhost. Makes test easier
234 '-o', 'NoHostAuthenticationForLocalhost=yes',
235 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
236 '-o', 'ConnectionAttempts=3',
237 '-o', 'ServerAliveInterval=30',
238 '-o', 'TCPKeepAlive=yes',
239 '-l', user, hostip or host]
241 if not strict_host_checking:
242 # Do not check for Host key. Unsafe.
243 ssh_args.extend(['-o', 'StrictHostKeyChecking=no'])
246 ssh_args.append('-A')
249 ssh_args.append('-p%d' % port)
252 ssh_args.extend(('-i', identity))
255 ssh_args.append('-t')
256 ssh_args.append('-t')
259 # Create a temporary server key file
260 tmp_known_hosts = make_server_key_args(server_key, host, port)
261 ssh_args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
263 ssh_cmd = " ".join(ssh_args)
265 exec_cmd = "EXEC:'%s socat STDIO UNIX-CONNECT\:%s'" % (ssh_cmd,
268 args.append(exec_cmd)
270 log_msg = " socat - host %s - command %s " % (host, " ".join(args))
272 return _retry_rexec(args, log_msg,
278 tmp_known_hosts = tmp_known_hosts,
281 def rexec(command, host, user,
289 connect_timeout = 30,
294 strict_host_checking = True):
296 Executes a remote command, returns ((stdout,stderr),process)
299 tmp_known_hosts = None
300 hostip = gethostbyname(host)
303 # Don't bother with localhost. Makes test easier
304 '-o', 'NoHostAuthenticationForLocalhost=yes',
305 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
306 '-o', 'ConnectionAttempts=3',
307 '-o', 'ServerAliveInterval=30',
308 '-o', 'TCPKeepAlive=yes',
309 '-l', user, hostip or host]
311 if persistent and openssh_has_persist():
313 '-o', 'ControlMaster=auto',
314 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
315 '-o', 'ControlPersist=60' ])
317 if not strict_host_checking:
318 # Do not check for Host key. Unsafe.
319 args.extend(['-o', 'StrictHostKeyChecking=no'])
325 args.append('-p%d' % port)
328 args.extend(('-i', identity))
338 # Create a temporary server key file
339 tmp_known_hosts = make_server_key_args(server_key, host, port)
340 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
343 command = "sudo " + command
347 log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
349 stdout = stderr = stdin = subprocess.PIPE
351 stdout = stderr = stdin = None
353 return _retry_rexec(args, log_msg,
359 tmp_known_hosts = tmp_known_hosts,
362 def rcopy(source, dest,
369 strict_host_checking = True):
371 Copies from/to remote sites.
373 Source and destination should have the user and host encoded
376 Source can be a list of files to copy to a single destination,
377 in which case it is advised that the destination be a folder.
380 # Parse destination as <user>@<server>:<path>
381 if isinstance(dest, basestring) and ':' in dest:
382 remspec, path = dest.split(':',1)
383 elif isinstance(source, basestring) and ':' in source:
384 remspec, path = source.split(':',1)
386 raise ValueError, "Both endpoints cannot be local"
387 user,host = remspec.rsplit('@',1)
390 tmp_known_hosts = None
392 args = ['scp', '-q', '-p', '-C',
393 # Speed up transfer using blowfish cypher specification which is
394 # faster than the default one (3des)
396 # Don't bother with localhost. Makes test easier
397 '-o', 'NoHostAuthenticationForLocalhost=yes',
398 '-o', 'ConnectTimeout=60',
399 '-o', 'ConnectionAttempts=3',
400 '-o', 'ServerAliveInterval=30',
401 '-o', 'TCPKeepAlive=yes' ]
404 args.append('-P%d' % port)
410 args.extend(('-i', identity))
413 # Create a temporary server key file
414 tmp_known_hosts = make_server_key_args(server_key, host, port)
415 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
417 if not strict_host_checking:
418 # Do not check for Host key. Unsafe.
419 args.extend(['-o', 'StrictHostKeyChecking=no'])
421 if isinstance(source, list):
424 if openssh_has_persist():
426 '-o', 'ControlMaster=auto',
427 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
433 log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
435 return _retry_rexec(args, log_msg, env = None, retry = retry,
436 tmp_known_hosts = tmp_known_hosts,
439 def rspawn(command, pidfile,
440 stdout = '/dev/null',
454 Spawn a remote command such that it will continue working asynchronously in
457 :param command: The command to run, it should be a single line.
460 :param pidfile: Path to a file where to store the pid and ppid of the
464 :param stdout: Path to file to redirect standard output.
465 The default value is /dev/null
468 :param stderr: Path to file to redirect standard error.
469 If the special STDOUT value is used, stderr will
470 be redirected to the same file as stdout
473 :param stdin: Path to a file with input to be piped into the command's standard input
476 :param home: Path to working directory folder.
477 It is assumed to exist unless the create_home flag is set.
480 :param create_home: Flag to force creation of the home folder before
482 :type create_home: bool
484 :param sudo: Flag forcing execution with sudo user
489 (stdout, stderr), process
491 Of the spawning process, which only captures errors at spawning time.
492 Usually only useful for diagnostics.
494 # Start process in a "daemonized" way, using nohup and heavy
495 # stdin/out redirection to avoid connection issues
499 stderr = ' ' + stderr
501 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
503 'pidfile' : shell_escape(pidfile),
509 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
510 'command' : shell_escape(daemon_command),
511 'sudo' : 'sudo -S' if sudo else '',
512 'pidfile' : shell_escape(pidfile),
513 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
514 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
517 (out,err),proc = rexec(
524 server_key = server_key,
529 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
531 return ((out, err), proc)
542 Returns the pid and ppid of a process from a remote file where the
543 information was stored.
545 :param home: Path to directory where the pidfile is located
548 :param pidfile: Name of file containing the pid information
553 A (pid, ppid) tuple useful for calling rstatus and rkill,
554 or None if the pidfile isn't valid yet (can happen when process is staring up)
557 (out,err),proc = rexec(
558 "cat %(pidfile)s" % {
566 server_key = server_key
574 return map(int,out.strip().split(' ',1))
576 # Ignore, many ways to fail that don't matter that much
580 def rstatus(pid, ppid,
588 Returns a code representing the the status of a remote process
590 :param pid: Process id of the process
593 :param ppid: Parent process id of process
596 :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
599 (out,err),proc = rexec(
600 # Check only by pid. pid+ppid does not always work (especially with sudo)
601 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
610 server_key = server_key
614 return ProcStatus.NOT_STARTED
618 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
621 status = (out.strip() == 'wait')
623 return ProcStatus.NOT_STARTED
624 return ProcStatus.RUNNING if status else ProcStatus.FINISHED
637 Sends a kill signal to a remote process.
639 First tries a SIGTERM, and if the process does not end in 10 seconds,
642 :param pid: Process id of process to be killed
645 :param ppid: Parent process id of process to be killed
648 :param sudo: Flag indicating if sudo should be used to kill the process
652 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
654 SUBKILL="%(subkill)s" ;
655 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
656 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
657 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
659 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
662 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
663 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
667 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
668 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
669 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
673 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
675 (out,err),proc = rexec(
679 'sudo' : 'sudo -S' if sudo else '',
687 server_key = server_key
690 # wait, don't leave zombies around
693 return (out, err), proc
695 def _retry_rexec(args,
697 stdout = subprocess.PIPE,
698 stdin = subprocess.PIPE,
699 stderr = subprocess.PIPE,
702 tmp_known_hosts = None,
705 for x in xrange(retry):
706 # connects to the remote host and starts a remote connection
707 proc = subprocess.Popen(args,
713 # attach tempfile object to the process, to make sure the file stays
714 # alive until the process is finished with it
715 proc._known_hosts = tmp_known_hosts
717 # The argument block == False forces to rexec to return immediately,
722 (out, err) = proc.communicate()
724 out = proc.stdout.read()
725 if proc.poll() and stderr:
726 err = proc.stderr.read()
728 log(log_msg, logging.DEBUG, out, err)
733 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
734 # SSH error, can safely retry
737 # Probably timed out or plain failed but can retry
742 msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
743 t, x, " ".join(args))
744 log(msg, logging.DEBUG)
749 except RuntimeError, e:
750 msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
751 log(msg, logging.DEBUG, out, err)
757 return ((out, err), proc)