5a82909af3ec82ee909215e786b26aaf53aa1bb4
[nepi.git] / src / nepi / resources / ns3 / ns3wrapper_server.py
1 class Server(object):
2     def __init__(self, root_dir = ".", log_level = "ERROR", 
3             environment_setup = "", clean_root = False):
4         self._root_dir = root_dir
5         self._clean_root = clean_root
6         self._stop = False
7         self._ctrl_sock = None
8         self._log_level = log_level
9         self._rdbuf = ""
10         self._environment_setup = environment_setup
11
12     def run(self):
13         try:
14             if self.daemonize():
15                 self.post_daemonize()
16                 self.loop()
17                 self.cleanup()
18                 # ref: "os._exit(0)"
19                 # can not return normally after fork beacuse no exec was done.
20                 # This means that if we don't do a os._exit(0) here the code that 
21                 # follows the call to "Server.run()" in the "caller code" will be 
22                 # executed... but by now it has already been executed after the 
23                 # first process (the one that did the first fork) returned.
24                 os._exit(0)
25         except:
26             print >>sys.stderr, "SERVER_ERROR."
27             self.log_error()
28             self.cleanup()
29             os._exit(0)
30         print >>sys.stderr, "SERVER_READY."
31
32     def daemonize(self):
33         # pipes for process synchronization
34         (r, w) = os.pipe()
35         
36         # build root folder
37         root = os.path.normpath(self._root_dir)
38         if self._root_dir not in [".", ""] and os.path.exists(root) \
39                 and self._clean_root:
40             shutil.rmtree(root)
41         if not os.path.exists(root):
42             os.makedirs(root, 0755)
43
44         pid1 = os.fork()
45         if pid1 > 0:
46             os.close(w)
47             while True:
48                 try:
49                     os.read(r, 1)
50                 except OSError, e: # pragma: no cover
51                     if e.errno == errno.EINTR:
52                         continue
53                     else:
54                         raise
55                 break
56             os.close(r)
57             # os.waitpid avoids leaving a <defunc> (zombie) process
58             st = os.waitpid(pid1, 0)[1]
59             if st:
60                 raise RuntimeError("Daemonization failed")
61             # return 0 to inform the caller method that this is not the 
62             # daemonized process
63             return 0
64         os.close(r)
65
66         # Decouple from parent environment.
67         os.chdir(self._root_dir)
68         os.umask(0)
69         os.setsid()
70
71         # fork 2
72         pid2 = os.fork()
73         if pid2 > 0:
74             # see ref: "os._exit(0)"
75             os._exit(0)
76
77         # close all open file descriptors.
78         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
79         if (max_fd == resource.RLIM_INFINITY):
80             max_fd = MAX_FD
81         for fd in range(3, max_fd):
82             if fd != w:
83                 try:
84                     os.close(fd)
85                 except OSError:
86                     pass
87
88         # Redirect standard file descriptors.
89         stdin = open(DEV_NULL, "r")
90         stderr = stdout = open(STD_ERR, "a", 0)
91         os.dup2(stdin.fileno(), sys.stdin.fileno())
92         # NOTE: sys.stdout.write will still be buffered, even if the file
93         # was opened with 0 buffer
94         os.dup2(stdout.fileno(), sys.stdout.fileno())
95         os.dup2(stderr.fileno(), sys.stderr.fileno())
96         
97         # setup environment
98         if self._environment_setup:
99             # parse environment variables and pass to child process
100             # do it by executing shell commands, in case there's some heavy setup involved
101             envproc = subprocess.Popen(
102                 [ "bash", "-c", 
103                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
104                         ( self._environment_setup, ) ],
105                 stdin = subprocess.PIPE, 
106                 stdout = subprocess.PIPE,
107                 stderr = subprocess.PIPE
108             )
109             out,err = envproc.communicate()
110
111             # parse new environment
112             if out:
113                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
114             
115                 # apply to current environment
116                 for name, value in environment.iteritems():
117                     os.environ[name] = value
118                 
119                 # apply pythonpath
120                 if 'PYTHONPATH' in environment:
121                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
122
123         # create control socket
124         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
125         try:
126             self._ctrl_sock.bind(CTRL_SOCK)
127         except socket.error:
128             # Address in use, check pidfile
129             pid = None
130             try:
131                 pidfile = open(CTRL_PID, "r")
132                 pid = pidfile.read()
133                 pidfile.close()
134                 pid = int(pid)
135             except:
136                 # no pidfile
137                 pass
138             
139             if pid is not None:
140                 # Check process liveliness
141                 if not os.path.exists("/proc/%d" % (pid,)):
142                     # Ok, it's dead, clean the socket
143                     os.remove(CTRL_SOCK)
144             
145             # try again
146             self._ctrl_sock.bind(CTRL_SOCK)
147             
148         self._ctrl_sock.listen(0)
149         
150         # Save pidfile
151         pidfile = open(CTRL_PID, "w")
152         pidfile.write(str(os.getpid()))
153         pidfile.close()
154
155         # let the parent process know that the daemonization is finished
156         os.write(w, "\n")
157         os.close(w)
158         return 1
159
160     def post_daemonize(self):
161         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
162         # QT, for some strange reason, redefines the SIGCHILD handler to write
163         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
164         # Server dameonization closes all file descriptors from fileno '3',
165         # but the overloaded handler (inherited by the forked process) will
166         # keep trying to write the \0 to fileno 'x', which might have been reused 
167         # after closing, for other operations. This is bad bad bad when fileno 'x'
168         # is in use for communication pouroses, because unexpected \0 start
169         # appearing in the communication messages... this is exactly what happens 
170         # when using netns in daemonized form. Thus, be have no other alternative than
171         # restoring the SIGCHLD handler to the default here.
172         import signal
173         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
174
175     def loop(self):
176         while not self._stop:
177             conn, addr = self._ctrl_sock.accept()
178             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
179             conn.settimeout(5)
180             while not self._stop:
181                 try:
182                     msg = self.recv_msg(conn)
183                 except socket.timeout, e:
184                     #self.log_error("SERVER recv_msg: connection timedout ")
185                     continue
186                 
187                 if not msg:
188                     self.log_error("CONNECTION LOST")
189                     break
190                     
191                 if msg == STOP_MSG:
192                     self._stop = True
193                     reply = self.stop_action()
194                 else:
195                     reply = self.reply_action(msg)
196                 
197                 try:
198                     self.send_reply(conn, reply)
199                 except socket.error:
200                     self.log_error()
201                     self.log_error("NOTICE: Awaiting for reconnection")
202                     break
203             try:
204                 conn.close()
205             except:
206                 # Doesn't matter
207                 self.log_error()
208
209     def recv_msg(self, conn):
210         data = [self._rdbuf]
211         chunk = data[0]
212         while '\n' not in chunk:
213             try:
214                 chunk = conn.recv(1024)
215             except (OSError, socket.error), e:
216                 if e[0] != errno.EINTR:
217                     raise
218                 else:
219                     continue
220             if chunk:
221                 data.append(chunk)
222             else:
223                 # empty chunk = EOF
224                 break
225         data = ''.join(data).split('\n',1)
226         while len(data) < 2:
227             data.append('')
228         data, self._rdbuf = data
229         
230         decoded = base64.b64decode(data)
231         return decoded.rstrip()
232
233     def send_reply(self, conn, reply):
234         encoded = base64.b64encode(reply)
235         conn.send("%s\n" % encoded)
236        
237     def cleanup(self):
238         try:
239             self._ctrl_sock.close()
240             os.remove(CTRL_SOCK)
241         except:
242             self.log_error()
243
244     def stop_action(self):
245         return "Stopping server"
246
247     def reply_action(self, msg):
248         return "Reply to: %s" % msg
249
250     def log_error(self, text = None, context = ''):
251         if text == None:
252             text = traceback.format_exc()
253         date = time.strftime("%Y-%m-%d %H:%M:%S")
254         if context:
255             context = " (%s)" % (context,)
256         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
257         return text
258
259     def log_debug(self, text):
260         if self._log_level == DC.DEBUG_LEVEL:
261             date = time.strftime("%Y-%m-%d %H:%M:%S")
262             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
263
264 class Forwarder(object):
265     def __init__(self, root_dir = "."):
266         self._ctrl_sock = None
267         self._root_dir = root_dir
268         self._stop = False
269         self._rdbuf = ""
270
271     def forward(self):
272         self.connect()
273         print >>sys.stderr, "FORWARDER_READY."
274         while not self._stop:
275             data = self.read_data()
276             if not data:
277                 # Connection to client lost
278                 break
279             self.send_to_server(data)
280             
281             data = self.recv_from_server()
282             if not data:
283                 # Connection to server lost
284                 raise IOError, "Connection to server lost while "\
285                     "expecting response"
286             self.write_data(data)
287         self.disconnect()
288
289     def read_data(self):
290         return sys.stdin.readline()
291
292     def write_data(self, data):
293         sys.stdout.write(data)
294         # sys.stdout.write is buffered, this is why we need to do a flush()
295         sys.stdout.flush()
296
297     def send_to_server(self, data):
298         try:
299             self._ctrl_sock.send(data)
300         except (IOError, socket.error), e:
301             if e[0] == errno.EPIPE:
302                 self.connect()
303                 self._ctrl_sock.send(data)
304             else:
305                 raise e
306         encoded = data.rstrip() 
307         msg = base64.b64decode(encoded)
308         if msg == STOP_MSG:
309             self._stop = True
310
311     def recv_from_server(self):
312         data = [self._rdbuf]
313         chunk = data[0]
314         while '\n' not in chunk:
315             try:
316                 chunk = self._ctrl_sock.recv(1024)
317             except (OSError, socket.error), e:
318                 if e[0] != errno.EINTR:
319                     raise
320                 continue
321             if chunk:
322                 data.append(chunk)
323             else:
324                 # empty chunk = EOF
325                 break
326         data = ''.join(data).split('\n',1)
327         while len(data) < 2:
328             data.append('')
329         data, self._rdbuf = data
330         
331         return data+'\n'
332  
333     def connect(self):
334         self.disconnect()
335         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
336         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
337         self._ctrl_sock.connect(sock_addr)
338
339     def disconnect(self):
340         try:
341             self._ctrl_sock.close()
342         except:
343             pass
344