2 # -*- coding: utf-8 -*-
12 from time import strftime
15 CTRL_SOCK = "ctrl.sock"
16 STD_ERR = "stderr.log"
25 def __init__(self, root_dir = "."):
26 self._root_dir = root_dir
28 self._ctrl_sock = None
30 self._log_level = ERROR_LEVEL
39 # can not return normally after fork beacuse no exec was done.
40 # This means that if we don't do a os._exit(0) here the code that
41 # follows the call to "Server.run()" in the "caller code" will be
42 # executed... but by now it has already been executed after the
43 # first process (the one that did the first fork) returned.
51 # pipes for process synchronization
59 # os.waitpid avoids leaving a <defunc> (zombie) process
60 st = os.waitpid(pid1, 0)[1]
62 raise RuntimeError("Daemonization failed")
63 # return 0 to inform the caller method that this is not the
68 # Decouple from parent environment.
69 os.chdir(self._root_dir)
76 # see ref: "os._exit(0)"
79 # close all open file descriptors.
80 for fd in range(3, MAX_FD):
87 # Redirect standard file descriptors.
88 self._stderr = stdout = file(STD_ERR, "a", 0)
89 stdin = open('/dev/null', 'r')
90 os.dup2(stdin.fileno(), sys.stdin.fileno())
91 os.dup2(stdout.fileno(), sys.stdout.fileno())
92 os.dup2(self._stderr.fileno(), sys.stderr.fileno())
93 # let the parent process know that the daemonization is finished
98 def post_daemonize(self):
102 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
103 self._ctrl_sock.bind(CTRL_SOCK)
104 self._ctrl_sock.listen(0)
105 while not self._stop:
106 conn, addr = self._ctrl_sock.accept()
110 msg = self.recv_msg(conn)
111 except socket.timeout, e:
116 reply = self.stop_action()
118 reply = self.reply_action(msg)
119 self.send_reply(conn, reply)
122 def recv_msg(self, conn):
126 chunk = conn.recv(1024)
128 if e.errno != errno.EINTR:
133 if chunk[-1] == "\n":
135 decoded = base64.b64decode(data)
136 return decoded.rstrip()
138 def send_reply(self, conn, reply):
139 encoded = base64.b64encode(reply)
140 conn.send("%s\n" % encoded)
144 self._ctrl_sock.close()
149 def stop_action(self):
150 return "Stopping server"
152 def reply_action(self, msg):
153 return "Reply to: %s" % msg
155 def set_error_log_level(self):
156 self._log_level = ERROR_LEVEL
158 def set_debug_log_level(self):
159 self._log_level = DEBUG_LEVEL
161 def log_error(self, text = None):
163 text = traceback.format_exc()
164 date = strftime("%Y-%m-%d %H:%M:%S")
165 sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
168 def log_debug(self, text):
169 if self._log_level == DEBUG_LEVEL:
170 date = strftime("%Y-%m-%d %H:%M:%S")
171 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
173 class Forwarder(object):
174 def __init__(self, root_dir = "."):
175 self._ctrl_sock = None
176 self._root_dir = root_dir
181 while not self._stop:
182 data = self.read_data()
183 self.send_to_server(data)
184 data = self.recv_from_server()
185 self.write_data(data)
189 return sys.stdin.readline()
191 def write_data(self, data):
192 sys.stdout.write(data)
195 def send_to_server(self, data):
197 self._ctrl_sock.send(data)
199 if e.errno == errno.EPIPE:
201 self._ctrl_sock.send(data)
204 encoded = data.rstrip()
205 msg = base64.b64decode(encoded)
209 def recv_from_server(self):
213 chunk = self._ctrl_sock.recv(1024)
215 if e.errno != errno.EINTR:
220 if chunk[-1] == "\n":
226 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
227 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
228 self._ctrl_sock.connect(sock_addr)
230 def disconnect(self):
232 self._ctrl_sock.close()
236 class Client(object):
237 def __init__(self, root_dir = "."):
238 self._process = subprocess.Popen(
240 "from nepi.util import server;c=server.Forwarder('%s');\
241 c.forward()" % root_dir
243 stdin = subprocess.PIPE,
244 stdout = subprocess.PIPE)
246 def send_msg(self, msg):
247 encoded = base64.b64encode(msg)
248 data = "%s\n" % encoded
249 self._process.stdin.write(data)
252 self.send_msg(STOP_MSG)
254 def read_reply(self):
255 data = self._process.stdout.readline()
256 encoded = data.rstrip()
257 return base64.b64decode(encoded)