Binary building support - and tests for it
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import resource
8 import select
9 import socket
10 import sys
11 import subprocess
12 import threading
13 import time
14 import traceback
15 import signal
16 import re
17
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
20 MAX_FD = 1024
21
22 STOP_MSG = "STOP"
23
24 ERROR_LEVEL = 0
25 DEBUG_LEVEL = 1
26
27 if hasattr(os, "devnull"):
28     DEV_NULL = os.devnull
29 else:
30     DEV_NULL = "/dev/null"
31
32
33
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
35
36 def shell_escape(s):
37     """ Escapes strings so that they are safe to use as command-line arguments """
38     if SHELL_SAFE.match(s):
39         # safe string - no escaping needed
40         return s
41     else:
42         # unsafe string - escape
43         s = s.replace("'","\\'")
44         return "'%s'" % (s,)
45
46 class Server(object):
47     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48         self._root_dir = root_dir
49         self._stop = False
50         self._ctrl_sock = None
51         self._log_level = log_level
52
53     def run(self):
54         try:
55             if self.daemonize():
56                 self.post_daemonize()
57                 self.loop()
58                 self.cleanup()
59                 # ref: "os._exit(0)"
60                 # can not return normally after fork beacuse no exec was done.
61                 # This means that if we don't do a os._exit(0) here the code that 
62                 # follows the call to "Server.run()" in the "caller code" will be 
63                 # executed... but by now it has already been executed after the 
64                 # first process (the one that did the first fork) returned.
65                 os._exit(0)
66         except:
67             self.log_error()
68             self.cleanup()
69             os._exit(0)
70
71     def daemonize(self):
72         # pipes for process synchronization
73         (r, w) = os.pipe()
74
75         pid1 = os.fork()
76         if pid1 > 0:
77             os.close(w)
78             os.read(r, 1)
79             os.close(r)
80             # os.waitpid avoids leaving a <defunc> (zombie) process
81             st = os.waitpid(pid1, 0)[1]
82             if st:
83                 raise RuntimeError("Daemonization failed")
84             # return 0 to inform the caller method that this is not the 
85             # daemonized process
86             return 0
87         os.close(r)
88
89         # Decouple from parent environment.
90         os.chdir(self._root_dir)
91         os.umask(0)
92         os.setsid()
93
94         # fork 2
95         pid2 = os.fork()
96         if pid2 > 0:
97             # see ref: "os._exit(0)"
98             os._exit(0)
99
100         # close all open file descriptors.
101         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102         if (max_fd == resource.RLIM_INFINITY):
103             max_fd = MAX_FD
104         for fd in range(3, max_fd):
105             if fd != w:
106                 try:
107                     os.close(fd)
108                 except OSError:
109                     pass
110
111         # Redirect standard file descriptors.
112         stdin = open(DEV_NULL, "r")
113         stderr = stdout = open(STD_ERR, "a", 0)
114         os.dup2(stdin.fileno(), sys.stdin.fileno())
115         # NOTE: sys.stdout.write will still be buffered, even if the file
116         # was opened with 0 buffer
117         os.dup2(stdout.fileno(), sys.stdout.fileno())
118         os.dup2(stderr.fileno(), sys.stderr.fileno())
119
120         # create control socket
121         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122         self._ctrl_sock.bind(CTRL_SOCK)
123         self._ctrl_sock.listen(0)
124
125         # let the parent process know that the daemonization is finished
126         os.write(w, "\n")
127         os.close(w)
128         return 1
129
130     def post_daemonize(self):
131         pass
132
133     def loop(self):
134         while not self._stop:
135             conn, addr = self._ctrl_sock.accept()
136             conn.settimeout(5)
137             while not self._stop:
138                 try:
139                     msg = self.recv_msg(conn)
140                 except socket.timeout, e:
141                     break
142                     
143                 if msg == STOP_MSG:
144                     self._stop = True
145                     reply = self.stop_action()
146                 else:
147                     reply = self.reply_action(msg)
148                 
149                 try:
150                     self.send_reply(conn, reply)
151                 except socket.error:
152                     self.log_error()
153                     self.log_error("NOTICE: Awaiting for reconnection")
154                     break
155             try:
156                 conn.close()
157             except:
158                 # Doesn't matter
159                 self.log_error()
160
161     def recv_msg(self, conn):
162         data = ""
163         while True:
164             try:
165                 chunk = conn.recv(1024)
166             except OSError, e:
167                 if e.errno != errno.EINTR:
168                     raise
169                 if chunk == '':
170                     continue
171             if chunk:
172                 data += chunk
173                 if chunk[-1] == "\n":
174                     break
175             else:
176                 # empty chunk = EOF
177                 break
178         decoded = base64.b64decode(data)
179         return decoded.rstrip()
180
181     def send_reply(self, conn, reply):
182         encoded = base64.b64encode(reply)
183         conn.send("%s\n" % encoded)
184        
185     def cleanup(self):
186         try:
187             self._ctrl_sock.close()
188             os.remove(CTRL_SOCK)
189         except:
190             self.log_error()
191
192     def stop_action(self):
193         return "Stopping server"
194
195     def reply_action(self, msg):
196         return "Reply to: %s" % msg
197
198     def log_error(self, text = None, context = ''):
199         if text == None:
200             text = traceback.format_exc()
201         date = time.strftime("%Y-%m-%d %H:%M:%S")
202         if context:
203             context = " (%s)" % (context,)
204         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
205         return text
206
207     def log_debug(self, text):
208         if self._log_level == DEBUG_LEVEL:
209             date = time.strftime("%Y-%m-%d %H:%M:%S")
210             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
211
212 class Forwarder(object):
213     def __init__(self, root_dir = "."):
214         self._ctrl_sock = None
215         self._root_dir = root_dir
216         self._stop = False
217
218     def forward(self):
219         self.connect()
220         print >>sys.stderr, "READY."
221         while not self._stop:
222             data = self.read_data()
223             self.send_to_server(data)
224             data = self.recv_from_server()
225             self.write_data(data)
226         self.disconnect()
227
228     def read_data(self):
229         return sys.stdin.readline()
230
231     def write_data(self, data):
232         sys.stdout.write(data)
233         # sys.stdout.write is buffered, this is why we need to do a flush()
234         sys.stdout.flush()
235
236     def send_to_server(self, data):
237         try:
238             self._ctrl_sock.send(data)
239         except IOError, e:
240             if e.errno == errno.EPIPE:
241                 self.connect()
242                 self._ctrl_sock.send(data)
243             else:
244                 raise e
245         encoded = data.rstrip() 
246         msg = base64.b64decode(encoded)
247         if msg == STOP_MSG:
248             self._stop = True
249
250     def recv_from_server(self):
251         data = ""
252         while True:
253             try:
254                 chunk = self._ctrl_sock.recv(1024)
255             except OSError, e:
256                 if e.errno != errno.EINTR:
257                     raise
258                 if chunk == '':
259                     continue
260             data += chunk
261             if chunk[-1] == "\n":
262                 break
263         return data
264  
265     def connect(self):
266         self.disconnect()
267         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269         self._ctrl_sock.connect(sock_addr)
270
271     def disconnect(self):
272         try:
273             self._ctrl_sock.close()
274         except:
275             pass
276
277 class Client(object):
278     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
279             agent = None):
280         self.root_dir = root_dir
281         self.addr = (host, port)
282         self.user = user
283         self.agent = agent
284         self._stopped = False
285         self.connect()
286     
287     def __del__(self):
288         if self._process.poll() is None:
289             os.kill(self._process.pid, signal.SIGTERM)
290         self._process.wait()
291         
292     def connect(self):
293         root_dir = self.root_dir
294         (host, port) = self.addr
295         user = self.user
296         agent = self.agent
297         
298         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299                 c.forward()" % (root_dir,)
300         if host != None:
301             self._process = popen_ssh_subprocess(python_code, host, port, 
302                     user, agent)
303             # popen_ssh_subprocess already waits for readiness
304         else:
305             self._process = subprocess.Popen(
306                     ["python", "-c", python_code],
307                     stdin = subprocess.PIPE, 
308                     stdout = subprocess.PIPE,
309                     stderr = subprocess.PIPE
310                 )
311                 
312         # Wait for the forwarder to be ready, otherwise nobody
313         # will be able to connect to it
314         helo = self._process.stderr.readline()
315         if helo != 'READY.\n':
316             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317                     helo + self._process.stderr.read())
318         
319     def send_msg(self, msg):
320         encoded = base64.b64encode(msg)
321         data = "%s\n" % encoded
322         
323         try:
324             self._process.stdin.write(data)
325         except (IOError, ValueError):
326             # dead process, poll it to un-zombify
327             self._process.poll()
328             
329             # try again after reconnect
330             # If it fails again, though, give up
331             self.connect()
332             self._process.stdin.write(data)
333
334     def send_stop(self):
335         self.send_msg(STOP_MSG)
336         self._stopped = True
337
338     def read_reply(self):
339         data = self._process.stdout.readline()
340         encoded = data.rstrip() 
341         return base64.b64decode(encoded)
342
343 def popen_ssh_command(command, host, port, user, agent, 
344             stdin="", 
345             ident_key = None,
346             tty = False):
347         """
348         Executes a remote commands, returns ((stdout,stderr),process)
349         """
350         args = ['ssh',
351                 # Don't bother with localhost. Makes test easier
352                 '-o', 'NoHostAuthenticationForLocalhost=yes',
353                 '-l', user, host]
354         if agent:
355             args.append('-A')
356         if port:
357             args.append('-p%d' % port)
358         if ident_key:
359             args.extend(('-i', ident_key))
360         if tty:
361             args.append('-t')
362         args.append(command)
363
364         # connects to the remote host and starts a remote connection
365         proc = subprocess.Popen(args, 
366                 stdout = subprocess.PIPE,
367                 stdin = subprocess.PIPE, 
368                 stderr = subprocess.PIPE)
369         return (proc.communicate(stdin), proc)
370  
371 def popen_scp(source, dest, 
372             port = None, 
373             agent = None, 
374             recursive = False,
375             ident_key = None):
376         """
377         Copies from/to remote sites.
378         
379         Source and destination should have the user and host encoded
380         as per scp specs.
381         
382         If source is a file object, a special mode will be used to
383         create the remote file with the same contents.
384         
385         If dest is a file object, the remote file (source) will be
386         read and written into dest.
387         
388         In these modes, recursive cannot be True.
389         """
390         
391         if isinstance(source, file) or isinstance(dest, file) \
392                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
393             assert not resursive
394             
395             # Parse destination as <user>@<server>:<path>
396             tgtspec, path = dest.split(':',1)
397             user,host = tgtspec.rsplit('@',1)
398             
399             args = ['ssh', '-l', user, '-C',
400                     # Don't bother with localhost. Makes test easier
401                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
402             if port:
403                 args.append('-P%d' % port)
404             if ident_key:
405                 args.extend(('-i', ident_key))
406             
407             if isinstance(source, file) or hasattr(source, 'read'):
408                 args.append('cat > %s' % (shell_escape(path),))
409             elif isinstance(dest, file) or hasattr(dest, 'write'):
410                 args.append('cat %s' % (shell_escape(path),))
411             else:
412                 raise AssertionError, "Unreachable code reached! :-Q"
413             
414             # connects to the remote host and starts a remote connection
415             if isinstance(source, file):
416                 proc = subprocess.Popen(args, 
417                         stdout = open('/dev/null','w'),
418                         stderr = subprocess.PIPE,
419                         stdin = source)
420                 err = proc.stderr.read()
421                 proc.wait()
422                 return ((None,err), proc)
423             elif isinstance(dest, file):
424                 proc = subprocess.Popen(args, 
425                         stdout = open('/dev/null','w'),
426                         stderr = subprocess.PIPE,
427                         stdin = source)
428                 err = proc.stderr.read()
429                 proc.wait()
430                 return ((None,err), proc)
431             elif hasattr(source, 'read'):
432                 # file-like (but not file) source
433                 proc = subprocess.Popen(args, 
434                         stdout = open('/dev/null','w'),
435                         stderr = subprocess.PIPE,
436                         stdin = source)
437                 
438                 buf = None
439                 err = []
440                 while True:
441                     if not buf:
442                         buf = source.read(4096)
443                     
444                     rdrdy, wrdy, broken = os.select(
445                         [proc.stderr],
446                         [proc.stdin],
447                         [proc.stderr,proc.stdin])
448                     
449                     if proc.stderr in rdrdy:
450                         # use os.read for fully unbuffered behavior
451                         err.append(os.read(proc.stderr.fileno(), 4096))
452                     
453                     if proc.stdin in wrdy:
454                         proc.stdin.write(buf)
455                         buf = None
456                     
457                     if broken:
458                         break
459                 err.append(proc.stderr.read())
460                     
461                 proc.wait()
462                 return ((None,''.join(err)), proc)
463             elif hasattr(dest, 'write'):
464                 # file-like (but not file) dest
465                 proc = subprocess.Popen(args, 
466                         stdout = open('/dev/null','w'),
467                         stderr = subprocess.PIPE,
468                         stdin = source)
469                 
470                 buf = None
471                 err = []
472                 while True:
473                     rdrdy, wrdy, broken = os.select(
474                         [proc.stderr, proc.stdout],
475                         [],
476                         [proc.stderr, proc.stdout])
477                     
478                     if proc.stderr in rdrdy:
479                         # use os.read for fully unbuffered behavior
480                         err.append(os.read(proc.stderr.fileno(), 4096))
481                     
482                     if proc.stdout in rdrdy:
483                         # use os.read for fully unbuffered behavior
484                         dest.write(os.read(proc.stdout.fileno(), 4096))
485                     
486                     if broken:
487                         break
488                 err.append(proc.stderr.read())
489                     
490                 proc.wait()
491                 return ((None,''.join(err)), proc)
492             else:
493                 raise AssertionError, "Unreachable code reached! :-Q"
494         else:
495             # plain scp
496             args = ['scp', '-q', '-p', '-C',
497                     # Don't bother with localhost. Makes test easier
498                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
499             if port:
500                 args.append('-P%d' % port)
501             if recursive:
502                 args.append('-r')
503             if ident_key:
504                 args.extend(('-i', ident_key))
505             args.append(source)
506             args.append(dest)
507
508             # connects to the remote host and starts a remote connection
509             proc = subprocess.Popen(args, 
510                     stdout = subprocess.PIPE,
511                     stdin = subprocess.PIPE, 
512                     stderr = subprocess.PIPE)
513             comm = proc.communicate()
514             proc.wait()
515             return (comm, proc)
516  
517 def popen_ssh_subprocess(python_code, host, port, user, agent, 
518         python_path = None,
519         ident_key = None,
520         tty = False):
521         if python_path:
522             python_path.replace("'", r"'\''")
523             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
524         else:
525             cmd = ""
526         # Uncomment for debug (to run everything under strace)
527         # We had to verify if strace works (cannot nest them)
528         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
529         #cmd += "$CMD "
530         #if self.mode == MODE_SSH:
531         #    cmd += "strace -f -tt -s 200 -o strace$$.out "
532         cmd += "python -c '"
533         cmd += "import base64, os\n"
534         cmd += "cmd = \"\"\n"
535         cmd += "while True:\n"
536         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
537         cmd += " if cmd[-1] == \"\\n\": break\n"
538         cmd += "cmd = base64.b64decode(cmd)\n"
539         # Uncomment for debug
540         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
541         cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
542         cmd += "exec(cmd)\n'"
543
544         args = ['ssh',
545                 # Don't bother with localhost. Makes test easier
546                 '-o', 'NoHostAuthenticationForLocalhost=yes',
547                 '-l', user, host]
548         if agent:
549             args.append('-A')
550         if port:
551             args.append('-p%d' % port)
552         if ident_key:
553             args.extend(('-i', ident_key))
554         if tty:
555             args.append('-t')
556         args.append(cmd)
557
558         # connects to the remote host and starts a remote rpyc connection
559         proc = subprocess.Popen(args, 
560                 stdout = subprocess.PIPE,
561                 stdin = subprocess.PIPE, 
562                 stderr = subprocess.PIPE)
563         # send the command to execute
564         os.write(proc.stdin.fileno(),
565                 base64.b64encode(python_code) + "\n")
566         msg = os.read(proc.stdout.fileno(), 3)
567         if msg != "OK\n":
568             raise RuntimeError("Failed to start remote python interpreter")
569         return proc
570