2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 def __init__(self, root_dir = ".", log_level = "ERROR",
23 environment_setup = "", clean_root = False):
24 self._root_dir = root_dir
25 self._clean_root = clean_root
27 self._ctrl_sock = None
28 self._log_level = log_level
30 self._environment_setup = environment_setup
39 # can not return normally after fork beacuse no exec was done.
40 # This means that if we don't do a os._exit(0) here the code that
41 # follows the call to "Server.run()" in the "caller code" will be
42 # executed... but by now it has already been executed after the
43 # first process (the one that did the first fork) returned.
46 print >>sys.stderr, "SERVER_ERROR."
50 print >>sys.stderr, "SERVER_READY."
53 # pipes for process synchronization
57 root = os.path.normpath(self._root_dir)
58 if self._root_dir not in [".", ""] and os.path.exists(root) \
61 if not os.path.exists(root):
62 os.makedirs(root, 0755)
70 except OSError, e: # pragma: no cover
71 if e.errno == errno.EINTR:
77 # os.waitpid avoids leaving a <defunc> (zombie) process
78 st = os.waitpid(pid1, 0)[1]
80 raise RuntimeError("Daemonization failed")
81 # return 0 to inform the caller method that this is not the
86 # Decouple from parent environment.
87 os.chdir(self._root_dir)
94 # see ref: "os._exit(0)"
97 # close all open file descriptors.
98 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
99 if (max_fd == resource.RLIM_INFINITY):
101 for fd in range(3, max_fd):
108 # Redirect standard file descriptors.
109 stdin = open(DEV_NULL, "r")
110 stderr = stdout = open(STD_ERR, "a", 0)
111 os.dup2(stdin.fileno(), sys.stdin.fileno())
112 # NOTE: sys.stdout.write will still be buffered, even if the file
113 # was opened with 0 buffer
114 os.dup2(stdout.fileno(), sys.stdout.fileno())
115 os.dup2(stderr.fileno(), sys.stderr.fileno())
118 if self._environment_setup:
119 # parse environment variables and pass to child process
120 # do it by executing shell commands, in case there's some heavy setup involved
121 envproc = subprocess.Popen(
123 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
124 ( self._environment_setup, ) ],
125 stdin = subprocess.PIPE,
126 stdout = subprocess.PIPE,
127 stderr = subprocess.PIPE
129 out,err = envproc.communicate()
131 # parse new environment
133 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
135 # apply to current environment
136 for name, value in environment.iteritems():
137 os.environ[name] = value
140 if 'PYTHONPATH' in environment:
141 sys.path = environment['PYTHONPATH'].split(':') + sys.path
143 # create control socket
144 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
146 self._ctrl_sock.bind(CTRL_SOCK)
148 # Address in use, check pidfile
151 pidfile = open(CTRL_PID, "r")
160 # Check process liveliness
161 if not os.path.exists("/proc/%d" % (pid,)):
162 # Ok, it's dead, clean the socket
166 self._ctrl_sock.bind(CTRL_SOCK)
168 self._ctrl_sock.listen(0)
171 pidfile = open(CTRL_PID, "w")
172 pidfile.write(str(os.getpid()))
175 # let the parent process know that the daemonization is finished
180 def post_daemonize(self):
181 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
182 # QT, for some strange reason, redefines the SIGCHILD handler to write
183 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
184 # Server dameonization closes all file descriptors from fileno '3',
185 # but the overloaded handler (inherited by the forked process) will
186 # keep trying to write the \0 to fileno 'x', which might have been reused
187 # after closing, for other operations. This is bad bad bad when fileno 'x'
188 # is in use for communication pouroses, because unexpected \0 start
189 # appearing in the communication messages... this is exactly what happens
190 # when using netns in daemonized form. Thus, be have no other alternative than
191 # restoring the SIGCHLD handler to the default here.
193 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
196 while not self._stop:
197 conn, addr = self._ctrl_sock.accept()
198 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
200 while not self._stop:
202 msg = self.recv_msg(conn)
203 except socket.timeout, e:
204 #self.log_error("SERVER recv_msg: connection timedout ")
208 self.log_error("CONNECTION LOST")
213 reply = self.stop_action()
215 reply = self.reply_action(msg)
218 self.send_reply(conn, reply)
221 self.log_error("NOTICE: Awaiting for reconnection")
229 def recv_msg(self, conn):
232 while '\n' not in chunk:
234 chunk = conn.recv(1024)
235 except (OSError, socket.error), e:
236 if e[0] != errno.EINTR:
245 data = ''.join(data).split('\n',1)
248 data, self._rdbuf = data
250 decoded = base64.b64decode(data)
251 return decoded.rstrip()
253 def send_reply(self, conn, reply):
254 encoded = base64.b64encode(reply)
255 conn.send("%s\n" % encoded)
259 self._ctrl_sock.close()
264 def stop_action(self):
265 return "Stopping server"
267 def reply_action(self, msg):
268 return "Reply to: %s" % msg
270 def log_error(self, text = None, context = ''):
272 text = traceback.format_exc()
273 date = time.strftime("%Y-%m-%d %H:%M:%S")
275 context = " (%s)" % (context,)
276 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
279 def log_debug(self, text):
280 if self._log_level == DC.DEBUG_LEVEL:
281 date = time.strftime("%Y-%m-%d %H:%M:%S")
282 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
284 class Forwarder(object):
285 def __init__(self, root_dir = "."):
286 self._ctrl_sock = None
287 self._root_dir = root_dir
293 print >>sys.stderr, "FORWARDER_READY."
294 while not self._stop:
295 data = self.read_data()
297 # Connection to client lost
299 self.send_to_server(data)
301 data = self.recv_from_server()
303 # Connection to server lost
304 raise IOError, "Connection to server lost while "\
306 self.write_data(data)
310 return sys.stdin.readline()
312 def write_data(self, data):
313 sys.stdout.write(data)
314 # sys.stdout.write is buffered, this is why we need to do a flush()
317 def send_to_server(self, data):
319 self._ctrl_sock.send(data)
320 except (IOError, socket.error), e:
321 if e[0] == errno.EPIPE:
323 self._ctrl_sock.send(data)
326 encoded = data.rstrip()
327 msg = base64.b64decode(encoded)
331 def recv_from_server(self):
334 while '\n' not in chunk:
336 chunk = self._ctrl_sock.recv(1024)
337 except (OSError, socket.error), e:
338 if e[0] != errno.EINTR:
346 data = ''.join(data).split('\n',1)
349 data, self._rdbuf = data
355 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
356 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
357 self._ctrl_sock.connect(sock_addr)
359 def disconnect(self):
361 self._ctrl_sock.close()