Initially working version of PlanetLab testbed implementation.
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import resource
8 import select
9 import socket
10 import sys
11 import subprocess
12 import threading
13 import time
14 import traceback
15 import signal
16 import re
17
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
20 MAX_FD = 1024
21
22 STOP_MSG = "STOP"
23
24 ERROR_LEVEL = 0
25 DEBUG_LEVEL = 1
26
27 if hasattr(os, "devnull"):
28     DEV_NULL = os.devnull
29 else:
30     DEV_NULL = "/dev/null"
31
32
33
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
35
36 def shell_escape(s):
37     """ Escapes strings so that they are safe to use as command-line arguments """
38     if SHELL_SAFE.match(s):
39         # safe string - no escaping needed
40         return s
41     else:
42         # unsafe string - escape
43         s = s.replace("'","\\'")
44         return "'%s'" % (s,)
45
46 class Server(object):
47     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48         self._root_dir = root_dir
49         self._stop = False
50         self._ctrl_sock = None
51         self._log_level = log_level
52
53     def run(self):
54         try:
55             if self.daemonize():
56                 self.post_daemonize()
57                 self.loop()
58                 self.cleanup()
59                 # ref: "os._exit(0)"
60                 # can not return normally after fork beacuse no exec was done.
61                 # This means that if we don't do a os._exit(0) here the code that 
62                 # follows the call to "Server.run()" in the "caller code" will be 
63                 # executed... but by now it has already been executed after the 
64                 # first process (the one that did the first fork) returned.
65                 os._exit(0)
66         except:
67             self.log_error()
68             self.cleanup()
69             os._exit(0)
70
71     def daemonize(self):
72         # pipes for process synchronization
73         (r, w) = os.pipe()
74
75         pid1 = os.fork()
76         if pid1 > 0:
77             os.close(w)
78             os.read(r, 1)
79             os.close(r)
80             # os.waitpid avoids leaving a <defunc> (zombie) process
81             st = os.waitpid(pid1, 0)[1]
82             if st:
83                 raise RuntimeError("Daemonization failed")
84             # return 0 to inform the caller method that this is not the 
85             # daemonized process
86             return 0
87         os.close(r)
88
89         # Decouple from parent environment.
90         os.chdir(self._root_dir)
91         os.umask(0)
92         os.setsid()
93
94         # fork 2
95         pid2 = os.fork()
96         if pid2 > 0:
97             # see ref: "os._exit(0)"
98             os._exit(0)
99
100         # close all open file descriptors.
101         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102         if (max_fd == resource.RLIM_INFINITY):
103             max_fd = MAX_FD
104         for fd in range(3, max_fd):
105             if fd != w:
106                 try:
107                     os.close(fd)
108                 except OSError:
109                     pass
110
111         # Redirect standard file descriptors.
112         stdin = open(DEV_NULL, "r")
113         stderr = stdout = open(STD_ERR, "a", 0)
114         os.dup2(stdin.fileno(), sys.stdin.fileno())
115         # NOTE: sys.stdout.write will still be buffered, even if the file
116         # was opened with 0 buffer
117         os.dup2(stdout.fileno(), sys.stdout.fileno())
118         os.dup2(stderr.fileno(), sys.stderr.fileno())
119
120         # create control socket
121         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122         self._ctrl_sock.bind(CTRL_SOCK)
123         self._ctrl_sock.listen(0)
124
125         # let the parent process know that the daemonization is finished
126         os.write(w, "\n")
127         os.close(w)
128         return 1
129
130     def post_daemonize(self):
131         pass
132
133     def loop(self):
134         while not self._stop:
135             conn, addr = self._ctrl_sock.accept()
136             conn.settimeout(5)
137             while not self._stop:
138                 try:
139                     msg = self.recv_msg(conn)
140                 except socket.timeout, e:
141                     break
142                     
143                 if msg == STOP_MSG:
144                     self._stop = True
145                     reply = self.stop_action()
146                 else:
147                     reply = self.reply_action(msg)
148                 
149                 try:
150                     self.send_reply(conn, reply)
151                 except socket.error:
152                     self.log_error()
153                     self.log_error("NOTICE: Awaiting for reconnection")
154                     break
155             try:
156                 conn.close()
157             except:
158                 # Doesn't matter
159                 self.log_error()
160
161     def recv_msg(self, conn):
162         data = ""
163         while True:
164             try:
165                 chunk = conn.recv(1024)
166             except OSError, e:
167                 if e.errno != errno.EINTR:
168                     raise
169                 if chunk == '':
170                     continue
171             if chunk:
172                 data += chunk
173                 if chunk[-1] == "\n":
174                     break
175             else:
176                 # empty chunk = EOF
177                 break
178         decoded = base64.b64decode(data)
179         return decoded.rstrip()
180
181     def send_reply(self, conn, reply):
182         encoded = base64.b64encode(reply)
183         conn.send("%s\n" % encoded)
184        
185     def cleanup(self):
186         try:
187             self._ctrl_sock.close()
188             os.remove(CTRL_SOCK)
189         except:
190             self.log_error()
191
192     def stop_action(self):
193         return "Stopping server"
194
195     def reply_action(self, msg):
196         return "Reply to: %s" % msg
197
198     def log_error(self, text = None, context = ''):
199         if text == None:
200             text = traceback.format_exc()
201         date = time.strftime("%Y-%m-%d %H:%M:%S")
202         if context:
203             context = " (%s)" % (context,)
204         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
205         return text
206
207     def log_debug(self, text):
208         if self._log_level == DEBUG_LEVEL:
209             date = time.strftime("%Y-%m-%d %H:%M:%S")
210             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
211
212 class Forwarder(object):
213     def __init__(self, root_dir = "."):
214         self._ctrl_sock = None
215         self._root_dir = root_dir
216         self._stop = False
217
218     def forward(self):
219         self.connect()
220         print >>sys.stderr, "READY."
221         while not self._stop:
222             data = self.read_data()
223             self.send_to_server(data)
224             data = self.recv_from_server()
225             self.write_data(data)
226         self.disconnect()
227
228     def read_data(self):
229         return sys.stdin.readline()
230
231     def write_data(self, data):
232         sys.stdout.write(data)
233         # sys.stdout.write is buffered, this is why we need to do a flush()
234         sys.stdout.flush()
235
236     def send_to_server(self, data):
237         try:
238             self._ctrl_sock.send(data)
239         except IOError, e:
240             if e.errno == errno.EPIPE:
241                 self.connect()
242                 self._ctrl_sock.send(data)
243             else:
244                 raise e
245         encoded = data.rstrip() 
246         msg = base64.b64decode(encoded)
247         if msg == STOP_MSG:
248             self._stop = True
249
250     def recv_from_server(self):
251         data = ""
252         while True:
253             try:
254                 chunk = self._ctrl_sock.recv(1024)
255             except OSError, e:
256                 if e.errno != errno.EINTR:
257                     raise
258                 if chunk == '':
259                     continue
260             data += chunk
261             if chunk[-1] == "\n":
262                 break
263         return data
264  
265     def connect(self):
266         self.disconnect()
267         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269         self._ctrl_sock.connect(sock_addr)
270
271     def disconnect(self):
272         try:
273             self._ctrl_sock.close()
274         except:
275             pass
276
277 class Client(object):
278     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
279             agent = None):
280         self.root_dir = root_dir
281         self.addr = (host, port)
282         self.user = user
283         self.agent = agent
284         self._stopped = False
285         self.connect()
286     
287     def __del__(self):
288         if self._process.poll() is None:
289             os.kill(self._process.pid, signal.SIGTERM)
290         self._process.wait()
291         
292     def connect(self):
293         root_dir = self.root_dir
294         (host, port) = self.addr
295         user = self.user
296         agent = self.agent
297         
298         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299                 c.forward()" % (root_dir,)
300         if host != None:
301             self._process = popen_ssh_subprocess(python_code, host, port, 
302                     user, agent)
303             # popen_ssh_subprocess already waits for readiness
304         else:
305             self._process = subprocess.Popen(
306                     ["python", "-c", python_code],
307                     stdin = subprocess.PIPE, 
308                     stdout = subprocess.PIPE,
309                     stderr = subprocess.PIPE
310                 )
311                 
312         # Wait for the forwarder to be ready, otherwise nobody
313         # will be able to connect to it
314         helo = self._process.stderr.readline()
315         if helo != 'READY.\n':
316             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317                     helo + self._process.stderr.read())
318         
319     def send_msg(self, msg):
320         encoded = base64.b64encode(msg)
321         data = "%s\n" % encoded
322         
323         try:
324             self._process.stdin.write(data)
325         except (IOError, ValueError):
326             # dead process, poll it to un-zombify
327             self._process.poll()
328             
329             # try again after reconnect
330             # If it fails again, though, give up
331             self.connect()
332             self._process.stdin.write(data)
333
334     def send_stop(self):
335         self.send_msg(STOP_MSG)
336         self._stopped = True
337
338     def read_reply(self):
339         data = self._process.stdout.readline()
340         encoded = data.rstrip() 
341         return base64.b64decode(encoded)
342
343 def popen_ssh_command(command, host, port, user, agent, 
344             stdin="", 
345             ident_key = None):
346         """
347         Executes a remote commands, returns ((stdout,stderr),process)
348         """
349         args = ['ssh',
350                 # Don't bother with localhost. Makes test easier
351                 '-o', 'NoHostAuthenticationForLocalhost=yes',
352                 '-l', user, host]
353         if agent:
354             args.append('-A')
355         if port:
356             args.append('-p%d' % port)
357         if ident_key:
358             args.extend(('-i', ident_key))
359         args.append(command)
360
361         # connects to the remote host and starts a remote connection
362         proc = subprocess.Popen(args, 
363                 stdout = subprocess.PIPE,
364                 stdin = subprocess.PIPE, 
365                 stderr = subprocess.PIPE)
366         return (proc.communicate(stdin), proc)
367  
368 def popen_scp(source, dest, port, agent, 
369             recursive = False,
370             ident_key = None):
371         """
372         Copies from/to remote sites.
373         
374         Source and destination should have the user and host encoded
375         as per scp specs.
376         
377         If source is a file object, a special mode will be used to
378         create the remote file with the same contents.
379         
380         If dest is a file object, the remote file (source) will be
381         read and written into dest.
382         
383         In these modes, recursive cannot be True.
384         """
385         
386         if isinstance(source, file) or isinstance(dest, file) \
387                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
388             assert not resursive
389             
390             # Parse destination as <user>@<server>:<path>
391             tgtspec, path = dest.split(':',1)
392             user,host = tgtspec.rsplit('@',1)
393             
394             args = ['ssh', '-l', user, '-C',
395                     # Don't bother with localhost. Makes test easier
396                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
397             if port:
398                 args.append('-P%d' % port)
399             if ident_key:
400                 args.extend(('-i', ident_key))
401             
402             if isinstance(source, file) or hasattr(source, 'read'):
403                 args.append('cat > %s' % (shell_escape(path),))
404             elif isinstance(dest, file) or hasattr(dest, 'write'):
405                 args.append('cat %s' % (shell_escape(path),))
406             else:
407                 raise AssertionError, "Unreachable code reached! :-Q"
408             
409             # connects to the remote host and starts a remote connection
410             if isinstance(source, file):
411                 proc = subprocess.Popen(args, 
412                         stdout = open('/dev/null','w'),
413                         stderr = subprocess.PIPE,
414                         stdin = source)
415                 err = proc.stderr.read()
416                 proc.wait()
417                 return ((None,err), proc)
418             elif isinstance(dest, file):
419                 proc = subprocess.Popen(args, 
420                         stdout = open('/dev/null','w'),
421                         stderr = subprocess.PIPE,
422                         stdin = source)
423                 err = proc.stderr.read()
424                 proc.wait()
425                 return ((None,err), proc)
426             elif hasattr(source, 'read'):
427                 # file-like (but not file) source
428                 proc = subprocess.Popen(args, 
429                         stdout = open('/dev/null','w'),
430                         stderr = subprocess.PIPE,
431                         stdin = source)
432                 
433                 buf = None
434                 err = []
435                 while True:
436                     if not buf:
437                         buf = source.read(4096)
438                     
439                     rdrdy, wrdy, broken = os.select(
440                         [proc.stderr],
441                         [proc.stdin],
442                         [proc.stderr,proc.stdin])
443                     
444                     if proc.stderr in rdrdy:
445                         # use os.read for fully unbuffered behavior
446                         err.append(os.read(proc.stderr.fileno(), 4096))
447                     
448                     if proc.stdin in wrdy:
449                         proc.stdin.write(buf)
450                         buf = None
451                     
452                     if broken:
453                         break
454                 err.append(proc.stderr.read())
455                     
456                 proc.wait()
457                 return ((None,''.join(err)), proc)
458             elif hasattr(dest, 'write'):
459                 # file-like (but not file) dest
460                 proc = subprocess.Popen(args, 
461                         stdout = open('/dev/null','w'),
462                         stderr = subprocess.PIPE,
463                         stdin = source)
464                 
465                 buf = None
466                 err = []
467                 while True:
468                     rdrdy, wrdy, broken = os.select(
469                         [proc.stderr, proc.stdout],
470                         [],
471                         [proc.stderr, proc.stdout])
472                     
473                     if proc.stderr in rdrdy:
474                         # use os.read for fully unbuffered behavior
475                         err.append(os.read(proc.stderr.fileno(), 4096))
476                     
477                     if proc.stdout in rdrdy:
478                         # use os.read for fully unbuffered behavior
479                         dest.write(os.read(proc.stdout.fileno(), 4096))
480                     
481                     if broken:
482                         break
483                 err.append(proc.stderr.read())
484                     
485                 proc.wait()
486                 return ((None,''.join(err)), proc)
487             else:
488                 raise AssertionError, "Unreachable code reached! :-Q"
489         else:
490             # plain scp
491             args = ['scp', '-q', '-p', '-C',
492                     # Don't bother with localhost. Makes test easier
493                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
494             if port:
495                 args.append('-P%d' % port)
496             if recursive:
497                 args.append('-r')
498             if ident_key:
499                 args.extend(('-i', ident_key))
500             args.append(source)
501             args.append(dest)
502
503             # connects to the remote host and starts a remote connection
504             proc = subprocess.Popen(args, 
505                     stdout = subprocess.PIPE,
506                     stdin = subprocess.PIPE, 
507                     stderr = subprocess.PIPE)
508             comm = proc.communicate()
509             proc.wait()
510             return (comm, proc)
511  
512 def popen_ssh_subprocess(python_code, host, port, user, agent, 
513         python_path = None,
514         ident_key = None):
515         if python_path:
516             python_path.replace("'", r"'\''")
517             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
518         else:
519             cmd = ""
520         # Uncomment for debug (to run everything under strace)
521         # We had to verify if strace works (cannot nest them)
522         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
523         #cmd += "$CMD "
524         #if self.mode == MODE_SSH:
525         #    cmd += "strace -f -tt -s 200 -o strace$$.out "
526         cmd += "python -c '"
527         cmd += "import base64, os\n"
528         cmd += "cmd = \"\"\n"
529         cmd += "while True:\n"
530         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
531         cmd += " if cmd[-1] == \"\\n\": break\n"
532         cmd += "cmd = base64.b64decode(cmd)\n"
533         # Uncomment for debug
534         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
535         cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
536         cmd += "exec(cmd)\n'"
537
538         args = ['ssh',
539                 # Don't bother with localhost. Makes test easier
540                 '-o', 'NoHostAuthenticationForLocalhost=yes',
541                 '-l', user, host]
542         if agent:
543             args.append('-A')
544         if port:
545             args.append('-p%d' % port)
546         if ident_key:
547             args.extend(('-i', ident_key))
548         args.append(cmd)
549
550         # connects to the remote host and starts a remote rpyc connection
551         proc = subprocess.Popen(args, 
552                 stdout = subprocess.PIPE,
553                 stdin = subprocess.PIPE, 
554                 stderr = subprocess.PIPE)
555         # send the command to execute
556         os.write(proc.stdin.fileno(),
557                 base64.b64encode(python_code) + "\n")
558         msg = os.read(proc.stdout.fileno(), 3)
559         if msg != "OK\n":
560             raise RuntimeError("Failed to start remote python interpreter")
561         return proc
562