2 # -*- coding: utf-8 -*-
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
27 if hasattr(os, "devnull"):
30 DEV_NULL = "/dev/null"
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
37 """ Escapes strings so that they are safe to use as command-line arguments """
38 if SHELL_SAFE.match(s):
39 # safe string - no escaping needed
42 # unsafe string - escape
43 s = s.replace("'","\\'")
47 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48 self._root_dir = root_dir
50 self._ctrl_sock = None
51 self._log_level = log_level
60 # can not return normally after fork beacuse no exec was done.
61 # This means that if we don't do a os._exit(0) here the code that
62 # follows the call to "Server.run()" in the "caller code" will be
63 # executed... but by now it has already been executed after the
64 # first process (the one that did the first fork) returned.
72 # pipes for process synchronization
80 # os.waitpid avoids leaving a <defunc> (zombie) process
81 st = os.waitpid(pid1, 0)[1]
83 raise RuntimeError("Daemonization failed")
84 # return 0 to inform the caller method that this is not the
89 # Decouple from parent environment.
90 os.chdir(self._root_dir)
97 # see ref: "os._exit(0)"
100 # close all open file descriptors.
101 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102 if (max_fd == resource.RLIM_INFINITY):
104 for fd in range(3, max_fd):
111 # Redirect standard file descriptors.
112 stdin = open(DEV_NULL, "r")
113 stderr = stdout = open(STD_ERR, "a", 0)
114 os.dup2(stdin.fileno(), sys.stdin.fileno())
115 # NOTE: sys.stdout.write will still be buffered, even if the file
116 # was opened with 0 buffer
117 os.dup2(stdout.fileno(), sys.stdout.fileno())
118 os.dup2(stderr.fileno(), sys.stderr.fileno())
120 # create control socket
121 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122 self._ctrl_sock.bind(CTRL_SOCK)
123 self._ctrl_sock.listen(0)
125 # let the parent process know that the daemonization is finished
130 def post_daemonize(self):
134 while not self._stop:
135 conn, addr = self._ctrl_sock.accept()
137 while not self._stop:
139 msg = self.recv_msg(conn)
140 except socket.timeout, e:
145 reply = self.stop_action()
147 reply = self.reply_action(msg)
150 self.send_reply(conn, reply)
153 self.log_error("NOTICE: Awaiting for reconnection")
161 def recv_msg(self, conn):
165 chunk = conn.recv(1024)
167 if e.errno != errno.EINTR:
173 if chunk[-1] == "\n":
178 decoded = base64.b64decode(data)
179 return decoded.rstrip()
181 def send_reply(self, conn, reply):
182 encoded = base64.b64encode(reply)
183 conn.send("%s\n" % encoded)
187 self._ctrl_sock.close()
192 def stop_action(self):
193 return "Stopping server"
195 def reply_action(self, msg):
196 return "Reply to: %s" % msg
198 def log_error(self, text = None, context = ''):
200 text = traceback.format_exc()
201 date = time.strftime("%Y-%m-%d %H:%M:%S")
203 context = " (%s)" % (context,)
204 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
207 def log_debug(self, text):
208 if self._log_level == DEBUG_LEVEL:
209 date = time.strftime("%Y-%m-%d %H:%M:%S")
210 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
212 class Forwarder(object):
213 def __init__(self, root_dir = "."):
214 self._ctrl_sock = None
215 self._root_dir = root_dir
220 print >>sys.stderr, "READY."
221 while not self._stop:
222 data = self.read_data()
223 self.send_to_server(data)
224 data = self.recv_from_server()
225 self.write_data(data)
229 return sys.stdin.readline()
231 def write_data(self, data):
232 sys.stdout.write(data)
233 # sys.stdout.write is buffered, this is why we need to do a flush()
236 def send_to_server(self, data):
238 self._ctrl_sock.send(data)
240 if e.errno == errno.EPIPE:
242 self._ctrl_sock.send(data)
245 encoded = data.rstrip()
246 msg = base64.b64decode(encoded)
250 def recv_from_server(self):
254 chunk = self._ctrl_sock.recv(1024)
256 if e.errno != errno.EINTR:
261 if chunk[-1] == "\n":
267 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269 self._ctrl_sock.connect(sock_addr)
271 def disconnect(self):
273 self._ctrl_sock.close()
277 class Client(object):
278 def __init__(self, root_dir = ".", host = None, port = None, user = None,
280 self.root_dir = root_dir
281 self.addr = (host, port)
284 self._stopped = False
288 if self._process.poll() is None:
289 os.kill(self._process.pid, signal.SIGTERM)
293 root_dir = self.root_dir
294 (host, port) = self.addr
298 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299 c.forward()" % (root_dir,)
301 self._process = popen_ssh_subprocess(python_code, host, port,
303 # popen_ssh_subprocess already waits for readiness
305 self._process = subprocess.Popen(
306 ["python", "-c", python_code],
307 stdin = subprocess.PIPE,
308 stdout = subprocess.PIPE,
309 stderr = subprocess.PIPE
312 # Wait for the forwarder to be ready, otherwise nobody
313 # will be able to connect to it
314 helo = self._process.stderr.readline()
315 if helo != 'READY.\n':
316 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317 helo + self._process.stderr.read())
319 def send_msg(self, msg):
320 encoded = base64.b64encode(msg)
321 data = "%s\n" % encoded
324 self._process.stdin.write(data)
325 except (IOError, ValueError):
326 # dead process, poll it to un-zombify
329 # try again after reconnect
330 # If it fails again, though, give up
332 self._process.stdin.write(data)
335 self.send_msg(STOP_MSG)
338 def read_reply(self):
339 data = self._process.stdout.readline()
340 encoded = data.rstrip()
341 return base64.b64decode(encoded)
343 def popen_ssh_command(command, host, port, user, agent,
348 Executes a remote commands, returns ((stdout,stderr),process)
351 # Don't bother with localhost. Makes test easier
352 '-o', 'NoHostAuthenticationForLocalhost=yes',
357 args.append('-p%d' % port)
359 args.extend(('-i', ident_key))
364 # connects to the remote host and starts a remote connection
365 proc = subprocess.Popen(args,
366 stdout = subprocess.PIPE,
367 stdin = subprocess.PIPE,
368 stderr = subprocess.PIPE)
369 return (proc.communicate(stdin), proc)
371 def popen_scp(source, dest,
377 Copies from/to remote sites.
379 Source and destination should have the user and host encoded
382 If source is a file object, a special mode will be used to
383 create the remote file with the same contents.
385 If dest is a file object, the remote file (source) will be
386 read and written into dest.
388 In these modes, recursive cannot be True.
391 if isinstance(source, file) or isinstance(dest, file) \
392 or hasattr(source, 'read') or hasattr(dest, 'write'):
395 # Parse destination as <user>@<server>:<path>
396 tgtspec, path = dest.split(':',1)
397 user,host = tgtspec.rsplit('@',1)
399 args = ['ssh', '-l', user, '-C',
400 # Don't bother with localhost. Makes test easier
401 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
403 args.append('-P%d' % port)
405 args.extend(('-i', ident_key))
407 if isinstance(source, file) or hasattr(source, 'read'):
408 args.append('cat > %s' % (shell_escape(path),))
409 elif isinstance(dest, file) or hasattr(dest, 'write'):
410 args.append('cat %s' % (shell_escape(path),))
412 raise AssertionError, "Unreachable code reached! :-Q"
414 # connects to the remote host and starts a remote connection
415 if isinstance(source, file):
416 proc = subprocess.Popen(args,
417 stdout = open('/dev/null','w'),
418 stderr = subprocess.PIPE,
420 err = proc.stderr.read()
422 return ((None,err), proc)
423 elif isinstance(dest, file):
424 proc = subprocess.Popen(args,
425 stdout = open('/dev/null','w'),
426 stderr = subprocess.PIPE,
428 err = proc.stderr.read()
430 return ((None,err), proc)
431 elif hasattr(source, 'read'):
432 # file-like (but not file) source
433 proc = subprocess.Popen(args,
434 stdout = open('/dev/null','w'),
435 stderr = subprocess.PIPE,
442 buf = source.read(4096)
444 rdrdy, wrdy, broken = os.select(
447 [proc.stderr,proc.stdin])
449 if proc.stderr in rdrdy:
450 # use os.read for fully unbuffered behavior
451 err.append(os.read(proc.stderr.fileno(), 4096))
453 if proc.stdin in wrdy:
454 proc.stdin.write(buf)
459 err.append(proc.stderr.read())
462 return ((None,''.join(err)), proc)
463 elif hasattr(dest, 'write'):
464 # file-like (but not file) dest
465 proc = subprocess.Popen(args,
466 stdout = open('/dev/null','w'),
467 stderr = subprocess.PIPE,
473 rdrdy, wrdy, broken = os.select(
474 [proc.stderr, proc.stdout],
476 [proc.stderr, proc.stdout])
478 if proc.stderr in rdrdy:
479 # use os.read for fully unbuffered behavior
480 err.append(os.read(proc.stderr.fileno(), 4096))
482 if proc.stdout in rdrdy:
483 # use os.read for fully unbuffered behavior
484 dest.write(os.read(proc.stdout.fileno(), 4096))
488 err.append(proc.stderr.read())
491 return ((None,''.join(err)), proc)
493 raise AssertionError, "Unreachable code reached! :-Q"
496 args = ['scp', '-q', '-p', '-C',
497 # Don't bother with localhost. Makes test easier
498 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
500 args.append('-P%d' % port)
504 args.extend(('-i', ident_key))
508 # connects to the remote host and starts a remote connection
509 proc = subprocess.Popen(args,
510 stdout = subprocess.PIPE,
511 stdin = subprocess.PIPE,
512 stderr = subprocess.PIPE)
513 comm = proc.communicate()
517 def popen_ssh_subprocess(python_code, host, port, user, agent,
522 python_path.replace("'", r"'\''")
523 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
526 # Uncomment for debug (to run everything under strace)
527 # We had to verify if strace works (cannot nest them)
528 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
530 #if self.mode == MODE_SSH:
531 # cmd += "strace -f -tt -s 200 -o strace$$.out "
533 cmd += "import base64, os\n"
534 cmd += "cmd = \"\"\n"
535 cmd += "while True:\n"
536 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
537 cmd += " if cmd[-1] == \"\\n\": break\n"
538 cmd += "cmd = base64.b64decode(cmd)\n"
539 # Uncomment for debug
540 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
541 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
542 cmd += "exec(cmd)\n'"
545 # Don't bother with localhost. Makes test easier
546 '-o', 'NoHostAuthenticationForLocalhost=yes',
551 args.append('-p%d' % port)
553 args.extend(('-i', ident_key))
558 # connects to the remote host and starts a remote rpyc connection
559 proc = subprocess.Popen(args,
560 stdout = subprocess.PIPE,
561 stdin = subprocess.PIPE,
562 stderr = subprocess.PIPE)
563 # send the command to execute
564 os.write(proc.stdin.fileno(),
565 base64.b64encode(python_code) + "\n")
566 msg = os.read(proc.stdout.fileno(), 3)
568 raise RuntimeError("Failed to start remote python interpreter")