working on proxy.py ...
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import select
8 import socket
9 import sys
10 import subprocess
11 import threading
12
13 CTRL_SOCK = "ctrl.sock"
14 STD_ERR = "stderr.log"
15 MAX_FD = 1024
16
17 STOP_MSG = "STOP"
18
19 class Server(object):
20     def __init__(self, root_dir = "."):
21         self._root_dir = root_dir
22         self._stop = False
23         self._ctrl_sock = None
24         self._stderr = None 
25
26     def run(self):
27         if self.daemonize():
28             self.post_daemonize()
29             self.loop()
30             self.cleanup()
31             # ref: "os._exit(0)"
32             # can not return normally after fork beacuse no exec was done.
33             # This means that if we don't do a os._exit(0) here the code that 
34             # follows the call to "Server.run()" in the "caller code" will be 
35             # executed... but by now it has already been executed after the 
36             # first process (the one that did the first fork) returned.
37             os._exit(0)
38
39     def daemonize(self):
40         pid1 = os.fork()
41         if pid1 > 0:
42             # we do os.waitpid to avoid leaving a <defunc> (zombie) process
43             os.waitpid(pid1, 0)
44             # return 0 to inform the caller method that this is not the 
45             # daemonized process
46             return 0
47
48         # Decouple from parent environment.
49         os.chdir(self._root_dir)
50         os.umask(0)
51         os.setsid()
52
53         # fork 2
54         pid2 = os.fork()
55         if pid2 > 0:
56             # see ref: "os._exit(0)"
57             os._exit(0)
58
59         # close all open file descriptors.
60         for fd in range(2, MAX_FD):
61             try:
62                 os.close(fd)
63             except OSError:
64                 pass
65
66         # Redirect standard file descriptors.
67         self._stderr = stdout = file(STD_ERR, "a", 0)
68         stdin = open('/dev/null', 'r')
69         os.dup2(stdin.fileno(), sys.stdin.fileno())
70         os.dup2(stdout.fileno(), sys.stdout.fileno())
71         os.dup2(self._stderr.fileno(), sys.stderr.fileno())
72         return 1
73
74     def post_daemonize(self):
75         pass
76
77     def loop(self):
78         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
79         self._ctrl_sock.bind(CTRL_SOCK)
80         self._ctrl_sock.listen(0)
81         while not self._stop:
82             conn, addr = self._ctrl_sock.accept()
83             conn.settimeout(5)
84             while True:
85                 try:
86                     msg = self.recv_msg(conn)
87                 except socket.timeout, e:
88                     break
89                     
90                 if msg == STOP_MSG:
91                     self._stop = True
92                     try:
93                         reply = self.stop_action()
94                     except e:
95                         sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
96                     self.send_reply(conn, reply)
97                     break
98                 else:
99                     try:
100                         reply = self.reply_action(msg)
101                     except e:
102                         sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
103                     self.send_reply(conn, reply)
104             conn.close()
105
106     def recv_msg(self, conn):
107        data = conn.recv(1024)
108        decoded = base64.b64decode(data)
109        return decoded.rstrip()
110
111     def send_reply(self, conn, reply):
112        encoded = base64.b64encode(reply)
113        conn.send("%s\n" % encoded)
114        
115     def cleanup(self):
116         try:
117             self._ctrl_sock.close()
118             os.remove(CTRL_SOCK)
119         except e:
120             sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
121
122     def stop_action(self):
123         return "Stopping server"
124
125     def reply_action(self, msg):
126         return "Reply to: %s" % msg
127
128 class Forwarder(object):
129     def __init__(self, root_dir = "."):
130         self._ctrl_sock = None
131         self._root_dir = root_dir
132         self._stop = False
133
134     def forward(self):
135         self.connect()
136         while not self._stop:
137             data = self.read_data()
138             self.send_to_server(data)
139             data = self.recv_from_server()
140             self.write_data(data)
141         self.disconnect()
142
143     def read_data(self):
144         return sys.stdin.readline()
145
146     def write_data(self, data):
147         sys.stdout.write(data)
148         sys.stdout.flush()
149
150     def send_to_server(self, data):
151         try:
152             self._ctrl_sock.send(data)
153         except IOError, e:
154             if e.errno == errno.EPIPE:
155                 self.connect()
156                 self._ctrl_sock.send(data)
157             else:
158                 raise e
159         encoded = data.rstrip() 
160         msg = base64.b64decode(encoded)
161         if msg == STOP_MSG:
162             self._stop = True
163
164     def recv_from_server(self):
165         return self._ctrl_sock.recv(1024)
166  
167     def connect(self):
168         self.disconnect()
169         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
170         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
171         self._ctrl_sock.connect(sock_addr)
172
173     def disconnect(self):
174         try:
175             self._ctrl_sock.close()
176         except:
177             pass
178
179 class Client(object):
180     def __init__(self, root_dir = "."):
181         self._process = subprocess.Popen(
182                 ["python", "-c", 
183                 "from nepi.util import server;c=server.Forwarder('%s');\
184                         c.forward()" % root_dir
185                 ],
186                 stdin = subprocess.PIPE, 
187                 stdout = subprocess.PIPE, 
188                 env = os.environ)
189
190     def send_msg(self, msg):
191         encoded = base64.b64encode(msg)
192         data = "%s\n" % encoded
193         self._process.stdin.write(data)
194
195     def send_stop(self):
196         self.send_msg(STOP_MSG)
197
198     def read_reply(self):
199         data = self._process.stdout.readline()
200         encoded = data.rstrip() 
201         return base64.b64decode(encoded)
202