2 # -*- coding: utf-8 -*-
16 CTRL_SOCK = "ctrl.sock"
17 STD_ERR = "stderr.log"
25 if hasattr(os, "devnull"):
28 DEV_NULL = "/dev/null"
31 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
32 self._root_dir = root_dir
34 self._ctrl_sock = None
35 self._log_level = log_level
44 # can not return normally after fork beacuse no exec was done.
45 # This means that if we don't do a os._exit(0) here the code that
46 # follows the call to "Server.run()" in the "caller code" will be
47 # executed... but by now it has already been executed after the
48 # first process (the one that did the first fork) returned.
56 # pipes for process synchronization
64 # os.waitpid avoids leaving a <defunc> (zombie) process
65 st = os.waitpid(pid1, 0)[1]
67 raise RuntimeError("Daemonization failed")
68 # return 0 to inform the caller method that this is not the
73 # Decouple from parent environment.
74 os.chdir(self._root_dir)
81 # see ref: "os._exit(0)"
84 # close all open file descriptors.
85 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
86 if (max_fd == resource.RLIM_INFINITY):
88 for fd in range(3, max_fd):
95 # Redirect standard file descriptors.
96 stdin = open(DEV_NULL, "r")
97 stderr = stdout = open(STD_ERR, "a", 0)
98 os.dup2(stdin.fileno(), sys.stdin.fileno())
99 # NOTE: sys.stdout.write will still be buffered, even if the file
100 # was opened with 0 buffer
101 os.dup2(stdout.fileno(), sys.stdout.fileno())
102 os.dup2(stderr.fileno(), sys.stderr.fileno())
104 # let the parent process know that the daemonization is finished
109 def post_daemonize(self):
113 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
114 self._ctrl_sock.bind(CTRL_SOCK)
115 self._ctrl_sock.listen(0)
116 while not self._stop:
117 conn, addr = self._ctrl_sock.accept()
121 msg = self.recv_msg(conn)
122 except socket.timeout, e:
127 reply = self.stop_action()
129 reply = self.reply_action(msg)
130 self.send_reply(conn, reply)
133 def recv_msg(self, conn):
137 chunk = conn.recv(1024)
139 if e.errno != errno.EINTR:
144 if chunk[-1] == "\n":
146 decoded = base64.b64decode(data)
147 return decoded.rstrip()
149 def send_reply(self, conn, reply):
150 encoded = base64.b64encode(reply)
151 conn.send("%s\n" % encoded)
155 self._ctrl_sock.close()
160 def stop_action(self):
161 return "Stopping server"
163 def reply_action(self, msg):
164 return "Reply to: %s" % msg
166 def log_error(self, text = None):
168 text = traceback.format_exc()
169 date = time.strftime("%Y-%m-%d %H:%M:%S")
170 sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
173 def log_debug(self, text):
174 if self._log_level == DEBUG_LEVEL:
175 date = time.strftime("%Y-%m-%d %H:%M:%S")
176 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
178 class Forwarder(object):
179 def __init__(self, root_dir = "."):
180 self._ctrl_sock = None
181 self._root_dir = root_dir
186 while not self._stop:
187 data = self.read_data()
188 self.send_to_server(data)
189 data = self.recv_from_server()
190 self.write_data(data)
194 return sys.stdin.readline()
196 def write_data(self, data):
197 sys.stdout.write(data)
198 # sys.stdout.write is buffered, this is why we need to do a flush()
201 def send_to_server(self, data):
203 self._ctrl_sock.send(data)
205 if e.errno == errno.EPIPE:
207 self._ctrl_sock.send(data)
210 encoded = data.rstrip()
211 msg = base64.b64decode(encoded)
215 def recv_from_server(self):
219 chunk = self._ctrl_sock.recv(1024)
221 if e.errno != errno.EINTR:
226 if chunk[-1] == "\n":
232 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
233 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
234 self._ctrl_sock.connect(sock_addr)
236 def disconnect(self):
238 self._ctrl_sock.close()
242 class Client(object):
243 def __init__(self, root_dir = ".", host = None, port = None, user = None,
245 python_code = "from nepi.util import server;c=server.Forwarder('%s');\
246 c.forward()" % root_dir
248 self._process = popen_ssh_subprocess(python_code, host, port,
251 self._process = subprocess.Popen(
252 ["python", "-c", python_code],
253 stdin = subprocess.PIPE,
254 stdout = subprocess.PIPE,
255 stderr = subprocess.PIPE
258 def send_msg(self, msg):
259 encoded = base64.b64encode(msg)
260 data = "%s\n" % encoded
261 self._process.stdin.write(data)
264 self.send_msg(STOP_MSG)
266 def read_reply(self):
267 data = self._process.stdout.readline()
268 encoded = data.rstrip()
269 return base64.b64decode(encoded)
271 def popen_ssh_subprocess(python_code, host, port, user, agent,
274 python_path.replace("'", r"'\''")
275 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
278 # Uncomment for debug (to run everything under strace)
279 # We had to verify if strace works (cannot nest them)
280 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
282 #if self.mode == MODE_SSH:
283 # cmd += "strace -f -tt -s 200 -o strace$$.out "
285 cmd += "import base64, os\n"
286 cmd += "cmd = \"\"\n"
287 cmd += "while True:\n"
288 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
289 cmd += " if cmd[-1] == \"\\n\": break\n"
290 cmd += "cmd = base64.b64decode(cmd)\n"
291 # Uncomment for debug
292 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
293 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
294 cmd += "exec(cmd)\n'"
297 # Don't bother with localhost. Makes test easier
298 '-o', 'NoHostAuthenticationForLocalhost=yes',
303 args.append('-p%d' % port)
306 # connects to the remote host and starts a remote rpyc connection
307 proc = subprocess.Popen(args,
308 stdout = subprocess.PIPE,
309 stdin = subprocess.PIPE,
310 stderr = subprocess.PIPE)
311 # send the command to execute
312 os.write(proc.stdin.fileno(),
313 base64.b64encode(python_code) + "\n")
314 msg = os.read(proc.stdout.fileno(), 3)
316 raise RuntimeError("Failed to start remote python interpreter")