working on server.py
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import select
8 import socket
9 import sys
10 import subprocess
11 import threading
12
13 CTRL_SOCK = "ctrl.sock"
14 STD_ERR = "stderr.log"
15 MAX_FD = 1024
16
17 STOP_MSG = "STOP"
18
19 class Server(object):
20     def __init__(self, root_dir = "."):
21         self._root_dir = root_dir
22         self._stop = False
23         self._ctrl_sock = None
24         self._stderr = None 
25
26     def run(self):
27         if self.daemonize():
28             self.loop()
29             self.cleanup()
30             # ref: "os._exit(0)"
31             # can not return normally after fork beacuse no exec was done.
32             # This means that if we don't do a os._exit(0) here the code that 
33             # follows the call to "Server.run()" in the "caller code" will be 
34             # executed... but by now it has already been executed after the 
35             # first process (the one that did the first fork) returned.
36             os._exit(0)
37
38     def daemonize(self):
39         pid1 = os.fork()
40         if pid1 > 0:
41             # we do os.waitpid to avoid leaving a <defunc> (zombie) process
42             os.waitpid(pid1, 0)
43             # return 0 to inform the caller method that this is not the 
44             # daemonized process
45             return 0
46
47         # Decouple from parent environment.
48         os.chdir(self._root_dir)
49         os.umask(0)
50         os.setsid()
51
52         # fork 2
53         pid2 = os.fork()
54         if pid2 > 0:
55             # see ref: "os._exit(0)"
56             os._exit(0)
57
58         # close all open file descriptors.
59         for fd in range(2, MAX_FD):
60             try:
61                 os.close(fd)
62             except OSError:
63                 pass
64
65         # Redirect standard file descriptors.
66         self._stderr = stdout = file(STD_ERR, "a", 0)
67         stdin = open('/dev/null', 'r')
68         os.dup2(stdin.fileno(), sys.stdin.fileno())
69         os.dup2(stdout.fileno(), sys.stdout.fileno())
70         os.dup2(self._stderr.fileno(), sys.stderr.fileno())
71         return 1
72
73     def loop(self):
74         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
75         self._ctrl_sock.bind(CTRL_SOCK)
76         self._ctrl_sock.listen(0)
77         while not self._stop:
78             conn, addr = self._ctrl_sock.accept()
79             conn.settimeout(5)
80             while True:
81                 try:
82                     msg = self.recv_msg(conn)
83                 except socket.timeout, e:
84                     break
85                     
86                 if msg == STOP_MSG:
87                     self._stop = True
88                     reply = "Stopping server"
89                     self.send_reply(conn, reply)
90                     break
91                 else:
92                     reply = "Replying to %s" % msg
93                     self.send_reply(conn, reply)
94             conn.close()
95
96     def recv_msg(self, conn):
97        data = conn.recv(1024)
98        decoded = base64.b64decode(data)
99        return decoded.rstrip()
100
101     def send_reply(self, conn, reply):
102        encoded = base64.b64encode(reply)
103        conn.send("%s\n" % encoded)
104        
105     def cleanup(self):
106         try:
107             self._ctrl_sock.close()
108             os.remove(CTRL_SOCK)
109         except e:
110             sys.stderr.write("ERROR: %s\n" % str(e))
111
112 class Forwarder(object):
113     def __init__(self, root_dir = "."):
114         self._ctrl_sock = None
115         self._root_dir = root_dir
116         self._stop = False
117
118     def forward(self):
119         self.connect()
120         while not self._stop:
121             data = self.read_data()
122             self.send_to_server(data)
123             reply = self.recv_from_server()
124             self.write_reply(reply)
125         self.disconnect()
126
127     def read_data(self):
128         return sys.stdin.readline()
129
130     def write_reply(self, reply):
131         sys.stdout.write("%s\n" % reply)
132
133     def send_to_server(self, data):
134         try:
135             self._ctrl_sock.send(data)
136         except IOError, e:
137             if e.errno == errno.EPIPE:
138                 self.connect()
139                 self._ctrl_sock.send(data)
140             else:
141                 raise e
142         encoded = data.rstrip() 
143         msg = base64.b64decode(encoded)
144         if msg == STOP_MSG:
145             self._stop = True
146
147     def recv_from_server(self):
148         data = self._ctrl_sock.recv(1024)
149         encoded = data.rstrip() 
150         reply = base64.b64decode(encoded)
151         return reply
152  
153     def connect(self):
154         self.disconnect()
155         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
157         self._ctrl_sock.connect(sock_addr)
158
159     def disconnect(self):
160         try:
161             self._ctrl_sock.close()
162         except:
163             pass
164
165 class Client(object):
166     def __init__(self, root_dir = "."):
167         self._process = subprocess.Popen(
168                 ["python", "-c", 
169                 "from nepi.util import server;c=server.Forwarder('%s');\
170                         c.forward()" % root_dir
171                 ],
172                 stdin = subprocess.PIPE, 
173                 env = os.environ)
174
175     def send_msg(self, msg):
176         encoded = base64.b64encode(msg)
177         self._process.stdin.write("%s\n" % encoded)
178
179     def send_stop(self):
180         self.send_msg(STOP_MSG)
181         self._process.wait()
182