# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # class Server(object): def __init__(self, root_dir = ".", log_level = "ERROR", environment_setup = "", clean_root = False): self._root_dir = root_dir self._clean_root = clean_root self._stop = False self._ctrl_sock = None self._log_level = log_level self._rdbuf = "" self._environment_setup = environment_setup def run(self): try: if self.daemonize(): self.post_daemonize() self.loop() self.cleanup() # ref: "os._exit(0)" # can not return normally after fork beacuse no exec was done. # This means that if we don't do a os._exit(0) here the code that # follows the call to "Server.run()" in the "caller code" will be # executed... but by now it has already been executed after the # first process (the one that did the first fork) returned. os._exit(0) except: print >>sys.stderr, "SERVER_ERROR." self.log_error() self.cleanup() os._exit(0) print >>sys.stderr, "SERVER_READY." def daemonize(self): # pipes for process synchronization (r, w) = os.pipe() # build root folder root = os.path.normpath(self._root_dir) if self._root_dir not in [".", ""] and os.path.exists(root) \ and self._clean_root: shutil.rmtree(root) if not os.path.exists(root): os.makedirs(root, 0755) pid1 = os.fork() if pid1 > 0: os.close(w) while True: try: os.read(r, 1) except OSError, e: # pragma: no cover if e.errno == errno.EINTR: continue else: raise break os.close(r) # os.waitpid avoids leaving a (zombie) process st = os.waitpid(pid1, 0)[1] if st: raise RuntimeError("Daemonization failed") # return 0 to inform the caller method that this is not the # daemonized process return 0 os.close(r) # Decouple from parent environment. os.chdir(self._root_dir) os.umask(0) os.setsid() # fork 2 pid2 = os.fork() if pid2 > 0: # see ref: "os._exit(0)" os._exit(0) # close all open file descriptors. max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if (max_fd == resource.RLIM_INFINITY): max_fd = MAX_FD for fd in range(3, max_fd): if fd != w: try: os.close(fd) except OSError: pass # Redirect standard file descriptors. stdin = open(DEV_NULL, "r") stderr = stdout = open(STD_ERR, "a", 0) os.dup2(stdin.fileno(), sys.stdin.fileno()) # NOTE: sys.stdout.write will still be buffered, even if the file # was opened with 0 buffer os.dup2(stdout.fileno(), sys.stdout.fileno()) os.dup2(stderr.fileno(), sys.stderr.fileno()) # setup environment if self._environment_setup: # parse environment variables and pass to child process # do it by executing shell commands, in case there's some heavy setup involved envproc = subprocess.Popen( [ "bash", "-c", "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" % ( self._environment_setup, ) ], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE ) out,err = envproc.communicate() # parse new environment if out: environment = dict(map(lambda x:x.split("\x02"), out.split("\x01"))) # apply to current environment for name, value in environment.iteritems(): os.environ[name] = value # apply pythonpath if 'PYTHONPATH' in environment: sys.path = environment['PYTHONPATH'].split(':') + sys.path # create control socket self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: self._ctrl_sock.bind(CTRL_SOCK) except socket.error: # Address in use, check pidfile pid = None try: pidfile = open(CTRL_PID, "r") pid = pidfile.read() pidfile.close() pid = int(pid) except: # no pidfile pass if pid is not None: # Check process liveliness if not os.path.exists("/proc/%d" % (pid,)): # Ok, it's dead, clean the socket os.remove(CTRL_SOCK) # try again self._ctrl_sock.bind(CTRL_SOCK) self._ctrl_sock.listen(0) # Save pidfile pidfile = open(CTRL_PID, "w") pidfile.write(str(os.getpid())) pidfile.close() # let the parent process know that the daemonization is finished os.write(w, "\n") os.close(w) return 1 def post_daemonize(self): os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level # QT, for some strange reason, redefines the SIGCHILD handler to write # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received. # Server dameonization closes all file descriptors from fileno '3', # but the overloaded handler (inherited by the forked process) will # keep trying to write the \0 to fileno 'x', which might have been reused # after closing, for other operations. This is bad bad bad when fileno 'x' # is in use for communication pouroses, because unexpected \0 start # appearing in the communication messages... this is exactly what happens # when using netns in daemonized form. Thus, be have no other alternative than # restoring the SIGCHLD handler to the default here. import signal signal.signal(signal.SIGCHLD, signal.SIG_DFL) def loop(self): while not self._stop: conn, addr = self._ctrl_sock.accept() self.log_error("ACCEPTED CONNECTION: %s" % (addr,)) conn.settimeout(5) while not self._stop: try: msg = self.recv_msg(conn) except socket.timeout, e: #self.log_error("SERVER recv_msg: connection timedout ") continue if not msg: self.log_error("CONNECTION LOST") break if msg == STOP_MSG: self._stop = True reply = self.stop_action() else: reply = self.reply_action(msg) try: self.send_reply(conn, reply) except socket.error: self.log_error() self.log_error("NOTICE: Awaiting for reconnection") break try: conn.close() except: # Doesn't matter self.log_error() def recv_msg(self, conn): data = [self._rdbuf] chunk = data[0] while '\n' not in chunk: try: chunk = conn.recv(1024) except (OSError, socket.error), e: if e[0] != errno.EINTR: raise else: continue if chunk: data.append(chunk) else: # empty chunk = EOF break data = ''.join(data).split('\n',1) while len(data) < 2: data.append('') data, self._rdbuf = data decoded = base64.b64decode(data) return decoded.rstrip() def send_reply(self, conn, reply): encoded = base64.b64encode(reply) conn.send("%s\n" % encoded) def cleanup(self): try: self._ctrl_sock.close() os.remove(CTRL_SOCK) except: self.log_error() def stop_action(self): return "Stopping server" def reply_action(self, msg): return "Reply to: %s" % msg def log_error(self, text = None, context = ''): if text == None: text = traceback.format_exc() date = time.strftime("%Y-%m-%d %H:%M:%S") if context: context = " (%s)" % (context,) sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text)) return text def log_debug(self, text): if self._log_level == DC.DEBUG_LEVEL: date = time.strftime("%Y-%m-%d %H:%M:%S") sys.stderr.write("DEBUG: %s\n%s\n" % (date, text)) class Forwarder(object): def __init__(self, root_dir = "."): self._ctrl_sock = None self._root_dir = root_dir self._stop = False self._rdbuf = "" def forward(self): self.connect() print >>sys.stderr, "FORWARDER_READY." while not self._stop: data = self.read_data() if not data: # Connection to client lost break self.send_to_server(data) data = self.recv_from_server() if not data: # Connection to server lost raise IOError, "Connection to server lost while "\ "expecting response" self.write_data(data) self.disconnect() def read_data(self): return sys.stdin.readline() def write_data(self, data): sys.stdout.write(data) # sys.stdout.write is buffered, this is why we need to do a flush() sys.stdout.flush() def send_to_server(self, data): try: self._ctrl_sock.send(data) except (IOError, socket.error), e: if e[0] == errno.EPIPE: self.connect() self._ctrl_sock.send(data) else: raise e encoded = data.rstrip() msg = base64.b64decode(encoded) if msg == STOP_MSG: self._stop = True def recv_from_server(self): data = [self._rdbuf] chunk = data[0] while '\n' not in chunk: try: chunk = self._ctrl_sock.recv(1024) except (OSError, socket.error), e: if e[0] != errno.EINTR: raise continue if chunk: data.append(chunk) else: # empty chunk = EOF break data = ''.join(data).split('\n',1) while len(data) < 2: data.append('') data, self._rdbuf = data return data+'\n' def connect(self): self.disconnect() self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock_addr = os.path.join(self._root_dir, CTRL_SOCK) self._ctrl_sock.connect(sock_addr) def disconnect(self): try: self._ctrl_sock.close() except: pass