6d37d0dd9ac8a2f6dc2cd1506b069bfef208164e
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import resource
8 import select
9 import socket
10 import sys
11 import subprocess
12 import threading
13 import time
14 import traceback
15 import signal
16 import re
17
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
20 MAX_FD = 1024
21
22 STOP_MSG = "STOP"
23
24 ERROR_LEVEL = 0
25 DEBUG_LEVEL = 1
26
27 if hasattr(os, "devnull"):
28     DEV_NULL = os.devnull
29 else:
30     DEV_NULL = "/dev/null"
31
32
33
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
35
36 def shell_escape(s):
37     """ Escapes strings so that they are safe to use as command-line arguments """
38     if SHELL_SAFE.match(s):
39         # safe string - no escaping needed
40         return s
41     else:
42         # unsafe string - escape
43         s = s.replace("'","\\'")
44         return "'%s'" % (s,)
45
46 class Server(object):
47     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48         self._root_dir = root_dir
49         self._stop = False
50         self._ctrl_sock = None
51         self._log_level = log_level
52
53     def run(self):
54         try:
55             if self.daemonize():
56                 self.post_daemonize()
57                 self.loop()
58                 self.cleanup()
59                 # ref: "os._exit(0)"
60                 # can not return normally after fork beacuse no exec was done.
61                 # This means that if we don't do a os._exit(0) here the code that 
62                 # follows the call to "Server.run()" in the "caller code" will be 
63                 # executed... but by now it has already been executed after the 
64                 # first process (the one that did the first fork) returned.
65                 os._exit(0)
66         except:
67             self.log_error()
68             self.cleanup()
69             os._exit(0)
70
71     def daemonize(self):
72         # pipes for process synchronization
73         (r, w) = os.pipe()
74
75         pid1 = os.fork()
76         if pid1 > 0:
77             os.close(w)
78             os.read(r, 1)
79             os.close(r)
80             # os.waitpid avoids leaving a <defunc> (zombie) process
81             st = os.waitpid(pid1, 0)[1]
82             if st:
83                 raise RuntimeError("Daemonization failed")
84             # return 0 to inform the caller method that this is not the 
85             # daemonized process
86             return 0
87         os.close(r)
88
89         # Decouple from parent environment.
90         os.chdir(self._root_dir)
91         os.umask(0)
92         os.setsid()
93
94         # fork 2
95         pid2 = os.fork()
96         if pid2 > 0:
97             # see ref: "os._exit(0)"
98             os._exit(0)
99
100         # close all open file descriptors.
101         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102         if (max_fd == resource.RLIM_INFINITY):
103             max_fd = MAX_FD
104         for fd in range(3, max_fd):
105             if fd != w:
106                 try:
107                     os.close(fd)
108                 except OSError:
109                     pass
110
111         # Redirect standard file descriptors.
112         stdin = open(DEV_NULL, "r")
113         stderr = stdout = open(STD_ERR, "a", 0)
114         os.dup2(stdin.fileno(), sys.stdin.fileno())
115         # NOTE: sys.stdout.write will still be buffered, even if the file
116         # was opened with 0 buffer
117         os.dup2(stdout.fileno(), sys.stdout.fileno())
118         os.dup2(stderr.fileno(), sys.stderr.fileno())
119
120         # create control socket
121         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122         self._ctrl_sock.bind(CTRL_SOCK)
123         self._ctrl_sock.listen(0)
124
125         # let the parent process know that the daemonization is finished
126         os.write(w, "\n")
127         os.close(w)
128         return 1
129
130     def post_daemonize(self):
131         pass
132
133     def loop(self):
134         while not self._stop:
135             conn, addr = self._ctrl_sock.accept()
136             conn.settimeout(5)
137             while not self._stop:
138                 try:
139                     msg = self.recv_msg(conn)
140                 except socket.timeout, e:
141                     break
142                     
143                 if msg == STOP_MSG:
144                     self._stop = True
145                     reply = self.stop_action()
146                 else:
147                     reply = self.reply_action(msg)
148                 
149                 try:
150                     self.send_reply(conn, reply)
151                 except socket.error:
152                     self.log_error()
153                     self.log_error("NOTICE: Awaiting for reconnection")
154                     break
155             try:
156                 conn.close()
157             except:
158                 # Doesn't matter
159                 self.log_error()
160
161     def recv_msg(self, conn):
162         data = ""
163         while True:
164             try:
165                 chunk = conn.recv(1024)
166             except OSError, e:
167                 if e.errno != errno.EINTR:
168                     raise
169                 if chunk == '':
170                     continue
171             if chunk:
172                 data += chunk
173                 if chunk[-1] == "\n":
174                     break
175             else:
176                 # empty chunk = EOF
177                 break
178         decoded = base64.b64decode(data)
179         return decoded.rstrip()
180
181     def send_reply(self, conn, reply):
182         encoded = base64.b64encode(reply)
183         conn.send("%s\n" % encoded)
184        
185     def cleanup(self):
186         try:
187             self._ctrl_sock.close()
188             os.remove(CTRL_SOCK)
189         except:
190             self.log_error()
191
192     def stop_action(self):
193         return "Stopping server"
194
195     def reply_action(self, msg):
196         return "Reply to: %s" % msg
197
198     def log_error(self, text = None, context = ''):
199         if text == None:
200             text = traceback.format_exc()
201         date = time.strftime("%Y-%m-%d %H:%M:%S")
202         if context:
203             context = " (%s)" % (context,)
204         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
205         return text
206
207     def log_debug(self, text):
208         if self._log_level == DEBUG_LEVEL:
209             date = time.strftime("%Y-%m-%d %H:%M:%S")
210             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
211
212 class Forwarder(object):
213     def __init__(self, root_dir = "."):
214         self._ctrl_sock = None
215         self._root_dir = root_dir
216         self._stop = False
217
218     def forward(self):
219         self.connect()
220         print >>sys.stderr, "READY."
221         while not self._stop:
222             data = self.read_data()
223             self.send_to_server(data)
224             data = self.recv_from_server()
225             self.write_data(data)
226         self.disconnect()
227
228     def read_data(self):
229         return sys.stdin.readline()
230
231     def write_data(self, data):
232         sys.stdout.write(data)
233         # sys.stdout.write is buffered, this is why we need to do a flush()
234         sys.stdout.flush()
235
236     def send_to_server(self, data):
237         try:
238             self._ctrl_sock.send(data)
239         except IOError, e:
240             if e.errno == errno.EPIPE:
241                 self.connect()
242                 self._ctrl_sock.send(data)
243             else:
244                 raise e
245         encoded = data.rstrip() 
246         msg = base64.b64decode(encoded)
247         if msg == STOP_MSG:
248             self._stop = True
249
250     def recv_from_server(self):
251         data = ""
252         while True:
253             try:
254                 chunk = self._ctrl_sock.recv(1024)
255             except OSError, e:
256                 if e.errno != errno.EINTR:
257                     raise
258                 if chunk == '':
259                     continue
260             data += chunk
261             if chunk[-1] == "\n":
262                 break
263         return data
264  
265     def connect(self):
266         self.disconnect()
267         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269         self._ctrl_sock.connect(sock_addr)
270
271     def disconnect(self):
272         try:
273             self._ctrl_sock.close()
274         except:
275             pass
276
277 class Client(object):
278     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
279             agent = None):
280         self.root_dir = root_dir
281         self.addr = (host, port)
282         self.user = user
283         self.agent = agent
284         self._stopped = False
285         self.connect()
286     
287     def __del__(self):
288         if self._process.poll() is None:
289             os.kill(self._process.pid, signal.SIGTERM)
290         self._process.wait()
291         
292     def connect(self):
293         root_dir = self.root_dir
294         (host, port) = self.addr
295         user = self.user
296         agent = self.agent
297         
298         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299                 c.forward()" % (root_dir,)
300         if host != None:
301             self._process = popen_ssh_subprocess(python_code, host, port, 
302                     user, agent)
303             # popen_ssh_subprocess already waits for readiness
304         else:
305             self._process = subprocess.Popen(
306                     ["python", "-c", python_code],
307                     stdin = subprocess.PIPE, 
308                     stdout = subprocess.PIPE,
309                     stderr = subprocess.PIPE
310                 )
311                 
312         # Wait for the forwarder to be ready, otherwise nobody
313         # will be able to connect to it
314         helo = self._process.stderr.readline()
315         if helo != 'READY.\n':
316             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317                     helo + self._process.stderr.read())
318         
319     def send_msg(self, msg):
320         encoded = base64.b64encode(msg)
321         data = "%s\n" % encoded
322         
323         try:
324             self._process.stdin.write(data)
325         except (IOError, ValueError):
326             # dead process, poll it to un-zombify
327             self._process.poll()
328             
329             # try again after reconnect
330             # If it fails again, though, give up
331             self.connect()
332             self._process.stdin.write(data)
333
334     def send_stop(self):
335         self.send_msg(STOP_MSG)
336         self._stopped = True
337
338     def read_reply(self):
339         data = self._process.stdout.readline()
340         encoded = data.rstrip() 
341         return base64.b64decode(encoded)
342
343 def popen_ssh_command(command, host, port, user, agent, 
344             stdin="", 
345             ident_key = None,
346             tty = False):
347         """
348         Executes a remote commands, returns ((stdout,stderr),process)
349         """
350         args = ['ssh',
351                 # Don't bother with localhost. Makes test easier
352                 '-o', 'NoHostAuthenticationForLocalhost=yes',
353                 '-l', user, host]
354         if agent:
355             args.append('-A')
356         if port:
357             args.append('-p%d' % port)
358         if ident_key:
359             args.extend(('-i', ident_key))
360         if tty:
361             args.append('-t')
362         args.append(command)
363
364         # connects to the remote host and starts a remote connection
365         proc = subprocess.Popen(args, 
366                 stdout = subprocess.PIPE,
367                 stdin = subprocess.PIPE, 
368                 stderr = subprocess.PIPE)
369         return (proc.communicate(stdin), proc)
370  
371 def popen_scp(source, dest, port, agent, 
372             recursive = False,
373             ident_key = None):
374         """
375         Copies from/to remote sites.
376         
377         Source and destination should have the user and host encoded
378         as per scp specs.
379         
380         If source is a file object, a special mode will be used to
381         create the remote file with the same contents.
382         
383         If dest is a file object, the remote file (source) will be
384         read and written into dest.
385         
386         In these modes, recursive cannot be True.
387         """
388         
389         if isinstance(source, file) or isinstance(dest, file) \
390                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
391             assert not resursive
392             
393             # Parse destination as <user>@<server>:<path>
394             tgtspec, path = dest.split(':',1)
395             user,host = tgtspec.rsplit('@',1)
396             
397             args = ['ssh', '-l', user, '-C',
398                     # Don't bother with localhost. Makes test easier
399                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
400             if port:
401                 args.append('-P%d' % port)
402             if ident_key:
403                 args.extend(('-i', ident_key))
404             
405             if isinstance(source, file) or hasattr(source, 'read'):
406                 args.append('cat > %s' % (shell_escape(path),))
407             elif isinstance(dest, file) or hasattr(dest, 'write'):
408                 args.append('cat %s' % (shell_escape(path),))
409             else:
410                 raise AssertionError, "Unreachable code reached! :-Q"
411             
412             # connects to the remote host and starts a remote connection
413             if isinstance(source, file):
414                 proc = subprocess.Popen(args, 
415                         stdout = open('/dev/null','w'),
416                         stderr = subprocess.PIPE,
417                         stdin = source)
418                 err = proc.stderr.read()
419                 proc.wait()
420                 return ((None,err), proc)
421             elif isinstance(dest, file):
422                 proc = subprocess.Popen(args, 
423                         stdout = open('/dev/null','w'),
424                         stderr = subprocess.PIPE,
425                         stdin = source)
426                 err = proc.stderr.read()
427                 proc.wait()
428                 return ((None,err), proc)
429             elif hasattr(source, 'read'):
430                 # file-like (but not file) source
431                 proc = subprocess.Popen(args, 
432                         stdout = open('/dev/null','w'),
433                         stderr = subprocess.PIPE,
434                         stdin = source)
435                 
436                 buf = None
437                 err = []
438                 while True:
439                     if not buf:
440                         buf = source.read(4096)
441                     
442                     rdrdy, wrdy, broken = os.select(
443                         [proc.stderr],
444                         [proc.stdin],
445                         [proc.stderr,proc.stdin])
446                     
447                     if proc.stderr in rdrdy:
448                         # use os.read for fully unbuffered behavior
449                         err.append(os.read(proc.stderr.fileno(), 4096))
450                     
451                     if proc.stdin in wrdy:
452                         proc.stdin.write(buf)
453                         buf = None
454                     
455                     if broken:
456                         break
457                 err.append(proc.stderr.read())
458                     
459                 proc.wait()
460                 return ((None,''.join(err)), proc)
461             elif hasattr(dest, 'write'):
462                 # file-like (but not file) dest
463                 proc = subprocess.Popen(args, 
464                         stdout = open('/dev/null','w'),
465                         stderr = subprocess.PIPE,
466                         stdin = source)
467                 
468                 buf = None
469                 err = []
470                 while True:
471                     rdrdy, wrdy, broken = os.select(
472                         [proc.stderr, proc.stdout],
473                         [],
474                         [proc.stderr, proc.stdout])
475                     
476                     if proc.stderr in rdrdy:
477                         # use os.read for fully unbuffered behavior
478                         err.append(os.read(proc.stderr.fileno(), 4096))
479                     
480                     if proc.stdout in rdrdy:
481                         # use os.read for fully unbuffered behavior
482                         dest.write(os.read(proc.stdout.fileno(), 4096))
483                     
484                     if broken:
485                         break
486                 err.append(proc.stderr.read())
487                     
488                 proc.wait()
489                 return ((None,''.join(err)), proc)
490             else:
491                 raise AssertionError, "Unreachable code reached! :-Q"
492         else:
493             # plain scp
494             args = ['scp', '-q', '-p', '-C',
495                     # Don't bother with localhost. Makes test easier
496                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
497             if port:
498                 args.append('-P%d' % port)
499             if recursive:
500                 args.append('-r')
501             if ident_key:
502                 args.extend(('-i', ident_key))
503             args.append(source)
504             args.append(dest)
505
506             # connects to the remote host and starts a remote connection
507             proc = subprocess.Popen(args, 
508                     stdout = subprocess.PIPE,
509                     stdin = subprocess.PIPE, 
510                     stderr = subprocess.PIPE)
511             comm = proc.communicate()
512             proc.wait()
513             return (comm, proc)
514  
515 def popen_ssh_subprocess(python_code, host, port, user, agent, 
516         python_path = None,
517         ident_key = None,
518         tty = False):
519         if python_path:
520             python_path.replace("'", r"'\''")
521             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
522         else:
523             cmd = ""
524         # Uncomment for debug (to run everything under strace)
525         # We had to verify if strace works (cannot nest them)
526         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
527         #cmd += "$CMD "
528         #if self.mode == MODE_SSH:
529         #    cmd += "strace -f -tt -s 200 -o strace$$.out "
530         cmd += "python -c '"
531         cmd += "import base64, os\n"
532         cmd += "cmd = \"\"\n"
533         cmd += "while True:\n"
534         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
535         cmd += " if cmd[-1] == \"\\n\": break\n"
536         cmd += "cmd = base64.b64decode(cmd)\n"
537         # Uncomment for debug
538         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
539         cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
540         cmd += "exec(cmd)\n'"
541
542         args = ['ssh',
543                 # Don't bother with localhost. Makes test easier
544                 '-o', 'NoHostAuthenticationForLocalhost=yes',
545                 '-l', user, host]
546         if agent:
547             args.append('-A')
548         if port:
549             args.append('-p%d' % port)
550         if ident_key:
551             args.extend(('-i', ident_key))
552         if tty:
553             args.append('-t')
554         args.append(cmd)
555
556         # connects to the remote host and starts a remote rpyc connection
557         proc = subprocess.Popen(args, 
558                 stdout = subprocess.PIPE,
559                 stdin = subprocess.PIPE, 
560                 stderr = subprocess.PIPE)
561         # send the command to execute
562         os.write(proc.stdin.fileno(),
563                 base64.b64encode(python_code) + "\n")
564         msg = os.read(proc.stdout.fileno(), 3)
565         if msg != "OK\n":
566             raise RuntimeError("Failed to start remote python interpreter")
567         return proc
568