2 # -*- coding: utf-8 -*-
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
27 if hasattr(os, "devnull"):
30 DEV_NULL = "/dev/null"
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
37 """ Escapes strings so that they are safe to use as command-line arguments """
38 if SHELL_SAFE.match(s):
39 # safe string - no escaping needed
42 # unsafe string - escape
43 s = s.replace("'","\\'")
47 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48 self._root_dir = root_dir
50 self._ctrl_sock = None
51 self._log_level = log_level
60 # can not return normally after fork beacuse no exec was done.
61 # This means that if we don't do a os._exit(0) here the code that
62 # follows the call to "Server.run()" in the "caller code" will be
63 # executed... but by now it has already been executed after the
64 # first process (the one that did the first fork) returned.
72 # pipes for process synchronization
80 # os.waitpid avoids leaving a <defunc> (zombie) process
81 st = os.waitpid(pid1, 0)[1]
83 raise RuntimeError("Daemonization failed")
84 # return 0 to inform the caller method that this is not the
89 # Decouple from parent environment.
90 os.chdir(self._root_dir)
97 # see ref: "os._exit(0)"
100 # close all open file descriptors.
101 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102 if (max_fd == resource.RLIM_INFINITY):
104 for fd in range(3, max_fd):
111 # Redirect standard file descriptors.
112 stdin = open(DEV_NULL, "r")
113 stderr = stdout = open(STD_ERR, "a", 0)
114 os.dup2(stdin.fileno(), sys.stdin.fileno())
115 # NOTE: sys.stdout.write will still be buffered, even if the file
116 # was opened with 0 buffer
117 os.dup2(stdout.fileno(), sys.stdout.fileno())
118 os.dup2(stderr.fileno(), sys.stderr.fileno())
120 # create control socket
121 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122 self._ctrl_sock.bind(CTRL_SOCK)
123 self._ctrl_sock.listen(0)
125 # let the parent process know that the daemonization is finished
130 def post_daemonize(self):
134 while not self._stop:
135 conn, addr = self._ctrl_sock.accept()
137 while not self._stop:
139 msg = self.recv_msg(conn)
140 except socket.timeout, e:
145 reply = self.stop_action()
147 reply = self.reply_action(msg)
150 self.send_reply(conn, reply)
153 self.log_error("NOTICE: Awaiting for reconnection")
161 def recv_msg(self, conn):
165 chunk = conn.recv(1024)
167 if e.errno != errno.EINTR:
173 if chunk[-1] == "\n":
178 decoded = base64.b64decode(data)
179 return decoded.rstrip()
181 def send_reply(self, conn, reply):
182 encoded = base64.b64encode(reply)
183 conn.send("%s\n" % encoded)
187 self._ctrl_sock.close()
192 def stop_action(self):
193 return "Stopping server"
195 def reply_action(self, msg):
196 return "Reply to: %s" % msg
198 def log_error(self, text = None, context = ''):
200 text = traceback.format_exc()
201 date = time.strftime("%Y-%m-%d %H:%M:%S")
203 context = " (%s)" % (context,)
204 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
207 def log_debug(self, text):
208 if self._log_level == DEBUG_LEVEL:
209 date = time.strftime("%Y-%m-%d %H:%M:%S")
210 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
212 class Forwarder(object):
213 def __init__(self, root_dir = "."):
214 self._ctrl_sock = None
215 self._root_dir = root_dir
220 print >>sys.stderr, "READY."
221 while not self._stop:
222 data = self.read_data()
223 self.send_to_server(data)
224 data = self.recv_from_server()
225 self.write_data(data)
229 return sys.stdin.readline()
231 def write_data(self, data):
232 sys.stdout.write(data)
233 # sys.stdout.write is buffered, this is why we need to do a flush()
236 def send_to_server(self, data):
238 self._ctrl_sock.send(data)
240 if e.errno == errno.EPIPE:
242 self._ctrl_sock.send(data)
245 encoded = data.rstrip()
246 msg = base64.b64decode(encoded)
250 def recv_from_server(self):
254 chunk = self._ctrl_sock.recv(1024)
256 if e.errno != errno.EINTR:
261 if chunk[-1] == "\n":
267 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269 self._ctrl_sock.connect(sock_addr)
271 def disconnect(self):
273 self._ctrl_sock.close()
277 class Client(object):
278 def __init__(self, root_dir = ".", host = None, port = None, user = None,
280 self.root_dir = root_dir
281 self.addr = (host, port)
284 self._stopped = False
288 if self._process.poll() is None:
289 os.kill(self._process.pid, signal.SIGTERM)
293 root_dir = self.root_dir
294 (host, port) = self.addr
298 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299 c.forward()" % (root_dir,)
301 self._process = popen_ssh_subprocess(python_code, host, port,
303 # popen_ssh_subprocess already waits for readiness
305 self._process = subprocess.Popen(
306 ["python", "-c", python_code],
307 stdin = subprocess.PIPE,
308 stdout = subprocess.PIPE,
309 stderr = subprocess.PIPE
312 # Wait for the forwarder to be ready, otherwise nobody
313 # will be able to connect to it
314 helo = self._process.stderr.readline()
315 if helo != 'READY.\n':
316 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317 helo + self._process.stderr.read())
319 def send_msg(self, msg):
320 encoded = base64.b64encode(msg)
321 data = "%s\n" % encoded
324 self._process.stdin.write(data)
325 except (IOError, ValueError):
326 # dead process, poll it to un-zombify
329 # try again after reconnect
330 # If it fails again, though, give up
332 self._process.stdin.write(data)
335 self.send_msg(STOP_MSG)
338 def read_reply(self):
339 data = self._process.stdout.readline()
340 encoded = data.rstrip()
341 return base64.b64decode(encoded)
343 def popen_ssh_command(command, host, port, user, agent,
348 Executes a remote commands, returns ((stdout,stderr),process)
351 # Don't bother with localhost. Makes test easier
352 '-o', 'NoHostAuthenticationForLocalhost=yes',
357 args.append('-p%d' % port)
359 args.extend(('-i', ident_key))
364 # connects to the remote host and starts a remote connection
365 proc = subprocess.Popen(args,
366 stdout = subprocess.PIPE,
367 stdin = subprocess.PIPE,
368 stderr = subprocess.PIPE)
369 return (proc.communicate(stdin), proc)
371 def popen_scp(source, dest, port, agent,
375 Copies from/to remote sites.
377 Source and destination should have the user and host encoded
380 If source is a file object, a special mode will be used to
381 create the remote file with the same contents.
383 If dest is a file object, the remote file (source) will be
384 read and written into dest.
386 In these modes, recursive cannot be True.
389 if isinstance(source, file) or isinstance(dest, file) \
390 or hasattr(source, 'read') or hasattr(dest, 'write'):
393 # Parse destination as <user>@<server>:<path>
394 tgtspec, path = dest.split(':',1)
395 user,host = tgtspec.rsplit('@',1)
397 args = ['ssh', '-l', user, '-C',
398 # Don't bother with localhost. Makes test easier
399 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
401 args.append('-P%d' % port)
403 args.extend(('-i', ident_key))
405 if isinstance(source, file) or hasattr(source, 'read'):
406 args.append('cat > %s' % (shell_escape(path),))
407 elif isinstance(dest, file) or hasattr(dest, 'write'):
408 args.append('cat %s' % (shell_escape(path),))
410 raise AssertionError, "Unreachable code reached! :-Q"
412 # connects to the remote host and starts a remote connection
413 if isinstance(source, file):
414 proc = subprocess.Popen(args,
415 stdout = open('/dev/null','w'),
416 stderr = subprocess.PIPE,
418 err = proc.stderr.read()
420 return ((None,err), proc)
421 elif isinstance(dest, file):
422 proc = subprocess.Popen(args,
423 stdout = open('/dev/null','w'),
424 stderr = subprocess.PIPE,
426 err = proc.stderr.read()
428 return ((None,err), proc)
429 elif hasattr(source, 'read'):
430 # file-like (but not file) source
431 proc = subprocess.Popen(args,
432 stdout = open('/dev/null','w'),
433 stderr = subprocess.PIPE,
440 buf = source.read(4096)
442 rdrdy, wrdy, broken = os.select(
445 [proc.stderr,proc.stdin])
447 if proc.stderr in rdrdy:
448 # use os.read for fully unbuffered behavior
449 err.append(os.read(proc.stderr.fileno(), 4096))
451 if proc.stdin in wrdy:
452 proc.stdin.write(buf)
457 err.append(proc.stderr.read())
460 return ((None,''.join(err)), proc)
461 elif hasattr(dest, 'write'):
462 # file-like (but not file) dest
463 proc = subprocess.Popen(args,
464 stdout = open('/dev/null','w'),
465 stderr = subprocess.PIPE,
471 rdrdy, wrdy, broken = os.select(
472 [proc.stderr, proc.stdout],
474 [proc.stderr, proc.stdout])
476 if proc.stderr in rdrdy:
477 # use os.read for fully unbuffered behavior
478 err.append(os.read(proc.stderr.fileno(), 4096))
480 if proc.stdout in rdrdy:
481 # use os.read for fully unbuffered behavior
482 dest.write(os.read(proc.stdout.fileno(), 4096))
486 err.append(proc.stderr.read())
489 return ((None,''.join(err)), proc)
491 raise AssertionError, "Unreachable code reached! :-Q"
494 args = ['scp', '-q', '-p', '-C',
495 # Don't bother with localhost. Makes test easier
496 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
498 args.append('-P%d' % port)
502 args.extend(('-i', ident_key))
506 # connects to the remote host and starts a remote connection
507 proc = subprocess.Popen(args,
508 stdout = subprocess.PIPE,
509 stdin = subprocess.PIPE,
510 stderr = subprocess.PIPE)
511 comm = proc.communicate()
515 def popen_ssh_subprocess(python_code, host, port, user, agent,
520 python_path.replace("'", r"'\''")
521 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
524 # Uncomment for debug (to run everything under strace)
525 # We had to verify if strace works (cannot nest them)
526 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
528 #if self.mode == MODE_SSH:
529 # cmd += "strace -f -tt -s 200 -o strace$$.out "
531 cmd += "import base64, os\n"
532 cmd += "cmd = \"\"\n"
533 cmd += "while True:\n"
534 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
535 cmd += " if cmd[-1] == \"\\n\": break\n"
536 cmd += "cmd = base64.b64decode(cmd)\n"
537 # Uncomment for debug
538 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
539 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
540 cmd += "exec(cmd)\n'"
543 # Don't bother with localhost. Makes test easier
544 '-o', 'NoHostAuthenticationForLocalhost=yes',
549 args.append('-p%d' % port)
551 args.extend(('-i', ident_key))
556 # connects to the remote host and starts a remote rpyc connection
557 proc = subprocess.Popen(args,
558 stdout = subprocess.PIPE,
559 stdin = subprocess.PIPE,
560 stderr = subprocess.PIPE)
561 # send the command to execute
562 os.write(proc.stdin.fileno(),
563 base64.b64encode(python_code) + "\n")
564 msg = os.read(proc.stdout.fileno(), 3)
566 raise RuntimeError("Failed to start remote python interpreter")