15 OPENSSH_HAS_PERSIST = None
16 CONTROL_PATH = "yyy_ssh_ctrl_path"
18 if hasattr(os, "devnull"):
21 DEV_NULL = "/dev/null"
23 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
25 hostbyname_cache = dict()
29 Special value that when given to rspawn in stderr causes stderr to
30 redirect to whatever stdout was redirected to.
35 Process is still running
45 Process hasn't started running yet (this should be very rare)
48 def openssh_has_persist():
49 """ The ssh_config options ControlMaster and ControlPersist allow to
50 reuse a same network connection for multiple ssh sessions. In this
51 way limitations on number of open ssh connections can be bypassed.
52 However, older versions of openSSH do not support this feature.
53 This function is used to determine if ssh connection persist features
56 global OPENSSH_HAS_PERSIST
57 if OPENSSH_HAS_PERSIST is None:
58 proc = subprocess.Popen(["ssh","-v"],
59 stdout = subprocess.PIPE,
60 stderr = subprocess.STDOUT,
61 stdin = open("/dev/null","r") )
62 out,err = proc.communicate()
65 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
66 OPENSSH_HAS_PERSIST = bool(vre.match(out))
67 return OPENSSH_HAS_PERSIST
70 """ Escapes strings so that they are safe to use as command-line
72 if SHELL_SAFE.match(s):
73 # safe string - no escaping needed
76 # unsafe string - escape
78 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
81 return "'$'\\x%02x''" % (ord(c),)
82 s = ''.join(map(escp,s))
85 def eintr_retry(func):
86 """Retries a function invocation when a EINTR occurs"""
88 @functools.wraps(func)
90 retry = kw.pop("_retry", False)
91 for i in xrange(0 if retry else 4):
94 except (select.error, socket.error), args:
95 if args[0] == errno.EINTR:
100 if e.errno == errno.EINTR:
105 return func(*p, **kw)
108 def make_connkey(user, host, port):
109 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
110 if len(connkey) > 60:
111 connkey = hashlib.sha1(connkey).hexdigest()
114 def make_control_path(user, host, port):
115 connkey = make_connkey(user, host, port)
116 return '/tmp/%s_%s' % ( CONTROL_PATH, connkey, )
118 def rexec(command, host, user,
123 identity_file = None,
129 err_on_timeout = True,
130 connect_timeout = 30,
133 Executes a remote command, returns ((stdout,stderr),process)
136 # Don't bother with localhost. Makes test easier
137 '-o', 'NoHostAuthenticationForLocalhost=yes',
138 # XXX: Possible security issue
139 # Avoid interactive requests to accept new host keys
140 '-o', 'StrictHostKeyChecking=no',
141 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
142 '-o', 'ConnectionAttempts=3',
143 '-o', 'ServerAliveInterval=30',
144 '-o', 'TCPKeepAlive=yes',
147 if persistent and openssh_has_persist():
148 control_path = make_control_path(user, host, port)
150 '-o', 'ControlMaster=auto',
151 '-o', 'ControlPath=%s' % control_path,
152 '-o', 'ControlPersist=60' ])
156 args.append('-p%d' % port)
158 args.extend(('-i', identity_file))
168 for envkey, envval in env.iteritems():
169 export += '%s=%s ' % (envkey, envval)
170 command = export + command
173 command = "sudo " + command
177 for x in xrange(retry or 3):
178 # connects to the remote host and starts a remote connection
179 proc = subprocess.Popen(args,
180 stdout = subprocess.PIPE,
181 stdin = subprocess.PIPE,
182 stderr = subprocess.PIPE)
185 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
187 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
188 # SSH error, can safely retry
191 # Probably timed out or plain failed but can retry
194 except RuntimeError,e:
199 return ((out, err), proc)
201 def rcopy(source, dest,
205 identity_file = None):
207 Copies file from/to remote sites.
209 Source and destination should have the user and host encoded
212 If source is a file object, a special mode will be used to
213 create the remote file with the same contents.
215 If dest is a file object, the remote file (source) will be
216 read and written into dest.
218 In these modes, recursive cannot be True.
220 Source can be a list of files to copy to a single destination,
221 in which case it is advised that the destination be a folder.
224 if isinstance(source, file) and source.tell() == 0:
226 elif hasattr(source, 'read'):
227 tmp = tempfile.NamedTemporaryFile()
229 buf = source.read(65536)
237 if isinstance(source, file) or isinstance(dest, file) \
238 or hasattr(source, 'read') or hasattr(dest, 'write'):
241 # Parse source/destination as <user>@<server>:<path>
242 if isinstance(dest, basestring) and ':' in dest:
243 remspec, path = dest.split(':',1)
244 elif isinstance(source, basestring) and ':' in source:
245 remspec, path = source.split(':',1)
247 raise ValueError, "Both endpoints cannot be local"
248 user,host = remspec.rsplit('@',1)
249 tmp_known_hosts = None
251 args = ['ssh', '-l', user, '-C',
252 # Don't bother with localhost. Makes test easier
253 '-o', 'NoHostAuthenticationForLocalhost=yes',
254 # XXX: Possible security issue
255 # Avoid interactive requests to accept new host keys
256 '-o', 'StrictHostKeyChecking=no',
257 '-o', 'ConnectTimeout=30',
258 '-o', 'ConnectionAttempts=3',
259 '-o', 'ServerAliveInterval=30',
260 '-o', 'TCPKeepAlive=yes',
263 if openssh_has_persist():
264 control_path = make_control_path(user, host, port)
266 '-o', 'ControlMaster=auto',
267 '-o', 'ControlPath=%s' % control_path,
268 '-o', 'ControlPersist=60' ])
270 args.append('-P%d' % port)
272 args.extend(('-i', identity_file))
274 if isinstance(source, file) or hasattr(source, 'read'):
275 args.append('cat > %s' % (shell_escape(path),))
276 elif isinstance(dest, file) or hasattr(dest, 'write'):
277 args.append('cat %s' % (shell_escape(path),))
279 raise AssertionError, "Unreachable code reached! :-Q"
281 # connects to the remote host and starts a remote connection
282 if isinstance(source, file):
283 proc = subprocess.Popen(args,
284 stdout = open('/dev/null','w'),
285 stderr = subprocess.PIPE,
287 err = proc.stderr.read()
288 eintr_retry(proc.wait)()
289 return ((None,err), proc)
290 elif isinstance(dest, file):
291 proc = subprocess.Popen(args,
292 stdout = open('/dev/null','w'),
293 stderr = subprocess.PIPE,
295 err = proc.stderr.read()
296 eintr_retry(proc.wait)()
297 return ((None,err), proc)
298 elif hasattr(source, 'read'):
299 # file-like (but not file) source
300 proc = subprocess.Popen(args,
301 stdout = open('/dev/null','w'),
302 stderr = subprocess.PIPE,
303 stdin = subprocess.PIPE)
309 buf = source.read(4096)
314 rdrdy, wrdy, broken = select.select(
317 [proc.stderr,proc.stdin])
319 if proc.stderr in rdrdy:
320 # use os.read for fully unbuffered behavior
321 err.append(os.read(proc.stderr.fileno(), 4096))
323 if proc.stdin in wrdy:
324 proc.stdin.write(buf)
330 err.append(proc.stderr.read())
332 eintr_retry(proc.wait)()
333 return ((None,''.join(err)), proc)
334 elif hasattr(dest, 'write'):
335 # file-like (but not file) dest
336 proc = subprocess.Popen(args,
337 stdout = subprocess.PIPE,
338 stderr = subprocess.PIPE,
339 stdin = open('/dev/null','w'))
344 rdrdy, wrdy, broken = select.select(
345 [proc.stderr, proc.stdout],
347 [proc.stderr, proc.stdout])
349 if proc.stderr in rdrdy:
350 # use os.read for fully unbuffered behavior
351 err.append(os.read(proc.stderr.fileno(), 4096))
353 if proc.stdout in rdrdy:
354 # use os.read for fully unbuffered behavior
355 buf = os.read(proc.stdout.fileno(), 4096)
364 err.append(proc.stderr.read())
366 eintr_retry(proc.wait)()
367 return ((None,''.join(err)), proc)
369 raise AssertionError, "Unreachable code reached! :-Q"
371 # Parse destination as <user>@<server>:<path>
372 if isinstance(dest, basestring) and ':' in dest:
373 remspec, path = dest.split(':',1)
374 elif isinstance(source, basestring) and ':' in source:
375 remspec, path = source.split(':',1)
377 raise ValueError, "Both endpoints cannot be local"
378 user,host = remspec.rsplit('@',1)
381 args = ['scp', '-q', '-p', '-C',
382 # Don't bother with localhost. Makes test easier
383 '-o', 'NoHostAuthenticationForLocalhost=yes',
384 # XXX: Possible security issue
385 # Avoid interactive requests to accept new host keys
386 '-o', 'StrictHostKeyChecking=no',
387 '-o', 'ConnectTimeout=30',
388 '-o', 'ConnectionAttempts=3',
389 '-o', 'ServerAliveInterval=30',
390 '-o', 'TCPKeepAlive=yes' ]
393 args.append('-P%d' % port)
397 args.extend(('-i', identity_file))
399 if isinstance(source,list):
402 if openssh_has_persist():
403 control_path = make_control_path(user, host, port)
405 '-o', 'ControlMaster=no',
406 '-o', 'ControlPath=%s' % control_path ])
411 # connects to the remote host and starts a remote connection
412 proc = subprocess.Popen(args,
413 stdout = subprocess.PIPE,
414 stdin = subprocess.PIPE,
415 stderr = subprocess.PIPE)
417 comm = proc.communicate()
418 eintr_retry(proc.wait)()
421 def rspawn(command, pidfile,
422 stdout = '/dev/null',
432 identity_file = None,
435 Spawn a remote command such that it will continue working asynchronously.
438 command: the command to run - it should be a single line.
440 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
442 stdout: path of a file to redirect standard output to - must be a string.
443 Defaults to /dev/null
444 stderr: path of a file to redirect standard error to - string or the special STDOUT value
445 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
446 stdin: path of a file with input to be piped into the command's standard input
448 home: path of a folder to use as working directory - should exist, unless you specify create_home
450 create_home: if True, the home folder will be created first with mkdir -p
452 sudo: whether the command needs to be executed as root
454 host/port/user/agent/identity_file: see rexec
457 (stdout, stderr), process
459 Of the spawning process, which only captures errors at spawning time.
460 Usually only useful for diagnostics.
462 # Start process in a "daemonized" way, using nohup and heavy
463 # stdin/out redirection to avoid connection issues
467 stderr = ' ' + stderr
469 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
478 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
479 'command' : daemon_command,
481 'sudo' : 'sudo -S' if sudo else '',
484 'gohome' : 'cd %s ; ' % home if home else '',
485 'create' : 'mkdir -p %s ; ' % home if create_home else '',
488 (out,err), proc = rexec(
494 identity_file = identity_file,
499 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
501 return (out,err),proc
504 def rcheck_pid(pidfile,
509 identity_file = None):
511 Check the pidfile of a process spawned with remote_spawn.
514 pidfile: the pidfile passed to remote_span
516 host/port/user/agent/identity_file: see rexec
520 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
521 or None if the pidfile isn't valid yet (maybe the process is still starting).
524 (out,err),proc = rexec(
525 "cat %(pidfile)s" % {
532 identity_file = identity_file
540 return map(int,out.strip().split(' ',1))
542 # Ignore, many ways to fail that don't matter that much
546 def rstatus(pid, ppid,
551 identity_file = None):
553 Check the status of a process spawned with remote_spawn.
556 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
558 host/port/user/agent/identity_file: see rexec
562 One of NOT_STARTED, RUNNING, FINISHED
565 (out,err),proc = rexec(
566 "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
574 identity_file = identity_file
583 status = bool(int(out.strip()))
586 logging.warn("Error checking remote status:\n%s%s\n", out, err)
587 # Ignore, many ways to fail that don't matter that much
589 return RUNNING if status else FINISHED
599 identity_file = None,
602 Kill a process spawned with remote_spawn.
604 First tries a SIGTERM, and if the process does not end in 10 seconds,
608 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
610 sudo: whether the command was run with sudo - careful killing like this.
612 host/port/user/agent/identity_file: see rexec
616 Nothing, should have killed the process
619 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
621 SUBKILL="%(subkill)s" ;
622 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
623 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
624 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
626 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
629 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
630 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
634 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
635 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
636 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
640 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
642 (out,err),proc = rexec(
646 'sudo' : 'sudo -S' if sudo else '',
653 identity_file = identity_file
656 # wait, don't leave zombies around
660 def _communicate(self, input, timeout=None, err_on_timeout=True):
663 stdout = None # Return
664 stderr = None # Return
668 if timeout is not None:
669 timelimit = time.time() + timeout
670 killtime = timelimit + 4
671 bailtime = timelimit + 4
674 # Flush stdio buffer. This might block, if the user has
675 # been writing to .stdin in an uncontrolled fashion.
678 write_set.append(self.stdin)
682 read_set.append(self.stdout)
685 read_set.append(self.stderr)
689 while read_set or write_set:
690 if timeout is not None:
691 curtime = time.time()
692 if timeout is None or curtime > timelimit:
693 if curtime > bailtime:
695 elif curtime > killtime:
696 signum = signal.SIGKILL
698 signum = signal.SIGTERM
700 os.kill(self.pid, signum)
703 select_timeout = timelimit - curtime + 0.1
707 if select_timeout > 1.0:
711 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
712 except select.error,e:
718 if not rlist and not wlist and not xlist and self.poll() is not None:
719 # timeout and process exited, say bye
722 if self.stdin in wlist:
723 # When select has indicated that the file is writable,
724 # we can write up to PIPE_BUF bytes without risk
725 # blocking. POSIX defines PIPE_BUF >= 512
726 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
727 input_offset += bytes_written
728 if input_offset >= len(input):
730 write_set.remove(self.stdin)
732 if self.stdout in rlist:
733 data = os.read(self.stdout.fileno(), 1024)
736 read_set.remove(self.stdout)
739 if self.stderr in rlist:
740 data = os.read(self.stderr.fileno(), 1024)
743 read_set.remove(self.stderr)
746 # All data exchanged. Translate lists into strings.
747 if stdout is not None:
748 stdout = ''.join(stdout)
749 if stderr is not None:
750 stderr = ''.join(stderr)
752 # Translate newlines, if requested. We cannot let the file
753 # object do the translation: It is based on stdio, which is
754 # impossible to combine with select (unless forcing no
756 if self.universal_newlines and hasattr(file, 'newlines'):
758 stdout = self._translate_newlines(stdout)
760 stderr = self._translate_newlines(stderr)
762 if killed and err_on_timeout:
763 errcode = self.poll()
764 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
770 return (stdout, stderr)