server daemon launched over ssh connection.
[nepi.git] / src / nepi / util / server.py
index 8a3dbed..2e02c74 100644 (file)
@@ -4,12 +4,13 @@
 import base64
 import errno
 import os
+import resource
 import select
 import socket
 import sys
 import subprocess
 import threading
-from time import strftime
+import time
 import traceback
 
 CTRL_SOCK = "ctrl.sock"
@@ -21,13 +22,17 @@ STOP_MSG = "STOP"
 ERROR_LEVEL = 0
 DEBUG_LEVEL = 1
 
+if hasattr(os, "devnull"):
+    DEV_NULL = os.devnull
+else:
+    DEV_NULL = "/dev/null"
+
 class Server(object):
-    def __init__(self, root_dir = "."):
+    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
         self._root_dir = root_dir
         self._stop = False
         self._ctrl_sock = None
-        self._stderr = None
-        self._log_level = ERROR_LEVEL
+        self._log_level = log_level
 
     def run(self):
         try:
@@ -77,7 +82,10 @@ class Server(object):
             os._exit(0)
 
         # close all open file descriptors.
-        for fd in range(3, MAX_FD):
+        max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+        if (max_fd == resource.RLIM_INFINITY):
+            max_fd = MAX_FD
+        for fd in range(3, max_fd):
             if fd != w:
                 try:
                     os.close(fd)
@@ -85,11 +93,14 @@ class Server(object):
                     pass
 
         # Redirect standard file descriptors.
-        self._stderr = stdout = file(STD_ERR, "a", 0)
-        stdin = open('/dev/null', 'r')
+        stdin = open(DEV_NULL, "r")
+        stderr = stdout = open(STD_ERR, "a", 0)
         os.dup2(stdin.fileno(), sys.stdin.fileno())
+        # NOTE: sys.stdout.write will still be buffered, even if the file
+        # was opened with 0 buffer
         os.dup2(stdout.fileno(), sys.stdout.fileno())
-        os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+        os.dup2(stderr.fileno(), sys.stderr.fileno())
+
         # let the parent process know that the daemonization is finished
         os.write(w, "\n")
         os.close(w)
@@ -152,22 +163,16 @@ class Server(object):
     def reply_action(self, msg):
         return "Reply to: %s" % msg
 
-    def set_error_log_level(self):
-        self._log_level = ERROR_LEVEL
-
-    def set_debug_log_level(self):
-        self._log_level = DEBUG_LEVEL
-
     def log_error(self, text = None):
         if text == None:
             text = traceback.format_exc()
-        date = strftime("%Y-%m-%d %H:%M:%S")
+        date = time.strftime("%Y-%m-%d %H:%M:%S")
         sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
         return text
 
     def log_debug(self, text):
         if self._log_level == DEBUG_LEVEL:
-            date = strftime("%Y-%m-%d %H:%M:%S")
+            date = time.strftime("%Y-%m-%d %H:%M:%S")
             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
 
 class Forwarder(object):
@@ -190,6 +195,7 @@ class Forwarder(object):
 
     def write_data(self, data):
         sys.stdout.write(data)
+        # sys.stdout.write is buffered, for this we need to do a flush()
         sys.stdout.flush()
 
     def send_to_server(self, data):
@@ -234,14 +240,20 @@ class Forwarder(object):
             pass
 
 class Client(object):
-    def __init__(self, root_dir = "."):
-        self._process = subprocess.Popen(
-                ["python", "-c", 
-                "from nepi.util import server;c=server.Forwarder('%s');\
-                        c.forward()" % root_dir
-                ],
-                stdin = subprocess.PIPE, 
-                stdout = subprocess.PIPE)
+    def __init__(self, root_dir = ".", host = None, port = None, user = None, 
+            agent = None):
+        python_code = "from nepi.util import server;c=server.Forwarder('%s');\
+                c.forward()" % root_dir
+        if host != None:
+            self._process = popen_ssh_subprocess(python_code, host, port, 
+                    user, agent)
+        else:
+            self._process = subprocess.Popen(
+                    ["python", "-c", python_code],
+                    stdin = subprocess.PIPE, 
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE
+                )
 
     def send_msg(self, msg):
         encoded = base64.b64encode(msg)
@@ -256,3 +268,51 @@ class Client(object):
         encoded = data.rstrip() 
         return base64.b64decode(encoded)
 
+def popen_ssh_subprocess(python_code, host, port, user, agent, 
+        python_path = None):
+        if python_path:
+            python_path.replace("'", r"'\''")
+            cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+        else:
+            cmd = ""
+        # Uncomment for debug (to run everything under strace)
+        # We had to verify if strace works (cannot nest them)
+        #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+        #cmd += "$CMD "
+        #if self.mode == MODE_SSH:
+        #    cmd += "strace -f -tt -s 200 -o strace$$.out "
+        cmd += "python -c '"
+        cmd += "import base64, os\n"
+        cmd += "cmd = \"\"\n"
+        cmd += "while True:\n"
+        cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
+        cmd += " if cmd[-1] == \"\\n\": break\n"
+        cmd += "cmd = base64.b64decode(cmd)\n"
+        # Uncomment for debug
+        #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
+        cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
+        cmd += "exec(cmd)\n'"
+
+        args = ['ssh',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-l', user, host]
+        if agent:
+            args.append('-A')
+        if port:
+            args.append('-p%d' % port)
+        args.append(cmd)
+
+        # connects to the remote host and starts a remote rpyc connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        # send the command to execute
+        os.write(proc.stdin.fileno(),
+                base64.b64encode(python_code) + "\n")
+        msg = os.read(proc.stdout.fileno(), 3)
+        if msg != "OK\n":
+            raise RuntimeError("Failed to start remote python interpreter")
+        return proc