2 # -*- coding: utf-8 -*-
24 CTRL_SOCK = "ctrl.sock"
25 STD_ERR = "stderr.log"
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34 if hasattr(os, "devnull"):
37 DEV_NULL = "/dev/null"
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42 """ Escapes strings so that they are safe to use as command-line arguments """
43 if SHELL_SAFE.match(s):
44 # safe string - no escaping needed
47 # unsafe string - escape
49 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
52 return "'$'\\x%02x''" % (ord(c),)
53 s = ''.join(map(escp,s))
56 def eintr_retry(func):
58 @functools.wraps(func)
60 retry = kw.pop("_retry", False)
61 for i in xrange(0 if retry else 4):
64 except (select.error, socket.error), args:
65 if args[0] == errno.EINTR:
70 if e.errno == errno.EINTR:
79 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
80 self._root_dir = root_dir
82 self._ctrl_sock = None
83 self._log_level = log_level
85 self._environment_setup = environment_setup
94 # can not return normally after fork beacuse no exec was done.
95 # This means that if we don't do a os._exit(0) here the code that
96 # follows the call to "Server.run()" in the "caller code" will be
97 # executed... but by now it has already been executed after the
98 # first process (the one that did the first fork) returned.
106 # pipes for process synchronization
110 root = os.path.normpath(self._root_dir)
111 if not os.path.exists(root):
112 os.makedirs(root, 0755)
120 except OSError, e: # pragma: no cover
121 if e.errno == errno.EINTR:
127 # os.waitpid avoids leaving a <defunc> (zombie) process
128 st = os.waitpid(pid1, 0)[1]
130 raise RuntimeError("Daemonization failed")
131 # return 0 to inform the caller method that this is not the
136 # Decouple from parent environment.
137 os.chdir(self._root_dir)
144 # see ref: "os._exit(0)"
147 # close all open file descriptors.
148 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
149 if (max_fd == resource.RLIM_INFINITY):
151 for fd in range(3, max_fd):
158 # Redirect standard file descriptors.
159 stdin = open(DEV_NULL, "r")
160 stderr = stdout = open(STD_ERR, "a", 0)
161 os.dup2(stdin.fileno(), sys.stdin.fileno())
162 # NOTE: sys.stdout.write will still be buffered, even if the file
163 # was opened with 0 buffer
164 os.dup2(stdout.fileno(), sys.stdout.fileno())
165 os.dup2(stderr.fileno(), sys.stderr.fileno())
168 if self._environment_setup:
169 # parse environment variables and pass to child process
170 # do it by executing shell commands, in case there's some heavy setup involved
171 envproc = subprocess.Popen(
173 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
174 ( self._environment_setup, ) ],
175 stdin = subprocess.PIPE,
176 stdout = subprocess.PIPE,
177 stderr = subprocess.PIPE
179 out,err = envproc.communicate()
181 # parse new environment
183 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
185 # apply to current environment
186 for name, value in environment.iteritems():
187 os.environ[name] = value
190 if 'PYTHONPATH' in environment:
191 sys.path = environment['PYTHONPATH'].split(':') + sys.path
193 # create control socket
194 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
195 self._ctrl_sock.bind(CTRL_SOCK)
196 self._ctrl_sock.listen(0)
198 # let the parent process know that the daemonization is finished
203 def post_daemonize(self):
204 # QT, for some strange reason, redefines the SIGCHILD handler to write
205 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
206 # Server dameonization closes all file descriptors from fileno '3',
207 # but the overloaded handler (inherited by the forked process) will
208 # keep trying to write the \0 to fileno 'x', which might have been reused
209 # after closing, for other operations. This is bad bad bad when fileno 'x'
210 # is in use for communication pouroses, because unexpected \0 start
211 # appearing in the communication messages... this is exactly what happens
212 # when using netns in daemonized form. Thus, be have no other alternative than
213 # restoring the SIGCHLD handler to the default here.
215 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
218 while not self._stop:
219 conn, addr = self._ctrl_sock.accept()
221 while not self._stop:
223 msg = self.recv_msg(conn)
224 except socket.timeout, e:
230 reply = self.stop_action()
232 reply = self.reply_action(msg)
235 self.send_reply(conn, reply)
238 self.log_error("NOTICE: Awaiting for reconnection")
246 def recv_msg(self, conn):
249 while '\n' not in chunk:
251 chunk = conn.recv(1024)
252 except (OSError, socket.error), e:
253 if e[0] != errno.EINTR:
262 data = ''.join(data).split('\n',1)
265 data, self._rdbuf = data
267 decoded = base64.b64decode(data)
268 return decoded.rstrip()
270 def send_reply(self, conn, reply):
271 encoded = base64.b64encode(reply)
272 conn.send("%s\n" % encoded)
276 self._ctrl_sock.close()
281 def stop_action(self):
282 return "Stopping server"
284 def reply_action(self, msg):
285 return "Reply to: %s" % msg
287 def log_error(self, text = None, context = ''):
289 text = traceback.format_exc()
290 date = time.strftime("%Y-%m-%d %H:%M:%S")
292 context = " (%s)" % (context,)
293 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
296 def log_debug(self, text):
297 if self._log_level == DEBUG_LEVEL:
298 date = time.strftime("%Y-%m-%d %H:%M:%S")
299 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
301 class Forwarder(object):
302 def __init__(self, root_dir = "."):
303 self._ctrl_sock = None
304 self._root_dir = root_dir
310 print >>sys.stderr, "READY."
311 while not self._stop:
312 data = self.read_data()
313 self.send_to_server(data)
314 data = self.recv_from_server()
315 self.write_data(data)
319 return sys.stdin.readline()
321 def write_data(self, data):
322 sys.stdout.write(data)
323 # sys.stdout.write is buffered, this is why we need to do a flush()
326 def send_to_server(self, data):
328 self._ctrl_sock.send(data)
329 except (IOError, socket.error), e:
330 if e[0] == errno.EPIPE:
332 self._ctrl_sock.send(data)
335 encoded = data.rstrip()
336 msg = base64.b64decode(encoded)
340 def recv_from_server(self):
343 while '\n' not in chunk:
345 chunk = self._ctrl_sock.recv(1024)
346 except (OSError, socket.error), e:
347 if e[0] != errno.EINTR:
355 data = ''.join(data).split('\n',1)
358 data, self._rdbuf = data
364 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
365 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
366 self._ctrl_sock.connect(sock_addr)
368 def disconnect(self):
370 self._ctrl_sock.close()
374 class Client(object):
375 def __init__(self, root_dir = ".", host = None, port = None, user = None,
376 agent = None, environment_setup = ""):
377 self.root_dir = root_dir
378 self.addr = (host, port)
381 self.environment_setup = environment_setup
382 self._stopped = False
383 self._deferreds = collections.deque()
387 if self._process.poll() is None:
388 os.kill(self._process.pid, signal.SIGTERM)
392 root_dir = self.root_dir
393 (host, port) = self.addr
397 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
398 c.forward()" % (root_dir,)
400 self._process = popen_ssh_subprocess(python_code, host, port,
402 environment_setup = self.environment_setup)
403 # popen_ssh_subprocess already waits for readiness
404 if self._process.poll():
405 err = proc.stderr.read()
406 raise RuntimeError("Client could not be reached: %s" % \
409 self._process = subprocess.Popen(
410 ["python", "-c", python_code],
411 stdin = subprocess.PIPE,
412 stdout = subprocess.PIPE,
413 stderr = subprocess.PIPE
416 # Wait for the forwarder to be ready, otherwise nobody
417 # will be able to connect to it
418 helo = self._process.stderr.readline()
419 if helo != 'READY.\n':
420 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
421 helo + self._process.stderr.read())
423 def send_msg(self, msg):
424 encoded = base64.b64encode(msg)
425 data = "%s\n" % encoded
428 self._process.stdin.write(data)
429 except (IOError, ValueError):
430 # dead process, poll it to un-zombify
433 # try again after reconnect
434 # If it fails again, though, give up
436 self._process.stdin.write(data)
439 self.send_msg(STOP_MSG)
442 def defer_reply(self, transform=None):
444 self._deferreds.append(defer_entry)
446 functools.partial(self.read_reply, defer_entry, transform)
449 def _read_reply(self):
450 data = self._process.stdout.readline()
451 encoded = data.rstrip()
453 # empty == eof == dead process, poll it to un-zombify
456 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
457 return base64.b64decode(encoded)
459 def read_reply(self, which=None, transform=None):
460 # Test to see if someone did it already
461 if which is not None and len(which):
463 # ...just return the deferred value
465 return transform(which[0])
469 # Process all deferreds until the one we're looking for
470 # or until the queue is empty
471 while self._deferreds:
473 deferred = self._deferreds.popleft()
478 deferred.append(self._read_reply())
479 if deferred is which:
480 # We reached the one we were looking for
482 return transform(deferred[0])
487 # They've requested a synchronous read
489 return transform(self._read_reply())
491 return self._read_reply()
493 def _make_server_key_args(server_key, host, port, args):
495 Returns a reference to the created temporary file, and adds the
496 corresponding arguments to the given argument list.
498 Make sure to hold onto it until the process is done with the file
501 host = '%s:%s' % (host,port)
502 # Create a temporary server key file
503 tmp_known_hosts = tempfile.NamedTemporaryFile()
505 # Add the intended host key
506 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
508 # If we're not in strict mode, add user-configured keys
509 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
510 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
511 if os.access(user_hosts_path, os.R_OK):
512 f = open(user_hosts_path, "r")
513 tmp_known_hosts.write(f.read())
516 tmp_known_hosts.flush()
518 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
520 return tmp_known_hosts
522 def popen_ssh_command(command, host, port, user, agent,
529 err_on_timeout = True):
531 Executes a remote commands, returns ((stdout,stderr),process)
534 print "ssh", host, command
536 tmp_known_hosts = None
538 # Don't bother with localhost. Makes test easier
539 '-o', 'NoHostAuthenticationForLocalhost=yes',
544 args.append('-p%d' % port)
546 args.extend(('-i', ident_key))
550 # Create a temporary server key file
551 tmp_known_hosts = _make_server_key_args(
552 server_key, host, port, args)
556 # connects to the remote host and starts a remote connection
557 proc = subprocess.Popen(args,
558 stdout = subprocess.PIPE,
559 stdin = subprocess.PIPE,
560 stderr = subprocess.PIPE)
562 # attach tempfile object to the process, to make sure the file stays
563 # alive until the process is finished with it
564 proc._known_hosts = tmp_known_hosts
567 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
569 except RuntimeError,e:
573 print " timedout -> ", e.args
577 print " -> ", out, err
579 return ((out, err), proc)
581 def popen_scp(source, dest,
588 Copies from/to remote sites.
590 Source and destination should have the user and host encoded
593 If source is a file object, a special mode will be used to
594 create the remote file with the same contents.
596 If dest is a file object, the remote file (source) will be
597 read and written into dest.
599 In these modes, recursive cannot be True.
601 Source can be a list of files to copy to a single destination,
602 in which case it is advised that the destination be a folder.
606 print "scp", source, dest
608 if isinstance(source, file) and source.tell() == 0:
610 elif hasattr(source, 'read'):
611 tmp = tempfile.NamedTemporaryFile()
613 buf = source.read(65536)
621 if isinstance(source, file) or isinstance(dest, file) \
622 or hasattr(source, 'read') or hasattr(dest, 'write'):
625 # Parse source/destination as <user>@<server>:<path>
626 if isinstance(dest, basestring) and ':' in dest:
627 remspec, path = dest.split(':',1)
628 elif isinstance(source, basestring) and ':' in source:
629 remspec, path = source.split(':',1)
631 raise ValueError, "Both endpoints cannot be local"
632 user,host = remspec.rsplit('@',1)
633 tmp_known_hosts = None
635 args = ['ssh', '-l', user, '-C',
636 # Don't bother with localhost. Makes test easier
637 '-o', 'NoHostAuthenticationForLocalhost=yes',
640 args.append('-P%d' % port)
642 args.extend(('-i', ident_key))
644 # Create a temporary server key file
645 tmp_known_hosts = _make_server_key_args(
646 server_key, host, port, args)
648 if isinstance(source, file) or hasattr(source, 'read'):
649 args.append('cat > %s' % (shell_escape(path),))
650 elif isinstance(dest, file) or hasattr(dest, 'write'):
651 args.append('cat %s' % (shell_escape(path),))
653 raise AssertionError, "Unreachable code reached! :-Q"
655 # connects to the remote host and starts a remote connection
656 if isinstance(source, file):
657 proc = subprocess.Popen(args,
658 stdout = open('/dev/null','w'),
659 stderr = subprocess.PIPE,
661 err = proc.stderr.read()
662 proc._known_hosts = tmp_known_hosts
663 eintr_retry(proc.wait)()
664 return ((None,err), proc)
665 elif isinstance(dest, file):
666 proc = subprocess.Popen(args,
667 stdout = open('/dev/null','w'),
668 stderr = subprocess.PIPE,
670 err = proc.stderr.read()
671 proc._known_hosts = tmp_known_hosts
672 eintr_retry(proc.wait)()
673 return ((None,err), proc)
674 elif hasattr(source, 'read'):
675 # file-like (but not file) source
676 proc = subprocess.Popen(args,
677 stdout = open('/dev/null','w'),
678 stderr = subprocess.PIPE,
679 stdin = subprocess.PIPE)
685 buf = source.read(4096)
690 rdrdy, wrdy, broken = select.select(
693 [proc.stderr,proc.stdin])
695 if proc.stderr in rdrdy:
696 # use os.read for fully unbuffered behavior
697 err.append(os.read(proc.stderr.fileno(), 4096))
699 if proc.stdin in wrdy:
700 proc.stdin.write(buf)
706 err.append(proc.stderr.read())
708 proc._known_hosts = tmp_known_hosts
709 eintr_retry(proc.wait)()
710 return ((None,''.join(err)), proc)
711 elif hasattr(dest, 'write'):
712 # file-like (but not file) dest
713 proc = subprocess.Popen(args,
714 stdout = subprocess.PIPE,
715 stderr = subprocess.PIPE,
716 stdin = open('/dev/null','w'))
721 rdrdy, wrdy, broken = select.select(
722 [proc.stderr, proc.stdout],
724 [proc.stderr, proc.stdout])
726 if proc.stderr in rdrdy:
727 # use os.read for fully unbuffered behavior
728 err.append(os.read(proc.stderr.fileno(), 4096))
730 if proc.stdout in rdrdy:
731 # use os.read for fully unbuffered behavior
732 buf = os.read(proc.stdout.fileno(), 4096)
741 err.append(proc.stderr.read())
743 proc._known_hosts = tmp_known_hosts
744 eintr_retry(proc.wait)()
745 return ((None,''.join(err)), proc)
747 raise AssertionError, "Unreachable code reached! :-Q"
749 # Parse destination as <user>@<server>:<path>
750 if isinstance(dest, basestring) and ':' in dest:
751 remspec, path = dest.split(':',1)
752 elif isinstance(source, basestring) and ':' in source:
753 remspec, path = source.split(':',1)
755 raise ValueError, "Both endpoints cannot be local"
756 user,host = remspec.rsplit('@',1)
759 tmp_known_hosts = None
760 args = ['scp', '-q', '-p', '-C',
761 # Don't bother with localhost. Makes test easier
762 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
764 args.append('-P%d' % port)
768 args.extend(('-i', ident_key))
770 # Create a temporary server key file
771 tmp_known_hosts = _make_server_key_args(
772 server_key, host, port, args)
773 if isinstance(source,list):
779 # connects to the remote host and starts a remote connection
780 proc = subprocess.Popen(args,
781 stdout = subprocess.PIPE,
782 stdin = subprocess.PIPE,
783 stderr = subprocess.PIPE)
784 proc._known_hosts = tmp_known_hosts
786 comm = proc.communicate()
787 eintr_retry(proc.wait)()
790 def popen_ssh_subprocess(python_code, host, port, user, agent,
795 environment_setup = "",
796 waitcommand = False):
799 python_path.replace("'", r"'\''")
800 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
802 if environment_setup:
803 cmd += environment_setup
805 # Uncomment for debug (to run everything under strace)
806 # We had to verify if strace works (cannot nest them)
807 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
809 #cmd += "strace -f -tt -s 200 -o strace$$.out "
811 cmd += "import base64, os\n"
812 cmd += "cmd = \"\"\n"
813 cmd += "while True:\n"
814 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
815 cmd += " if cmd[-1] == \"\\n\": break\n"
816 cmd += "cmd = base64.b64decode(cmd)\n"
817 # Uncomment for debug
818 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
820 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
823 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
826 tmp_known_hosts = None
828 # Don't bother with localhost. Makes test easier
829 '-o', 'NoHostAuthenticationForLocalhost=yes',
834 args.append('-p%d' % port)
836 args.extend(('-i', ident_key))
840 # Create a temporary server key file
841 tmp_known_hosts = _make_server_key_args(
842 server_key, host, port, args)
845 # connects to the remote host and starts a remote rpyc connection
846 proc = subprocess.Popen(args,
847 stdout = subprocess.PIPE,
848 stdin = subprocess.PIPE,
849 stderr = subprocess.PIPE)
850 proc._known_hosts = tmp_known_hosts
852 # send the command to execute
853 os.write(proc.stdin.fileno(),
854 base64.b64encode(python_code) + "\n")
855 msg = os.read(proc.stdout.fileno(), 3)
857 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
858 msg, proc.stdout.read(), proc.stderr.read())
863 def _communicate(self, input, timeout=None, err_on_timeout=True):
866 stdout = None # Return
867 stderr = None # Return
871 if timeout is not None:
872 timelimit = time.time() + timeout
873 killtime = timelimit + 4
874 bailtime = timelimit + 4
877 # Flush stdio buffer. This might block, if the user has
878 # been writing to .stdin in an uncontrolled fashion.
881 write_set.append(self.stdin)
885 read_set.append(self.stdout)
888 read_set.append(self.stderr)
892 while read_set or write_set:
893 if timeout is not None:
894 curtime = time.time()
895 if timeout is None or curtime > timelimit:
896 if curtime > bailtime:
898 elif curtime > killtime:
899 signum = signal.SIGKILL
901 signum = signal.SIGTERM
903 os.kill(self.pid, signum)
906 select_timeout = timelimit - curtime + 0.1
908 select_timeout = None
911 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
912 except select.error,e:
918 if self.stdin in wlist:
919 # When select has indicated that the file is writable,
920 # we can write up to PIPE_BUF bytes without risk
921 # blocking. POSIX defines PIPE_BUF >= 512
922 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
923 input_offset += bytes_written
924 if input_offset >= len(input):
926 write_set.remove(self.stdin)
928 if self.stdout in rlist:
929 data = os.read(self.stdout.fileno(), 1024)
932 read_set.remove(self.stdout)
935 if self.stderr in rlist:
936 data = os.read(self.stderr.fileno(), 1024)
939 read_set.remove(self.stderr)
942 # All data exchanged. Translate lists into strings.
943 if stdout is not None:
944 stdout = ''.join(stdout)
945 if stderr is not None:
946 stderr = ''.join(stderr)
948 # Translate newlines, if requested. We cannot let the file
949 # object do the translation: It is based on stdio, which is
950 # impossible to combine with select (unless forcing no
952 if self.universal_newlines and hasattr(file, 'newlines'):
954 stdout = self._translate_newlines(stdout)
956 stderr = self._translate_newlines(stderr)
958 if killed and err_on_timeout:
959 errcode = self.poll()
960 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
966 return (stdout, stderr)