2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except (select.error, socket.error), args:
64 if args[0] == errno.EINTR:
69 if e.errno == errno.EINTR:
78 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None):
79 self._root_dir = root_dir
81 self._ctrl_sock = None
82 self._log_level = log_level
84 self._environment_setup = environment_setup
93 # can not return normally after fork beacuse no exec was done.
94 # This means that if we don't do a os._exit(0) here the code that
95 # follows the call to "Server.run()" in the "caller code" will be
96 # executed... but by now it has already been executed after the
97 # first process (the one that did the first fork) returned.
105 # pipes for process synchronization
109 root = os.path.normpath(self._root_dir)
110 if not os.path.exists(root):
111 os.makedirs(root, 0755)
119 except OSError, e: # pragma: no cover
120 if e.errno == errno.EINTR:
126 # os.waitpid avoids leaving a <defunc> (zombie) process
127 st = os.waitpid(pid1, 0)[1]
129 raise RuntimeError("Daemonization failed")
130 # return 0 to inform the caller method that this is not the
135 # Decouple from parent environment.
136 os.chdir(self._root_dir)
143 # see ref: "os._exit(0)"
146 # close all open file descriptors.
147 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148 if (max_fd == resource.RLIM_INFINITY):
150 for fd in range(3, max_fd):
157 # Redirect standard file descriptors.
158 stdin = open(DEV_NULL, "r")
159 stderr = stdout = open(STD_ERR, "a", 0)
160 os.dup2(stdin.fileno(), sys.stdin.fileno())
161 # NOTE: sys.stdout.write will still be buffered, even if the file
162 # was opened with 0 buffer
163 os.dup2(stdout.fileno(), sys.stdout.fileno())
164 os.dup2(stderr.fileno(), sys.stderr.fileno())
167 if self._environment_setup:
168 # parse environment variables and pass to child process
169 # do it by executing shell commands, in case there's some heavy setup involved
170 envproc = subprocess.Popen(
172 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173 ( self._environment_setup, ) ],
174 stdin = subprocess.PIPE,
175 stdout = subprocess.PIPE,
176 stderr = subprocess.PIPE
178 out,err = envproc.communicate()
180 # parse new environment
181 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
183 # apply to current environment
184 for name, value in environment.iteritems():
185 os.environ[name] = value
188 if 'PYTHONPATH' in environment:
189 sys.path = environment['PYTHONPATH'].split(':') + sys.path
191 # create control socket
192 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
193 self._ctrl_sock.bind(CTRL_SOCK)
194 self._ctrl_sock.listen(0)
196 # let the parent process know that the daemonization is finished
201 def post_daemonize(self):
205 while not self._stop:
206 conn, addr = self._ctrl_sock.accept()
208 while not self._stop:
210 msg = self.recv_msg(conn)
211 except socket.timeout, e:
217 reply = self.stop_action()
219 reply = self.reply_action(msg)
222 self.send_reply(conn, reply)
225 self.log_error("NOTICE: Awaiting for reconnection")
233 def recv_msg(self, conn):
236 while '\n' not in chunk:
238 chunk = conn.recv(1024)
239 except (OSError, socket.error), e:
240 if e[0] != errno.EINTR:
249 data = ''.join(data).split('\n',1)
252 data, self._rdbuf = data
254 decoded = base64.b64decode(data)
255 return decoded.rstrip()
257 def send_reply(self, conn, reply):
258 encoded = base64.b64encode(reply)
259 conn.send("%s\n" % encoded)
263 self._ctrl_sock.close()
268 def stop_action(self):
269 return "Stopping server"
271 def reply_action(self, msg):
272 return "Reply to: %s" % msg
274 def log_error(self, text = None, context = ''):
276 text = traceback.format_exc()
277 date = time.strftime("%Y-%m-%d %H:%M:%S")
279 context = " (%s)" % (context,)
280 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
283 def log_debug(self, text):
284 if self._log_level == DEBUG_LEVEL:
285 date = time.strftime("%Y-%m-%d %H:%M:%S")
286 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
288 class Forwarder(object):
289 def __init__(self, root_dir = "."):
290 self._ctrl_sock = None
291 self._root_dir = root_dir
297 print >>sys.stderr, "READY."
298 while not self._stop:
299 data = self.read_data()
300 self.send_to_server(data)
301 data = self.recv_from_server()
302 self.write_data(data)
306 return sys.stdin.readline()
308 def write_data(self, data):
309 sys.stdout.write(data)
310 # sys.stdout.write is buffered, this is why we need to do a flush()
313 def send_to_server(self, data):
315 self._ctrl_sock.send(data)
316 except (IOError, socket.error), e:
317 if e[0] == errno.EPIPE:
319 self._ctrl_sock.send(data)
322 encoded = data.rstrip()
323 msg = base64.b64decode(encoded)
327 def recv_from_server(self):
330 while '\n' not in chunk:
332 chunk = self._ctrl_sock.recv(1024)
333 except (OSError, socket.error), e:
334 if e[0] != errno.EINTR:
342 data = ''.join(data).split('\n',1)
345 data, self._rdbuf = data
351 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
352 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
353 self._ctrl_sock.connect(sock_addr)
355 def disconnect(self):
357 self._ctrl_sock.close()
361 class Client(object):
362 def __init__(self, root_dir = ".", host = None, port = None, user = None,
363 agent = None, environment_setup = ""):
364 self.root_dir = root_dir
365 self.addr = (host, port)
368 self.environment_setup = environment_setup
369 self._stopped = False
370 self._deferreds = collections.deque()
374 if self._process.poll() is None:
375 os.kill(self._process.pid, signal.SIGTERM)
379 root_dir = self.root_dir
380 (host, port) = self.addr
384 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
385 c.forward()" % (root_dir,)
387 self._process = popen_ssh_subprocess(python_code, host, port,
389 environment_setup = self.environment_setup)
390 # popen_ssh_subprocess already waits for readiness
391 if self._process.poll():
392 err = proc.stderr.read()
393 raise RuntimeError("Client could not be reached: %s" % \
396 self._process = subprocess.Popen(
397 ["python", "-c", python_code],
398 stdin = subprocess.PIPE,
399 stdout = subprocess.PIPE,
400 stderr = subprocess.PIPE
403 # Wait for the forwarder to be ready, otherwise nobody
404 # will be able to connect to it
405 helo = self._process.stderr.readline()
406 if helo != 'READY.\n':
407 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
408 helo + self._process.stderr.read())
410 def send_msg(self, msg):
411 encoded = base64.b64encode(msg)
412 data = "%s\n" % encoded
415 self._process.stdin.write(data)
416 except (IOError, ValueError):
417 # dead process, poll it to un-zombify
420 # try again after reconnect
421 # If it fails again, though, give up
423 self._process.stdin.write(data)
426 self.send_msg(STOP_MSG)
429 def defer_reply(self, transform=None):
431 self._deferreds.append(defer_entry)
433 functools.partial(self.read_reply, defer_entry, transform)
436 def _read_reply(self):
437 data = self._process.stdout.readline()
438 encoded = data.rstrip()
440 # empty == eof == dead process, poll it to un-zombify
443 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
444 return base64.b64decode(encoded)
446 def read_reply(self, which=None, transform=None):
447 # Test to see if someone did it already
448 if which is not None and len(which):
450 # ...just return the deferred value
452 return transform(which[0])
456 # Process all deferreds until the one we're looking for
457 # or until the queue is empty
458 while self._deferreds:
460 deferred = self._deferreds.popleft()
465 deferred.append(self._read_reply())
466 if deferred is which:
467 # We reached the one we were looking for
469 return transform(deferred[0])
474 # They've requested a synchronous read
476 return transform(self._read_reply())
478 return self._read_reply()
480 def _make_server_key_args(server_key, host, port, args):
482 Returns a reference to the created temporary file, and adds the
483 corresponding arguments to the given argument list.
485 Make sure to hold onto it until the process is done with the file
488 host = '%s:%s' % (host,port)
489 # Create a temporary server key file
490 tmp_known_hosts = tempfile.NamedTemporaryFile()
492 # Add the intended host key
493 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
495 # If we're not in strict mode, add user-configured keys
496 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
497 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
498 if os.access(user_hosts_path, os.R_OK):
499 f = open(user_hosts_path, "r")
500 tmp_known_hosts.write(f.read())
503 tmp_known_hosts.flush()
505 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
507 return tmp_known_hosts
509 def popen_ssh_command(command, host, port, user, agent,
515 Executes a remote commands, returns ((stdout,stderr),process)
518 print "ssh", host, command
520 tmp_known_hosts = None
522 # Don't bother with localhost. Makes test easier
523 '-o', 'NoHostAuthenticationForLocalhost=yes',
528 args.append('-p%d' % port)
530 args.extend(('-i', ident_key))
534 # Create a temporary server key file
535 tmp_known_hosts = _make_server_key_args(
536 server_key, host, port, args)
539 # connects to the remote host and starts a remote connection
540 proc = subprocess.Popen(args,
541 stdout = subprocess.PIPE,
542 stdin = subprocess.PIPE,
543 stderr = subprocess.PIPE)
545 # attach tempfile object to the process, to make sure the file stays
546 # alive until the process is finished with it
547 proc._known_hosts = tmp_known_hosts
549 out, err = proc.communicate(stdin)
551 print " -> ", out, err
553 return ((out, err), proc)
555 def popen_scp(source, dest,
562 Copies from/to remote sites.
564 Source and destination should have the user and host encoded
567 If source is a file object, a special mode will be used to
568 create the remote file with the same contents.
570 If dest is a file object, the remote file (source) will be
571 read and written into dest.
573 In these modes, recursive cannot be True.
575 Source can be a list of files to copy to a single destination,
576 in which case it is advised that the destination be a folder.
580 print "scp", source, dest
582 if isinstance(source, file) or isinstance(dest, file) \
583 or hasattr(source, 'read') or hasattr(dest, 'write'):
586 # Parse source/destination as <user>@<server>:<path>
587 if isinstance(dest, basestring) and ':' in dest:
588 remspec, path = dest.split(':',1)
589 elif isinstance(source, basestring) and ':' in source:
590 remspec, path = source.split(':',1)
592 raise ValueError, "Both endpoints cannot be local"
593 user,host = remspec.rsplit('@',1)
594 tmp_known_hosts = None
596 args = ['ssh', '-l', user, '-C',
597 # Don't bother with localhost. Makes test easier
598 '-o', 'NoHostAuthenticationForLocalhost=yes',
601 args.append('-P%d' % port)
603 args.extend(('-i', ident_key))
605 # Create a temporary server key file
606 tmp_known_hosts = _make_server_key_args(
607 server_key, host, port, args)
609 if isinstance(source, file) or hasattr(source, 'read'):
610 args.append('cat > %s' % (shell_escape(path),))
611 elif isinstance(dest, file) or hasattr(dest, 'write'):
612 args.append('cat %s' % (shell_escape(path),))
614 raise AssertionError, "Unreachable code reached! :-Q"
616 # connects to the remote host and starts a remote connection
617 if isinstance(source, file):
618 proc = subprocess.Popen(args,
619 stdout = open('/dev/null','w'),
620 stderr = subprocess.PIPE,
622 err = proc.stderr.read()
623 proc._known_hosts = tmp_known_hosts
624 eintr_retry(proc.wait)()
625 return ((None,err), proc)
626 elif isinstance(dest, file):
627 proc = subprocess.Popen(args,
628 stdout = open('/dev/null','w'),
629 stderr = subprocess.PIPE,
631 err = proc.stderr.read()
632 proc._known_hosts = tmp_known_hosts
633 eintr_retry(proc.wait)()
634 return ((None,err), proc)
635 elif hasattr(source, 'read'):
636 # file-like (but not file) source
637 proc = subprocess.Popen(args,
638 stdout = open('/dev/null','w'),
639 stderr = subprocess.PIPE,
640 stdin = subprocess.PIPE)
646 buf = source.read(4096)
651 rdrdy, wrdy, broken = select.select(
654 [proc.stderr,proc.stdin])
656 if proc.stderr in rdrdy:
657 # use os.read for fully unbuffered behavior
658 err.append(os.read(proc.stderr.fileno(), 4096))
660 if proc.stdin in wrdy:
661 proc.stdin.write(buf)
667 err.append(proc.stderr.read())
669 proc._known_hosts = tmp_known_hosts
670 eintr_retry(proc.wait)()
671 return ((None,''.join(err)), proc)
672 elif hasattr(dest, 'write'):
673 # file-like (but not file) dest
674 proc = subprocess.Popen(args,
675 stdout = subprocess.PIPE,
676 stderr = subprocess.PIPE,
677 stdin = open('/dev/null','w'))
682 rdrdy, wrdy, broken = select.select(
683 [proc.stderr, proc.stdout],
685 [proc.stderr, proc.stdout])
687 if proc.stderr in rdrdy:
688 # use os.read for fully unbuffered behavior
689 err.append(os.read(proc.stderr.fileno(), 4096))
691 if proc.stdout in rdrdy:
692 # use os.read for fully unbuffered behavior
693 buf = os.read(proc.stdout.fileno(), 4096)
702 err.append(proc.stderr.read())
704 proc._known_hosts = tmp_known_hosts
705 eintr_retry(proc.wait)()
706 return ((None,''.join(err)), proc)
708 raise AssertionError, "Unreachable code reached! :-Q"
710 # Parse destination as <user>@<server>:<path>
711 if isinstance(dest, basestring) and ':' in dest:
712 remspec, path = dest.split(':',1)
713 elif isinstance(source, basestring) and ':' in source:
714 remspec, path = source.split(':',1)
716 raise ValueError, "Both endpoints cannot be local"
717 user,host = remspec.rsplit('@',1)
720 tmp_known_hosts = None
721 args = ['scp', '-q', '-p', '-C',
722 # Don't bother with localhost. Makes test easier
723 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
725 args.append('-P%d' % port)
729 args.extend(('-i', ident_key))
731 # Create a temporary server key file
732 tmp_known_hosts = _make_server_key_args(
733 server_key, host, port, args)
734 if isinstance(source,list):
740 # connects to the remote host and starts a remote connection
741 proc = subprocess.Popen(args,
742 stdout = subprocess.PIPE,
743 stdin = subprocess.PIPE,
744 stderr = subprocess.PIPE)
745 proc._known_hosts = tmp_known_hosts
747 comm = proc.communicate()
748 eintr_retry(proc.wait)()
751 def popen_ssh_subprocess(python_code, host, port, user, agent,
756 environment_setup = "",
757 waitcommand = False):
760 python_path.replace("'", r"'\''")
761 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
763 if environment_setup:
764 cmd += environment_setup
766 # Uncomment for debug (to run everything under strace)
767 # We had to verify if strace works (cannot nest them)
768 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
770 #cmd += "strace -f -tt -s 200 -o strace$$.out "
772 cmd += "import base64, os\n"
773 cmd += "cmd = \"\"\n"
774 cmd += "while True:\n"
775 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
776 cmd += " if cmd[-1] == \"\\n\": break\n"
777 cmd += "cmd = base64.b64decode(cmd)\n"
778 # Uncomment for debug
779 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
781 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
784 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
787 tmp_known_hosts = None
789 # Don't bother with localhost. Makes test easier
790 '-o', 'NoHostAuthenticationForLocalhost=yes',
795 args.append('-p%d' % port)
797 args.extend(('-i', ident_key))
801 # Create a temporary server key file
802 tmp_known_hosts = _make_server_key_args(
803 server_key, host, port, args)
806 # connects to the remote host and starts a remote rpyc connection
807 proc = subprocess.Popen(args,
808 stdout = subprocess.PIPE,
809 stdin = subprocess.PIPE,
810 stderr = subprocess.PIPE)
811 proc._known_hosts = tmp_known_hosts
813 # send the command to execute
814 os.write(proc.stdin.fileno(),
815 base64.b64encode(python_code) + "\n")
816 msg = os.read(proc.stdout.fileno(), 3)
818 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
819 msg, proc.stdout.read(), proc.stderr.read())