2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except (select.error, socket.error), args:
64 if args[0] == errno.EINTR:
69 if e.errno == errno.EINTR:
78 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
79 self._root_dir = root_dir
81 self._ctrl_sock = None
82 self._log_level = log_level
84 self._environment_setup = environment_setup
93 # can not return normally after fork beacuse no exec was done.
94 # This means that if we don't do a os._exit(0) here the code that
95 # follows the call to "Server.run()" in the "caller code" will be
96 # executed... but by now it has already been executed after the
97 # first process (the one that did the first fork) returned.
105 # pipes for process synchronization
109 root = os.path.normpath(self._root_dir)
110 if not os.path.exists(root):
111 os.makedirs(root, 0755)
119 except OSError, e: # pragma: no cover
120 if e.errno == errno.EINTR:
126 # os.waitpid avoids leaving a <defunc> (zombie) process
127 st = os.waitpid(pid1, 0)[1]
129 raise RuntimeError("Daemonization failed")
130 # return 0 to inform the caller method that this is not the
135 # Decouple from parent environment.
136 os.chdir(self._root_dir)
143 # see ref: "os._exit(0)"
146 # close all open file descriptors.
147 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148 if (max_fd == resource.RLIM_INFINITY):
150 for fd in range(3, max_fd):
157 # Redirect standard file descriptors.
158 stdin = open(DEV_NULL, "r")
159 stderr = stdout = open(STD_ERR, "a", 0)
160 os.dup2(stdin.fileno(), sys.stdin.fileno())
161 # NOTE: sys.stdout.write will still be buffered, even if the file
162 # was opened with 0 buffer
163 os.dup2(stdout.fileno(), sys.stdout.fileno())
164 os.dup2(stderr.fileno(), sys.stderr.fileno())
167 if self._environment_setup:
168 # parse environment variables and pass to child process
169 # do it by executing shell commands, in case there's some heavy setup involved
170 envproc = subprocess.Popen(
172 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173 ( self._environment_setup, ) ],
174 stdin = subprocess.PIPE,
175 stdout = subprocess.PIPE,
176 stderr = subprocess.PIPE
178 out,err = envproc.communicate()
180 # parse new environment
182 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
184 # apply to current environment
185 for name, value in environment.iteritems():
186 os.environ[name] = value
189 if 'PYTHONPATH' in environment:
190 sys.path = environment['PYTHONPATH'].split(':') + sys.path
192 # create control socket
193 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
194 self._ctrl_sock.bind(CTRL_SOCK)
195 self._ctrl_sock.listen(0)
197 # let the parent process know that the daemonization is finished
202 def post_daemonize(self):
203 # QT, for some strange reason, redefines the SIGCHILD handler to write
204 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
205 # Server dameonization closes all file descriptors from fileno '3',
206 # but the overloaded handler (inherited by the forked process) will
207 # keep trying to write the \0 to fileno 'x', which might have been reused
208 # after closing, for other operations. This is bad bad bad when fileno 'x'
209 # is in use for communication pouroses, because unexpected \0 start
210 # appearing in the communication messages... this is exactly what happens
211 # when using netns in daemonized form. Thus, be have no other alternative than
212 # restoring the SIGCHLD handler to the default here.
214 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
217 while not self._stop:
218 conn, addr = self._ctrl_sock.accept()
220 while not self._stop:
222 msg = self.recv_msg(conn)
223 except socket.timeout, e:
229 reply = self.stop_action()
231 reply = self.reply_action(msg)
234 self.send_reply(conn, reply)
237 self.log_error("NOTICE: Awaiting for reconnection")
245 def recv_msg(self, conn):
248 while '\n' not in chunk:
250 chunk = conn.recv(1024)
251 except (OSError, socket.error), e:
252 if e[0] != errno.EINTR:
261 data = ''.join(data).split('\n',1)
264 data, self._rdbuf = data
266 decoded = base64.b64decode(data)
267 return decoded.rstrip()
269 def send_reply(self, conn, reply):
270 encoded = base64.b64encode(reply)
271 conn.send("%s\n" % encoded)
275 self._ctrl_sock.close()
280 def stop_action(self):
281 return "Stopping server"
283 def reply_action(self, msg):
284 return "Reply to: %s" % msg
286 def log_error(self, text = None, context = ''):
288 text = traceback.format_exc()
289 date = time.strftime("%Y-%m-%d %H:%M:%S")
291 context = " (%s)" % (context,)
292 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
295 def log_debug(self, text):
296 if self._log_level == DEBUG_LEVEL:
297 date = time.strftime("%Y-%m-%d %H:%M:%S")
298 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
300 class Forwarder(object):
301 def __init__(self, root_dir = "."):
302 self._ctrl_sock = None
303 self._root_dir = root_dir
309 print >>sys.stderr, "READY."
310 while not self._stop:
311 data = self.read_data()
312 self.send_to_server(data)
313 data = self.recv_from_server()
314 self.write_data(data)
318 return sys.stdin.readline()
320 def write_data(self, data):
321 sys.stdout.write(data)
322 # sys.stdout.write is buffered, this is why we need to do a flush()
325 def send_to_server(self, data):
327 self._ctrl_sock.send(data)
328 except (IOError, socket.error), e:
329 if e[0] == errno.EPIPE:
331 self._ctrl_sock.send(data)
334 encoded = data.rstrip()
335 msg = base64.b64decode(encoded)
339 def recv_from_server(self):
342 while '\n' not in chunk:
344 chunk = self._ctrl_sock.recv(1024)
345 except (OSError, socket.error), e:
346 if e[0] != errno.EINTR:
354 data = ''.join(data).split('\n',1)
357 data, self._rdbuf = data
363 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
364 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
365 self._ctrl_sock.connect(sock_addr)
367 def disconnect(self):
369 self._ctrl_sock.close()
373 class Client(object):
374 def __init__(self, root_dir = ".", host = None, port = None, user = None,
375 agent = None, environment_setup = ""):
376 self.root_dir = root_dir
377 self.addr = (host, port)
380 self.environment_setup = environment_setup
381 self._stopped = False
382 self._deferreds = collections.deque()
386 if self._process.poll() is None:
387 os.kill(self._process.pid, signal.SIGTERM)
391 root_dir = self.root_dir
392 (host, port) = self.addr
396 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
397 c.forward()" % (root_dir,)
399 self._process = popen_ssh_subprocess(python_code, host, port,
401 environment_setup = self.environment_setup)
402 # popen_ssh_subprocess already waits for readiness
403 if self._process.poll():
404 err = proc.stderr.read()
405 raise RuntimeError("Client could not be reached: %s" % \
408 self._process = subprocess.Popen(
409 ["python", "-c", python_code],
410 stdin = subprocess.PIPE,
411 stdout = subprocess.PIPE,
412 stderr = subprocess.PIPE
415 # Wait for the forwarder to be ready, otherwise nobody
416 # will be able to connect to it
417 helo = self._process.stderr.readline()
418 if helo != 'READY.\n':
419 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
420 helo + self._process.stderr.read())
422 def send_msg(self, msg):
423 encoded = base64.b64encode(msg)
424 data = "%s\n" % encoded
427 self._process.stdin.write(data)
428 except (IOError, ValueError):
429 # dead process, poll it to un-zombify
432 # try again after reconnect
433 # If it fails again, though, give up
435 self._process.stdin.write(data)
438 self.send_msg(STOP_MSG)
441 def defer_reply(self, transform=None):
443 self._deferreds.append(defer_entry)
445 functools.partial(self.read_reply, defer_entry, transform)
448 def _read_reply(self):
449 data = self._process.stdout.readline()
450 encoded = data.rstrip()
452 # empty == eof == dead process, poll it to un-zombify
455 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
456 return base64.b64decode(encoded)
458 def read_reply(self, which=None, transform=None):
459 # Test to see if someone did it already
460 if which is not None and len(which):
462 # ...just return the deferred value
464 return transform(which[0])
468 # Process all deferreds until the one we're looking for
469 # or until the queue is empty
470 while self._deferreds:
472 deferred = self._deferreds.popleft()
477 deferred.append(self._read_reply())
478 if deferred is which:
479 # We reached the one we were looking for
481 return transform(deferred[0])
486 # They've requested a synchronous read
488 return transform(self._read_reply())
490 return self._read_reply()
492 def _make_server_key_args(server_key, host, port, args):
494 Returns a reference to the created temporary file, and adds the
495 corresponding arguments to the given argument list.
497 Make sure to hold onto it until the process is done with the file
500 host = '%s:%s' % (host,port)
501 # Create a temporary server key file
502 tmp_known_hosts = tempfile.NamedTemporaryFile()
504 # Add the intended host key
505 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
507 # If we're not in strict mode, add user-configured keys
508 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
509 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
510 if os.access(user_hosts_path, os.R_OK):
511 f = open(user_hosts_path, "r")
512 tmp_known_hosts.write(f.read())
515 tmp_known_hosts.flush()
517 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
519 return tmp_known_hosts
521 def popen_ssh_command(command, host, port, user, agent,
527 Executes a remote commands, returns ((stdout,stderr),process)
530 print "ssh", host, command
532 tmp_known_hosts = None
534 # Don't bother with localhost. Makes test easier
535 '-o', 'NoHostAuthenticationForLocalhost=yes',
540 args.append('-p%d' % port)
542 args.extend(('-i', ident_key))
546 # Create a temporary server key file
547 tmp_known_hosts = _make_server_key_args(
548 server_key, host, port, args)
551 # connects to the remote host and starts a remote connection
552 proc = subprocess.Popen(args,
553 stdout = subprocess.PIPE,
554 stdin = subprocess.PIPE,
555 stderr = subprocess.PIPE)
557 # attach tempfile object to the process, to make sure the file stays
558 # alive until the process is finished with it
559 proc._known_hosts = tmp_known_hosts
561 out, err = proc.communicate(stdin)
563 print " -> ", out, err
565 return ((out, err), proc)
567 def popen_scp(source, dest,
574 Copies from/to remote sites.
576 Source and destination should have the user and host encoded
579 If source is a file object, a special mode will be used to
580 create the remote file with the same contents.
582 If dest is a file object, the remote file (source) will be
583 read and written into dest.
585 In these modes, recursive cannot be True.
587 Source can be a list of files to copy to a single destination,
588 in which case it is advised that the destination be a folder.
592 print "scp", source, dest
594 if isinstance(source, file) and source.tell() == 0:
596 elif hasattr(source, 'read'):
597 tmp = tempfile.NamedTemporaryFile()
599 buf = source.read(65536)
607 if isinstance(source, file) or isinstance(dest, file) \
608 or hasattr(source, 'read') or hasattr(dest, 'write'):
611 # Parse source/destination as <user>@<server>:<path>
612 if isinstance(dest, basestring) and ':' in dest:
613 remspec, path = dest.split(':',1)
614 elif isinstance(source, basestring) and ':' in source:
615 remspec, path = source.split(':',1)
617 raise ValueError, "Both endpoints cannot be local"
618 user,host = remspec.rsplit('@',1)
619 tmp_known_hosts = None
621 args = ['ssh', '-l', user, '-C',
622 # Don't bother with localhost. Makes test easier
623 '-o', 'NoHostAuthenticationForLocalhost=yes',
626 args.append('-P%d' % port)
628 args.extend(('-i', ident_key))
630 # Create a temporary server key file
631 tmp_known_hosts = _make_server_key_args(
632 server_key, host, port, args)
634 if isinstance(source, file) or hasattr(source, 'read'):
635 args.append('cat > %s' % (shell_escape(path),))
636 elif isinstance(dest, file) or hasattr(dest, 'write'):
637 args.append('cat %s' % (shell_escape(path),))
639 raise AssertionError, "Unreachable code reached! :-Q"
641 # connects to the remote host and starts a remote connection
642 if isinstance(source, file):
643 proc = subprocess.Popen(args,
644 stdout = open('/dev/null','w'),
645 stderr = subprocess.PIPE,
647 err = proc.stderr.read()
648 proc._known_hosts = tmp_known_hosts
649 eintr_retry(proc.wait)()
650 return ((None,err), proc)
651 elif isinstance(dest, file):
652 proc = subprocess.Popen(args,
653 stdout = open('/dev/null','w'),
654 stderr = subprocess.PIPE,
656 err = proc.stderr.read()
657 proc._known_hosts = tmp_known_hosts
658 eintr_retry(proc.wait)()
659 return ((None,err), proc)
660 elif hasattr(source, 'read'):
661 # file-like (but not file) source
662 proc = subprocess.Popen(args,
663 stdout = open('/dev/null','w'),
664 stderr = subprocess.PIPE,
665 stdin = subprocess.PIPE)
671 buf = source.read(4096)
676 rdrdy, wrdy, broken = select.select(
679 [proc.stderr,proc.stdin])
681 if proc.stderr in rdrdy:
682 # use os.read for fully unbuffered behavior
683 err.append(os.read(proc.stderr.fileno(), 4096))
685 if proc.stdin in wrdy:
686 proc.stdin.write(buf)
692 err.append(proc.stderr.read())
694 proc._known_hosts = tmp_known_hosts
695 eintr_retry(proc.wait)()
696 return ((None,''.join(err)), proc)
697 elif hasattr(dest, 'write'):
698 # file-like (but not file) dest
699 proc = subprocess.Popen(args,
700 stdout = subprocess.PIPE,
701 stderr = subprocess.PIPE,
702 stdin = open('/dev/null','w'))
707 rdrdy, wrdy, broken = select.select(
708 [proc.stderr, proc.stdout],
710 [proc.stderr, proc.stdout])
712 if proc.stderr in rdrdy:
713 # use os.read for fully unbuffered behavior
714 err.append(os.read(proc.stderr.fileno(), 4096))
716 if proc.stdout in rdrdy:
717 # use os.read for fully unbuffered behavior
718 buf = os.read(proc.stdout.fileno(), 4096)
727 err.append(proc.stderr.read())
729 proc._known_hosts = tmp_known_hosts
730 eintr_retry(proc.wait)()
731 return ((None,''.join(err)), proc)
733 raise AssertionError, "Unreachable code reached! :-Q"
735 # Parse destination as <user>@<server>:<path>
736 if isinstance(dest, basestring) and ':' in dest:
737 remspec, path = dest.split(':',1)
738 elif isinstance(source, basestring) and ':' in source:
739 remspec, path = source.split(':',1)
741 raise ValueError, "Both endpoints cannot be local"
742 user,host = remspec.rsplit('@',1)
745 tmp_known_hosts = None
746 args = ['scp', '-q', '-p', '-C',
747 # Don't bother with localhost. Makes test easier
748 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
750 args.append('-P%d' % port)
754 args.extend(('-i', ident_key))
756 # Create a temporary server key file
757 tmp_known_hosts = _make_server_key_args(
758 server_key, host, port, args)
759 if isinstance(source,list):
765 # connects to the remote host and starts a remote connection
766 proc = subprocess.Popen(args,
767 stdout = subprocess.PIPE,
768 stdin = subprocess.PIPE,
769 stderr = subprocess.PIPE)
770 proc._known_hosts = tmp_known_hosts
772 comm = proc.communicate()
773 eintr_retry(proc.wait)()
776 def popen_ssh_subprocess(python_code, host, port, user, agent,
781 environment_setup = "",
782 waitcommand = False):
785 python_path.replace("'", r"'\''")
786 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
788 if environment_setup:
789 cmd += environment_setup
791 # Uncomment for debug (to run everything under strace)
792 # We had to verify if strace works (cannot nest them)
793 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
795 #cmd += "strace -f -tt -s 200 -o strace$$.out "
797 cmd += "import base64, os\n"
798 cmd += "cmd = \"\"\n"
799 cmd += "while True:\n"
800 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
801 cmd += " if cmd[-1] == \"\\n\": break\n"
802 cmd += "cmd = base64.b64decode(cmd)\n"
803 # Uncomment for debug
804 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
806 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
809 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
812 tmp_known_hosts = None
814 # Don't bother with localhost. Makes test easier
815 '-o', 'NoHostAuthenticationForLocalhost=yes',
820 args.append('-p%d' % port)
822 args.extend(('-i', ident_key))
826 # Create a temporary server key file
827 tmp_known_hosts = _make_server_key_args(
828 server_key, host, port, args)
831 # connects to the remote host and starts a remote rpyc connection
832 proc = subprocess.Popen(args,
833 stdout = subprocess.PIPE,
834 stdin = subprocess.PIPE,
835 stderr = subprocess.PIPE)
836 proc._known_hosts = tmp_known_hosts
838 # send the command to execute
839 os.write(proc.stdin.fileno(),
840 base64.b64encode(python_code) + "\n")
841 msg = os.read(proc.stdout.fileno(), 3)
843 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
844 msg, proc.stdout.read(), proc.stderr.read())