2 # -*- coding: utf-8 -*-
20 CTRL_SOCK = "ctrl.sock"
21 STD_ERR = "stderr.log"
28 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
30 if hasattr(os, "devnull"):
33 DEV_NULL = "/dev/null"
35 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
38 """ Escapes strings so that they are safe to use as command-line arguments """
39 if SHELL_SAFE.match(s):
40 # safe string - no escaping needed
43 # unsafe string - escape
45 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
48 return "'$'\\x%02x''" % (ord(c),)
49 s = ''.join(map(escp,s))
52 def eintr_retry(func):
54 @functools.wraps(func)
56 retry = kw.pop("_retry", False)
57 for i in xrange(0 if retry else 4):
60 except select.error, args:
61 if args[0] == errno.EINTR:
70 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
71 self._root_dir = root_dir
73 self._ctrl_sock = None
74 self._log_level = log_level
83 # can not return normally after fork beacuse no exec was done.
84 # This means that if we don't do a os._exit(0) here the code that
85 # follows the call to "Server.run()" in the "caller code" will be
86 # executed... but by now it has already been executed after the
87 # first process (the one that did the first fork) returned.
95 # pipes for process synchronization
99 root = os.path.normpath(self._root_dir)
100 if not os.path.exists(root):
101 os.makedirs(root, 0755)
109 except OSError, e: # pragma: no cover
110 if e.errno == errno.EINTR:
116 # os.waitpid avoids leaving a <defunc> (zombie) process
117 st = os.waitpid(pid1, 0)[1]
119 raise RuntimeError("Daemonization failed")
120 # return 0 to inform the caller method that this is not the
125 # Decouple from parent environment.
126 os.chdir(self._root_dir)
133 # see ref: "os._exit(0)"
136 # close all open file descriptors.
137 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
138 if (max_fd == resource.RLIM_INFINITY):
140 for fd in range(3, max_fd):
147 # Redirect standard file descriptors.
148 stdin = open(DEV_NULL, "r")
149 stderr = stdout = open(STD_ERR, "a", 0)
150 os.dup2(stdin.fileno(), sys.stdin.fileno())
151 # NOTE: sys.stdout.write will still be buffered, even if the file
152 # was opened with 0 buffer
153 os.dup2(stdout.fileno(), sys.stdout.fileno())
154 os.dup2(stderr.fileno(), sys.stderr.fileno())
156 # create control socket
157 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158 self._ctrl_sock.bind(CTRL_SOCK)
159 self._ctrl_sock.listen(0)
161 # let the parent process know that the daemonization is finished
166 def post_daemonize(self):
170 while not self._stop:
171 conn, addr = self._ctrl_sock.accept()
173 while not self._stop:
175 msg = self.recv_msg(conn)
176 except socket.timeout, e:
181 reply = self.stop_action()
183 reply = self.reply_action(msg)
186 self.send_reply(conn, reply)
189 self.log_error("NOTICE: Awaiting for reconnection")
197 def recv_msg(self, conn):
201 chunk = conn.recv(1024)
203 if e.errno != errno.EINTR:
209 if chunk[-1] == "\n":
214 decoded = base64.b64decode(data)
215 return decoded.rstrip()
217 def send_reply(self, conn, reply):
218 encoded = base64.b64encode(reply)
219 conn.send("%s\n" % encoded)
223 self._ctrl_sock.close()
228 def stop_action(self):
229 return "Stopping server"
231 def reply_action(self, msg):
232 return "Reply to: %s" % msg
234 def log_error(self, text = None, context = ''):
236 text = traceback.format_exc()
237 date = time.strftime("%Y-%m-%d %H:%M:%S")
239 context = " (%s)" % (context,)
240 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
243 def log_debug(self, text):
244 if self._log_level == DEBUG_LEVEL:
245 date = time.strftime("%Y-%m-%d %H:%M:%S")
246 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
248 class Forwarder(object):
249 def __init__(self, root_dir = "."):
250 self._ctrl_sock = None
251 self._root_dir = root_dir
256 print >>sys.stderr, "READY."
257 while not self._stop:
258 data = self.read_data()
259 self.send_to_server(data)
260 data = self.recv_from_server()
261 self.write_data(data)
265 return sys.stdin.readline()
267 def write_data(self, data):
268 sys.stdout.write(data)
269 # sys.stdout.write is buffered, this is why we need to do a flush()
272 def send_to_server(self, data):
274 self._ctrl_sock.send(data)
276 if e.errno == errno.EPIPE:
278 self._ctrl_sock.send(data)
281 encoded = data.rstrip()
282 msg = base64.b64decode(encoded)
286 def recv_from_server(self):
290 chunk = self._ctrl_sock.recv(1024)
292 if e.errno != errno.EINTR:
297 if chunk[-1] == "\n":
303 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
304 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
305 self._ctrl_sock.connect(sock_addr)
307 def disconnect(self):
309 self._ctrl_sock.close()
313 class Client(object):
314 def __init__(self, root_dir = ".", host = None, port = None, user = None,
315 agent = None, environment_setup = ""):
316 self.root_dir = root_dir
317 self.addr = (host, port)
320 self.environment_setup = environment_setup
321 self._stopped = False
325 if self._process.poll() is None:
326 os.kill(self._process.pid, signal.SIGTERM)
330 root_dir = self.root_dir
331 (host, port) = self.addr
335 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
336 c.forward()" % (root_dir,)
338 self._process = popen_ssh_subprocess(python_code, host, port,
340 environment_setup = self.environment_setup)
341 # popen_ssh_subprocess already waits for readiness
342 if self._process.poll():
343 err = proc.stderr.read()
344 raise RuntimeError("Client could not be reached: %s" % \
347 self._process = subprocess.Popen(
348 ["python", "-c", python_code],
349 stdin = subprocess.PIPE,
350 stdout = subprocess.PIPE,
351 stderr = subprocess.PIPE
354 # Wait for the forwarder to be ready, otherwise nobody
355 # will be able to connect to it
356 helo = self._process.stderr.readline()
357 if helo != 'READY.\n':
358 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
359 helo + self._process.stderr.read())
361 def send_msg(self, msg):
362 encoded = base64.b64encode(msg)
363 data = "%s\n" % encoded
366 self._process.stdin.write(data)
367 except (IOError, ValueError):
368 # dead process, poll it to un-zombify
371 # try again after reconnect
372 # If it fails again, though, give up
374 self._process.stdin.write(data)
377 self.send_msg(STOP_MSG)
380 def read_reply(self):
381 data = self._process.stdout.readline()
382 encoded = data.rstrip()
383 return base64.b64decode(encoded)
385 def _make_server_key_args(server_key, host, port, args):
387 Returns a reference to the created temporary file, and adds the
388 corresponding arguments to the given argument list.
390 Make sure to hold onto it until the process is done with the file
393 host = '%s:%s' % (host,port)
394 # Create a temporary server key file
395 tmp_known_hosts = tempfile.NamedTemporaryFile()
397 # Add the intended host key
398 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
400 # If we're not in strict mode, add user-configured keys
401 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
402 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
403 if os.access(user_hosts_path, os.R_OK):
404 f = open(user_hosts_path, "r")
405 tmp_known_hosts.write(f.read())
408 tmp_known_hosts.flush()
410 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
411 return tmp_known_hosts
413 def popen_ssh_command(command, host, port, user, agent,
419 Executes a remote commands, returns ((stdout,stderr),process)
422 print "ssh", host, command
424 tmp_known_hosts = None
426 # Don't bother with localhost. Makes test easier
427 '-o', 'NoHostAuthenticationForLocalhost=yes',
432 args.append('-p%d' % port)
434 args.extend(('-i', ident_key))
438 # Create a temporary server key file
439 tmp_known_hosts = _make_server_key_args(
440 server_key, host, port, args)
443 # connects to the remote host and starts a remote connection
444 proc = subprocess.Popen(args,
445 stdout = subprocess.PIPE,
446 stdin = subprocess.PIPE,
447 stderr = subprocess.PIPE)
449 # attach tempfile object to the process, to make sure the file stays
450 # alive until the process is finished with it
451 proc._known_hosts = tmp_known_hosts
453 out, err = proc.communicate(stdin)
455 print " -> ", out, err
457 return ((out, err), proc)
459 def popen_scp(source, dest,
466 Copies from/to remote sites.
468 Source and destination should have the user and host encoded
471 If source is a file object, a special mode will be used to
472 create the remote file with the same contents.
474 If dest is a file object, the remote file (source) will be
475 read and written into dest.
477 In these modes, recursive cannot be True.
479 Source can be a list of files to copy to a single destination,
480 in which case it is advised that the destination be a folder.
484 print "scp", source, dest
486 if isinstance(source, file) or isinstance(dest, file) \
487 or hasattr(source, 'read') or hasattr(dest, 'write'):
490 # Parse source/destination as <user>@<server>:<path>
491 if isinstance(dest, basestring) and ':' in dest:
492 remspec, path = dest.split(':',1)
493 elif isinstance(source, basestring) and ':' in source:
494 remspec, path = source.split(':',1)
496 raise ValueError, "Both endpoints cannot be local"
497 user,host = remspec.rsplit('@',1)
498 tmp_known_hosts = None
500 args = ['ssh', '-l', user, '-C',
501 # Don't bother with localhost. Makes test easier
502 '-o', 'NoHostAuthenticationForLocalhost=yes',
505 args.append('-P%d' % port)
507 args.extend(('-i', ident_key))
509 # Create a temporary server key file
510 tmp_known_hosts = _make_server_key_args(
511 server_key, host, port, args)
513 if isinstance(source, file) or hasattr(source, 'read'):
514 args.append('cat > %s' % (shell_escape(path),))
515 elif isinstance(dest, file) or hasattr(dest, 'write'):
516 args.append('cat %s' % (shell_escape(path),))
518 raise AssertionError, "Unreachable code reached! :-Q"
520 # connects to the remote host and starts a remote connection
521 if isinstance(source, file):
522 proc = subprocess.Popen(args,
523 stdout = open('/dev/null','w'),
524 stderr = subprocess.PIPE,
526 err = proc.stderr.read()
527 proc._known_hosts = tmp_known_hosts
529 return ((None,err), proc)
530 elif isinstance(dest, file):
531 proc = subprocess.Popen(args,
532 stdout = open('/dev/null','w'),
533 stderr = subprocess.PIPE,
535 err = proc.stderr.read()
536 proc._known_hosts = tmp_known_hosts
538 return ((None,err), proc)
539 elif hasattr(source, 'read'):
540 # file-like (but not file) source
541 proc = subprocess.Popen(args,
542 stdout = open('/dev/null','w'),
543 stderr = subprocess.PIPE,
544 stdin = subprocess.PIPE)
550 buf = source.read(4096)
555 rdrdy, wrdy, broken = select.select(
558 [proc.stderr,proc.stdin])
560 if proc.stderr in rdrdy:
561 # use os.read for fully unbuffered behavior
562 err.append(os.read(proc.stderr.fileno(), 4096))
564 if proc.stdin in wrdy:
565 proc.stdin.write(buf)
571 err.append(proc.stderr.read())
573 proc._known_hosts = tmp_known_hosts
575 return ((None,''.join(err)), proc)
576 elif hasattr(dest, 'write'):
577 # file-like (but not file) dest
578 proc = subprocess.Popen(args,
579 stdout = subprocess.PIPE,
580 stderr = subprocess.PIPE,
581 stdin = open('/dev/null','w'))
586 rdrdy, wrdy, broken = select.select(
587 [proc.stderr, proc.stdout],
589 [proc.stderr, proc.stdout])
591 if proc.stderr in rdrdy:
592 # use os.read for fully unbuffered behavior
593 err.append(os.read(proc.stderr.fileno(), 4096))
595 if proc.stdout in rdrdy:
596 # use os.read for fully unbuffered behavior
597 buf = os.read(proc.stdout.fileno(), 4096)
606 err.append(proc.stderr.read())
608 proc._known_hosts = tmp_known_hosts
610 return ((None,''.join(err)), proc)
612 raise AssertionError, "Unreachable code reached! :-Q"
614 # Parse destination as <user>@<server>:<path>
615 if isinstance(dest, basestring) and ':' in dest:
616 remspec, path = dest.split(':',1)
617 elif isinstance(source, basestring) and ':' in source:
618 remspec, path = source.split(':',1)
620 raise ValueError, "Both endpoints cannot be local"
621 user,host = remspec.rsplit('@',1)
624 tmp_known_hosts = None
625 args = ['scp', '-q', '-p', '-C',
626 # Don't bother with localhost. Makes test easier
627 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
629 args.append('-P%d' % port)
633 args.extend(('-i', ident_key))
635 # Create a temporary server key file
636 tmp_known_hosts = _make_server_key_args(
637 server_key, host, port, args)
638 if isinstance(source,list):
644 # connects to the remote host and starts a remote connection
645 proc = subprocess.Popen(args,
646 stdout = subprocess.PIPE,
647 stdin = subprocess.PIPE,
648 stderr = subprocess.PIPE)
649 proc._known_hosts = tmp_known_hosts
651 comm = proc.communicate()
655 def popen_ssh_subprocess(python_code, host, port, user, agent,
660 environment_setup = "",
661 waitcommand = False):
664 python_path.replace("'", r"'\''")
665 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
667 if environment_setup:
668 cmd += environment_setup
670 # Uncomment for debug (to run everything under strace)
671 # We had to verify if strace works (cannot nest them)
672 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
674 #cmd += "strace -f -tt -s 200 -o strace$$.out "
676 cmd += "import base64, os\n"
677 cmd += "cmd = \"\"\n"
678 cmd += "while True:\n"
679 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
680 cmd += " if cmd[-1] == \"\\n\": break\n"
681 cmd += "cmd = base64.b64decode(cmd)\n"
682 # Uncomment for debug
683 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
685 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
688 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
691 tmp_known_hosts = None
693 # Don't bother with localhost. Makes test easier
694 '-o', 'NoHostAuthenticationForLocalhost=yes',
699 args.append('-p%d' % port)
701 args.extend(('-i', ident_key))
705 # Create a temporary server key file
706 tmp_known_hosts = _make_server_key_args(
707 server_key, host, port, args)
710 # connects to the remote host and starts a remote rpyc connection
711 proc = subprocess.Popen(args,
712 stdout = subprocess.PIPE,
713 stdin = subprocess.PIPE,
714 stderr = subprocess.PIPE)
715 proc._known_hosts = tmp_known_hosts
717 # send the command to execute
718 os.write(proc.stdin.fileno(),
719 base64.b64encode(python_code) + "\n")
720 msg = os.read(proc.stdout.fileno(), 3)
722 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
723 msg, proc.stdout.read(), proc.stderr.read())