2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except (select.error, socket.error), args:
64 if args[0] == errno.EINTR:
69 if e.errno == errno.EINTR:
78 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
79 self._root_dir = root_dir
81 self._ctrl_sock = None
82 self._log_level = log_level
84 self._environment_setup = environment_setup
93 # can not return normally after fork beacuse no exec was done.
94 # This means that if we don't do a os._exit(0) here the code that
95 # follows the call to "Server.run()" in the "caller code" will be
96 # executed... but by now it has already been executed after the
97 # first process (the one that did the first fork) returned.
105 # pipes for process synchronization
109 root = os.path.normpath(self._root_dir)
110 if not os.path.exists(root):
111 os.makedirs(root, 0755)
119 except OSError, e: # pragma: no cover
120 if e.errno == errno.EINTR:
126 # os.waitpid avoids leaving a <defunc> (zombie) process
127 st = os.waitpid(pid1, 0)[1]
129 raise RuntimeError("Daemonization failed")
130 # return 0 to inform the caller method that this is not the
135 # Decouple from parent environment.
136 os.chdir(self._root_dir)
143 # see ref: "os._exit(0)"
146 # close all open file descriptors.
147 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148 if (max_fd == resource.RLIM_INFINITY):
150 for fd in range(3, max_fd):
157 # Redirect standard file descriptors.
158 stdin = open(DEV_NULL, "r")
159 stderr = stdout = open(STD_ERR, "a", 0)
160 os.dup2(stdin.fileno(), sys.stdin.fileno())
161 # NOTE: sys.stdout.write will still be buffered, even if the file
162 # was opened with 0 buffer
163 os.dup2(stdout.fileno(), sys.stdout.fileno())
164 os.dup2(stderr.fileno(), sys.stderr.fileno())
167 if self._environment_setup:
168 # parse environment variables and pass to child process
169 # do it by executing shell commands, in case there's some heavy setup involved
170 envproc = subprocess.Popen(
172 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173 ( self._environment_setup, ) ],
174 stdin = subprocess.PIPE,
175 stdout = subprocess.PIPE,
176 stderr = subprocess.PIPE
178 out,err = envproc.communicate()
180 # parse new environment
182 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
184 # apply to current environment
185 for name, value in environment.iteritems():
186 os.environ[name] = value
189 if 'PYTHONPATH' in environment:
190 sys.path = environment['PYTHONPATH'].split(':') + sys.path
192 # create control socket
193 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
194 self._ctrl_sock.bind(CTRL_SOCK)
195 self._ctrl_sock.listen(0)
197 # let the parent process know that the daemonization is finished
202 def post_daemonize(self):
206 while not self._stop:
207 conn, addr = self._ctrl_sock.accept()
209 while not self._stop:
211 msg = self.recv_msg(conn)
212 except socket.timeout, e:
218 reply = self.stop_action()
220 reply = self.reply_action(msg)
223 self.send_reply(conn, reply)
226 self.log_error("NOTICE: Awaiting for reconnection")
234 def recv_msg(self, conn):
237 while '\n' not in chunk:
239 chunk = conn.recv(1024)
240 except (OSError, socket.error), e:
241 if e[0] != errno.EINTR:
250 data = ''.join(data).split('\n',1)
253 data, self._rdbuf = data
255 decoded = base64.b64decode(data)
256 return decoded.rstrip()
258 def send_reply(self, conn, reply):
259 encoded = base64.b64encode(reply)
260 conn.send("%s\n" % encoded)
264 self._ctrl_sock.close()
269 def stop_action(self):
270 return "Stopping server"
272 def reply_action(self, msg):
273 return "Reply to: %s" % msg
275 def log_error(self, text = None, context = ''):
277 text = traceback.format_exc()
278 date = time.strftime("%Y-%m-%d %H:%M:%S")
280 context = " (%s)" % (context,)
281 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
284 def log_debug(self, text):
285 if self._log_level == DEBUG_LEVEL:
286 date = time.strftime("%Y-%m-%d %H:%M:%S")
287 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
289 class Forwarder(object):
290 def __init__(self, root_dir = "."):
291 self._ctrl_sock = None
292 self._root_dir = root_dir
298 print >>sys.stderr, "READY."
299 while not self._stop:
300 data = self.read_data()
301 self.send_to_server(data)
302 data = self.recv_from_server()
303 self.write_data(data)
307 return sys.stdin.readline()
309 def write_data(self, data):
310 sys.stdout.write(data)
311 # sys.stdout.write is buffered, this is why we need to do a flush()
314 def send_to_server(self, data):
316 self._ctrl_sock.send(data)
317 except (IOError, socket.error), e:
318 if e[0] == errno.EPIPE:
320 self._ctrl_sock.send(data)
323 encoded = data.rstrip()
324 msg = base64.b64decode(encoded)
328 def recv_from_server(self):
331 while '\n' not in chunk:
333 chunk = self._ctrl_sock.recv(1024)
334 except (OSError, socket.error), e:
335 if e[0] != errno.EINTR:
343 data = ''.join(data).split('\n',1)
346 data, self._rdbuf = data
352 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
353 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
354 self._ctrl_sock.connect(sock_addr)
356 def disconnect(self):
358 self._ctrl_sock.close()
362 class Client(object):
363 def __init__(self, root_dir = ".", host = None, port = None, user = None,
364 agent = None, environment_setup = ""):
365 self.root_dir = root_dir
366 self.addr = (host, port)
369 self.environment_setup = environment_setup
370 self._stopped = False
371 self._deferreds = collections.deque()
375 if self._process.poll() is None:
376 os.kill(self._process.pid, signal.SIGTERM)
380 root_dir = self.root_dir
381 (host, port) = self.addr
385 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
386 c.forward()" % (root_dir,)
388 self._process = popen_ssh_subprocess(python_code, host, port,
390 environment_setup = self.environment_setup)
391 # popen_ssh_subprocess already waits for readiness
392 if self._process.poll():
393 err = proc.stderr.read()
394 raise RuntimeError("Client could not be reached: %s" % \
397 self._process = subprocess.Popen(
398 ["python", "-c", python_code],
399 stdin = subprocess.PIPE,
400 stdout = subprocess.PIPE,
401 stderr = subprocess.PIPE
404 # Wait for the forwarder to be ready, otherwise nobody
405 # will be able to connect to it
406 helo = self._process.stderr.readline()
407 if helo != 'READY.\n':
408 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
409 helo + self._process.stderr.read())
411 def send_msg(self, msg):
412 encoded = base64.b64encode(msg)
413 data = "%s\n" % encoded
416 self._process.stdin.write(data)
417 except (IOError, ValueError):
418 # dead process, poll it to un-zombify
421 # try again after reconnect
422 # If it fails again, though, give up
424 self._process.stdin.write(data)
427 self.send_msg(STOP_MSG)
430 def defer_reply(self, transform=None):
432 self._deferreds.append(defer_entry)
434 functools.partial(self.read_reply, defer_entry, transform)
437 def _read_reply(self):
438 data = self._process.stdout.readline()
439 encoded = data.rstrip()
441 # empty == eof == dead process, poll it to un-zombify
444 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
445 return base64.b64decode(encoded)
447 def read_reply(self, which=None, transform=None):
448 # Test to see if someone did it already
449 if which is not None and len(which):
451 # ...just return the deferred value
453 return transform(which[0])
457 # Process all deferreds until the one we're looking for
458 # or until the queue is empty
459 while self._deferreds:
461 deferred = self._deferreds.popleft()
466 deferred.append(self._read_reply())
467 if deferred is which:
468 # We reached the one we were looking for
470 return transform(deferred[0])
475 # They've requested a synchronous read
477 return transform(self._read_reply())
479 return self._read_reply()
481 def _make_server_key_args(server_key, host, port, args):
483 Returns a reference to the created temporary file, and adds the
484 corresponding arguments to the given argument list.
486 Make sure to hold onto it until the process is done with the file
489 host = '%s:%s' % (host,port)
490 # Create a temporary server key file
491 tmp_known_hosts = tempfile.NamedTemporaryFile()
493 # Add the intended host key
494 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
496 # If we're not in strict mode, add user-configured keys
497 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
498 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
499 if os.access(user_hosts_path, os.R_OK):
500 f = open(user_hosts_path, "r")
501 tmp_known_hosts.write(f.read())
504 tmp_known_hosts.flush()
506 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
508 return tmp_known_hosts
510 def popen_ssh_command(command, host, port, user, agent,
516 Executes a remote commands, returns ((stdout,stderr),process)
519 print "ssh", host, command
521 tmp_known_hosts = None
523 # Don't bother with localhost. Makes test easier
524 '-o', 'NoHostAuthenticationForLocalhost=yes',
529 args.append('-p%d' % port)
531 args.extend(('-i', ident_key))
535 # Create a temporary server key file
536 tmp_known_hosts = _make_server_key_args(
537 server_key, host, port, args)
540 # connects to the remote host and starts a remote connection
541 proc = subprocess.Popen(args,
542 stdout = subprocess.PIPE,
543 stdin = subprocess.PIPE,
544 stderr = subprocess.PIPE)
546 # attach tempfile object to the process, to make sure the file stays
547 # alive until the process is finished with it
548 proc._known_hosts = tmp_known_hosts
550 out, err = proc.communicate(stdin)
552 print " -> ", out, err
554 return ((out, err), proc)
556 def popen_scp(source, dest,
563 Copies from/to remote sites.
565 Source and destination should have the user and host encoded
568 If source is a file object, a special mode will be used to
569 create the remote file with the same contents.
571 If dest is a file object, the remote file (source) will be
572 read and written into dest.
574 In these modes, recursive cannot be True.
576 Source can be a list of files to copy to a single destination,
577 in which case it is advised that the destination be a folder.
581 print "scp", source, dest
583 if isinstance(source, file) or isinstance(dest, file) \
584 or hasattr(source, 'read') or hasattr(dest, 'write'):
587 # Parse source/destination as <user>@<server>:<path>
588 if isinstance(dest, basestring) and ':' in dest:
589 remspec, path = dest.split(':',1)
590 elif isinstance(source, basestring) and ':' in source:
591 remspec, path = source.split(':',1)
593 raise ValueError, "Both endpoints cannot be local"
594 user,host = remspec.rsplit('@',1)
595 tmp_known_hosts = None
597 args = ['ssh', '-l', user, '-C',
598 # Don't bother with localhost. Makes test easier
599 '-o', 'NoHostAuthenticationForLocalhost=yes',
602 args.append('-P%d' % port)
604 args.extend(('-i', ident_key))
606 # Create a temporary server key file
607 tmp_known_hosts = _make_server_key_args(
608 server_key, host, port, args)
610 if isinstance(source, file) or hasattr(source, 'read'):
611 args.append('cat > %s' % (shell_escape(path),))
612 elif isinstance(dest, file) or hasattr(dest, 'write'):
613 args.append('cat %s' % (shell_escape(path),))
615 raise AssertionError, "Unreachable code reached! :-Q"
617 # connects to the remote host and starts a remote connection
618 if isinstance(source, file):
619 proc = subprocess.Popen(args,
620 stdout = open('/dev/null','w'),
621 stderr = subprocess.PIPE,
623 err = proc.stderr.read()
624 proc._known_hosts = tmp_known_hosts
625 eintr_retry(proc.wait)()
626 return ((None,err), proc)
627 elif isinstance(dest, file):
628 proc = subprocess.Popen(args,
629 stdout = open('/dev/null','w'),
630 stderr = subprocess.PIPE,
632 err = proc.stderr.read()
633 proc._known_hosts = tmp_known_hosts
634 eintr_retry(proc.wait)()
635 return ((None,err), proc)
636 elif hasattr(source, 'read'):
637 # file-like (but not file) source
638 proc = subprocess.Popen(args,
639 stdout = open('/dev/null','w'),
640 stderr = subprocess.PIPE,
641 stdin = subprocess.PIPE)
647 buf = source.read(4096)
652 rdrdy, wrdy, broken = select.select(
655 [proc.stderr,proc.stdin])
657 if proc.stderr in rdrdy:
658 # use os.read for fully unbuffered behavior
659 err.append(os.read(proc.stderr.fileno(), 4096))
661 if proc.stdin in wrdy:
662 proc.stdin.write(buf)
668 err.append(proc.stderr.read())
670 proc._known_hosts = tmp_known_hosts
671 eintr_retry(proc.wait)()
672 return ((None,''.join(err)), proc)
673 elif hasattr(dest, 'write'):
674 # file-like (but not file) dest
675 proc = subprocess.Popen(args,
676 stdout = subprocess.PIPE,
677 stderr = subprocess.PIPE,
678 stdin = open('/dev/null','w'))
683 rdrdy, wrdy, broken = select.select(
684 [proc.stderr, proc.stdout],
686 [proc.stderr, proc.stdout])
688 if proc.stderr in rdrdy:
689 # use os.read for fully unbuffered behavior
690 err.append(os.read(proc.stderr.fileno(), 4096))
692 if proc.stdout in rdrdy:
693 # use os.read for fully unbuffered behavior
694 buf = os.read(proc.stdout.fileno(), 4096)
703 err.append(proc.stderr.read())
705 proc._known_hosts = tmp_known_hosts
706 eintr_retry(proc.wait)()
707 return ((None,''.join(err)), proc)
709 raise AssertionError, "Unreachable code reached! :-Q"
711 # Parse destination as <user>@<server>:<path>
712 if isinstance(dest, basestring) and ':' in dest:
713 remspec, path = dest.split(':',1)
714 elif isinstance(source, basestring) and ':' in source:
715 remspec, path = source.split(':',1)
717 raise ValueError, "Both endpoints cannot be local"
718 user,host = remspec.rsplit('@',1)
721 tmp_known_hosts = None
722 args = ['scp', '-q', '-p', '-C',
723 # Don't bother with localhost. Makes test easier
724 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
726 args.append('-P%d' % port)
730 args.extend(('-i', ident_key))
732 # Create a temporary server key file
733 tmp_known_hosts = _make_server_key_args(
734 server_key, host, port, args)
735 if isinstance(source,list):
741 # connects to the remote host and starts a remote connection
742 proc = subprocess.Popen(args,
743 stdout = subprocess.PIPE,
744 stdin = subprocess.PIPE,
745 stderr = subprocess.PIPE)
746 proc._known_hosts = tmp_known_hosts
748 comm = proc.communicate()
749 eintr_retry(proc.wait)()
752 def popen_ssh_subprocess(python_code, host, port, user, agent,
757 environment_setup = "",
758 waitcommand = False):
761 python_path.replace("'", r"'\''")
762 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
764 if environment_setup:
765 cmd += environment_setup
767 # Uncomment for debug (to run everything under strace)
768 # We had to verify if strace works (cannot nest them)
769 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
771 #cmd += "strace -f -tt -s 200 -o strace$$.out "
773 cmd += "import base64, os\n"
774 cmd += "cmd = \"\"\n"
775 cmd += "while True:\n"
776 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
777 cmd += " if cmd[-1] == \"\\n\": break\n"
778 cmd += "cmd = base64.b64decode(cmd)\n"
779 # Uncomment for debug
780 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
782 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
785 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
788 tmp_known_hosts = None
790 # Don't bother with localhost. Makes test easier
791 '-o', 'NoHostAuthenticationForLocalhost=yes',
796 args.append('-p%d' % port)
798 args.extend(('-i', ident_key))
802 # Create a temporary server key file
803 tmp_known_hosts = _make_server_key_args(
804 server_key, host, port, args)
807 # connects to the remote host and starts a remote rpyc connection
808 proc = subprocess.Popen(args,
809 stdout = subprocess.PIPE,
810 stdin = subprocess.PIPE,
811 stderr = subprocess.PIPE)
812 proc._known_hosts = tmp_known_hosts
814 # send the command to execute
815 os.write(proc.stdin.fileno(),
816 base64.b64encode(python_code) + "\n")
817 msg = os.read(proc.stdout.fileno(), 3)
819 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
820 msg, proc.stdout.read(), proc.stderr.read())