15 OPENSSH_HAS_PERSIST = None
16 CONTROL_PATH = "yyy_ssh_ctrl_path"
18 if hasattr(os, "devnull"):
21 DEV_NULL = "/dev/null"
23 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
25 hostbyname_cache = dict()
29 Special value that when given to rspawn in stderr causes stderr to
30 redirect to whatever stdout was redirected to.
35 Process is still running
45 Process hasn't started running yet (this should be very rare)
48 def openssh_has_persist():
49 """ The ssh_config options ControlMaster and ControlPersist allow to
50 reuse a same network connection for multiple ssh sessions. In this
51 way limitations on number of open ssh connections can be bypassed.
52 However, older versions of openSSH do not support this feature.
53 This function is used to determine if ssh connection persist features
56 global OPENSSH_HAS_PERSIST
57 if OPENSSH_HAS_PERSIST is None:
58 proc = subprocess.Popen(["ssh","-v"],
59 stdout = subprocess.PIPE,
60 stderr = subprocess.STDOUT,
61 stdin = open("/dev/null","r") )
62 out,err = proc.communicate()
65 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
66 OPENSSH_HAS_PERSIST = bool(vre.match(out))
67 return OPENSSH_HAS_PERSIST
70 """ Escapes strings so that they are safe to use as command-line
72 if SHELL_SAFE.match(s):
73 # safe string - no escaping needed
76 # unsafe string - escape
78 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
81 return "'$'\\x%02x''" % (ord(c),)
82 s = ''.join(map(escp,s))
85 def eintr_retry(func):
86 """Retries a function invocation when a EINTR occurs"""
88 @functools.wraps(func)
90 retry = kw.pop("_retry", False)
91 for i in xrange(0 if retry else 4):
94 except (select.error, socket.error), args:
95 if args[0] == errno.EINTR:
100 if e.errno == errno.EINTR:
105 return func(*p, **kw)
108 def make_connkey(user, host, port):
109 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
110 if len(connkey) > 60:
111 connkey = hashlib.sha1(connkey).hexdigest()
114 def make_control_path(user, host, port):
115 connkey = make_connkey(user, host, port)
116 return '/tmp/%s_%s' % ( CONTROL_PATH, connkey, )
118 def rexec(command, host, user,
123 identity_file = None,
129 err_on_timeout = True,
130 connect_timeout = 30,
133 Executes a remote command, returns ((stdout,stderr),process)
136 # Don't bother with localhost. Makes test easier
137 '-o', 'NoHostAuthenticationForLocalhost=yes',
138 # XXX: Possible security issue
139 # Avoid interactive requests to accept new host keys
140 '-o', 'StrictHostKeyChecking=no',
141 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
142 '-o', 'ConnectionAttempts=3',
143 '-o', 'ServerAliveInterval=30',
144 '-o', 'TCPKeepAlive=yes',
147 if persistent and openssh_has_persist():
148 control_path = make_control_path(user, host, port)
150 '-o', 'ControlMaster=auto',
151 '-o', 'ControlPath=%s' % control_path,
152 '-o', 'ControlPersist=60' ])
156 args.append('-p%d' % port)
158 args.extend(('-i', identity_file))
168 for envkey, envval in env.iteritems():
169 export += '%s=%s ' % (envkey, envval)
170 command = export + command
173 command = "sudo " + command
177 for x in xrange(retry or 3):
178 # connects to the remote host and starts a remote connection
179 proc = subprocess.Popen(args,
180 stdout = subprocess.PIPE,
181 stdin = subprocess.PIPE,
182 stderr = subprocess.PIPE)
185 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
187 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
188 # SSH error, can safely retry
191 # Probably timed out or plain failed but can retry
194 except RuntimeError,e:
199 return ((out, err), proc)
201 def rcopy(source, dest,
204 identity_file = None):
206 Copies file from/to remote sites.
208 Source and destination should have the user and host encoded
211 Source can be a list of files to copy to a single destination,
212 in which case it is advised that the destination be a folder.
215 # Parse destination as <user>@<server>:<path>
216 if isinstance(dest, basestring) and ':' in dest:
217 remspec, path = dest.split(':',1)
218 elif isinstance(source, basestring) and ':' in source:
219 remspec, path = source.split(':',1)
221 raise ValueError, "Both endpoints cannot be local"
222 user, host = remspec.rsplit('@',1)
224 raw_string = r'''rsync -rlpcSz --timeout=900 '''
225 raw_string += r''' -e 'ssh -o BatchMode=yes '''
226 raw_string += r''' -o NoHostAuthenticationForLocalhost=yes '''
227 raw_string += r''' -o StrictHostKeyChecking=no '''
228 raw_string += r''' -o ConnectionAttempts=3 '''
230 if openssh_has_persist():
231 control_path = make_control_path(user, host, port)
232 raw_string += r''' -o ControlMaster=auto '''
233 raw_string += r''' -o ControlPath=%s ''' % control_path
236 raw_string += r''' -p %d ''' % port
239 raw_string += r''' -i "%s" ''' % identity_file
241 # closing -e 'ssh...'
242 raw_string += r''' ' '''
244 if isinstance(source,list):
245 source = ' '.join(source)
247 source = '"%s"' % source
249 raw_string += r''' %s ''' % source
250 raw_string += r''' %s ''' % dest
252 # connects to the remote host and starts a remote connection
253 proc = subprocess.Popen(raw_string,
255 stdout = subprocess.PIPE,
256 stdin = subprocess.PIPE,
257 stderr = subprocess.PIPE)
259 comm = proc.communicate()
260 eintr_retry(proc.wait)()
263 def rspawn(command, pidfile,
264 stdout = '/dev/null',
274 identity_file = None,
277 Spawn a remote command such that it will continue working asynchronously.
280 command: the command to run - it should be a single line.
282 pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
284 stdout: path of a file to redirect standard output to - must be a string.
285 Defaults to /dev/null
286 stderr: path of a file to redirect standard error to - string or the special STDOUT value
287 to redirect to the same file stdout was redirected to. Defaults to STDOUT.
288 stdin: path of a file with input to be piped into the command's standard input
290 home: path of a folder to use as working directory - should exist, unless you specify create_home
292 create_home: if True, the home folder will be created first with mkdir -p
294 sudo: whether the command needs to be executed as root
296 host/port/user/agent/identity_file: see rexec
299 (stdout, stderr), process
301 Of the spawning process, which only captures errors at spawning time.
302 Usually only useful for diagnostics.
304 # Start process in a "daemonized" way, using nohup and heavy
305 # stdin/out redirection to avoid connection issues
309 stderr = ' ' + stderr
311 #XXX: ppid is always 1!!!
312 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
321 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
322 'command' : daemon_command,
324 'sudo' : 'sudo -S' if sudo else '',
327 'gohome' : 'cd %s ; ' % home if home else '',
328 'create' : 'mkdir -p %s ; ' % home if create_home else '',
331 (out,err), proc = rexec(
337 identity_file = identity_file,
342 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
344 return (out,err),proc
347 def rcheck_pid(pidfile,
352 identity_file = None):
354 Check the pidfile of a process spawned with remote_spawn.
357 pidfile: the pidfile passed to remote_span
359 host/port/user/agent/identity_file: see rexec
363 A (pid, ppid) tuple useful for calling remote_status and remote_kill,
364 or None if the pidfile isn't valid yet (maybe the process is still starting).
367 (out,err),proc = rexec(
368 "cat %(pidfile)s" % {
375 identity_file = identity_file
383 return map(int,out.strip().split(' ',1))
385 # Ignore, many ways to fail that don't matter that much
389 def rstatus(pid, ppid,
394 identity_file = None):
396 Check the status of a process spawned with remote_spawn.
399 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
401 host/port/user/agent/identity_file: see rexec
405 One of NOT_STARTED, RUNNING, FINISHED
409 (out,err),proc = rexec(
410 "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
418 identity_file = identity_file
427 status = bool(int(out.strip()))
430 logging.warn("Error checking remote status:\n%s%s\n", out, err)
431 # Ignore, many ways to fail that don't matter that much
433 return RUNNING if status else FINISHED
443 identity_file = None,
446 Kill a process spawned with remote_spawn.
448 First tries a SIGTERM, and if the process does not end in 10 seconds,
452 pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
454 sudo: whether the command was run with sudo - careful killing like this.
456 host/port/user/agent/identity_file: see rexec
460 Nothing, should have killed the process
463 subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
465 SUBKILL="%(subkill)s" ;
466 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
467 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
468 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
470 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
473 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
474 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
478 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
479 %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
480 %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
484 cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
486 (out,err),proc = rexec(
490 'sudo' : 'sudo -S' if sudo else '',
497 identity_file = identity_file
500 # wait, don't leave zombies around
504 def _communicate(self, input, timeout=None, err_on_timeout=True):
507 stdout = None # Return
508 stderr = None # Return
512 if timeout is not None:
513 timelimit = time.time() + timeout
514 killtime = timelimit + 4
515 bailtime = timelimit + 4
518 # Flush stdio buffer. This might block, if the user has
519 # been writing to .stdin in an uncontrolled fashion.
522 write_set.append(self.stdin)
526 read_set.append(self.stdout)
529 read_set.append(self.stderr)
533 while read_set or write_set:
534 if timeout is not None:
535 curtime = time.time()
536 if timeout is None or curtime > timelimit:
537 if curtime > bailtime:
539 elif curtime > killtime:
540 signum = signal.SIGKILL
542 signum = signal.SIGTERM
544 os.kill(self.pid, signum)
547 select_timeout = timelimit - curtime + 0.1
551 if select_timeout > 1.0:
555 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
556 except select.error,e:
562 if not rlist and not wlist and not xlist and self.poll() is not None:
563 # timeout and process exited, say bye
566 if self.stdin in wlist:
567 # When select has indicated that the file is writable,
568 # we can write up to PIPE_BUF bytes without risk
569 # blocking. POSIX defines PIPE_BUF >= 512
570 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
571 input_offset += bytes_written
572 if input_offset >= len(input):
574 write_set.remove(self.stdin)
576 if self.stdout in rlist:
577 data = os.read(self.stdout.fileno(), 1024)
580 read_set.remove(self.stdout)
583 if self.stderr in rlist:
584 data = os.read(self.stderr.fileno(), 1024)
587 read_set.remove(self.stderr)
590 # All data exchanged. Translate lists into strings.
591 if stdout is not None:
592 stdout = ''.join(stdout)
593 if stderr is not None:
594 stderr = ''.join(stderr)
596 # Translate newlines, if requested. We cannot let the file
597 # object do the translation: It is based on stdio, which is
598 # impossible to combine with select (unless forcing no
600 if self.universal_newlines and hasattr(file, 'newlines'):
602 stdout = self._translate_newlines(stdout)
604 stderr = self._translate_newlines(stderr)
606 if killed and err_on_timeout:
607 errcode = self.poll()
608 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
614 return (stdout, stderr)