2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except select.error, args:
64 if args[0] == errno.EINTR:
73 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
74 self._root_dir = root_dir
76 self._ctrl_sock = None
77 self._log_level = log_level
87 # can not return normally after fork beacuse no exec was done.
88 # This means that if we don't do a os._exit(0) here the code that
89 # follows the call to "Server.run()" in the "caller code" will be
90 # executed... but by now it has already been executed after the
91 # first process (the one that did the first fork) returned.
99 # pipes for process synchronization
103 root = os.path.normpath(self._root_dir)
104 if not os.path.exists(root):
105 os.makedirs(root, 0755)
113 except OSError, e: # pragma: no cover
114 if e.errno == errno.EINTR:
120 # os.waitpid avoids leaving a <defunc> (zombie) process
121 st = os.waitpid(pid1, 0)[1]
123 raise RuntimeError("Daemonization failed")
124 # return 0 to inform the caller method that this is not the
129 # Decouple from parent environment.
130 os.chdir(self._root_dir)
137 # see ref: "os._exit(0)"
140 # close all open file descriptors.
141 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
142 if (max_fd == resource.RLIM_INFINITY):
144 for fd in range(3, max_fd):
151 # Redirect standard file descriptors.
152 stdin = open(DEV_NULL, "r")
153 stderr = stdout = open(STD_ERR, "a", 0)
154 os.dup2(stdin.fileno(), sys.stdin.fileno())
155 # NOTE: sys.stdout.write will still be buffered, even if the file
156 # was opened with 0 buffer
157 os.dup2(stdout.fileno(), sys.stdout.fileno())
158 os.dup2(stderr.fileno(), sys.stderr.fileno())
160 # create control socket
161 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162 self._ctrl_sock.bind(CTRL_SOCK)
163 self._ctrl_sock.listen(0)
165 # let the parent process know that the daemonization is finished
170 def post_daemonize(self):
174 while not self._stop:
175 conn, addr = self._ctrl_sock.accept()
177 while not self._stop:
179 msg = self.recv_msg(conn)
180 except socket.timeout, e:
185 reply = self.stop_action()
187 reply = self.reply_action(msg)
190 self.send_reply(conn, reply)
193 self.log_error("NOTICE: Awaiting for reconnection")
201 def recv_msg(self, conn):
204 while '\n' not in chunk:
206 chunk = conn.recv(1024)
208 if e.errno != errno.EINTR:
217 data = ''.join(data).split('\n',1)
220 data, self._rdbuf = data
222 decoded = base64.b64decode(data)
223 return decoded.rstrip()
225 def send_reply(self, conn, reply):
226 encoded = base64.b64encode(reply)
227 conn.send("%s\n" % encoded)
231 self._ctrl_sock.close()
236 def stop_action(self):
237 return "Stopping server"
239 def reply_action(self, msg):
240 return "Reply to: %s" % msg
242 def log_error(self, text = None, context = ''):
244 text = traceback.format_exc()
245 date = time.strftime("%Y-%m-%d %H:%M:%S")
247 context = " (%s)" % (context,)
248 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
251 def log_debug(self, text):
252 if self._log_level == DEBUG_LEVEL:
253 date = time.strftime("%Y-%m-%d %H:%M:%S")
254 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
256 class Forwarder(object):
257 def __init__(self, root_dir = "."):
258 self._ctrl_sock = None
259 self._root_dir = root_dir
265 print >>sys.stderr, "READY."
266 while not self._stop:
267 data = self.read_data()
268 self.send_to_server(data)
269 data = self.recv_from_server()
270 self.write_data(data)
274 return sys.stdin.readline()
276 def write_data(self, data):
277 sys.stdout.write(data)
278 # sys.stdout.write is buffered, this is why we need to do a flush()
281 def send_to_server(self, data):
283 self._ctrl_sock.send(data)
285 if e.errno == errno.EPIPE:
287 self._ctrl_sock.send(data)
290 encoded = data.rstrip()
291 msg = base64.b64decode(encoded)
295 def recv_from_server(self):
298 while '\n' not in chunk:
300 chunk = self._ctrl_sock.recv(1024)
302 if e.errno != errno.EINTR:
311 data = ''.join(data).split('\n',1)
314 data, self._rdbuf = data
320 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
321 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
322 self._ctrl_sock.connect(sock_addr)
324 def disconnect(self):
326 self._ctrl_sock.close()
330 class Client(object):
331 def __init__(self, root_dir = ".", host = None, port = None, user = None,
332 agent = None, environment_setup = ""):
333 self.root_dir = root_dir
334 self.addr = (host, port)
337 self.environment_setup = environment_setup
338 self._stopped = False
339 self._deferreds = collections.deque()
343 if self._process.poll() is None:
344 os.kill(self._process.pid, signal.SIGTERM)
348 root_dir = self.root_dir
349 (host, port) = self.addr
353 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
354 c.forward()" % (root_dir,)
356 self._process = popen_ssh_subprocess(python_code, host, port,
358 environment_setup = self.environment_setup)
359 # popen_ssh_subprocess already waits for readiness
360 if self._process.poll():
361 err = proc.stderr.read()
362 raise RuntimeError("Client could not be reached: %s" % \
365 self._process = subprocess.Popen(
366 ["python", "-c", python_code],
367 stdin = subprocess.PIPE,
368 stdout = subprocess.PIPE,
369 stderr = subprocess.PIPE
372 # Wait for the forwarder to be ready, otherwise nobody
373 # will be able to connect to it
374 helo = self._process.stderr.readline()
375 if helo != 'READY.\n':
376 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
377 helo + self._process.stderr.read())
379 def send_msg(self, msg):
380 encoded = base64.b64encode(msg)
381 data = "%s\n" % encoded
384 self._process.stdin.write(data)
385 except (IOError, ValueError):
386 # dead process, poll it to un-zombify
389 # try again after reconnect
390 # If it fails again, though, give up
392 self._process.stdin.write(data)
395 self.send_msg(STOP_MSG)
398 def defer_reply(self, transform=None):
400 self._deferreds.append(defer_entry)
402 functools.partial(self.read_reply, defer_entry, transform)
405 def _read_reply(self):
406 data = self._process.stdout.readline()
407 encoded = data.rstrip()
408 return base64.b64decode(encoded)
410 def read_reply(self, which=None, transform=None):
411 # Test to see if someone did it already
412 if which is not None and len(which):
414 # ...just return the deferred value
416 return transform(which[0])
420 # Process all deferreds until the one we're looking for
421 # or until the queue is empty
422 while self._deferreds:
424 deferred = self._deferreds.popleft()
429 deferred.append(self._read_reply())
430 if deferred is which:
431 # We reached the one we were looking for
433 return transform(deferred[0])
438 # They've requested a synchronous read
440 return transform(self._read_reply())
442 return self._read_reply()
444 def _make_server_key_args(server_key, host, port, args):
446 Returns a reference to the created temporary file, and adds the
447 corresponding arguments to the given argument list.
449 Make sure to hold onto it until the process is done with the file
452 host = '%s:%s' % (host,port)
453 # Create a temporary server key file
454 tmp_known_hosts = tempfile.NamedTemporaryFile()
456 # Add the intended host key
457 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
459 # If we're not in strict mode, add user-configured keys
460 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
461 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
462 if os.access(user_hosts_path, os.R_OK):
463 f = open(user_hosts_path, "r")
464 tmp_known_hosts.write(f.read())
467 tmp_known_hosts.flush()
469 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
471 return tmp_known_hosts
473 def popen_ssh_command(command, host, port, user, agent,
479 Executes a remote commands, returns ((stdout,stderr),process)
482 print "ssh", host, command
484 tmp_known_hosts = None
486 # Don't bother with localhost. Makes test easier
487 '-o', 'NoHostAuthenticationForLocalhost=yes',
492 args.append('-p%d' % port)
494 args.extend(('-i', ident_key))
498 # Create a temporary server key file
499 tmp_known_hosts = _make_server_key_args(
500 server_key, host, port, args)
503 # connects to the remote host and starts a remote connection
504 proc = subprocess.Popen(args,
505 stdout = subprocess.PIPE,
506 stdin = subprocess.PIPE,
507 stderr = subprocess.PIPE)
509 # attach tempfile object to the process, to make sure the file stays
510 # alive until the process is finished with it
511 proc._known_hosts = tmp_known_hosts
513 out, err = proc.communicate(stdin)
515 print " -> ", out, err
517 return ((out, err), proc)
519 def popen_scp(source, dest,
526 Copies from/to remote sites.
528 Source and destination should have the user and host encoded
531 If source is a file object, a special mode will be used to
532 create the remote file with the same contents.
534 If dest is a file object, the remote file (source) will be
535 read and written into dest.
537 In these modes, recursive cannot be True.
539 Source can be a list of files to copy to a single destination,
540 in which case it is advised that the destination be a folder.
544 print "scp", source, dest
546 if isinstance(source, file) or isinstance(dest, file) \
547 or hasattr(source, 'read') or hasattr(dest, 'write'):
550 # Parse source/destination as <user>@<server>:<path>
551 if isinstance(dest, basestring) and ':' in dest:
552 remspec, path = dest.split(':',1)
553 elif isinstance(source, basestring) and ':' in source:
554 remspec, path = source.split(':',1)
556 raise ValueError, "Both endpoints cannot be local"
557 user,host = remspec.rsplit('@',1)
558 tmp_known_hosts = None
560 args = ['ssh', '-l', user, '-C',
561 # Don't bother with localhost. Makes test easier
562 '-o', 'NoHostAuthenticationForLocalhost=yes',
565 args.append('-P%d' % port)
567 args.extend(('-i', ident_key))
569 # Create a temporary server key file
570 tmp_known_hosts = _make_server_key_args(
571 server_key, host, port, args)
573 if isinstance(source, file) or hasattr(source, 'read'):
574 args.append('cat > %s' % (shell_escape(path),))
575 elif isinstance(dest, file) or hasattr(dest, 'write'):
576 args.append('cat %s' % (shell_escape(path),))
578 raise AssertionError, "Unreachable code reached! :-Q"
580 # connects to the remote host and starts a remote connection
581 if isinstance(source, file):
582 proc = subprocess.Popen(args,
583 stdout = open('/dev/null','w'),
584 stderr = subprocess.PIPE,
586 err = proc.stderr.read()
587 proc._known_hosts = tmp_known_hosts
588 eintr_retry(proc.wait)()
589 return ((None,err), proc)
590 elif isinstance(dest, file):
591 proc = subprocess.Popen(args,
592 stdout = open('/dev/null','w'),
593 stderr = subprocess.PIPE,
595 err = proc.stderr.read()
596 proc._known_hosts = tmp_known_hosts
597 eintr_retry(proc.wait)()
598 return ((None,err), proc)
599 elif hasattr(source, 'read'):
600 # file-like (but not file) source
601 proc = subprocess.Popen(args,
602 stdout = open('/dev/null','w'),
603 stderr = subprocess.PIPE,
604 stdin = subprocess.PIPE)
610 buf = source.read(4096)
615 rdrdy, wrdy, broken = select.select(
618 [proc.stderr,proc.stdin])
620 if proc.stderr in rdrdy:
621 # use os.read for fully unbuffered behavior
622 err.append(os.read(proc.stderr.fileno(), 4096))
624 if proc.stdin in wrdy:
625 proc.stdin.write(buf)
631 err.append(proc.stderr.read())
633 proc._known_hosts = tmp_known_hosts
634 eintr_retry(proc.wait)()
635 return ((None,''.join(err)), proc)
636 elif hasattr(dest, 'write'):
637 # file-like (but not file) dest
638 proc = subprocess.Popen(args,
639 stdout = subprocess.PIPE,
640 stderr = subprocess.PIPE,
641 stdin = open('/dev/null','w'))
646 rdrdy, wrdy, broken = select.select(
647 [proc.stderr, proc.stdout],
649 [proc.stderr, proc.stdout])
651 if proc.stderr in rdrdy:
652 # use os.read for fully unbuffered behavior
653 err.append(os.read(proc.stderr.fileno(), 4096))
655 if proc.stdout in rdrdy:
656 # use os.read for fully unbuffered behavior
657 buf = os.read(proc.stdout.fileno(), 4096)
666 err.append(proc.stderr.read())
668 proc._known_hosts = tmp_known_hosts
669 eintr_retry(proc.wait)()
670 return ((None,''.join(err)), proc)
672 raise AssertionError, "Unreachable code reached! :-Q"
674 # Parse destination as <user>@<server>:<path>
675 if isinstance(dest, basestring) and ':' in dest:
676 remspec, path = dest.split(':',1)
677 elif isinstance(source, basestring) and ':' in source:
678 remspec, path = source.split(':',1)
680 raise ValueError, "Both endpoints cannot be local"
681 user,host = remspec.rsplit('@',1)
684 tmp_known_hosts = None
685 args = ['scp', '-q', '-p', '-C',
686 # Don't bother with localhost. Makes test easier
687 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
689 args.append('-P%d' % port)
693 args.extend(('-i', ident_key))
695 # Create a temporary server key file
696 tmp_known_hosts = _make_server_key_args(
697 server_key, host, port, args)
698 if isinstance(source,list):
704 # connects to the remote host and starts a remote connection
705 proc = subprocess.Popen(args,
706 stdout = subprocess.PIPE,
707 stdin = subprocess.PIPE,
708 stderr = subprocess.PIPE)
709 proc._known_hosts = tmp_known_hosts
711 comm = proc.communicate()
712 eintr_retry(proc.wait)()
715 def popen_ssh_subprocess(python_code, host, port, user, agent,
720 environment_setup = "",
721 waitcommand = False):
724 python_path.replace("'", r"'\''")
725 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
727 if environment_setup:
728 cmd += environment_setup
730 # Uncomment for debug (to run everything under strace)
731 # We had to verify if strace works (cannot nest them)
732 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
734 #cmd += "strace -f -tt -s 200 -o strace$$.out "
736 cmd += "import base64, os\n"
737 cmd += "cmd = \"\"\n"
738 cmd += "while True:\n"
739 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
740 cmd += " if cmd[-1] == \"\\n\": break\n"
741 cmd += "cmd = base64.b64decode(cmd)\n"
742 # Uncomment for debug
743 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
745 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
748 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
751 tmp_known_hosts = None
753 # Don't bother with localhost. Makes test easier
754 '-o', 'NoHostAuthenticationForLocalhost=yes',
759 args.append('-p%d' % port)
761 args.extend(('-i', ident_key))
765 # Create a temporary server key file
766 tmp_known_hosts = _make_server_key_args(
767 server_key, host, port, args)
770 # connects to the remote host and starts a remote rpyc connection
771 proc = subprocess.Popen(args,
772 stdout = subprocess.PIPE,
773 stdin = subprocess.PIPE,
774 stderr = subprocess.PIPE)
775 proc._known_hosts = tmp_known_hosts
777 # send the command to execute
778 os.write(proc.stdin.fileno(),
779 base64.b64encode(python_code) + "\n")
780 msg = os.read(proc.stdout.fileno(), 3)
782 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
783 msg, proc.stdout.read(), proc.stderr.read())