2 # -*- coding: utf-8 -*-
17 CTRL_SOCK = "ctrl.sock"
18 STD_ERR = "stderr.log"
26 if hasattr(os, "devnull"):
29 DEV_NULL = "/dev/null"
32 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
33 self._root_dir = root_dir
35 self._ctrl_sock = None
36 self._log_level = log_level
45 # can not return normally after fork beacuse no exec was done.
46 # This means that if we don't do a os._exit(0) here the code that
47 # follows the call to "Server.run()" in the "caller code" will be
48 # executed... but by now it has already been executed after the
49 # first process (the one that did the first fork) returned.
57 # pipes for process synchronization
65 # os.waitpid avoids leaving a <defunc> (zombie) process
66 st = os.waitpid(pid1, 0)[1]
68 raise RuntimeError("Daemonization failed")
69 # return 0 to inform the caller method that this is not the
74 # Decouple from parent environment.
75 os.chdir(self._root_dir)
82 # see ref: "os._exit(0)"
85 # close all open file descriptors.
86 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
87 if (max_fd == resource.RLIM_INFINITY):
89 for fd in range(3, max_fd):
96 # Redirect standard file descriptors.
97 stdin = open(DEV_NULL, "r")
98 stderr = stdout = open(STD_ERR, "a", 0)
99 os.dup2(stdin.fileno(), sys.stdin.fileno())
100 # NOTE: sys.stdout.write will still be buffered, even if the file
101 # was opened with 0 buffer
102 os.dup2(stdout.fileno(), sys.stdout.fileno())
103 os.dup2(stderr.fileno(), sys.stderr.fileno())
105 # create control socket
106 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
107 self._ctrl_sock.bind(CTRL_SOCK)
108 self._ctrl_sock.listen(0)
110 # let the parent process know that the daemonization is finished
115 def post_daemonize(self):
119 while not self._stop:
120 conn, addr = self._ctrl_sock.accept()
122 while not self._stop:
124 msg = self.recv_msg(conn)
125 except socket.timeout, e:
130 reply = self.stop_action()
132 reply = self.reply_action(msg)
135 self.send_reply(conn, reply)
138 self.log_error("NOTICE: Awaiting for reconnection")
146 def recv_msg(self, conn):
150 chunk = conn.recv(1024)
152 if e.errno != errno.EINTR:
158 if chunk[-1] == "\n":
163 decoded = base64.b64decode(data)
164 return decoded.rstrip()
166 def send_reply(self, conn, reply):
167 encoded = base64.b64encode(reply)
168 conn.send("%s\n" % encoded)
172 self._ctrl_sock.close()
177 def stop_action(self):
178 return "Stopping server"
180 def reply_action(self, msg):
181 return "Reply to: %s" % msg
183 def log_error(self, text = None, context = ''):
185 text = traceback.format_exc()
186 date = time.strftime("%Y-%m-%d %H:%M:%S")
188 context = " (%s)" % (context,)
189 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
192 def log_debug(self, text):
193 if self._log_level == DEBUG_LEVEL:
194 date = time.strftime("%Y-%m-%d %H:%M:%S")
195 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
197 class Forwarder(object):
198 def __init__(self, root_dir = "."):
199 self._ctrl_sock = None
200 self._root_dir = root_dir
205 print >>sys.stderr, "READY."
206 while not self._stop:
207 data = self.read_data()
208 self.send_to_server(data)
209 data = self.recv_from_server()
210 self.write_data(data)
214 return sys.stdin.readline()
216 def write_data(self, data):
217 sys.stdout.write(data)
218 # sys.stdout.write is buffered, this is why we need to do a flush()
221 def send_to_server(self, data):
223 self._ctrl_sock.send(data)
225 if e.errno == errno.EPIPE:
227 self._ctrl_sock.send(data)
230 encoded = data.rstrip()
231 msg = base64.b64decode(encoded)
235 def recv_from_server(self):
239 chunk = self._ctrl_sock.recv(1024)
241 if e.errno != errno.EINTR:
246 if chunk[-1] == "\n":
252 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
253 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
254 self._ctrl_sock.connect(sock_addr)
256 def disconnect(self):
258 self._ctrl_sock.close()
262 class Client(object):
263 def __init__(self, root_dir = ".", host = None, port = None, user = None,
265 self.root_dir = root_dir
266 self.addr = (host, port)
269 self._stopped = False
273 if self._process.poll() is None:
274 os.kill(self._process.pid, signal.SIGTERM)
278 root_dir = self.root_dir
279 (host, port) = self.addr
283 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
284 c.forward()" % (root_dir,)
286 self._process = popen_ssh_subprocess(python_code, host, port,
288 # popen_ssh_subprocess already waits for readiness
290 self._process = subprocess.Popen(
291 ["python", "-c", python_code],
292 stdin = subprocess.PIPE,
293 stdout = subprocess.PIPE,
294 stderr = subprocess.PIPE
297 # Wait for the forwarder to be ready, otherwise nobody
298 # will be able to connect to it
299 helo = self._process.stderr.readline()
300 if helo != 'READY.\n':
301 raise AssertionError, "Expected 'Ready.', got %r" % (helo,)
303 if self._process.poll():
304 err = self._process.stderr.read()
305 raise RuntimeError("Client could not be executed: %s" % \
308 def send_msg(self, msg):
309 encoded = base64.b64encode(msg)
310 data = "%s\n" % encoded
313 self._process.stdin.write(data)
314 except (IOError, ValueError):
315 # dead process, poll it to un-zombify
318 # try again after reconnect
319 # If it fails again, though, give up
321 self._process.stdin.write(data)
324 self.send_msg(STOP_MSG)
327 def read_reply(self):
328 data = self._process.stdout.readline()
329 encoded = data.rstrip()
330 return base64.b64decode(encoded)
332 def popen_ssh_subprocess(python_code, host, port, user, agent,
335 python_path.replace("'", r"'\''")
336 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
339 # Uncomment for debug (to run everything under strace)
340 # We had to verify if strace works (cannot nest them)
341 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
343 #if self.mode == MODE_SSH:
344 # cmd += "strace -f -tt -s 200 -o strace$$.out "
346 cmd += "import base64, os\n"
347 cmd += "cmd = \"\"\n"
348 cmd += "while True:\n"
349 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
350 cmd += " if cmd[-1] == \"\\n\": break\n"
351 cmd += "cmd = base64.b64decode(cmd)\n"
352 # Uncomment for debug
353 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
354 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
355 cmd += "exec(cmd)\n'"
358 # Don't bother with localhost. Makes test easier
359 '-o', 'NoHostAuthenticationForLocalhost=yes',
364 args.append('-p%d' % port)
367 # connects to the remote host and starts a remote rpyc connection
368 proc = subprocess.Popen(args,
369 stdout = subprocess.PIPE,
370 stdin = subprocess.PIPE,
371 stderr = subprocess.PIPE)
372 # send the command to execute
373 os.write(proc.stdin.fileno(),
374 base64.b64encode(python_code) + "\n")
375 msg = os.read(proc.stdout.fileno(), 3)
377 raise RuntimeError("Failed to start remote python interpreter")