2 def __init__(self, root_dir = ".", log_level = "ERROR",
3 environment_setup = "", clean_root = False):
4 self._root_dir = root_dir
5 self._clean_root = clean_root
8 self._log_level = log_level
10 self._environment_setup = environment_setup
19 # can not return normally after fork beacuse no exec was done.
20 # This means that if we don't do a os._exit(0) here the code that
21 # follows the call to "Server.run()" in the "caller code" will be
22 # executed... but by now it has already been executed after the
23 # first process (the one that did the first fork) returned.
26 print >>sys.stderr, "SERVER_ERROR."
30 print >>sys.stderr, "SERVER_READY."
33 # pipes for process synchronization
37 root = os.path.normpath(self._root_dir)
38 if self._root_dir not in [".", ""] and os.path.exists(root) \
41 if not os.path.exists(root):
42 os.makedirs(root, 0755)
50 except OSError, e: # pragma: no cover
51 if e.errno == errno.EINTR:
57 # os.waitpid avoids leaving a <defunc> (zombie) process
58 st = os.waitpid(pid1, 0)[1]
60 raise RuntimeError("Daemonization failed")
61 # return 0 to inform the caller method that this is not the
66 # Decouple from parent environment.
67 os.chdir(self._root_dir)
74 # see ref: "os._exit(0)"
77 # close all open file descriptors.
78 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
79 if (max_fd == resource.RLIM_INFINITY):
81 for fd in range(3, max_fd):
88 # Redirect standard file descriptors.
89 stdin = open(DEV_NULL, "r")
90 stderr = stdout = open(STD_ERR, "a", 0)
91 os.dup2(stdin.fileno(), sys.stdin.fileno())
92 # NOTE: sys.stdout.write will still be buffered, even if the file
93 # was opened with 0 buffer
94 os.dup2(stdout.fileno(), sys.stdout.fileno())
95 os.dup2(stderr.fileno(), sys.stderr.fileno())
98 if self._environment_setup:
99 # parse environment variables and pass to child process
100 # do it by executing shell commands, in case there's some heavy setup involved
101 envproc = subprocess.Popen(
103 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
104 ( self._environment_setup, ) ],
105 stdin = subprocess.PIPE,
106 stdout = subprocess.PIPE,
107 stderr = subprocess.PIPE
109 out,err = envproc.communicate()
111 # parse new environment
113 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
115 # apply to current environment
116 for name, value in environment.iteritems():
117 os.environ[name] = value
120 if 'PYTHONPATH' in environment:
121 sys.path = environment['PYTHONPATH'].split(':') + sys.path
123 # create control socket
124 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
126 self._ctrl_sock.bind(CTRL_SOCK)
128 # Address in use, check pidfile
131 pidfile = open(CTRL_PID, "r")
140 # Check process liveliness
141 if not os.path.exists("/proc/%d" % (pid,)):
142 # Ok, it's dead, clean the socket
146 self._ctrl_sock.bind(CTRL_SOCK)
148 self._ctrl_sock.listen(0)
151 pidfile = open(CTRL_PID, "w")
152 pidfile.write(str(os.getpid()))
155 # let the parent process know that the daemonization is finished
160 def post_daemonize(self):
161 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
162 # QT, for some strange reason, redefines the SIGCHILD handler to write
163 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
164 # Server dameonization closes all file descriptors from fileno '3',
165 # but the overloaded handler (inherited by the forked process) will
166 # keep trying to write the \0 to fileno 'x', which might have been reused
167 # after closing, for other operations. This is bad bad bad when fileno 'x'
168 # is in use for communication pouroses, because unexpected \0 start
169 # appearing in the communication messages... this is exactly what happens
170 # when using netns in daemonized form. Thus, be have no other alternative than
171 # restoring the SIGCHLD handler to the default here.
173 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
176 while not self._stop:
177 conn, addr = self._ctrl_sock.accept()
178 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
180 while not self._stop:
182 msg = self.recv_msg(conn)
183 except socket.timeout, e:
184 #self.log_error("SERVER recv_msg: connection timedout ")
188 self.log_error("CONNECTION LOST")
193 reply = self.stop_action()
195 reply = self.reply_action(msg)
198 self.send_reply(conn, reply)
201 self.log_error("NOTICE: Awaiting for reconnection")
209 def recv_msg(self, conn):
212 while '\n' not in chunk:
214 chunk = conn.recv(1024)
215 except (OSError, socket.error), e:
216 if e[0] != errno.EINTR:
225 data = ''.join(data).split('\n',1)
228 data, self._rdbuf = data
230 decoded = base64.b64decode(data)
231 return decoded.rstrip()
233 def send_reply(self, conn, reply):
234 encoded = base64.b64encode(reply)
235 conn.send("%s\n" % encoded)
239 self._ctrl_sock.close()
244 def stop_action(self):
245 return "Stopping server"
247 def reply_action(self, msg):
248 return "Reply to: %s" % msg
250 def log_error(self, text = None, context = ''):
252 text = traceback.format_exc()
253 date = time.strftime("%Y-%m-%d %H:%M:%S")
255 context = " (%s)" % (context,)
256 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
259 def log_debug(self, text):
260 if self._log_level == DC.DEBUG_LEVEL:
261 date = time.strftime("%Y-%m-%d %H:%M:%S")
262 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
264 class Forwarder(object):
265 def __init__(self, root_dir = "."):
266 self._ctrl_sock = None
267 self._root_dir = root_dir
273 print >>sys.stderr, "FORWARDER_READY."
274 while not self._stop:
275 data = self.read_data()
277 # Connection to client lost
279 self.send_to_server(data)
281 data = self.recv_from_server()
283 # Connection to server lost
284 raise IOError, "Connection to server lost while "\
286 self.write_data(data)
290 return sys.stdin.readline()
292 def write_data(self, data):
293 sys.stdout.write(data)
294 # sys.stdout.write is buffered, this is why we need to do a flush()
297 def send_to_server(self, data):
299 self._ctrl_sock.send(data)
300 except (IOError, socket.error), e:
301 if e[0] == errno.EPIPE:
303 self._ctrl_sock.send(data)
306 encoded = data.rstrip()
307 msg = base64.b64decode(encoded)
311 def recv_from_server(self):
314 while '\n' not in chunk:
316 chunk = self._ctrl_sock.recv(1024)
317 except (OSError, socket.error), e:
318 if e[0] != errno.EINTR:
326 data = ''.join(data).split('\n',1)
329 data, self._rdbuf = data
335 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
336 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
337 self._ctrl_sock.connect(sock_addr)
339 def disconnect(self):
341 self._ctrl_sock.close()