2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except select.error, args:
64 if args[0] == errno.EINTR:
69 if e.errno == errno.EINTR:
78 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
79 self._root_dir = root_dir
81 self._ctrl_sock = None
82 self._log_level = log_level
92 # can not return normally after fork beacuse no exec was done.
93 # This means that if we don't do a os._exit(0) here the code that
94 # follows the call to "Server.run()" in the "caller code" will be
95 # executed... but by now it has already been executed after the
96 # first process (the one that did the first fork) returned.
104 # pipes for process synchronization
108 root = os.path.normpath(self._root_dir)
109 if not os.path.exists(root):
110 os.makedirs(root, 0755)
118 except OSError, e: # pragma: no cover
119 if e.errno == errno.EINTR:
125 # os.waitpid avoids leaving a <defunc> (zombie) process
126 st = os.waitpid(pid1, 0)[1]
128 raise RuntimeError("Daemonization failed")
129 # return 0 to inform the caller method that this is not the
134 # Decouple from parent environment.
135 os.chdir(self._root_dir)
142 # see ref: "os._exit(0)"
145 # close all open file descriptors.
146 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
147 if (max_fd == resource.RLIM_INFINITY):
149 for fd in range(3, max_fd):
156 # Redirect standard file descriptors.
157 stdin = open(DEV_NULL, "r")
158 stderr = stdout = open(STD_ERR, "a", 0)
159 os.dup2(stdin.fileno(), sys.stdin.fileno())
160 # NOTE: sys.stdout.write will still be buffered, even if the file
161 # was opened with 0 buffer
162 os.dup2(stdout.fileno(), sys.stdout.fileno())
163 os.dup2(stderr.fileno(), sys.stderr.fileno())
165 # create control socket
166 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167 self._ctrl_sock.bind(CTRL_SOCK)
168 self._ctrl_sock.listen(0)
170 # let the parent process know that the daemonization is finished
175 def post_daemonize(self):
179 while not self._stop:
180 conn, addr = self._ctrl_sock.accept()
182 while not self._stop:
184 msg = self.recv_msg(conn)
185 except socket.timeout, e:
191 reply = self.stop_action()
193 reply = self.reply_action(msg)
196 self.send_reply(conn, reply)
199 self.log_error("NOTICE: Awaiting for reconnection")
207 def recv_msg(self, conn):
210 while '\n' not in chunk:
212 chunk = conn.recv(1024)
213 except socket.timeout:
216 if e.errno != errno.EINTR:
225 data = ''.join(data).split('\n',1)
228 data, self._rdbuf = data
230 decoded = base64.b64decode(data)
231 return decoded.rstrip()
233 def send_reply(self, conn, reply):
234 encoded = base64.b64encode(reply)
235 conn.send("%s\n" % encoded)
239 self._ctrl_sock.close()
244 def stop_action(self):
245 return "Stopping server"
247 def reply_action(self, msg):
248 return "Reply to: %s" % msg
250 def log_error(self, text = None, context = ''):
252 text = traceback.format_exc()
253 date = time.strftime("%Y-%m-%d %H:%M:%S")
255 context = " (%s)" % (context,)
256 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
259 def log_debug(self, text):
260 if self._log_level == DEBUG_LEVEL:
261 date = time.strftime("%Y-%m-%d %H:%M:%S")
262 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
264 class Forwarder(object):
265 def __init__(self, root_dir = "."):
266 self._ctrl_sock = None
267 self._root_dir = root_dir
273 print >>sys.stderr, "READY."
274 while not self._stop:
275 data = self.read_data()
276 self.send_to_server(data)
277 data = self.recv_from_server()
278 self.write_data(data)
282 return sys.stdin.readline()
284 def write_data(self, data):
285 sys.stdout.write(data)
286 # sys.stdout.write is buffered, this is why we need to do a flush()
289 def send_to_server(self, data):
291 self._ctrl_sock.send(data)
293 if e.errno == errno.EPIPE:
295 self._ctrl_sock.send(data)
298 encoded = data.rstrip()
299 msg = base64.b64decode(encoded)
303 def recv_from_server(self):
306 while '\n' not in chunk:
308 chunk = self._ctrl_sock.recv(1024)
310 if e.errno != errno.EINTR:
319 data = ''.join(data).split('\n',1)
322 data, self._rdbuf = data
328 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
329 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
330 self._ctrl_sock.connect(sock_addr)
332 def disconnect(self):
334 self._ctrl_sock.close()
338 class Client(object):
339 def __init__(self, root_dir = ".", host = None, port = None, user = None,
340 agent = None, environment_setup = ""):
341 self.root_dir = root_dir
342 self.addr = (host, port)
345 self.environment_setup = environment_setup
346 self._stopped = False
347 self._deferreds = collections.deque()
351 if self._process.poll() is None:
352 os.kill(self._process.pid, signal.SIGTERM)
356 root_dir = self.root_dir
357 (host, port) = self.addr
361 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
362 c.forward()" % (root_dir,)
364 self._process = popen_ssh_subprocess(python_code, host, port,
366 environment_setup = self.environment_setup)
367 # popen_ssh_subprocess already waits for readiness
368 if self._process.poll():
369 err = proc.stderr.read()
370 raise RuntimeError("Client could not be reached: %s" % \
373 self._process = subprocess.Popen(
374 ["python", "-c", python_code],
375 stdin = subprocess.PIPE,
376 stdout = subprocess.PIPE,
377 stderr = subprocess.PIPE
380 # Wait for the forwarder to be ready, otherwise nobody
381 # will be able to connect to it
382 helo = self._process.stderr.readline()
383 if helo != 'READY.\n':
384 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
385 helo + self._process.stderr.read())
387 def send_msg(self, msg):
388 encoded = base64.b64encode(msg)
389 data = "%s\n" % encoded
392 self._process.stdin.write(data)
393 except (IOError, ValueError):
394 # dead process, poll it to un-zombify
397 # try again after reconnect
398 # If it fails again, though, give up
400 self._process.stdin.write(data)
403 self.send_msg(STOP_MSG)
406 def defer_reply(self, transform=None):
408 self._deferreds.append(defer_entry)
410 functools.partial(self.read_reply, defer_entry, transform)
413 def _read_reply(self):
414 data = self._process.stdout.readline()
415 encoded = data.rstrip()
417 # empty == eof == dead process, poll it to un-zombify
420 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
421 return base64.b64decode(encoded)
423 def read_reply(self, which=None, transform=None):
424 # Test to see if someone did it already
425 if which is not None and len(which):
427 # ...just return the deferred value
429 return transform(which[0])
433 # Process all deferreds until the one we're looking for
434 # or until the queue is empty
435 while self._deferreds:
437 deferred = self._deferreds.popleft()
442 deferred.append(self._read_reply())
443 if deferred is which:
444 # We reached the one we were looking for
446 return transform(deferred[0])
451 # They've requested a synchronous read
453 return transform(self._read_reply())
455 return self._read_reply()
457 def _make_server_key_args(server_key, host, port, args):
459 Returns a reference to the created temporary file, and adds the
460 corresponding arguments to the given argument list.
462 Make sure to hold onto it until the process is done with the file
465 host = '%s:%s' % (host,port)
466 # Create a temporary server key file
467 tmp_known_hosts = tempfile.NamedTemporaryFile()
469 # Add the intended host key
470 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
472 # If we're not in strict mode, add user-configured keys
473 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
474 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
475 if os.access(user_hosts_path, os.R_OK):
476 f = open(user_hosts_path, "r")
477 tmp_known_hosts.write(f.read())
480 tmp_known_hosts.flush()
482 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
484 return tmp_known_hosts
486 def popen_ssh_command(command, host, port, user, agent,
492 Executes a remote commands, returns ((stdout,stderr),process)
495 print "ssh", host, command
497 tmp_known_hosts = None
499 # Don't bother with localhost. Makes test easier
500 '-o', 'NoHostAuthenticationForLocalhost=yes',
505 args.append('-p%d' % port)
507 args.extend(('-i', ident_key))
511 # Create a temporary server key file
512 tmp_known_hosts = _make_server_key_args(
513 server_key, host, port, args)
516 # connects to the remote host and starts a remote connection
517 proc = subprocess.Popen(args,
518 stdout = subprocess.PIPE,
519 stdin = subprocess.PIPE,
520 stderr = subprocess.PIPE)
522 # attach tempfile object to the process, to make sure the file stays
523 # alive until the process is finished with it
524 proc._known_hosts = tmp_known_hosts
526 out, err = proc.communicate(stdin)
528 print " -> ", out, err
530 return ((out, err), proc)
532 def popen_scp(source, dest,
539 Copies from/to remote sites.
541 Source and destination should have the user and host encoded
544 If source is a file object, a special mode will be used to
545 create the remote file with the same contents.
547 If dest is a file object, the remote file (source) will be
548 read and written into dest.
550 In these modes, recursive cannot be True.
552 Source can be a list of files to copy to a single destination,
553 in which case it is advised that the destination be a folder.
557 print "scp", source, dest
559 if isinstance(source, file) or isinstance(dest, file) \
560 or hasattr(source, 'read') or hasattr(dest, 'write'):
563 # Parse source/destination as <user>@<server>:<path>
564 if isinstance(dest, basestring) and ':' in dest:
565 remspec, path = dest.split(':',1)
566 elif isinstance(source, basestring) and ':' in source:
567 remspec, path = source.split(':',1)
569 raise ValueError, "Both endpoints cannot be local"
570 user,host = remspec.rsplit('@',1)
571 tmp_known_hosts = None
573 args = ['ssh', '-l', user, '-C',
574 # Don't bother with localhost. Makes test easier
575 '-o', 'NoHostAuthenticationForLocalhost=yes',
578 args.append('-P%d' % port)
580 args.extend(('-i', ident_key))
582 # Create a temporary server key file
583 tmp_known_hosts = _make_server_key_args(
584 server_key, host, port, args)
586 if isinstance(source, file) or hasattr(source, 'read'):
587 args.append('cat > %s' % (shell_escape(path),))
588 elif isinstance(dest, file) or hasattr(dest, 'write'):
589 args.append('cat %s' % (shell_escape(path),))
591 raise AssertionError, "Unreachable code reached! :-Q"
593 # connects to the remote host and starts a remote connection
594 if isinstance(source, file):
595 proc = subprocess.Popen(args,
596 stdout = open('/dev/null','w'),
597 stderr = subprocess.PIPE,
599 err = proc.stderr.read()
600 proc._known_hosts = tmp_known_hosts
601 eintr_retry(proc.wait)()
602 return ((None,err), proc)
603 elif isinstance(dest, file):
604 proc = subprocess.Popen(args,
605 stdout = open('/dev/null','w'),
606 stderr = subprocess.PIPE,
608 err = proc.stderr.read()
609 proc._known_hosts = tmp_known_hosts
610 eintr_retry(proc.wait)()
611 return ((None,err), proc)
612 elif hasattr(source, 'read'):
613 # file-like (but not file) source
614 proc = subprocess.Popen(args,
615 stdout = open('/dev/null','w'),
616 stderr = subprocess.PIPE,
617 stdin = subprocess.PIPE)
623 buf = source.read(4096)
628 rdrdy, wrdy, broken = select.select(
631 [proc.stderr,proc.stdin])
633 if proc.stderr in rdrdy:
634 # use os.read for fully unbuffered behavior
635 err.append(os.read(proc.stderr.fileno(), 4096))
637 if proc.stdin in wrdy:
638 proc.stdin.write(buf)
644 err.append(proc.stderr.read())
646 proc._known_hosts = tmp_known_hosts
647 eintr_retry(proc.wait)()
648 return ((None,''.join(err)), proc)
649 elif hasattr(dest, 'write'):
650 # file-like (but not file) dest
651 proc = subprocess.Popen(args,
652 stdout = subprocess.PIPE,
653 stderr = subprocess.PIPE,
654 stdin = open('/dev/null','w'))
659 rdrdy, wrdy, broken = select.select(
660 [proc.stderr, proc.stdout],
662 [proc.stderr, proc.stdout])
664 if proc.stderr in rdrdy:
665 # use os.read for fully unbuffered behavior
666 err.append(os.read(proc.stderr.fileno(), 4096))
668 if proc.stdout in rdrdy:
669 # use os.read for fully unbuffered behavior
670 buf = os.read(proc.stdout.fileno(), 4096)
679 err.append(proc.stderr.read())
681 proc._known_hosts = tmp_known_hosts
682 eintr_retry(proc.wait)()
683 return ((None,''.join(err)), proc)
685 raise AssertionError, "Unreachable code reached! :-Q"
687 # Parse destination as <user>@<server>:<path>
688 if isinstance(dest, basestring) and ':' in dest:
689 remspec, path = dest.split(':',1)
690 elif isinstance(source, basestring) and ':' in source:
691 remspec, path = source.split(':',1)
693 raise ValueError, "Both endpoints cannot be local"
694 user,host = remspec.rsplit('@',1)
697 tmp_known_hosts = None
698 args = ['scp', '-q', '-p', '-C',
699 # Don't bother with localhost. Makes test easier
700 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
702 args.append('-P%d' % port)
706 args.extend(('-i', ident_key))
708 # Create a temporary server key file
709 tmp_known_hosts = _make_server_key_args(
710 server_key, host, port, args)
711 if isinstance(source,list):
717 # connects to the remote host and starts a remote connection
718 proc = subprocess.Popen(args,
719 stdout = subprocess.PIPE,
720 stdin = subprocess.PIPE,
721 stderr = subprocess.PIPE)
722 proc._known_hosts = tmp_known_hosts
724 comm = proc.communicate()
725 eintr_retry(proc.wait)()
728 def popen_ssh_subprocess(python_code, host, port, user, agent,
733 environment_setup = "",
734 waitcommand = False):
737 python_path.replace("'", r"'\''")
738 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
740 if environment_setup:
741 cmd += environment_setup
743 # Uncomment for debug (to run everything under strace)
744 # We had to verify if strace works (cannot nest them)
745 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
747 #cmd += "strace -f -tt -s 200 -o strace$$.out "
749 cmd += "import base64, os\n"
750 cmd += "cmd = \"\"\n"
751 cmd += "while True:\n"
752 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
753 cmd += " if cmd[-1] == \"\\n\": break\n"
754 cmd += "cmd = base64.b64decode(cmd)\n"
755 # Uncomment for debug
756 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
758 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
761 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
764 tmp_known_hosts = None
766 # Don't bother with localhost. Makes test easier
767 '-o', 'NoHostAuthenticationForLocalhost=yes',
772 args.append('-p%d' % port)
774 args.extend(('-i', ident_key))
778 # Create a temporary server key file
779 tmp_known_hosts = _make_server_key_args(
780 server_key, host, port, args)
783 # connects to the remote host and starts a remote rpyc connection
784 proc = subprocess.Popen(args,
785 stdout = subprocess.PIPE,
786 stdin = subprocess.PIPE,
787 stderr = subprocess.PIPE)
788 proc._known_hosts = tmp_known_hosts
790 # send the command to execute
791 os.write(proc.stdin.fileno(),
792 base64.b64encode(python_code) + "\n")
793 msg = os.read(proc.stdout.fileno(), 3)
795 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
796 msg, proc.stdout.read(), proc.stderr.read())