1a432ef80bd349f9a87ad055bb3f6793bb3aaf07
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import select
8 import socket
9 import sys
10 import subprocess
11 import threading
12
13 CTRL_SOCK = "ctrl.sock"
14 STD_ERR = "stderr.log"
15 MAX_FD = 1024
16
17 STOP_MSG = "STOP"
18
19 class Server(object):
20     def __init__(self, root_dir = "."):
21         self._root_dir = root_dir
22         self._stop = False
23         self._ctrl_sock = None
24         self._stderr = None 
25
26     def run(self):
27         try:
28             if self.daemonize():
29                 self.post_daemonize()
30                 self.loop()
31                 self.cleanup()
32                 # ref: "os._exit(0)"
33                 # can not return normally after fork beacuse no exec was done.
34                 # This means that if we don't do a os._exit(0) here the code that 
35                 # follows the call to "Server.run()" in the "caller code" will be 
36                 # executed... but by now it has already been executed after the 
37                 # first process (the one that did the first fork) returned.
38                 os._exit(0)
39         except:
40             self.log_error()
41             raise
42
43     def daemonize(self):
44         pid1 = os.fork()
45         if pid1 > 0:
46             # we do os.waitpid to avoid leaving a <defunc> (zombie) process
47             os.waitpid(pid1, 0)
48             # return 0 to inform the caller method that this is not the 
49             # daemonized process
50             return 0
51
52         # Decouple from parent environment.
53         os.chdir(self._root_dir)
54         os.umask(0)
55         os.setsid()
56
57         # fork 2
58         pid2 = os.fork()
59         if pid2 > 0:
60             # see ref: "os._exit(0)"
61             os._exit(0)
62
63         # close all open file descriptors.
64         for fd in range(2, MAX_FD):
65             try:
66                 os.close(fd)
67             except OSError:
68                 pass
69
70         # Redirect standard file descriptors.
71         self._stderr = stdout = file(STD_ERR, "a", 0)
72         stdin = open('/dev/null', 'r')
73         os.dup2(stdin.fileno(), sys.stdin.fileno())
74         os.dup2(stdout.fileno(), sys.stdout.fileno())
75         os.dup2(self._stderr.fileno(), sys.stderr.fileno())
76         return 1
77
78     def post_daemonize(self):
79         pass
80
81     def loop(self):
82         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
83         self._ctrl_sock.bind(CTRL_SOCK)
84         self._ctrl_sock.listen(0)
85         while not self._stop:
86             conn, addr = self._ctrl_sock.accept()
87             conn.settimeout(5)
88             while True:
89                 try:
90                     msg = self.recv_msg(conn)
91                 except socket.timeout, e:
92                     break
93                     
94                 if msg == STOP_MSG:
95                     self._stop = True
96                     try:
97                         reply = self.stop_action()
98                     except:
99                         self.log_error()
100                     self.send_reply(conn, reply)
101                     break
102                 else:
103                     try:
104                         reply = self.reply_action(msg)
105                     except:
106                         self.log_error()
107                     self.send_reply(conn, reply)
108             conn.close()
109
110     def recv_msg(self, conn):
111        data = conn.recv(1024)
112        decoded = base64.b64decode(data)
113        return decoded.rstrip()
114
115     def send_reply(self, conn, reply):
116        encoded = base64.b64encode(reply)
117        conn.send("%s\n" % encoded)
118        
119     def cleanup(self):
120         try:
121             self._ctrl_sock.close()
122             os.remove(CTRL_SOCK)
123         except e:
124             self.log_error()
125
126     def stop_action(self):
127         return "Stopping server"
128
129     def reply_action(self, msg):
130         return "Reply to: %s" % msg
131
132     def log_error(self, error = None):
133         if error == None:
134             import traceback
135             error = "%s\n" %  traceback.format_exc()
136         sys.stderr.write(error)
137         return error
138
139 class Forwarder(object):
140     def __init__(self, root_dir = "."):
141         self._ctrl_sock = None
142         self._root_dir = root_dir
143         self._stop = False
144
145     def forward(self):
146         self.connect()
147         while not self._stop:
148             data = self.read_data()
149             self.send_to_server(data)
150             data = self.recv_from_server()
151             self.write_data(data)
152         self.disconnect()
153
154     def read_data(self):
155         return sys.stdin.readline()
156
157     def write_data(self, data):
158         sys.stdout.write(data)
159         sys.stdout.flush()
160
161     def send_to_server(self, data):
162         try:
163             self._ctrl_sock.send(data)
164         except IOError, e:
165             if e.errno == errno.EPIPE:
166                 self.connect()
167                 self._ctrl_sock.send(data)
168             else:
169                 raise e
170         encoded = data.rstrip() 
171         msg = base64.b64decode(encoded)
172         if msg == STOP_MSG:
173             self._stop = True
174
175     def recv_from_server(self):
176         return self._ctrl_sock.recv(1024)
177  
178     def connect(self):
179         self.disconnect()
180         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
181         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
182         self._ctrl_sock.connect(sock_addr)
183
184     def disconnect(self):
185         try:
186             self._ctrl_sock.close()
187         except:
188             pass
189
190 class Client(object):
191     def __init__(self, root_dir = "."):
192         self._process = subprocess.Popen(
193                 ["python", "-c", 
194                 "from nepi.util import server;c=server.Forwarder('%s');\
195                         c.forward()" % root_dir
196                 ],
197                 stdin = subprocess.PIPE, 
198                 stdout = subprocess.PIPE, 
199                 env = os.environ)
200
201     def send_msg(self, msg):
202         encoded = base64.b64encode(msg)
203         data = "%s\n" % encoded
204         self._process.stdin.write(data)
205
206     def send_stop(self):
207         self.send_msg(STOP_MSG)
208
209     def read_reply(self):
210         data = self._process.stdout.readline()
211         encoded = data.rstrip() 
212         return base64.b64decode(encoded)
213