2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except (select.error, socket.error), args:
64 if args[0] == errno.EINTR:
69 if e.errno == errno.EINTR:
78 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
79 self._root_dir = root_dir
81 self._ctrl_sock = None
82 self._log_level = log_level
92 # can not return normally after fork beacuse no exec was done.
93 # This means that if we don't do a os._exit(0) here the code that
94 # follows the call to "Server.run()" in the "caller code" will be
95 # executed... but by now it has already been executed after the
96 # first process (the one that did the first fork) returned.
104 # pipes for process synchronization
108 root = os.path.normpath(self._root_dir)
109 if not os.path.exists(root):
110 os.makedirs(root, 0755)
118 except OSError, e: # pragma: no cover
119 if e.errno == errno.EINTR:
125 # os.waitpid avoids leaving a <defunc> (zombie) process
126 st = os.waitpid(pid1, 0)[1]
128 raise RuntimeError("Daemonization failed")
129 # return 0 to inform the caller method that this is not the
134 # Decouple from parent environment.
135 os.chdir(self._root_dir)
142 # see ref: "os._exit(0)"
145 # close all open file descriptors.
146 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
147 if (max_fd == resource.RLIM_INFINITY):
149 for fd in range(3, max_fd):
156 # Redirect standard file descriptors.
157 stdin = open(DEV_NULL, "r")
158 stderr = stdout = open(STD_ERR, "a", 0)
159 os.dup2(stdin.fileno(), sys.stdin.fileno())
160 # NOTE: sys.stdout.write will still be buffered, even if the file
161 # was opened with 0 buffer
162 os.dup2(stdout.fileno(), sys.stdout.fileno())
163 os.dup2(stderr.fileno(), sys.stderr.fileno())
165 # create control socket
166 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167 self._ctrl_sock.bind(CTRL_SOCK)
168 self._ctrl_sock.listen(0)
170 # let the parent process know that the daemonization is finished
175 def post_daemonize(self):
179 while not self._stop:
180 conn, addr = self._ctrl_sock.accept()
182 while not self._stop:
184 msg = self.recv_msg(conn)
185 except socket.timeout, e:
191 reply = self.stop_action()
193 reply = self.reply_action(msg)
196 self.send_reply(conn, reply)
199 self.log_error("NOTICE: Awaiting for reconnection")
207 def recv_msg(self, conn):
210 while '\n' not in chunk:
212 chunk = conn.recv(1024)
213 except (OSError, socket.error), e:
214 if e[0] != errno.EINTR:
223 data = ''.join(data).split('\n',1)
226 data, self._rdbuf = data
228 decoded = base64.b64decode(data)
229 return decoded.rstrip()
231 def send_reply(self, conn, reply):
232 encoded = base64.b64encode(reply)
233 conn.send("%s\n" % encoded)
237 self._ctrl_sock.close()
242 def stop_action(self):
243 return "Stopping server"
245 def reply_action(self, msg):
246 return "Reply to: %s" % msg
248 def log_error(self, text = None, context = ''):
250 text = traceback.format_exc()
251 date = time.strftime("%Y-%m-%d %H:%M:%S")
253 context = " (%s)" % (context,)
254 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
257 def log_debug(self, text):
258 if self._log_level == DEBUG_LEVEL:
259 date = time.strftime("%Y-%m-%d %H:%M:%S")
260 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
262 class Forwarder(object):
263 def __init__(self, root_dir = "."):
264 self._ctrl_sock = None
265 self._root_dir = root_dir
271 print >>sys.stderr, "READY."
272 while not self._stop:
273 data = self.read_data()
274 self.send_to_server(data)
275 data = self.recv_from_server()
276 self.write_data(data)
280 return sys.stdin.readline()
282 def write_data(self, data):
283 sys.stdout.write(data)
284 # sys.stdout.write is buffered, this is why we need to do a flush()
287 def send_to_server(self, data):
289 self._ctrl_sock.send(data)
290 except (IOError, socket.error), e:
291 if e[0] == errno.EPIPE:
293 self._ctrl_sock.send(data)
296 encoded = data.rstrip()
297 msg = base64.b64decode(encoded)
301 def recv_from_server(self):
304 while '\n' not in chunk:
306 chunk = self._ctrl_sock.recv(1024)
307 except (OSError, socket.error), e:
308 if e[0] != errno.EINTR:
316 data = ''.join(data).split('\n',1)
319 data, self._rdbuf = data
325 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
326 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
327 self._ctrl_sock.connect(sock_addr)
329 def disconnect(self):
331 self._ctrl_sock.close()
335 class Client(object):
336 def __init__(self, root_dir = ".", host = None, port = None, user = None,
337 agent = None, environment_setup = ""):
338 self.root_dir = root_dir
339 self.addr = (host, port)
342 self.environment_setup = environment_setup
343 self._stopped = False
344 self._deferreds = collections.deque()
348 if self._process.poll() is None:
349 os.kill(self._process.pid, signal.SIGTERM)
353 root_dir = self.root_dir
354 (host, port) = self.addr
358 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
359 c.forward()" % (root_dir,)
361 self._process = popen_ssh_subprocess(python_code, host, port,
363 environment_setup = self.environment_setup)
364 # popen_ssh_subprocess already waits for readiness
365 if self._process.poll():
366 err = proc.stderr.read()
367 raise RuntimeError("Client could not be reached: %s" % \
370 self._process = subprocess.Popen(
371 ["python", "-c", python_code],
372 stdin = subprocess.PIPE,
373 stdout = subprocess.PIPE,
374 stderr = subprocess.PIPE
377 # Wait for the forwarder to be ready, otherwise nobody
378 # will be able to connect to it
379 helo = self._process.stderr.readline()
380 if helo != 'READY.\n':
381 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
382 helo + self._process.stderr.read())
384 def send_msg(self, msg):
385 encoded = base64.b64encode(msg)
386 data = "%s\n" % encoded
389 self._process.stdin.write(data)
390 except (IOError, ValueError):
391 # dead process, poll it to un-zombify
394 # try again after reconnect
395 # If it fails again, though, give up
397 self._process.stdin.write(data)
400 self.send_msg(STOP_MSG)
403 def defer_reply(self, transform=None):
405 self._deferreds.append(defer_entry)
407 functools.partial(self.read_reply, defer_entry, transform)
410 def _read_reply(self):
411 data = self._process.stdout.readline()
412 encoded = data.rstrip()
414 # empty == eof == dead process, poll it to un-zombify
417 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
418 return base64.b64decode(encoded)
420 def read_reply(self, which=None, transform=None):
421 # Test to see if someone did it already
422 if which is not None and len(which):
424 # ...just return the deferred value
426 return transform(which[0])
430 # Process all deferreds until the one we're looking for
431 # or until the queue is empty
432 while self._deferreds:
434 deferred = self._deferreds.popleft()
439 deferred.append(self._read_reply())
440 if deferred is which:
441 # We reached the one we were looking for
443 return transform(deferred[0])
448 # They've requested a synchronous read
450 return transform(self._read_reply())
452 return self._read_reply()
454 def _make_server_key_args(server_key, host, port, args):
456 Returns a reference to the created temporary file, and adds the
457 corresponding arguments to the given argument list.
459 Make sure to hold onto it until the process is done with the file
462 host = '%s:%s' % (host,port)
463 # Create a temporary server key file
464 tmp_known_hosts = tempfile.NamedTemporaryFile()
466 # Add the intended host key
467 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
469 # If we're not in strict mode, add user-configured keys
470 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
471 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
472 if os.access(user_hosts_path, os.R_OK):
473 f = open(user_hosts_path, "r")
474 tmp_known_hosts.write(f.read())
477 tmp_known_hosts.flush()
479 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
481 return tmp_known_hosts
483 def popen_ssh_command(command, host, port, user, agent,
489 Executes a remote commands, returns ((stdout,stderr),process)
492 print "ssh", host, command
494 tmp_known_hosts = None
496 # Don't bother with localhost. Makes test easier
497 '-o', 'NoHostAuthenticationForLocalhost=yes',
502 args.append('-p%d' % port)
504 args.extend(('-i', ident_key))
508 # Create a temporary server key file
509 tmp_known_hosts = _make_server_key_args(
510 server_key, host, port, args)
513 # connects to the remote host and starts a remote connection
514 proc = subprocess.Popen(args,
515 stdout = subprocess.PIPE,
516 stdin = subprocess.PIPE,
517 stderr = subprocess.PIPE)
519 # attach tempfile object to the process, to make sure the file stays
520 # alive until the process is finished with it
521 proc._known_hosts = tmp_known_hosts
523 out, err = proc.communicate(stdin)
525 print " -> ", out, err
527 return ((out, err), proc)
529 def popen_scp(source, dest,
536 Copies from/to remote sites.
538 Source and destination should have the user and host encoded
541 If source is a file object, a special mode will be used to
542 create the remote file with the same contents.
544 If dest is a file object, the remote file (source) will be
545 read and written into dest.
547 In these modes, recursive cannot be True.
549 Source can be a list of files to copy to a single destination,
550 in which case it is advised that the destination be a folder.
554 print "scp", source, dest
556 if isinstance(source, file) or isinstance(dest, file) \
557 or hasattr(source, 'read') or hasattr(dest, 'write'):
560 # Parse source/destination as <user>@<server>:<path>
561 if isinstance(dest, basestring) and ':' in dest:
562 remspec, path = dest.split(':',1)
563 elif isinstance(source, basestring) and ':' in source:
564 remspec, path = source.split(':',1)
566 raise ValueError, "Both endpoints cannot be local"
567 user,host = remspec.rsplit('@',1)
568 tmp_known_hosts = None
570 args = ['ssh', '-l', user, '-C',
571 # Don't bother with localhost. Makes test easier
572 '-o', 'NoHostAuthenticationForLocalhost=yes',
575 args.append('-P%d' % port)
577 args.extend(('-i', ident_key))
579 # Create a temporary server key file
580 tmp_known_hosts = _make_server_key_args(
581 server_key, host, port, args)
583 if isinstance(source, file) or hasattr(source, 'read'):
584 args.append('cat > %s' % (shell_escape(path),))
585 elif isinstance(dest, file) or hasattr(dest, 'write'):
586 args.append('cat %s' % (shell_escape(path),))
588 raise AssertionError, "Unreachable code reached! :-Q"
590 # connects to the remote host and starts a remote connection
591 if isinstance(source, file):
592 proc = subprocess.Popen(args,
593 stdout = open('/dev/null','w'),
594 stderr = subprocess.PIPE,
596 err = proc.stderr.read()
597 proc._known_hosts = tmp_known_hosts
598 eintr_retry(proc.wait)()
599 return ((None,err), proc)
600 elif isinstance(dest, file):
601 proc = subprocess.Popen(args,
602 stdout = open('/dev/null','w'),
603 stderr = subprocess.PIPE,
605 err = proc.stderr.read()
606 proc._known_hosts = tmp_known_hosts
607 eintr_retry(proc.wait)()
608 return ((None,err), proc)
609 elif hasattr(source, 'read'):
610 # file-like (but not file) source
611 proc = subprocess.Popen(args,
612 stdout = open('/dev/null','w'),
613 stderr = subprocess.PIPE,
614 stdin = subprocess.PIPE)
620 buf = source.read(4096)
625 rdrdy, wrdy, broken = select.select(
628 [proc.stderr,proc.stdin])
630 if proc.stderr in rdrdy:
631 # use os.read for fully unbuffered behavior
632 err.append(os.read(proc.stderr.fileno(), 4096))
634 if proc.stdin in wrdy:
635 proc.stdin.write(buf)
641 err.append(proc.stderr.read())
643 proc._known_hosts = tmp_known_hosts
644 eintr_retry(proc.wait)()
645 return ((None,''.join(err)), proc)
646 elif hasattr(dest, 'write'):
647 # file-like (but not file) dest
648 proc = subprocess.Popen(args,
649 stdout = subprocess.PIPE,
650 stderr = subprocess.PIPE,
651 stdin = open('/dev/null','w'))
656 rdrdy, wrdy, broken = select.select(
657 [proc.stderr, proc.stdout],
659 [proc.stderr, proc.stdout])
661 if proc.stderr in rdrdy:
662 # use os.read for fully unbuffered behavior
663 err.append(os.read(proc.stderr.fileno(), 4096))
665 if proc.stdout in rdrdy:
666 # use os.read for fully unbuffered behavior
667 buf = os.read(proc.stdout.fileno(), 4096)
676 err.append(proc.stderr.read())
678 proc._known_hosts = tmp_known_hosts
679 eintr_retry(proc.wait)()
680 return ((None,''.join(err)), proc)
682 raise AssertionError, "Unreachable code reached! :-Q"
684 # Parse destination as <user>@<server>:<path>
685 if isinstance(dest, basestring) and ':' in dest:
686 remspec, path = dest.split(':',1)
687 elif isinstance(source, basestring) and ':' in source:
688 remspec, path = source.split(':',1)
690 raise ValueError, "Both endpoints cannot be local"
691 user,host = remspec.rsplit('@',1)
694 tmp_known_hosts = None
695 args = ['scp', '-q', '-p', '-C',
696 # Don't bother with localhost. Makes test easier
697 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
699 args.append('-P%d' % port)
703 args.extend(('-i', ident_key))
705 # Create a temporary server key file
706 tmp_known_hosts = _make_server_key_args(
707 server_key, host, port, args)
708 if isinstance(source,list):
714 # connects to the remote host and starts a remote connection
715 proc = subprocess.Popen(args,
716 stdout = subprocess.PIPE,
717 stdin = subprocess.PIPE,
718 stderr = subprocess.PIPE)
719 proc._known_hosts = tmp_known_hosts
721 comm = proc.communicate()
722 eintr_retry(proc.wait)()
725 def popen_ssh_subprocess(python_code, host, port, user, agent,
730 environment_setup = "",
731 waitcommand = False):
734 python_path.replace("'", r"'\''")
735 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
737 if environment_setup:
738 cmd += environment_setup
740 # Uncomment for debug (to run everything under strace)
741 # We had to verify if strace works (cannot nest them)
742 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
744 #cmd += "strace -f -tt -s 200 -o strace$$.out "
746 cmd += "import base64, os\n"
747 cmd += "cmd = \"\"\n"
748 cmd += "while True:\n"
749 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
750 cmd += " if cmd[-1] == \"\\n\": break\n"
751 cmd += "cmd = base64.b64decode(cmd)\n"
752 # Uncomment for debug
753 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
755 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
758 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
761 tmp_known_hosts = None
763 # Don't bother with localhost. Makes test easier
764 '-o', 'NoHostAuthenticationForLocalhost=yes',
769 args.append('-p%d' % port)
771 args.extend(('-i', ident_key))
775 # Create a temporary server key file
776 tmp_known_hosts = _make_server_key_args(
777 server_key, host, port, args)
780 # connects to the remote host and starts a remote rpyc connection
781 proc = subprocess.Popen(args,
782 stdout = subprocess.PIPE,
783 stdin = subprocess.PIPE,
784 stderr = subprocess.PIPE)
785 proc._known_hosts = tmp_known_hosts
787 # send the command to execute
788 os.write(proc.stdin.fileno(),
789 base64.b64encode(python_code) + "\n")
790 msg = os.read(proc.stdout.fileno(), 3)
792 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
793 msg, proc.stdout.read(), proc.stderr.read())