15 OPENSSH_HAS_PERSIST = None
16 CONTROL_PATH = "yyyyy_ssh_control_path"
18 if hasattr(os, "devnull"):
21 DEV_NULL = "/dev/null"
23 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
25 hostbyname_cache = dict()
29 Special value that when given to rspawn in stderr causes stderr to
30 redirect to whatever stdout was redirected to.
35 Process is still running
45 Process hasn't started running yet (this should be very rare)
48 def openssh_has_persist():
49 """ The ssh_config options ControlMaster and ControlPersist allow to
50 reuse a same network connection for multiple ssh sessions. In this
51 way limitations on number of open ssh connections can be bypassed.
52 However, older versions of openSSH do not support this feature.
53 This function is used to determine if ssh connection persist features
56 global OPENSSH_HAS_PERSIST
57 if OPENSSH_HAS_PERSIST is None:
58 proc = subprocess.Popen(["ssh","-v"],
59 stdout = subprocess.PIPE,
60 stderr = subprocess.STDOUT,
61 stdin = open("/dev/null","r") )
62 out,err = proc.communicate()
65 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
66 OPENSSH_HAS_PERSIST = bool(vre.match(out))
67 return OPENSSH_HAS_PERSIST
70 """ Escapes strings so that they are safe to use as command-line
72 if SHELL_SAFE.match(s):
73 # safe string - no escaping needed
76 # unsafe string - escape
78 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
81 return "'$'\\x%02x''" % (ord(c),)
82 s = ''.join(map(escp,s))
85 def eintr_retry(func):
86 """Retries a function invocation when a EINTR occurs"""
88 @functools.wraps(func)
90 retry = kw.pop("_retry", False)
91 for i in xrange(0 if retry else 4):
94 except (select.error, socket.error), args:
95 if args[0] == errno.EINTR:
100 if e.errno == errno.EINTR:
105 return func(*p, **kw)
108 def make_connkey(user, host, port):
109 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
110 if len(connkey) > 60:
111 connkey = hashlib.sha1(connkey).hexdigest()
114 def rexec(command, host, user,
119 identity_file = None,
123 err_on_timeout = True,
124 connect_timeout = 30,
127 Executes a remote command, returns ((stdout,stderr),process)
129 connkey = make_connkey(user, host, port)
131 # Don't bother with localhost. Makes test easier
132 '-o', 'NoHostAuthenticationForLocalhost=yes',
133 # XXX: Possible security issue
134 # Avoid interactive requests to accept new host keys
135 '-o', 'StrictHostKeyChecking=no',
136 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
137 '-o', 'ConnectionAttempts=3',
138 '-o', 'ServerAliveInterval=30',
139 '-o', 'TCPKeepAlive=yes',
142 if persistent and openssh_has_persist():
144 '-o', 'ControlMaster=auto',
145 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
146 '-o', 'ControlPersist=60' ])
150 args.append('-p%d' % port)
152 args.extend(('-i', identity_file))
159 command = "sudo " + command
162 for x in xrange(retry or 3):
163 # connects to the remote host and starts a remote connection
164 proc = subprocess.Popen(args,
165 stdout = subprocess.PIPE,
166 stdin = subprocess.PIPE,
167 stderr = subprocess.PIPE)
170 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
172 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
173 # SSH error, can safely retry
176 # Probably timed out or plain failed but can retry
179 except RuntimeError,e:
184 return ((out, err), proc)
186 def rcopy(source, dest, host, user,
190 identity_file = None):
192 Copies file from/to remote sites.
194 Source and destination should have the user and host encoded
197 If source is a file object, a special mode will be used to
198 create the remote file with the same contents.
200 If dest is a file object, the remote file (source) will be
201 read and written into dest.
203 In these modes, recursive cannot be True.
205 Source can be a list of files to copy to a single destination,
206 in which case it is advised that the destination be a folder.
209 if isinstance(source, file) and source.tell() == 0:
212 elif hasattr(source, 'read'):
213 tmp = tempfile.NamedTemporaryFile()
215 buf = source.read(65536)
223 if isinstance(source, file) or isinstance(dest, file) \
224 or hasattr(source, 'read') or hasattr(dest, 'write'):
227 connkey = make_connkey(user,host,port)
228 args = ['ssh', '-l', user, '-C',
229 # Don't bother with localhost. Makes test easier
230 '-o', 'NoHostAuthenticationForLocalhost=yes',
231 # XXX: Possible security issue
232 # Avoid interactive requests to accept new host keys
233 '-o', 'StrictHostKeyChecking=no',
234 '-o', 'ConnectTimeout=30',
235 '-o', 'ConnectionAttempts=3',
236 '-o', 'ServerAliveInterval=30',
237 '-o', 'TCPKeepAlive=yes',
239 if openssh_has_persist():
241 '-o', 'ControlMaster=auto',
242 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
243 '-o', 'ControlPersist=60' ])
245 args.append('-P%d' % port)
247 args.extend(('-i', identity_file))
249 if isinstance(source, file) or hasattr(source, 'read'):
250 args.append('cat > %s' % dest)
251 elif isinstance(dest, file) or hasattr(dest, 'write'):
252 args.append('cat %s' % dest)
254 raise AssertionError, "Unreachable code reached! :-Q"
256 # connects to the remote host and starts a remote connection
257 if isinstance(source, file):
258 proc = subprocess.Popen(args,
259 stdout = open('/dev/null','w'),
260 stderr = subprocess.PIPE,
262 err = proc.stderr.read()
263 eintr_retry(proc.wait)()
264 return ((None,err), proc)
265 elif isinstance(dest, file):
266 proc = subprocess.Popen(args,
267 stdout = open('/dev/null','w'),
268 stderr = subprocess.PIPE,
270 err = proc.stderr.read()
271 eintr_retry(proc.wait)()
272 return ((None,err), proc)
273 elif hasattr(source, 'read'):
274 # file-like (but not file) source
275 proc = subprocess.Popen(args,
276 stdout = open('/dev/null','w'),
277 stderr = subprocess.PIPE,
278 stdin = subprocess.PIPE)
284 buf = source.read(4096)
289 rdrdy, wrdy, broken = select.select(
292 [proc.stderr,proc.stdin])
294 if proc.stderr in rdrdy:
295 # use os.read for fully unbuffered behavior
296 err.append(os.read(proc.stderr.fileno(), 4096))
298 if proc.stdin in wrdy:
299 proc.stdin.write(buf)
305 err.append(proc.stderr.read())
307 eintr_retry(proc.wait)()
308 return ((None,''.join(err)), proc)
309 elif hasattr(dest, 'write'):
310 # file-like (but not file) dest
311 proc = subprocess.Popen(args,
312 stdout = subprocess.PIPE,
313 stderr = subprocess.PIPE,
314 stdin = open('/dev/null','w'))
319 rdrdy, wrdy, broken = select.select(
320 [proc.stderr, proc.stdout],
322 [proc.stderr, proc.stdout])
324 if proc.stderr in rdrdy:
325 # use os.read for fully unbuffered behavior
326 err.append(os.read(proc.stderr.fileno(), 4096))
328 if proc.stdout in rdrdy:
329 # use os.read for fully unbuffered behavior
330 buf = os.read(proc.stdout.fileno(), 4096)
339 err.append(proc.stderr.read())
341 eintr_retry(proc.wait)()
342 return ((None,''.join(err)), proc)
344 raise AssertionError, "Unreachable code reached! :-Q"
347 args = ['scp', '-q', '-p', '-C',
348 # Don't bother with localhost. Makes test easier
349 '-o', 'NoHostAuthenticationForLocalhost=yes',
350 # XXX: Possible security issue
351 # Avoid interactive requests to accept new host keys
352 '-o', 'StrictHostKeyChecking=no',
353 '-o', 'ConnectTimeout=30',
354 '-o', 'ConnectionAttempts=3',
355 '-o', 'ServerAliveInterval=30',
356 '-o', 'TCPKeepAlive=yes' ]
359 args.append('-P%d' % port)
363 args.extend(('-i', identity_file))
365 if isinstance(source,list):
368 if openssh_has_persist():
369 connkey = make_connkey(user,host,port)
371 '-o', 'ControlMaster=no',
372 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
374 args.append("%s@%s:%s" %(user, host, dest))
376 # connects to the remote host and starts a remote connection
377 proc = subprocess.Popen(args,
378 stdout = subprocess.PIPE,
379 stdin = subprocess.PIPE,
380 stderr = subprocess.PIPE)
382 comm = proc.communicate()
383 eintr_retry(proc.wait)()
386 def rspawn(command, pidfile,
387 stdout = '/dev/null',
397 identity_file = None,
400 Spawn a remote command such that it will continue working asynchronously.
403 command: the command to run - it should be a single line.
405 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
407 stdout: path of a file to redirect standard output to - must be a string.
408 Defaults to /dev/null
409 stderr: path of a file to redirect standard error to - string or the special STDOUT value
410 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
411 stdin: path of a file with input to be piped into the command's standard input
413 home: path of a folder to use as working directory - should exist, unless you specify create_home
415 create_home: if True, the home folder will be created first with mkdir -p
417 sudo: whether the command needs to be executed as root
419 host/port/user/agent/identity_file: see rexec
422 (stdout, stderr), process
424 Of the spawning process, which only captures errors at spawning time.
425 Usually only useful for diagnostics.
427 # Start process in a "daemonized" way, using nohup and heavy
428 # stdin/out redirection to avoid connection issues
432 stderr = ' ' + stderr
434 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
443 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
444 'command' : daemon_command,
446 'sudo' : 'sudo -S' if sudo else '',
449 'gohome' : 'cd %s ; ' % home if home else '',
450 'create' : 'mkdir -p %s ; ' % home if create_home else '',
453 (out,err), proc = rexec(
459 identity_file = identity_file,
464 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
466 return (out,err),proc
469 def rcheck_pid(pidfile,
474 identity_file = None):
476 Check the pidfile of a process spawned with remote_spawn.
479 pidfile: the pidfile passed to remote_span
481 host/port/user/agent/identity_file: see rexec
485 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
486 or None if the pidfile isn't valid yet (maybe the process is still starting).
489 (out,err),proc = rexec(
490 "cat %(pidfile)s" % {
497 identity_file = identity_file
505 return map(int,out.strip().split(' ',1))
507 # Ignore, many ways to fail that don't matter that much
511 def rstatus(pid, ppid,
516 identity_file = None):
518 Check the status of a process spawned with remote_spawn.
521 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
523 host/port/user/agent/identity_file: see rexec
527 One of NOT_STARTED, RUNNING, FINISHED
530 (out,err),proc = rexec(
531 "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
539 identity_file = identity_file
548 status = bool(int(out.strip()))
551 logging.warn("Error checking remote status:\n%s%s\n", out, err)
552 # Ignore, many ways to fail that don't matter that much
554 return RUNNING if status else FINISHED
564 identity_file = None,
567 Kill a process spawned with remote_spawn.
569 First tries a SIGTERM, and if the process does not end in 10 seconds,
573 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
575 sudo: whether the command was run with sudo - careful killing like this.
577 host/port/user/agent/identity_file: see rexec
581 Nothing, should have killed the process
584 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
586 SUBKILL="%(subkill)s" ;
587 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
588 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
589 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
591 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
594 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
595 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
599 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
600 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
601 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
605 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
607 (out,err),proc = rexec(
611 'sudo' : 'sudo -S' if sudo else '',
618 identity_file = identity_file
621 # wait, don't leave zombies around
625 def _communicate(self, input, timeout=None, err_on_timeout=True):
628 stdout = None # Return
629 stderr = None # Return
633 if timeout is not None:
634 timelimit = time.time() + timeout
635 killtime = timelimit + 4
636 bailtime = timelimit + 4
639 # Flush stdio buffer. This might block, if the user has
640 # been writing to .stdin in an uncontrolled fashion.
643 write_set.append(self.stdin)
647 read_set.append(self.stdout)
650 read_set.append(self.stderr)
654 while read_set or write_set:
655 if timeout is not None:
656 curtime = time.time()
657 if timeout is None or curtime > timelimit:
658 if curtime > bailtime:
660 elif curtime > killtime:
661 signum = signal.SIGKILL
663 signum = signal.SIGTERM
665 os.kill(self.pid, signum)
668 select_timeout = timelimit - curtime + 0.1
672 if select_timeout > 1.0:
676 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
677 except select.error,e:
683 if not rlist and not wlist and not xlist and self.poll() is not None:
684 # timeout and process exited, say bye
687 if self.stdin in wlist:
688 # When select has indicated that the file is writable,
689 # we can write up to PIPE_BUF bytes without risk
690 # blocking. POSIX defines PIPE_BUF >= 512
691 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
692 input_offset += bytes_written
693 if input_offset >= len(input):
695 write_set.remove(self.stdin)
697 if self.stdout in rlist:
698 data = os.read(self.stdout.fileno(), 1024)
701 read_set.remove(self.stdout)
704 if self.stderr in rlist:
705 data = os.read(self.stderr.fileno(), 1024)
708 read_set.remove(self.stderr)
711 # All data exchanged. Translate lists into strings.
712 if stdout is not None:
713 stdout = ''.join(stdout)
714 if stderr is not None:
715 stderr = ''.join(stderr)
717 # Translate newlines, if requested. We cannot let the file
718 # object do the translation: It is based on stdio, which is
719 # impossible to combine with select (unless forcing no
721 if self.universal_newlines and hasattr(file, 'newlines'):
723 stdout = self._translate_newlines(stdout)
725 stderr = self._translate_newlines(stderr)
727 if killed and err_on_timeout:
728 errcode = self.poll()
729 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
735 return (stdout, stderr)