2 # -*- coding: utf-8 -*-
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33 if hasattr(os, "devnull"):
36 DEV_NULL = "/dev/null"
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41 """ Escapes strings so that they are safe to use as command-line arguments """
42 if SHELL_SAFE.match(s):
43 # safe string - no escaping needed
46 # unsafe string - escape
48 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
51 return "'$'\\x%02x''" % (ord(c),)
52 s = ''.join(map(escp,s))
55 def eintr_retry(func):
57 @functools.wraps(func)
59 retry = kw.pop("_retry", False)
60 for i in xrange(0 if retry else 4):
63 except select.error, args:
64 if args[0] == errno.EINTR:
73 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
74 self._root_dir = root_dir
76 self._ctrl_sock = None
77 self._log_level = log_level
87 # can not return normally after fork beacuse no exec was done.
88 # This means that if we don't do a os._exit(0) here the code that
89 # follows the call to "Server.run()" in the "caller code" will be
90 # executed... but by now it has already been executed after the
91 # first process (the one that did the first fork) returned.
99 # pipes for process synchronization
103 root = os.path.normpath(self._root_dir)
104 if not os.path.exists(root):
105 os.makedirs(root, 0755)
113 except OSError, e: # pragma: no cover
114 if e.errno == errno.EINTR:
120 # os.waitpid avoids leaving a <defunc> (zombie) process
121 st = os.waitpid(pid1, 0)[1]
123 raise RuntimeError("Daemonization failed")
124 # return 0 to inform the caller method that this is not the
129 # Decouple from parent environment.
130 os.chdir(self._root_dir)
137 # see ref: "os._exit(0)"
140 # close all open file descriptors.
141 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
142 if (max_fd == resource.RLIM_INFINITY):
144 for fd in range(3, max_fd):
151 # Redirect standard file descriptors.
152 stdin = open(DEV_NULL, "r")
153 stderr = stdout = open(STD_ERR, "a", 0)
154 os.dup2(stdin.fileno(), sys.stdin.fileno())
155 # NOTE: sys.stdout.write will still be buffered, even if the file
156 # was opened with 0 buffer
157 os.dup2(stdout.fileno(), sys.stdout.fileno())
158 os.dup2(stderr.fileno(), sys.stderr.fileno())
160 # create control socket
161 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162 self._ctrl_sock.bind(CTRL_SOCK)
163 self._ctrl_sock.listen(0)
165 # let the parent process know that the daemonization is finished
170 def post_daemonize(self):
174 while not self._stop:
175 conn, addr = self._ctrl_sock.accept()
177 while not self._stop:
179 msg = self.recv_msg(conn)
180 except socket.timeout, e:
185 reply = self.stop_action()
187 reply = self.reply_action(msg)
190 self.send_reply(conn, reply)
193 self.log_error("NOTICE: Awaiting for reconnection")
201 def recv_msg(self, conn):
204 while '\n' not in chunk:
206 chunk = conn.recv(1024)
208 if e.errno != errno.EINTR:
217 data = ''.join(data).split('\n',1)
220 data, self._rdbuf = data
222 decoded = base64.b64decode(data)
223 return decoded.rstrip()
225 def send_reply(self, conn, reply):
226 encoded = base64.b64encode(reply)
227 conn.send("%s\n" % encoded)
231 self._ctrl_sock.close()
236 def stop_action(self):
237 return "Stopping server"
239 def reply_action(self, msg):
240 return "Reply to: %s" % msg
242 def log_error(self, text = None, context = ''):
244 text = traceback.format_exc()
245 date = time.strftime("%Y-%m-%d %H:%M:%S")
247 context = " (%s)" % (context,)
248 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
251 def log_debug(self, text):
252 if self._log_level == DEBUG_LEVEL:
253 date = time.strftime("%Y-%m-%d %H:%M:%S")
254 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
256 class Forwarder(object):
257 def __init__(self, root_dir = "."):
258 self._ctrl_sock = None
259 self._root_dir = root_dir
265 print >>sys.stderr, "READY."
266 while not self._stop:
267 data = self.read_data()
268 self.send_to_server(data)
269 data = self.recv_from_server()
270 self.write_data(data)
274 return sys.stdin.readline()
276 def write_data(self, data):
277 sys.stdout.write(data)
278 # sys.stdout.write is buffered, this is why we need to do a flush()
281 def send_to_server(self, data):
283 self._ctrl_sock.send(data)
285 if e.errno == errno.EPIPE:
287 self._ctrl_sock.send(data)
290 encoded = data.rstrip()
291 msg = base64.b64decode(encoded)
295 def recv_from_server(self):
298 while '\n' not in chunk:
300 chunk = self._ctrl_sock.recv(1024)
302 if e.errno != errno.EINTR:
311 data = ''.join(data).split('\n',1)
314 data, self._rdbuf = data
320 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
321 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
322 self._ctrl_sock.connect(sock_addr)
324 def disconnect(self):
326 self._ctrl_sock.close()
330 class Client(object):
331 def __init__(self, root_dir = ".", host = None, port = None, user = None,
332 agent = None, environment_setup = ""):
333 self.root_dir = root_dir
334 self.addr = (host, port)
337 self.environment_setup = environment_setup
338 self._stopped = False
339 self._deferreds = collections.deque()
343 if self._process.poll() is None:
344 os.kill(self._process.pid, signal.SIGTERM)
348 root_dir = self.root_dir
349 (host, port) = self.addr
353 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
354 c.forward()" % (root_dir,)
356 self._process = popen_ssh_subprocess(python_code, host, port,
358 environment_setup = self.environment_setup)
359 # popen_ssh_subprocess already waits for readiness
360 if self._process.poll():
361 err = proc.stderr.read()
362 raise RuntimeError("Client could not be reached: %s" % \
365 self._process = subprocess.Popen(
366 ["python", "-c", python_code],
367 stdin = subprocess.PIPE,
368 stdout = subprocess.PIPE,
369 stderr = subprocess.PIPE
372 # Wait for the forwarder to be ready, otherwise nobody
373 # will be able to connect to it
374 helo = self._process.stderr.readline()
375 if helo != 'READY.\n':
376 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
377 helo + self._process.stderr.read())
379 def send_msg(self, msg):
380 encoded = base64.b64encode(msg)
381 data = "%s\n" % encoded
384 self._process.stdin.write(data)
385 except (IOError, ValueError):
386 # dead process, poll it to un-zombify
389 # try again after reconnect
390 # If it fails again, though, give up
392 self._process.stdin.write(data)
395 self.send_msg(STOP_MSG)
398 def defer_reply(self, transform=None):
400 self._deferreds.append(defer_entry)
402 functools.partial(self.read_reply, defer_entry, transform)
405 def _read_reply(self):
406 data = self._process.stdout.readline()
407 encoded = data.rstrip()
409 # empty == eof == dead process, poll it to un-zombify
412 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
413 return base64.b64decode(encoded)
415 def read_reply(self, which=None, transform=None):
416 # Test to see if someone did it already
417 if which is not None and len(which):
419 # ...just return the deferred value
421 return transform(which[0])
425 # Process all deferreds until the one we're looking for
426 # or until the queue is empty
427 while self._deferreds:
429 deferred = self._deferreds.popleft()
434 deferred.append(self._read_reply())
435 if deferred is which:
436 # We reached the one we were looking for
438 return transform(deferred[0])
443 # They've requested a synchronous read
445 return transform(self._read_reply())
447 return self._read_reply()
449 def _make_server_key_args(server_key, host, port, args):
451 Returns a reference to the created temporary file, and adds the
452 corresponding arguments to the given argument list.
454 Make sure to hold onto it until the process is done with the file
457 host = '%s:%s' % (host,port)
458 # Create a temporary server key file
459 tmp_known_hosts = tempfile.NamedTemporaryFile()
461 # Add the intended host key
462 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
464 # If we're not in strict mode, add user-configured keys
465 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
466 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
467 if os.access(user_hosts_path, os.R_OK):
468 f = open(user_hosts_path, "r")
469 tmp_known_hosts.write(f.read())
472 tmp_known_hosts.flush()
474 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
476 return tmp_known_hosts
478 def popen_ssh_command(command, host, port, user, agent,
484 Executes a remote commands, returns ((stdout,stderr),process)
487 print "ssh", host, command
489 tmp_known_hosts = None
491 # Don't bother with localhost. Makes test easier
492 '-o', 'NoHostAuthenticationForLocalhost=yes',
497 args.append('-p%d' % port)
499 args.extend(('-i', ident_key))
503 # Create a temporary server key file
504 tmp_known_hosts = _make_server_key_args(
505 server_key, host, port, args)
508 # connects to the remote host and starts a remote connection
509 proc = subprocess.Popen(args,
510 stdout = subprocess.PIPE,
511 stdin = subprocess.PIPE,
512 stderr = subprocess.PIPE)
514 # attach tempfile object to the process, to make sure the file stays
515 # alive until the process is finished with it
516 proc._known_hosts = tmp_known_hosts
518 out, err = proc.communicate(stdin)
520 print " -> ", out, err
522 return ((out, err), proc)
524 def popen_scp(source, dest,
531 Copies from/to remote sites.
533 Source and destination should have the user and host encoded
536 If source is a file object, a special mode will be used to
537 create the remote file with the same contents.
539 If dest is a file object, the remote file (source) will be
540 read and written into dest.
542 In these modes, recursive cannot be True.
544 Source can be a list of files to copy to a single destination,
545 in which case it is advised that the destination be a folder.
549 print "scp", source, dest
551 if isinstance(source, file) or isinstance(dest, file) \
552 or hasattr(source, 'read') or hasattr(dest, 'write'):
555 # Parse source/destination as <user>@<server>:<path>
556 if isinstance(dest, basestring) and ':' in dest:
557 remspec, path = dest.split(':',1)
558 elif isinstance(source, basestring) and ':' in source:
559 remspec, path = source.split(':',1)
561 raise ValueError, "Both endpoints cannot be local"
562 user,host = remspec.rsplit('@',1)
563 tmp_known_hosts = None
565 args = ['ssh', '-l', user, '-C',
566 # Don't bother with localhost. Makes test easier
567 '-o', 'NoHostAuthenticationForLocalhost=yes',
570 args.append('-P%d' % port)
572 args.extend(('-i', ident_key))
574 # Create a temporary server key file
575 tmp_known_hosts = _make_server_key_args(
576 server_key, host, port, args)
578 if isinstance(source, file) or hasattr(source, 'read'):
579 args.append('cat > %s' % (shell_escape(path),))
580 elif isinstance(dest, file) or hasattr(dest, 'write'):
581 args.append('cat %s' % (shell_escape(path),))
583 raise AssertionError, "Unreachable code reached! :-Q"
585 # connects to the remote host and starts a remote connection
586 if isinstance(source, file):
587 proc = subprocess.Popen(args,
588 stdout = open('/dev/null','w'),
589 stderr = subprocess.PIPE,
591 err = proc.stderr.read()
592 proc._known_hosts = tmp_known_hosts
593 eintr_retry(proc.wait)()
594 return ((None,err), proc)
595 elif isinstance(dest, file):
596 proc = subprocess.Popen(args,
597 stdout = open('/dev/null','w'),
598 stderr = subprocess.PIPE,
600 err = proc.stderr.read()
601 proc._known_hosts = tmp_known_hosts
602 eintr_retry(proc.wait)()
603 return ((None,err), proc)
604 elif hasattr(source, 'read'):
605 # file-like (but not file) source
606 proc = subprocess.Popen(args,
607 stdout = open('/dev/null','w'),
608 stderr = subprocess.PIPE,
609 stdin = subprocess.PIPE)
615 buf = source.read(4096)
620 rdrdy, wrdy, broken = select.select(
623 [proc.stderr,proc.stdin])
625 if proc.stderr in rdrdy:
626 # use os.read for fully unbuffered behavior
627 err.append(os.read(proc.stderr.fileno(), 4096))
629 if proc.stdin in wrdy:
630 proc.stdin.write(buf)
636 err.append(proc.stderr.read())
638 proc._known_hosts = tmp_known_hosts
639 eintr_retry(proc.wait)()
640 return ((None,''.join(err)), proc)
641 elif hasattr(dest, 'write'):
642 # file-like (but not file) dest
643 proc = subprocess.Popen(args,
644 stdout = subprocess.PIPE,
645 stderr = subprocess.PIPE,
646 stdin = open('/dev/null','w'))
651 rdrdy, wrdy, broken = select.select(
652 [proc.stderr, proc.stdout],
654 [proc.stderr, proc.stdout])
656 if proc.stderr in rdrdy:
657 # use os.read for fully unbuffered behavior
658 err.append(os.read(proc.stderr.fileno(), 4096))
660 if proc.stdout in rdrdy:
661 # use os.read for fully unbuffered behavior
662 buf = os.read(proc.stdout.fileno(), 4096)
671 err.append(proc.stderr.read())
673 proc._known_hosts = tmp_known_hosts
674 eintr_retry(proc.wait)()
675 return ((None,''.join(err)), proc)
677 raise AssertionError, "Unreachable code reached! :-Q"
679 # Parse destination as <user>@<server>:<path>
680 if isinstance(dest, basestring) and ':' in dest:
681 remspec, path = dest.split(':',1)
682 elif isinstance(source, basestring) and ':' in source:
683 remspec, path = source.split(':',1)
685 raise ValueError, "Both endpoints cannot be local"
686 user,host = remspec.rsplit('@',1)
689 tmp_known_hosts = None
690 args = ['scp', '-q', '-p', '-C',
691 # Don't bother with localhost. Makes test easier
692 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
694 args.append('-P%d' % port)
698 args.extend(('-i', ident_key))
700 # Create a temporary server key file
701 tmp_known_hosts = _make_server_key_args(
702 server_key, host, port, args)
703 if isinstance(source,list):
709 # connects to the remote host and starts a remote connection
710 proc = subprocess.Popen(args,
711 stdout = subprocess.PIPE,
712 stdin = subprocess.PIPE,
713 stderr = subprocess.PIPE)
714 proc._known_hosts = tmp_known_hosts
716 comm = proc.communicate()
717 eintr_retry(proc.wait)()
720 def popen_ssh_subprocess(python_code, host, port, user, agent,
725 environment_setup = "",
726 waitcommand = False):
729 python_path.replace("'", r"'\''")
730 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
732 if environment_setup:
733 cmd += environment_setup
735 # Uncomment for debug (to run everything under strace)
736 # We had to verify if strace works (cannot nest them)
737 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
739 #cmd += "strace -f -tt -s 200 -o strace$$.out "
741 cmd += "import base64, os\n"
742 cmd += "cmd = \"\"\n"
743 cmd += "while True:\n"
744 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
745 cmd += " if cmd[-1] == \"\\n\": break\n"
746 cmd += "cmd = base64.b64decode(cmd)\n"
747 # Uncomment for debug
748 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
750 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
753 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
756 tmp_known_hosts = None
758 # Don't bother with localhost. Makes test easier
759 '-o', 'NoHostAuthenticationForLocalhost=yes',
764 args.append('-p%d' % port)
766 args.extend(('-i', ident_key))
770 # Create a temporary server key file
771 tmp_known_hosts = _make_server_key_args(
772 server_key, host, port, args)
775 # connects to the remote host and starts a remote rpyc connection
776 proc = subprocess.Popen(args,
777 stdout = subprocess.PIPE,
778 stdin = subprocess.PIPE,
779 stderr = subprocess.PIPE)
780 proc._known_hosts = tmp_known_hosts
782 # send the command to execute
783 os.write(proc.stdin.fileno(),
784 base64.b64encode(python_code) + "\n")
785 msg = os.read(proc.stdout.fileno(), 3)
787 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
788 msg, proc.stdout.read(), proc.stderr.read())