update test and examples for OMF6 in OMF section
[nepi.git] / src / nepi / resources / ns3 / ns3wrapper_server.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19
20
21 class Server(object):
22     def __init__(self, root_dir = ".", log_level = "ERROR", 
23             environment_setup = "", clean_root = False):
24         self._root_dir = root_dir
25         self._clean_root = clean_root
26         self._stop = False
27         self._ctrl_sock = None
28         self._log_level = log_level
29         self._rdbuf = ""
30         self._environment_setup = environment_setup
31
32     def run(self):
33         try:
34             if self.daemonize():
35                 self.post_daemonize()
36                 self.loop()
37                 self.cleanup()
38                 # ref: "os._exit(0)"
39                 # can not return normally after fork beacuse no exec was done.
40                 # This means that if we don't do a os._exit(0) here the code that 
41                 # follows the call to "Server.run()" in the "caller code" will be 
42                 # executed... but by now it has already been executed after the 
43                 # first process (the one that did the first fork) returned.
44                 os._exit(0)
45         except:
46             print >>sys.stderr, "SERVER_ERROR."
47             self.log_error()
48             self.cleanup()
49             os._exit(0)
50         print >>sys.stderr, "SERVER_READY."
51
52     def daemonize(self):
53         # pipes for process synchronization
54         (r, w) = os.pipe()
55         
56         # build root folder
57         root = os.path.normpath(self._root_dir)
58         if self._root_dir not in [".", ""] and os.path.exists(root) \
59                 and self._clean_root:
60             shutil.rmtree(root)
61         if not os.path.exists(root):
62             os.makedirs(root, 0755)
63
64         pid1 = os.fork()
65         if pid1 > 0:
66             os.close(w)
67             while True:
68                 try:
69                     os.read(r, 1)
70                 except OSError, e: # pragma: no cover
71                     if e.errno == errno.EINTR:
72                         continue
73                     else:
74                         raise
75                 break
76             os.close(r)
77             # os.waitpid avoids leaving a <defunc> (zombie) process
78             st = os.waitpid(pid1, 0)[1]
79             if st:
80                 raise RuntimeError("Daemonization failed")
81             # return 0 to inform the caller method that this is not the 
82             # daemonized process
83             return 0
84         os.close(r)
85
86         # Decouple from parent environment.
87         os.chdir(self._root_dir)
88         os.umask(0)
89         os.setsid()
90
91         # fork 2
92         pid2 = os.fork()
93         if pid2 > 0:
94             # see ref: "os._exit(0)"
95             os._exit(0)
96
97         # close all open file descriptors.
98         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
99         if (max_fd == resource.RLIM_INFINITY):
100             max_fd = MAX_FD
101         for fd in range(3, max_fd):
102             if fd != w:
103                 try:
104                     os.close(fd)
105                 except OSError:
106                     pass
107
108         # Redirect standard file descriptors.
109         stdin = open(DEV_NULL, "r")
110         stderr = stdout = open(STD_ERR, "a", 0)
111         os.dup2(stdin.fileno(), sys.stdin.fileno())
112         # NOTE: sys.stdout.write will still be buffered, even if the file
113         # was opened with 0 buffer
114         os.dup2(stdout.fileno(), sys.stdout.fileno())
115         os.dup2(stderr.fileno(), sys.stderr.fileno())
116         
117         # setup environment
118         if self._environment_setup:
119             # parse environment variables and pass to child process
120             # do it by executing shell commands, in case there's some heavy setup involved
121             envproc = subprocess.Popen(
122                 [ "bash", "-c", 
123                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
124                         ( self._environment_setup, ) ],
125                 stdin = subprocess.PIPE, 
126                 stdout = subprocess.PIPE,
127                 stderr = subprocess.PIPE
128             )
129             out,err = envproc.communicate()
130
131             # parse new environment
132             if out:
133                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
134             
135                 # apply to current environment
136                 for name, value in environment.iteritems():
137                     os.environ[name] = value
138                 
139                 # apply pythonpath
140                 if 'PYTHONPATH' in environment:
141                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
142
143         # create control socket
144         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
145         try:
146             self._ctrl_sock.bind(CTRL_SOCK)
147         except socket.error:
148             # Address in use, check pidfile
149             pid = None
150             try:
151                 pidfile = open(CTRL_PID, "r")
152                 pid = pidfile.read()
153                 pidfile.close()
154                 pid = int(pid)
155             except:
156                 # no pidfile
157                 pass
158             
159             if pid is not None:
160                 # Check process liveliness
161                 if not os.path.exists("/proc/%d" % (pid,)):
162                     # Ok, it's dead, clean the socket
163                     os.remove(CTRL_SOCK)
164             
165             # try again
166             self._ctrl_sock.bind(CTRL_SOCK)
167             
168         self._ctrl_sock.listen(0)
169         
170         # Save pidfile
171         pidfile = open(CTRL_PID, "w")
172         pidfile.write(str(os.getpid()))
173         pidfile.close()
174
175         # let the parent process know that the daemonization is finished
176         os.write(w, "\n")
177         os.close(w)
178         return 1
179
180     def post_daemonize(self):
181         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
182         # QT, for some strange reason, redefines the SIGCHILD handler to write
183         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
184         # Server dameonization closes all file descriptors from fileno '3',
185         # but the overloaded handler (inherited by the forked process) will
186         # keep trying to write the \0 to fileno 'x', which might have been reused 
187         # after closing, for other operations. This is bad bad bad when fileno 'x'
188         # is in use for communication pouroses, because unexpected \0 start
189         # appearing in the communication messages... this is exactly what happens 
190         # when using netns in daemonized form. Thus, be have no other alternative than
191         # restoring the SIGCHLD handler to the default here.
192         import signal
193         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
194
195     def loop(self):
196         while not self._stop:
197             conn, addr = self._ctrl_sock.accept()
198             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
199             conn.settimeout(5)
200             while not self._stop:
201                 try:
202                     msg = self.recv_msg(conn)
203                 except socket.timeout, e:
204                     #self.log_error("SERVER recv_msg: connection timedout ")
205                     continue
206                 
207                 if not msg:
208                     self.log_error("CONNECTION LOST")
209                     break
210                     
211                 if msg == STOP_MSG:
212                     self._stop = True
213                     reply = self.stop_action()
214                 else:
215                     reply = self.reply_action(msg)
216                 
217                 try:
218                     self.send_reply(conn, reply)
219                 except socket.error:
220                     self.log_error()
221                     self.log_error("NOTICE: Awaiting for reconnection")
222                     break
223             try:
224                 conn.close()
225             except:
226                 # Doesn't matter
227                 self.log_error()
228
229     def recv_msg(self, conn):
230         data = [self._rdbuf]
231         chunk = data[0]
232         while '\n' not in chunk:
233             try:
234                 chunk = conn.recv(1024)
235             except (OSError, socket.error), e:
236                 if e[0] != errno.EINTR:
237                     raise
238                 else:
239                     continue
240             if chunk:
241                 data.append(chunk)
242             else:
243                 # empty chunk = EOF
244                 break
245         data = ''.join(data).split('\n',1)
246         while len(data) < 2:
247             data.append('')
248         data, self._rdbuf = data
249         
250         decoded = base64.b64decode(data)
251         return decoded.rstrip()
252
253     def send_reply(self, conn, reply):
254         encoded = base64.b64encode(reply)
255         conn.send("%s\n" % encoded)
256        
257     def cleanup(self):
258         try:
259             self._ctrl_sock.close()
260             os.remove(CTRL_SOCK)
261         except:
262             self.log_error()
263
264     def stop_action(self):
265         return "Stopping server"
266
267     def reply_action(self, msg):
268         return "Reply to: %s" % msg
269
270     def log_error(self, text = None, context = ''):
271         if text == None:
272             text = traceback.format_exc()
273         date = time.strftime("%Y-%m-%d %H:%M:%S")
274         if context:
275             context = " (%s)" % (context,)
276         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
277         return text
278
279     def log_debug(self, text):
280         if self._log_level == DC.DEBUG_LEVEL:
281             date = time.strftime("%Y-%m-%d %H:%M:%S")
282             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
283
284 class Forwarder(object):
285     def __init__(self, root_dir = "."):
286         self._ctrl_sock = None
287         self._root_dir = root_dir
288         self._stop = False
289         self._rdbuf = ""
290
291     def forward(self):
292         self.connect()
293         print >>sys.stderr, "FORWARDER_READY."
294         while not self._stop:
295             data = self.read_data()
296             if not data:
297                 # Connection to client lost
298                 break
299             self.send_to_server(data)
300             
301             data = self.recv_from_server()
302             if not data:
303                 # Connection to server lost
304                 raise IOError, "Connection to server lost while "\
305                     "expecting response"
306             self.write_data(data)
307         self.disconnect()
308
309     def read_data(self):
310         return sys.stdin.readline()
311
312     def write_data(self, data):
313         sys.stdout.write(data)
314         # sys.stdout.write is buffered, this is why we need to do a flush()
315         sys.stdout.flush()
316
317     def send_to_server(self, data):
318         try:
319             self._ctrl_sock.send(data)
320         except (IOError, socket.error), e:
321             if e[0] == errno.EPIPE:
322                 self.connect()
323                 self._ctrl_sock.send(data)
324             else:
325                 raise e
326         encoded = data.rstrip() 
327         msg = base64.b64decode(encoded)
328         if msg == STOP_MSG:
329             self._stop = True
330
331     def recv_from_server(self):
332         data = [self._rdbuf]
333         chunk = data[0]
334         while '\n' not in chunk:
335             try:
336                 chunk = self._ctrl_sock.recv(1024)
337             except (OSError, socket.error), e:
338                 if e[0] != errno.EINTR:
339                     raise
340                 continue
341             if chunk:
342                 data.append(chunk)
343             else:
344                 # empty chunk = EOF
345                 break
346         data = ''.join(data).split('\n',1)
347         while len(data) < 2:
348             data.append('')
349         data, self._rdbuf = data
350         
351         return data+'\n'
352  
353     def connect(self):
354         self.disconnect()
355         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
356         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
357         self._ctrl_sock.connect(sock_addr)
358
359     def disconnect(self):
360         try:
361             self._ctrl_sock.close()
362         except:
363             pass
364