8a3dbed725d0b9b578bccc6100bd198d5f00b7eb
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import select
8 import socket
9 import sys
10 import subprocess
11 import threading
12 from time import strftime
13 import traceback
14
15 CTRL_SOCK = "ctrl.sock"
16 STD_ERR = "stderr.log"
17 MAX_FD = 1024
18
19 STOP_MSG = "STOP"
20
21 ERROR_LEVEL = 0
22 DEBUG_LEVEL = 1
23
24 class Server(object):
25     def __init__(self, root_dir = "."):
26         self._root_dir = root_dir
27         self._stop = False
28         self._ctrl_sock = None
29         self._stderr = None
30         self._log_level = ERROR_LEVEL
31
32     def run(self):
33         try:
34             if self.daemonize():
35                 self.post_daemonize()
36                 self.loop()
37                 self.cleanup()
38                 # ref: "os._exit(0)"
39                 # can not return normally after fork beacuse no exec was done.
40                 # This means that if we don't do a os._exit(0) here the code that 
41                 # follows the call to "Server.run()" in the "caller code" will be 
42                 # executed... but by now it has already been executed after the 
43                 # first process (the one that did the first fork) returned.
44                 os._exit(0)
45         except:
46             self.log_error()
47             self.cleanup()
48             os._exit(0)
49
50     def daemonize(self):
51         # pipes for process synchronization
52         (r, w) = os.pipe()
53
54         pid1 = os.fork()
55         if pid1 > 0:
56             os.close(w)
57             os.read(r, 1)
58             os.close(r)
59             # os.waitpid avoids leaving a <defunc> (zombie) process
60             st = os.waitpid(pid1, 0)[1]
61             if st:
62                 raise RuntimeError("Daemonization failed")
63             # return 0 to inform the caller method that this is not the 
64             # daemonized process
65             return 0
66         os.close(r)
67
68         # Decouple from parent environment.
69         os.chdir(self._root_dir)
70         os.umask(0)
71         os.setsid()
72
73         # fork 2
74         pid2 = os.fork()
75         if pid2 > 0:
76             # see ref: "os._exit(0)"
77             os._exit(0)
78
79         # close all open file descriptors.
80         for fd in range(3, MAX_FD):
81             if fd != w:
82                 try:
83                     os.close(fd)
84                 except OSError:
85                     pass
86
87         # Redirect standard file descriptors.
88         self._stderr = stdout = file(STD_ERR, "a", 0)
89         stdin = open('/dev/null', 'r')
90         os.dup2(stdin.fileno(), sys.stdin.fileno())
91         os.dup2(stdout.fileno(), sys.stdout.fileno())
92         os.dup2(self._stderr.fileno(), sys.stderr.fileno())
93         # let the parent process know that the daemonization is finished
94         os.write(w, "\n")
95         os.close(w)
96         return 1
97
98     def post_daemonize(self):
99         pass
100
101     def loop(self):
102         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
103         self._ctrl_sock.bind(CTRL_SOCK)
104         self._ctrl_sock.listen(0)
105         while not self._stop:
106             conn, addr = self._ctrl_sock.accept()
107             conn.settimeout(5)
108             while True:
109                 try:
110                     msg = self.recv_msg(conn)
111                 except socket.timeout, e:
112                     break
113                     
114                 if msg == STOP_MSG:
115                     self._stop = True
116                     reply = self.stop_action()
117                 else:
118                     reply = self.reply_action(msg)
119                 self.send_reply(conn, reply)
120             conn.close()
121
122     def recv_msg(self, conn):
123         data = ""
124         while True:
125             try:
126                 chunk = conn.recv(1024)
127             except OSError, e:
128                 if e.errno != errno.EINTR:
129                     raise
130                 if chunk == '':
131                     continue
132             data += chunk
133             if chunk[-1] == "\n":
134                 break
135         decoded = base64.b64decode(data)
136         return decoded.rstrip()
137
138     def send_reply(self, conn, reply):
139         encoded = base64.b64encode(reply)
140         conn.send("%s\n" % encoded)
141        
142     def cleanup(self):
143         try:
144             self._ctrl_sock.close()
145             os.remove(CTRL_SOCK)
146         except:
147             self.log_error()
148
149     def stop_action(self):
150         return "Stopping server"
151
152     def reply_action(self, msg):
153         return "Reply to: %s" % msg
154
155     def set_error_log_level(self):
156         self._log_level = ERROR_LEVEL
157
158     def set_debug_log_level(self):
159         self._log_level = DEBUG_LEVEL
160
161     def log_error(self, text = None):
162         if text == None:
163             text = traceback.format_exc()
164         date = strftime("%Y-%m-%d %H:%M:%S")
165         sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
166         return text
167
168     def log_debug(self, text):
169         if self._log_level == DEBUG_LEVEL:
170             date = strftime("%Y-%m-%d %H:%M:%S")
171             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
172
173 class Forwarder(object):
174     def __init__(self, root_dir = "."):
175         self._ctrl_sock = None
176         self._root_dir = root_dir
177         self._stop = False
178
179     def forward(self):
180         self.connect()
181         while not self._stop:
182             data = self.read_data()
183             self.send_to_server(data)
184             data = self.recv_from_server()
185             self.write_data(data)
186         self.disconnect()
187
188     def read_data(self):
189         return sys.stdin.readline()
190
191     def write_data(self, data):
192         sys.stdout.write(data)
193         sys.stdout.flush()
194
195     def send_to_server(self, data):
196         try:
197             self._ctrl_sock.send(data)
198         except IOError, e:
199             if e.errno == errno.EPIPE:
200                 self.connect()
201                 self._ctrl_sock.send(data)
202             else:
203                 raise e
204         encoded = data.rstrip() 
205         msg = base64.b64decode(encoded)
206         if msg == STOP_MSG:
207             self._stop = True
208
209     def recv_from_server(self):
210         data = ""
211         while True:
212             try:
213                 chunk = self._ctrl_sock.recv(1024)
214             except OSError, e:
215                 if e.errno != errno.EINTR:
216                     raise
217                 if chunk == '':
218                     continue
219             data += chunk
220             if chunk[-1] == "\n":
221                 break
222         return data
223  
224     def connect(self):
225         self.disconnect()
226         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
227         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
228         self._ctrl_sock.connect(sock_addr)
229
230     def disconnect(self):
231         try:
232             self._ctrl_sock.close()
233         except:
234             pass
235
236 class Client(object):
237     def __init__(self, root_dir = "."):
238         self._process = subprocess.Popen(
239                 ["python", "-c", 
240                 "from nepi.util import server;c=server.Forwarder('%s');\
241                         c.forward()" % root_dir
242                 ],
243                 stdin = subprocess.PIPE, 
244                 stdout = subprocess.PIPE)
245
246     def send_msg(self, msg):
247         encoded = base64.b64encode(msg)
248         data = "%s\n" % encoded
249         self._process.stdin.write(data)
250
251     def send_stop(self):
252         self.send_msg(STOP_MSG)
253
254     def read_reply(self):
255         data = self._process.stdout.readline()
256         encoded = data.rstrip() 
257         return base64.b64decode(encoded)
258