15 OPENSSH_HAS_PERSIST = None
16 CONTROL_PATH = "yyyyy_ssh_control_path"
18 if hasattr(os, "devnull"):
21 DEV_NULL = "/dev/null"
23 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
25 hostbyname_cache = dict()
29 Special value that when given to rspawn in stderr causes stderr to
30 redirect to whatever stdout was redirected to.
35 Process is still running
45 Process hasn't started running yet (this should be very rare)
48 def openssh_has_persist():
49 """ The ssh_config options ControlMaster and ControlPersist allow to
50 reuse a same network connection for multiple ssh sessions. In this
51 way limitations on number of open ssh connections can be bypassed.
52 However, older versions of openSSH do not support this feature.
53 This function is used to determine if ssh connection persist features
56 global OPENSSH_HAS_PERSIST
57 if OPENSSH_HAS_PERSIST is None:
58 proc = subprocess.Popen(["ssh","-v"],
59 stdout = subprocess.PIPE,
60 stderr = subprocess.STDOUT,
61 stdin = open("/dev/null","r") )
62 out,err = proc.communicate()
65 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
66 OPENSSH_HAS_PERSIST = bool(vre.match(out))
67 return OPENSSH_HAS_PERSIST
70 """ Escapes strings so that they are safe to use as command-line
72 if SHELL_SAFE.match(s):
73 # safe string - no escaping needed
76 # unsafe string - escape
78 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
81 return "'$'\\x%02x''" % (ord(c),)
82 s = ''.join(map(escp,s))
85 def eintr_retry(func):
86 """Retries a function invocation when a EINTR occurs"""
88 @functools.wraps(func)
90 retry = kw.pop("_retry", False)
91 for i in xrange(0 if retry else 4):
94 except (select.error, socket.error), args:
95 if args[0] == errno.EINTR:
100 if e.errno == errno.EINTR:
105 return func(*p, **kw)
108 def make_connkey(user, host, port):
109 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
110 if len(connkey) > 60:
111 connkey = hashlib.sha1(connkey).hexdigest()
114 def rexec(command, host, user,
119 identity_file = None,
124 err_on_timeout = True,
125 connect_timeout = 30,
128 Executes a remote command, returns ((stdout,stderr),process)
130 connkey = make_connkey(user, host, port)
132 # Don't bother with localhost. Makes test easier
133 '-o', 'NoHostAuthenticationForLocalhost=yes',
134 # XXX: Possible security issue
135 # Avoid interactive requests to accept new host keys
136 '-o', 'StrictHostKeyChecking=no',
137 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
138 '-o', 'ConnectionAttempts=3',
139 '-o', 'ServerAliveInterval=30',
140 '-o', 'TCPKeepAlive=yes',
143 if persistent and openssh_has_persist():
145 '-o', 'ControlMaster=auto',
146 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
147 '-o', 'ControlPersist=60' ])
151 args.append('-p%d' % port)
153 args.extend(('-i', identity_file))
160 command = "sudo " + command
165 for x in xrange(retry or 3):
166 # connects to the remote host and starts a remote connection
167 proc = subprocess.Popen(args,
168 stdout = subprocess.PIPE,
169 stdin = subprocess.PIPE,
170 stderr = subprocess.PIPE)
173 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
175 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
176 # SSH error, can safely retry
179 # Probably timed out or plain failed but can retry
182 except RuntimeError,e:
187 return ((out, err), proc)
189 def rcopy(source, dest, host, user,
193 identity_file = None):
195 Copies file from/to remote sites.
197 Source and destination should have the user and host encoded
200 If source is a file object, a special mode will be used to
201 create the remote file with the same contents.
203 If dest is a file object, the remote file (source) will be
204 read and written into dest.
206 In these modes, recursive cannot be True.
208 Source can be a list of files to copy to a single destination,
209 in which case it is advised that the destination be a folder.
212 if isinstance(source, file) and source.tell() == 0:
215 elif hasattr(source, 'read'):
216 tmp = tempfile.NamedTemporaryFile()
218 buf = source.read(65536)
226 if isinstance(source, file) or isinstance(dest, file) \
227 or hasattr(source, 'read') or hasattr(dest, 'write'):
230 connkey = make_connkey(user,host,port)
231 args = ['ssh', '-l', user, '-C',
232 # Don't bother with localhost. Makes test easier
233 '-o', 'NoHostAuthenticationForLocalhost=yes',
234 # XXX: Possible security issue
235 # Avoid interactive requests to accept new host keys
236 '-o', 'StrictHostKeyChecking=no',
237 '-o', 'ConnectTimeout=30',
238 '-o', 'ConnectionAttempts=3',
239 '-o', 'ServerAliveInterval=30',
240 '-o', 'TCPKeepAlive=yes',
242 if openssh_has_persist():
244 '-o', 'ControlMaster=auto',
245 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
246 '-o', 'ControlPersist=60' ])
248 args.append('-P%d' % port)
250 args.extend(('-i', identity_file))
252 if isinstance(source, file) or hasattr(source, 'read'):
253 args.append('cat > %s' % (shell_escape(dest),))
254 elif isinstance(dest, file) or hasattr(dest, 'write'):
255 args.append('cat %s' % (shell_escape(dest),))
257 raise AssertionError, "Unreachable code reached! :-Q"
259 # connects to the remote host and starts a remote connection
260 if isinstance(source, file):
261 proc = subprocess.Popen(args,
262 stdout = open('/dev/null','w'),
263 stderr = subprocess.PIPE,
265 err = proc.stderr.read()
266 eintr_retry(proc.wait)()
267 return ((None,err), proc)
268 elif isinstance(dest, file):
269 proc = subprocess.Popen(args,
270 stdout = open('/dev/null','w'),
271 stderr = subprocess.PIPE,
273 err = proc.stderr.read()
274 eintr_retry(proc.wait)()
275 return ((None,err), proc)
276 elif hasattr(source, 'read'):
277 # file-like (but not file) source
278 proc = subprocess.Popen(args,
279 stdout = open('/dev/null','w'),
280 stderr = subprocess.PIPE,
281 stdin = subprocess.PIPE)
287 buf = source.read(4096)
292 rdrdy, wrdy, broken = select.select(
295 [proc.stderr,proc.stdin])
297 if proc.stderr in rdrdy:
298 # use os.read for fully unbuffered behavior
299 err.append(os.read(proc.stderr.fileno(), 4096))
301 if proc.stdin in wrdy:
302 proc.stdin.write(buf)
308 err.append(proc.stderr.read())
310 eintr_retry(proc.wait)()
311 return ((None,''.join(err)), proc)
312 elif hasattr(dest, 'write'):
313 # file-like (but not file) dest
314 proc = subprocess.Popen(args,
315 stdout = subprocess.PIPE,
316 stderr = subprocess.PIPE,
317 stdin = open('/dev/null','w'))
322 rdrdy, wrdy, broken = select.select(
323 [proc.stderr, proc.stdout],
325 [proc.stderr, proc.stdout])
327 if proc.stderr in rdrdy:
328 # use os.read for fully unbuffered behavior
329 err.append(os.read(proc.stderr.fileno(), 4096))
331 if proc.stdout in rdrdy:
332 # use os.read for fully unbuffered behavior
333 buf = os.read(proc.stdout.fileno(), 4096)
342 err.append(proc.stderr.read())
344 eintr_retry(proc.wait)()
345 return ((None,''.join(err)), proc)
347 raise AssertionError, "Unreachable code reached! :-Q"
350 args = ['scp', '-q', '-p', '-C',
351 # Don't bother with localhost. Makes test easier
352 '-o', 'NoHostAuthenticationForLocalhost=yes',
353 # XXX: Possible security issue
354 # Avoid interactive requests to accept new host keys
355 '-o', 'StrictHostKeyChecking=no',
356 '-o', 'ConnectTimeout=30',
357 '-o', 'ConnectionAttempts=3',
358 '-o', 'ServerAliveInterval=30',
359 '-o', 'TCPKeepAlive=yes' ]
362 args.append('-P%d' % port)
366 args.extend(('-i', identity_file))
368 if isinstance(source,list):
371 if openssh_has_persist():
372 connkey = make_connkey(user,host,port)
374 '-o', 'ControlMaster=no',
375 '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
377 args.append("%s@%s:%s" %(user, host, dest))
379 # connects to the remote host and starts a remote connection
380 proc = subprocess.Popen(args,
381 stdout = subprocess.PIPE,
382 stdin = subprocess.PIPE,
383 stderr = subprocess.PIPE)
385 comm = proc.communicate()
386 eintr_retry(proc.wait)()
389 def rspawn(command, pidfile,
390 stdout = '/dev/null',
400 identity_file = None,
403 Spawn a remote command such that it will continue working asynchronously.
406 command: the command to run - it should be a single line.
408 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
410 stdout: path of a file to redirect standard output to - must be a string.
411 Defaults to /dev/null
412 stderr: path of a file to redirect standard error to - string or the special STDOUT value
413 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
414 stdin: path of a file with input to be piped into the command's standard input
416 home: path of a folder to use as working directory - should exist, unless you specify create_home
418 create_home: if True, the home folder will be created first with mkdir -p
420 sudo: whether the command needs to be executed as root
422 host/port/user/agent/identity_file: see rexec
425 (stdout, stderr), process
427 Of the spawning process, which only captures errors at spawning time.
428 Usually only useful for diagnostics.
430 # Start process in a "daemonized" way, using nohup and heavy
431 # stdin/out redirection to avoid connection issues
435 stderr = ' ' + stderr
437 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
439 'pidfile' : shell_escape(pidfile),
446 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
447 'command' : shell_escape(daemon_command),
449 'sudo' : 'sudo -S' if sudo else '',
451 'pidfile' : shell_escape(pidfile),
452 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
453 'create' : 'mkdir -p %s ; ' % (shell_escape,) if create_home else '',
456 (out,err), proc = rexec(
462 identity_file = identity_file,
467 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
469 return (out,err),proc
472 def rcheck_pid(pidfile,
477 identity_file = None):
479 Check the pidfile of a process spawned with remote_spawn.
482 pidfile: the pidfile passed to remote_span
484 host/port/user/agent/identity_file: see rexec
488 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
489 or None if the pidfile isn't valid yet (maybe the process is still starting).
492 (out,err),proc = rexec(
493 "cat %(pidfile)s" % {
500 identity_file = identity_file
508 return map(int,out.strip().split(' ',1))
510 # Ignore, many ways to fail that don't matter that much
514 def rstatus(pid, ppid,
519 identity_file = None):
521 Check the status of a process spawned with remote_spawn.
524 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
526 host/port/user/agent/identity_file: see rexec
530 One of NOT_STARTED, RUNNING, FINISHED
533 (out,err),proc = rexec(
534 "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
542 identity_file = identity_file
551 status = bool(int(out.strip()))
554 logging.warn("Error checking remote status:\n%s%s\n", out, err)
555 # Ignore, many ways to fail that don't matter that much
557 return RUNNING if status else FINISHED
567 identity_file = None,
570 Kill a process spawned with remote_spawn.
572 First tries a SIGTERM, and if the process does not end in 10 seconds,
576 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
578 sudo: whether the command was run with sudo - careful killing like this.
580 host/port/user/agent/identity_file: see rexec
584 Nothing, should have killed the process
588 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
592 SUBKILL="%(subkill)s" ;
593 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
594 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
595 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
597 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
600 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
601 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
605 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
606 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
607 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
611 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
613 (out,err),proc = rexec(
617 'sudo' : 'sudo -S' if sudo else '',
624 identity_file = identity_file
627 # wait, don't leave zombies around
631 def _communicate(self, input, timeout=None, err_on_timeout=True):
634 stdout = None # Return
635 stderr = None # Return
639 if timeout is not None:
640 timelimit = time.time() + timeout
641 killtime = timelimit + 4
642 bailtime = timelimit + 4
645 # Flush stdio buffer. This might block, if the user has
646 # been writing to .stdin in an uncontrolled fashion.
649 write_set.append(self.stdin)
653 read_set.append(self.stdout)
656 read_set.append(self.stderr)
660 while read_set or write_set:
661 if timeout is not None:
662 curtime = time.time()
663 if timeout is None or curtime > timelimit:
664 if curtime > bailtime:
666 elif curtime > killtime:
667 signum = signal.SIGKILL
669 signum = signal.SIGTERM
671 os.kill(self.pid, signum)
674 select_timeout = timelimit - curtime + 0.1
678 if select_timeout > 1.0:
682 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
683 except select.error,e:
689 if not rlist and not wlist and not xlist and self.poll() is not None:
690 # timeout and process exited, say bye
693 if self.stdin in wlist:
694 # When select has indicated that the file is writable,
695 # we can write up to PIPE_BUF bytes without risk
696 # blocking. POSIX defines PIPE_BUF >= 512
697 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
698 input_offset += bytes_written
699 if input_offset >= len(input):
701 write_set.remove(self.stdin)
703 if self.stdout in rlist:
704 data = os.read(self.stdout.fileno(), 1024)
707 read_set.remove(self.stdout)
710 if self.stderr in rlist:
711 data = os.read(self.stderr.fileno(), 1024)
714 read_set.remove(self.stderr)
717 # All data exchanged. Translate lists into strings.
718 if stdout is not None:
719 stdout = ''.join(stdout)
720 if stderr is not None:
721 stderr = ''.join(stderr)
723 # Translate newlines, if requested. We cannot let the file
724 # object do the translation: It is based on stdio, which is
725 # impossible to combine with select (unless forcing no
727 if self.universal_newlines and hasattr(file, 'newlines'):
729 stdout = self._translate_newlines(stdout)
731 stderr = self._translate_newlines(stderr)
733 if killed and err_on_timeout:
734 errcode = self.poll()
735 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
741 return (stdout, stderr)