15 OPENSSH_HAS_PERSIST = None
16 CONTROL_PATH = "yyyyy_ssh_control_path"
18 if hasattr(os, "devnull"):
21 DEV_NULL = "/dev/null"
23 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
25 hostbyname_cache = dict()
29 Special value that when given to rspawn in stderr causes stderr to
30 redirect to whatever stdout was redirected to.
35 Process is still running
45 Process hasn't started running yet (this should be very rare)
48 def openssh_has_persist():
49 """ The ssh_config options ControlMaster and ControlPersist allow to
50 reuse a same network connection for multiple ssh sessions. In this
51 way limitations on number of open ssh connections can be bypassed.
52 However, older versions of openSSH do not support this feature.
53 This function is used to determine if ssh connection persist features
56 global OPENSSH_HAS_PERSIST
57 if OPENSSH_HAS_PERSIST is None:
58 proc = subprocess.Popen(["ssh","-v"],
59 stdout = subprocess.PIPE,
60 stderr = subprocess.STDOUT,
61 stdin = open("/dev/null","r") )
62 out,err = proc.communicate()
65 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
66 OPENSSH_HAS_PERSIST = bool(vre.match(out))
67 return OPENSSH_HAS_PERSIST
70 """ Escapes strings so that they are safe to use as command-line
72 if SHELL_SAFE.match(s):
73 # safe string - no escaping needed
76 # unsafe string - escape
78 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
81 return "'$'\\x%02x''" % (ord(c),)
82 s = ''.join(map(escp,s))
85 def eintr_retry(func):
86 """Retries a function invocation when a EINTR occurs"""
88 @functools.wraps(func)
90 retry = kw.pop("_retry", False)
91 for i in xrange(0 if retry else 4):
94 except (select.error, socket.error), args:
95 if args[0] == errno.EINTR:
100 if e.errno == errno.EINTR:
105 return func(*p, **kw)
108 def make_connkey(user, host, port):
109 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
110 if len(connkey) > 60:
111 connkey = hashlib.sha1(connkey).hexdigest()
114 def rexec(command, host, user,
119 identity_file = None,
124 err_on_timeout = True,
125 connect_timeout = 30,
128 Executes a remote command, returns ((stdout,stderr),process)
130 connkey = make_connkey(user, host, port)
132 # Don't bother with localhost. Makes test easier
133 '-o', 'NoHostAuthenticationForLocalhost=yes',
134 # XXX: Possible security issue
135 # Avoid interactive requests to accept new host keys
136 '-o', 'StrictHostKeyChecking=no',
137 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
138 '-o', 'ConnectionAttempts=3',
139 '-o', 'ServerAliveInterval=30',
140 '-o', 'TCPKeepAlive=yes',
143 if persistent and openssh_has_persist():
145 '-o', 'ControlMaster=auto',
146 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
147 '-o', 'ControlPersist=60' ])
151 args.append('-p%d' % port)
153 args.extend(('-i', identity_file))
160 command = "sudo " + command
163 for x in xrange(retry or 3):
164 # connects to the remote host and starts a remote connection
165 proc = subprocess.Popen(args,
166 stdout = subprocess.PIPE,
167 stdin = subprocess.PIPE,
168 stderr = subprocess.PIPE)
171 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
173 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
174 # SSH error, can safely retry
177 # Probably timed out or plain failed but can retry
180 except RuntimeError,e:
185 return ((out, err), proc)
187 def rcopy(source, dest, host, user,
191 identity_file = None):
193 Copies file from/to remote sites.
195 Source and destination should have the user and host encoded
198 If source is a file object, a special mode will be used to
199 create the remote file with the same contents.
201 If dest is a file object, the remote file (source) will be
202 read and written into dest.
204 In these modes, recursive cannot be True.
206 Source can be a list of files to copy to a single destination,
207 in which case it is advised that the destination be a folder.
210 if isinstance(source, file) and source.tell() == 0:
213 elif hasattr(source, 'read'):
214 tmp = tempfile.NamedTemporaryFile()
216 buf = source.read(65536)
224 if isinstance(source, file) or isinstance(dest, file) \
225 or hasattr(source, 'read') or hasattr(dest, 'write'):
228 connkey = make_connkey(user,host,port)
229 args = ['ssh', '-l', user, '-C',
230 # Don't bother with localhost. Makes test easier
231 '-o', 'NoHostAuthenticationForLocalhost=yes',
232 # XXX: Possible security issue
233 # Avoid interactive requests to accept new host keys
234 '-o', 'StrictHostKeyChecking=no',
235 '-o', 'ConnectTimeout=30',
236 '-o', 'ConnectionAttempts=3',
237 '-o', 'ServerAliveInterval=30',
238 '-o', 'TCPKeepAlive=yes',
240 if openssh_has_persist():
242 '-o', 'ControlMaster=auto',
243 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
244 '-o', 'ControlPersist=60' ])
246 args.append('-P%d' % port)
248 args.extend(('-i', identity_file))
250 if isinstance(source, file) or hasattr(source, 'read'):
251 args.append('cat > %s' % dest)
252 elif isinstance(dest, file) or hasattr(dest, 'write'):
253 args.append('cat %s' % dest)
255 raise AssertionError, "Unreachable code reached! :-Q"
257 # connects to the remote host and starts a remote connection
258 if isinstance(source, file):
259 proc = subprocess.Popen(args,
260 stdout = open('/dev/null','w'),
261 stderr = subprocess.PIPE,
263 err = proc.stderr.read()
264 eintr_retry(proc.wait)()
265 return ((None,err), proc)
266 elif isinstance(dest, file):
267 proc = subprocess.Popen(args,
268 stdout = open('/dev/null','w'),
269 stderr = subprocess.PIPE,
271 err = proc.stderr.read()
272 eintr_retry(proc.wait)()
273 return ((None,err), proc)
274 elif hasattr(source, 'read'):
275 # file-like (but not file) source
276 proc = subprocess.Popen(args,
277 stdout = open('/dev/null','w'),
278 stderr = subprocess.PIPE,
279 stdin = subprocess.PIPE)
285 buf = source.read(4096)
290 rdrdy, wrdy, broken = select.select(
293 [proc.stderr,proc.stdin])
295 if proc.stderr in rdrdy:
296 # use os.read for fully unbuffered behavior
297 err.append(os.read(proc.stderr.fileno(), 4096))
299 if proc.stdin in wrdy:
300 proc.stdin.write(buf)
306 err.append(proc.stderr.read())
308 eintr_retry(proc.wait)()
309 return ((None,''.join(err)), proc)
310 elif hasattr(dest, 'write'):
311 # file-like (but not file) dest
312 proc = subprocess.Popen(args,
313 stdout = subprocess.PIPE,
314 stderr = subprocess.PIPE,
315 stdin = open('/dev/null','w'))
320 rdrdy, wrdy, broken = select.select(
321 [proc.stderr, proc.stdout],
323 [proc.stderr, proc.stdout])
325 if proc.stderr in rdrdy:
326 # use os.read for fully unbuffered behavior
327 err.append(os.read(proc.stderr.fileno(), 4096))
329 if proc.stdout in rdrdy:
330 # use os.read for fully unbuffered behavior
331 buf = os.read(proc.stdout.fileno(), 4096)
340 err.append(proc.stderr.read())
342 eintr_retry(proc.wait)()
343 return ((None,''.join(err)), proc)
345 raise AssertionError, "Unreachable code reached! :-Q"
348 args = ['scp', '-q', '-p', '-C',
349 # Don't bother with localhost. Makes test easier
350 '-o', 'NoHostAuthenticationForLocalhost=yes',
351 # XXX: Possible security issue
352 # Avoid interactive requests to accept new host keys
353 '-o', 'StrictHostKeyChecking=no',
354 '-o', 'ConnectTimeout=30',
355 '-o', 'ConnectionAttempts=3',
356 '-o', 'ServerAliveInterval=30',
357 '-o', 'TCPKeepAlive=yes' ]
360 args.append('-P%d' % port)
364 args.extend(('-i', identity_file))
366 if isinstance(source,list):
369 if openssh_has_persist():
370 connkey = make_connkey(user,host,port)
372 '-o', 'ControlMaster=no',
373 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
375 args.append("%s@%s:%s" %(user, host, dest))
377 # connects to the remote host and starts a remote connection
378 proc = subprocess.Popen(args,
379 stdout = subprocess.PIPE,
380 stdin = subprocess.PIPE,
381 stderr = subprocess.PIPE)
383 comm = proc.communicate()
384 eintr_retry(proc.wait)()
387 def rspawn(command, pidfile,
388 stdout = '/dev/null',
398 identity_file = None,
401 Spawn a remote command such that it will continue working asynchronously.
404 command: the command to run - it should be a single line.
406 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
408 stdout: path of a file to redirect standard output to - must be a string.
409 Defaults to /dev/null
410 stderr: path of a file to redirect standard error to - string or the special STDOUT value
411 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
412 stdin: path of a file with input to be piped into the command's standard input
414 home: path of a folder to use as working directory - should exist, unless you specify create_home
416 create_home: if True, the home folder will be created first with mkdir -p
418 sudo: whether the command needs to be executed as root
420 host/port/user/agent/identity_file: see rexec
423 (stdout, stderr), process
425 Of the spawning process, which only captures errors at spawning time.
426 Usually only useful for diagnostics.
428 # Start process in a "daemonized" way, using nohup and heavy
429 # stdin/out redirection to avoid connection issues
433 stderr = ' ' + stderr
435 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
444 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
445 'command' : daemon_command,
447 'sudo' : 'sudo -S' if sudo else '',
450 'gohome' : 'cd %s ; ' % home if home else '',
451 'create' : 'mkdir -p %s ; ' % home if create_home else '',
454 (out,err), proc = rexec(
460 identity_file = identity_file,
465 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
467 return (out,err),proc
470 def rcheck_pid(pidfile,
475 identity_file = None):
477 Check the pidfile of a process spawned with remote_spawn.
480 pidfile: the pidfile passed to remote_span
482 host/port/user/agent/identity_file: see rexec
486 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
487 or None if the pidfile isn't valid yet (maybe the process is still starting).
490 (out,err),proc = rexec(
491 "cat %(pidfile)s" % {
498 identity_file = identity_file
506 return map(int,out.strip().split(' ',1))
508 # Ignore, many ways to fail that don't matter that much
512 def rstatus(pid, ppid,
517 identity_file = None):
519 Check the status of a process spawned with remote_spawn.
522 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
524 host/port/user/agent/identity_file: see rexec
528 One of NOT_STARTED, RUNNING, FINISHED
531 (out,err),proc = rexec(
532 "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
540 identity_file = identity_file
549 status = bool(int(out.strip()))
552 logging.warn("Error checking remote status:\n%s%s\n", out, err)
553 # Ignore, many ways to fail that don't matter that much
555 return RUNNING if status else FINISHED
565 identity_file = None,
568 Kill a process spawned with remote_spawn.
570 First tries a SIGTERM, and if the process does not end in 10 seconds,
574 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
576 sudo: whether the command was run with sudo - careful killing like this.
578 host/port/user/agent/identity_file: see rexec
582 Nothing, should have killed the process
586 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
590 SUBKILL="%(subkill)s" ;
591 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
592 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
593 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
595 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
598 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
599 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
603 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
604 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
605 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
609 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
611 (out,err),proc = rexec(
615 'sudo' : 'sudo -S' if sudo else '',
622 identity_file = identity_file
625 # wait, don't leave zombies around
629 def _communicate(self, input, timeout=None, err_on_timeout=True):
632 stdout = None # Return
633 stderr = None # Return
637 if timeout is not None:
638 timelimit = time.time() + timeout
639 killtime = timelimit + 4
640 bailtime = timelimit + 4
643 # Flush stdio buffer. This might block, if the user has
644 # been writing to .stdin in an uncontrolled fashion.
647 write_set.append(self.stdin)
651 read_set.append(self.stdout)
654 read_set.append(self.stderr)
658 while read_set or write_set:
659 if timeout is not None:
660 curtime = time.time()
661 if timeout is None or curtime > timelimit:
662 if curtime > bailtime:
664 elif curtime > killtime:
665 signum = signal.SIGKILL
667 signum = signal.SIGTERM
669 os.kill(self.pid, signum)
672 select_timeout = timelimit - curtime + 0.1
676 if select_timeout > 1.0:
680 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
681 except select.error,e:
687 if not rlist and not wlist and not xlist and self.poll() is not None:
688 # timeout and process exited, say bye
691 if self.stdin in wlist:
692 # When select has indicated that the file is writable,
693 # we can write up to PIPE_BUF bytes without risk
694 # blocking. POSIX defines PIPE_BUF >= 512
695 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
696 input_offset += bytes_written
697 if input_offset >= len(input):
699 write_set.remove(self.stdin)
701 if self.stdout in rlist:
702 data = os.read(self.stdout.fileno(), 1024)
705 read_set.remove(self.stdout)
708 if self.stderr in rlist:
709 data = os.read(self.stderr.fileno(), 1024)
712 read_set.remove(self.stderr)
715 # All data exchanged. Translate lists into strings.
716 if stdout is not None:
717 stdout = ''.join(stdout)
718 if stderr is not None:
719 stderr = ''.join(stderr)
721 # Translate newlines, if requested. We cannot let the file
722 # object do the translation: It is based on stdio, which is
723 # impossible to combine with select (unless forcing no
725 if self.universal_newlines and hasattr(file, 'newlines'):
727 stdout = self._translate_newlines(stdout)
729 stderr = self._translate_newlines(stderr)
731 if killed and err_on_timeout:
732 errcode = self.poll()
733 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
739 return (stdout, stderr)