15 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
17 if hasattr(os, "devnull"):
20 DEV_NULL = "/dev/null"
22 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
26 Special value that when given to rspawn in stderr causes stderr to
27 redirect to whatever stdout was redirected to.
32 Process is still running
42 Process hasn't started running yet (this should be very rare)
45 hostbyname_cache = dict()
47 def gethostbyname(host):
48 hostbyname = hostbyname_cache.get(host)
50 hostbyname = socket.gethostbyname(host)
51 hostbyname_cache[host] = hostbyname
54 OPENSSH_HAS_PERSIST = None
56 def openssh_has_persist():
57 """ The ssh_config options ControlMaster and ControlPersist allow to
58 reuse a same network connection for multiple ssh sessions. In this
59 way limitations on number of open ssh connections can be bypassed.
60 However, older versions of openSSH do not support this feature.
61 This function is used to determine if ssh connection persist features
64 global OPENSSH_HAS_PERSIST
65 if OPENSSH_HAS_PERSIST is None:
66 proc = subprocess.Popen(["ssh","-v"],
67 stdout = subprocess.PIPE,
68 stderr = subprocess.STDOUT,
69 stdin = open("/dev/null","r") )
70 out,err = proc.communicate()
73 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
74 OPENSSH_HAS_PERSIST = bool(vre.match(out))
75 return OPENSSH_HAS_PERSIST
77 def make_server_key_args(server_key, host, port):
78 """ Returns a reference to a temporary known_hosts file, to which
79 the server key has been added.
81 Make sure to hold onto the temp file reference until the process is
84 :param server_key: the server public key
87 :param host: the hostname
90 :param port: the ssh port
95 host = '%s:%s' % (host, str(port))
97 # Create a temporary server key file
98 tmp_known_hosts = tempfile.NamedTemporaryFile()
100 hostbyname = gethostbyname(host)
102 # Add the intended host key
103 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
105 # If we're not in strict mode, add user-configured keys
106 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
107 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
108 if os.access(user_hosts_path, os.R_OK):
109 f = open(user_hosts_path, "r")
110 tmp_known_hosts.write(f.read())
113 tmp_known_hosts.flush()
115 return tmp_known_hosts
117 def make_control_path(agent, forward_x11):
118 ctrl_path = "/tmp/nepi_ssh"
126 ctrl_path += "-%r@%h:%p"
131 """ Escapes strings so that they are safe to use as command-line
133 if SHELL_SAFE.match(s):
134 # safe string - no escaping needed
137 # unsafe string - escape
139 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
142 return "'$'\\x%02x''" % (ord(c),)
143 s = ''.join(map(escp,s))
146 def eintr_retry(func):
147 """Retries a function invocation when a EINTR occurs"""
149 @functools.wraps(func)
151 retry = kw.pop("_retry", False)
152 for i in xrange(0 if retry else 4):
154 return func(*p, **kw)
155 except (select.error, socket.error), args:
156 if args[0] == errno.EINTR:
161 if e.errno == errno.EINTR:
166 return func(*p, **kw)
169 def rexec(command, host, user,
180 err_on_timeout = True,
181 connect_timeout = 30,
183 forward_x11 = False):
185 Executes a remote command, returns ((stdout,stderr),process)
188 tmp_known_hosts = None
189 hostip = gethostbyname(host)
192 # Don't bother with localhost. Makes test easier
193 '-o', 'NoHostAuthenticationForLocalhost=yes',
194 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
195 '-o', 'ConnectionAttempts=3',
196 '-o', 'ServerAliveInterval=30',
197 '-o', 'TCPKeepAlive=yes',
198 '-l', user, hostip or host]
200 if persistent and openssh_has_persist():
202 '-o', 'ControlMaster=auto',
203 '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
204 '-o', 'ControlPersist=60' ])
210 args.append('-p%d' % port)
213 args.extend(('-i', identity))
223 # Create a temporary server key file
224 tmp_known_hosts = make_server_key_args(server_key, host, port)
225 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
229 for x in xrange(retry or 3):
230 # connects to the remote host and starts a remote connection
231 proc = subprocess.Popen(args,
232 stdout = subprocess.PIPE,
233 stdin = subprocess.PIPE,
234 stderr = subprocess.PIPE)
236 # attach tempfile object to the process, to make sure the file stays
237 # alive until the process is finished with it
238 proc._known_hosts = tmp_known_hosts
241 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
243 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
246 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
247 # SSH error, can safely retry
250 # Probably timed out or plain failed but can retry
253 except RuntimeError, e:
255 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % (
256 host, " ".join(args), out, err, e.args)
262 return ((out, err), proc)
264 def rcopy(source, dest,
271 Copies from/to remote sites.
273 Source and destination should have the user and host encoded
276 If source is a file object, a special mode will be used to
277 create the remote file with the same contents.
279 If dest is a file object, the remote file (source) will be
280 read and written into dest.
282 In these modes, recursive cannot be True.
284 Source can be a list of files to copy to a single destination,
285 in which case it is advised that the destination be a folder.
289 print "scp", source, dest
291 if isinstance(source, file) and source.tell() == 0:
293 elif hasattr(source, 'read'):
294 tmp = tempfile.NamedTemporaryFile()
296 buf = source.read(65536)
304 if isinstance(source, file) or isinstance(dest, file) \
305 or hasattr(source, 'read') or hasattr(dest, 'write'):
308 # Parse source/destination as <user>@<server>:<path>
309 if isinstance(dest, basestring) and ':' in dest:
310 remspec, path = dest.split(':',1)
311 elif isinstance(source, basestring) and ':' in source:
312 remspec, path = source.split(':',1)
314 raise ValueError, "Both endpoints cannot be local"
315 user,host = remspec.rsplit('@',1)
317 tmp_known_hosts = None
318 hostip = gethostbyname(host)
320 args = ['ssh', '-l', user, '-C',
321 # Don't bother with localhost. Makes test easier
322 '-o', 'NoHostAuthenticationForLocalhost=yes',
323 '-o', 'ConnectTimeout=60',
324 '-o', 'ConnectionAttempts=3',
325 '-o', 'ServerAliveInterval=30',
326 '-o', 'TCPKeepAlive=yes',
329 if openssh_has_persist():
331 '-o', 'ControlMaster=auto',
332 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
333 '-o', 'ControlPersist=60' ])
336 args.append('-P%d' % port)
339 args.extend(('-i', identity))
342 # Create a temporary server key file
343 tmp_known_hosts = make_server_key_args(server_key, host, port)
344 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
346 if isinstance(source, file) or hasattr(source, 'read'):
347 args.append('cat > %s' % (shell_escape(path),))
348 elif isinstance(dest, file) or hasattr(dest, 'write'):
349 args.append('cat %s' % (shell_escape(path),))
351 raise AssertionError, "Unreachable code reached! :-Q"
353 # connects to the remote host and starts a remote connection
354 if isinstance(source, file):
355 proc = subprocess.Popen(args,
356 stdout = open('/dev/null','w'),
357 stderr = subprocess.PIPE,
359 err = proc.stderr.read()
360 proc._known_hosts = tmp_known_hosts
361 eintr_retry(proc.wait)()
362 return ((None,err), proc)
363 elif isinstance(dest, file):
364 proc = subprocess.Popen(args,
365 stdout = open('/dev/null','w'),
366 stderr = subprocess.PIPE,
368 err = proc.stderr.read()
369 proc._known_hosts = tmp_known_hosts
370 eintr_retry(proc.wait)()
371 return ((None,err), proc)
372 elif hasattr(source, 'read'):
373 # file-like (but not file) source
374 proc = subprocess.Popen(args,
375 stdout = open('/dev/null','w'),
376 stderr = subprocess.PIPE,
377 stdin = subprocess.PIPE)
383 buf = source.read(4096)
388 rdrdy, wrdy, broken = select.select(
391 [proc.stderr,proc.stdin])
393 if proc.stderr in rdrdy:
394 # use os.read for fully unbuffered behavior
395 err.append(os.read(proc.stderr.fileno(), 4096))
397 if proc.stdin in wrdy:
398 proc.stdin.write(buf)
404 err.append(proc.stderr.read())
406 proc._known_hosts = tmp_known_hosts
407 eintr_retry(proc.wait)()
408 return ((None,''.join(err)), proc)
409 elif hasattr(dest, 'write'):
410 # file-like (but not file) dest
411 proc = subprocess.Popen(args,
412 stdout = subprocess.PIPE,
413 stderr = subprocess.PIPE,
414 stdin = open('/dev/null','w'))
419 rdrdy, wrdy, broken = select.select(
420 [proc.stderr, proc.stdout],
422 [proc.stderr, proc.stdout])
424 if proc.stderr in rdrdy:
425 # use os.read for fully unbuffered behavior
426 err.append(os.read(proc.stderr.fileno(), 4096))
428 if proc.stdout in rdrdy:
429 # use os.read for fully unbuffered behavior
430 buf = os.read(proc.stdout.fileno(), 4096)
439 err.append(proc.stderr.read())
441 proc._known_hosts = tmp_known_hosts
442 eintr_retry(proc.wait)()
443 return ((None,''.join(err)), proc)
445 raise AssertionError, "Unreachable code reached! :-Q"
447 # Parse destination as <user>@<server>:<path>
448 if isinstance(dest, basestring) and ':' in dest:
449 remspec, path = dest.split(':',1)
450 elif isinstance(source, basestring) and ':' in source:
451 remspec, path = source.split(':',1)
453 raise ValueError, "Both endpoints cannot be local"
454 user,host = remspec.rsplit('@',1)
457 tmp_known_hosts = None
459 args = ['scp', '-q', '-p', '-C',
460 # Don't bother with localhost. Makes test easier
461 '-o', 'NoHostAuthenticationForLocalhost=yes',
462 '-o', 'ConnectTimeout=60',
463 '-o', 'ConnectionAttempts=3',
464 '-o', 'ServerAliveInterval=30',
465 '-o', 'TCPKeepAlive=yes' ]
468 args.append('-P%d' % port)
474 args.extend(('-i', identity))
477 # Create a temporary server key file
478 tmp_known_hosts = make_server_key_args(server_key, host, port)
479 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
481 if isinstance(source,list):
484 if openssh_has_persist():
486 '-o', 'ControlMaster=auto',
487 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
493 # connects to the remote host and starts a remote connection
494 proc = subprocess.Popen(args,
495 stdout = subprocess.PIPE,
496 stdin = subprocess.PIPE,
497 stderr = subprocess.PIPE)
498 proc._known_hosts = tmp_known_hosts
500 (out, err) = proc.communicate()
501 eintr_retry(proc.wait)()
502 return ((out, err), proc)
504 def rspawn(command, pidfile,
505 stdout = '/dev/null',
519 Spawn a remote command such that it will continue working asynchronously.
522 command: the command to run - it should be a single line.
524 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
526 stdout: path of a file to redirect standard output to - must be a string.
527 Defaults to /dev/null
528 stderr: path of a file to redirect standard error to - string or the special STDOUT value
529 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
530 stdin: path of a file with input to be piped into the command's standard input
532 home: path of a folder to use as working directory - should exist, unless you specify create_home
534 create_home: if True, the home folder will be created first with mkdir -p
536 sudo: whether the command needs to be executed as root
538 host/port/user/agent/identity: see rexec
541 (stdout, stderr), process
543 Of the spawning process, which only captures errors at spawning time.
544 Usually only useful for diagnostics.
546 # Start process in a "daemonized" way, using nohup and heavy
547 # stdin/out redirection to avoid connection issues
551 stderr = ' ' + stderr
553 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
555 'pidfile' : shell_escape(pidfile),
561 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
562 'command' : shell_escape(daemon_command),
563 'sudo' : 'sudo -S' if sudo else '',
564 'pidfile' : shell_escape(pidfile),
565 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
566 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
569 (out,err),proc = rexec(
576 server_key = server_key,
581 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
583 return ((out, err), proc)
586 def rcheckpid(pidfile,
594 Check the pidfile of a process spawned with remote_spawn.
597 pidfile: the pidfile passed to remote_span
599 host/port/user/agent/identity: see rexec
603 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
604 or None if the pidfile isn't valid yet (maybe the process is still starting).
607 (out,err),proc = rexec(
608 "cat %(pidfile)s" % {
616 server_key = server_key
624 return map(int,out.strip().split(' ',1))
626 # Ignore, many ways to fail that don't matter that much
630 def rstatus(pid, ppid,
638 Check the status of a process spawned with remote_spawn.
641 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
643 host/port/user/agent/identity: see rexec
647 One of NOT_STARTED, RUNNING, FINISHED
650 (out,err),proc = rexec(
651 # Check only by pid. pid+ppid does not always work (especially with sudo)
652 " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
661 server_key = server_key
669 if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
672 status = (out.strip() == 'wait')
675 return RUNNING if status else FINISHED
688 Kill a process spawned with remote_spawn.
690 First tries a SIGTERM, and if the process does not end in 10 seconds,
694 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
696 sudo: whether the command was run with sudo - careful killing like this.
698 host/port/user/agent/identity: see rexec
702 Nothing, should have killed the process
705 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
707 SUBKILL="%(subkill)s" ;
708 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
709 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
710 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
712 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
715 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
716 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
720 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
721 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
722 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
726 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
728 (out,err),proc = rexec(
732 'sudo' : 'sudo -S' if sudo else '',
740 server_key = server_key
743 # wait, don't leave zombies around
746 return (out, err), proc
749 def _communicate(self, input, timeout=None, err_on_timeout=True):
752 stdout = None # Return
753 stderr = None # Return
757 if timeout is not None:
758 timelimit = time.time() + timeout
759 killtime = timelimit + 4
760 bailtime = timelimit + 4
763 # Flush stdio buffer. This might block, if the user has
764 # been writing to .stdin in an uncontrolled fashion.
767 write_set.append(self.stdin)
771 read_set.append(self.stdout)
774 read_set.append(self.stderr)
778 while read_set or write_set:
779 if timeout is not None:
780 curtime = time.time()
781 if timeout is None or curtime > timelimit:
782 if curtime > bailtime:
784 elif curtime > killtime:
785 signum = signal.SIGKILL
787 signum = signal.SIGTERM
789 os.kill(self.pid, signum)
792 select_timeout = timelimit - curtime + 0.1
796 if select_timeout > 1.0:
800 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
801 except select.error,e:
807 if not rlist and not wlist and not xlist and self.poll() is not None:
808 # timeout and process exited, say bye
811 if self.stdin in wlist:
812 # When select has indicated that the file is writable,
813 # we can write up to PIPE_BUF bytes without risk
814 # blocking. POSIX defines PIPE_BUF >= 512
815 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
816 input_offset += bytes_written
817 if input_offset >= len(input):
819 write_set.remove(self.stdin)
821 if self.stdout in rlist:
822 data = os.read(self.stdout.fileno(), 1024)
825 read_set.remove(self.stdout)
828 if self.stderr in rlist:
829 data = os.read(self.stderr.fileno(), 1024)
832 read_set.remove(self.stderr)
835 # All data exchanged. Translate lists into strings.
836 if stdout is not None:
837 stdout = ''.join(stdout)
838 if stderr is not None:
839 stderr = ''.join(stderr)
841 # Translate newlines, if requested. We cannot let the file
842 # object do the translation: It is based on stdio, which is
843 # impossible to combine with select (unless forcing no
845 if self.universal_newlines and hasattr(file, 'newlines'):
847 stdout = self._translate_newlines(stdout)
849 stderr = self._translate_newlines(stderr)
851 if killed and err_on_timeout:
852 errcode = self.poll()
853 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
859 return (stdout, stderr)