2 # -*- coding: utf-8 -*-
18 CTRL_SOCK = "ctrl.sock"
19 STD_ERR = "stderr.log"
27 if hasattr(os, "devnull"):
30 DEV_NULL = "/dev/null"
34 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
37 """ Escapes strings so that they are safe to use as command-line arguments """
38 if SHELL_SAFE.match(s):
39 # safe string - no escaping needed
42 # unsafe string - escape
43 s = s.replace("'","\\'")
47 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
48 self._root_dir = root_dir
50 self._ctrl_sock = None
51 self._log_level = log_level
60 # can not return normally after fork beacuse no exec was done.
61 # This means that if we don't do a os._exit(0) here the code that
62 # follows the call to "Server.run()" in the "caller code" will be
63 # executed... but by now it has already been executed after the
64 # first process (the one that did the first fork) returned.
72 # pipes for process synchronization
80 # os.waitpid avoids leaving a <defunc> (zombie) process
81 st = os.waitpid(pid1, 0)[1]
83 raise RuntimeError("Daemonization failed")
84 # return 0 to inform the caller method that this is not the
89 # Decouple from parent environment.
90 os.chdir(self._root_dir)
97 # see ref: "os._exit(0)"
100 # close all open file descriptors.
101 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
102 if (max_fd == resource.RLIM_INFINITY):
104 for fd in range(3, max_fd):
111 # Redirect standard file descriptors.
112 stdin = open(DEV_NULL, "r")
113 stderr = stdout = open(STD_ERR, "a", 0)
114 os.dup2(stdin.fileno(), sys.stdin.fileno())
115 # NOTE: sys.stdout.write will still be buffered, even if the file
116 # was opened with 0 buffer
117 os.dup2(stdout.fileno(), sys.stdout.fileno())
118 os.dup2(stderr.fileno(), sys.stderr.fileno())
120 # create control socket
121 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
122 self._ctrl_sock.bind(CTRL_SOCK)
123 self._ctrl_sock.listen(0)
125 # let the parent process know that the daemonization is finished
130 def post_daemonize(self):
134 while not self._stop:
135 conn, addr = self._ctrl_sock.accept()
137 while not self._stop:
139 msg = self.recv_msg(conn)
140 except socket.timeout, e:
145 reply = self.stop_action()
147 reply = self.reply_action(msg)
150 self.send_reply(conn, reply)
153 self.log_error("NOTICE: Awaiting for reconnection")
161 def recv_msg(self, conn):
165 chunk = conn.recv(1024)
167 if e.errno != errno.EINTR:
173 if chunk[-1] == "\n":
178 decoded = base64.b64decode(data)
179 return decoded.rstrip()
181 def send_reply(self, conn, reply):
182 encoded = base64.b64encode(reply)
183 conn.send("%s\n" % encoded)
187 self._ctrl_sock.close()
192 def stop_action(self):
193 return "Stopping server"
195 def reply_action(self, msg):
196 return "Reply to: %s" % msg
198 def log_error(self, text = None, context = ''):
200 text = traceback.format_exc()
201 date = time.strftime("%Y-%m-%d %H:%M:%S")
203 context = " (%s)" % (context,)
204 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
207 def log_debug(self, text):
208 if self._log_level == DEBUG_LEVEL:
209 date = time.strftime("%Y-%m-%d %H:%M:%S")
210 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
212 class Forwarder(object):
213 def __init__(self, root_dir = "."):
214 self._ctrl_sock = None
215 self._root_dir = root_dir
220 print >>sys.stderr, "READY."
221 while not self._stop:
222 data = self.read_data()
223 self.send_to_server(data)
224 data = self.recv_from_server()
225 self.write_data(data)
229 return sys.stdin.readline()
231 def write_data(self, data):
232 sys.stdout.write(data)
233 # sys.stdout.write is buffered, this is why we need to do a flush()
236 def send_to_server(self, data):
238 self._ctrl_sock.send(data)
240 if e.errno == errno.EPIPE:
242 self._ctrl_sock.send(data)
245 encoded = data.rstrip()
246 msg = base64.b64decode(encoded)
250 def recv_from_server(self):
254 chunk = self._ctrl_sock.recv(1024)
256 if e.errno != errno.EINTR:
261 if chunk[-1] == "\n":
267 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
269 self._ctrl_sock.connect(sock_addr)
271 def disconnect(self):
273 self._ctrl_sock.close()
277 class Client(object):
278 def __init__(self, root_dir = ".", host = None, port = None, user = None,
280 self.root_dir = root_dir
281 self.addr = (host, port)
284 self._stopped = False
288 if self._process.poll() is None:
289 os.kill(self._process.pid, signal.SIGTERM)
293 root_dir = self.root_dir
294 (host, port) = self.addr
298 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
299 c.forward()" % (root_dir,)
301 self._process = popen_ssh_subprocess(python_code, host, port,
303 # popen_ssh_subprocess already waits for readiness
305 self._process = subprocess.Popen(
306 ["python", "-c", python_code],
307 stdin = subprocess.PIPE,
308 stdout = subprocess.PIPE,
309 stderr = subprocess.PIPE
312 # Wait for the forwarder to be ready, otherwise nobody
313 # will be able to connect to it
314 helo = self._process.stderr.readline()
315 if helo != 'READY.\n':
316 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
317 helo + self._process.stderr.read())
319 def send_msg(self, msg):
320 encoded = base64.b64encode(msg)
321 data = "%s\n" % encoded
324 self._process.stdin.write(data)
325 except (IOError, ValueError):
326 # dead process, poll it to un-zombify
329 # try again after reconnect
330 # If it fails again, though, give up
332 self._process.stdin.write(data)
335 self.send_msg(STOP_MSG)
338 def read_reply(self):
339 data = self._process.stdout.readline()
340 encoded = data.rstrip()
341 return base64.b64decode(encoded)
343 def popen_ssh_command(command, host, port, user, agent,
347 Executes a remote commands, returns ((stdout,stderr),process)
350 # Don't bother with localhost. Makes test easier
351 '-o', 'NoHostAuthenticationForLocalhost=yes',
356 args.append('-p%d' % port)
358 args.extend(('-i', ident_key))
361 # connects to the remote host and starts a remote connection
362 proc = subprocess.Popen(args,
363 stdout = subprocess.PIPE,
364 stdin = subprocess.PIPE,
365 stderr = subprocess.PIPE)
366 return (proc.communicate(stdin), proc)
368 def popen_scp(source, dest, port, agent,
372 Copies from/to remote sites.
374 Source and destination should have the user and host encoded
377 If source is a file object, a special mode will be used to
378 create the remote file with the same contents.
380 If dest is a file object, the remote file (source) will be
381 read and written into dest.
383 In these modes, recursive cannot be True.
386 if isinstance(source, file) or isinstance(dest, file) \
387 or hasattr(source, 'read') or hasattr(dest, 'write'):
390 # Parse destination as <user>@<server>:<path>
391 tgtspec, path = dest.split(':',1)
392 user,host = tgtspec.rsplit('@',1)
394 args = ['ssh', '-l', user, '-C',
395 # Don't bother with localhost. Makes test easier
396 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
398 args.append('-P%d' % port)
400 args.extend(('-i', ident_key))
402 if isinstance(source, file) or hasattr(source, 'read'):
403 args.append('cat > %s' % (shell_escape(path),))
404 elif isinstance(dest, file) or hasattr(dest, 'write'):
405 args.append('cat %s' % (shell_escape(path),))
407 raise AssertionError, "Unreachable code reached! :-Q"
409 # connects to the remote host and starts a remote connection
410 if isinstance(source, file):
411 proc = subprocess.Popen(args,
412 stdout = open('/dev/null','w'),
413 stderr = subprocess.PIPE,
415 err = proc.stderr.read()
417 return ((None,err), proc)
418 elif isinstance(dest, file):
419 proc = subprocess.Popen(args,
420 stdout = open('/dev/null','w'),
421 stderr = subprocess.PIPE,
423 err = proc.stderr.read()
425 return ((None,err), proc)
426 elif hasattr(source, 'read'):
427 # file-like (but not file) source
428 proc = subprocess.Popen(args,
429 stdout = open('/dev/null','w'),
430 stderr = subprocess.PIPE,
437 buf = source.read(4096)
439 rdrdy, wrdy, broken = os.select(
442 [proc.stderr,proc.stdin])
444 if proc.stderr in rdrdy:
445 # use os.read for fully unbuffered behavior
446 err.append(os.read(proc.stderr.fileno(), 4096))
448 if proc.stdin in wrdy:
449 proc.stdin.write(buf)
454 err.append(proc.stderr.read())
457 return ((None,''.join(err)), proc)
458 elif hasattr(dest, 'write'):
459 # file-like (but not file) dest
460 proc = subprocess.Popen(args,
461 stdout = open('/dev/null','w'),
462 stderr = subprocess.PIPE,
468 rdrdy, wrdy, broken = os.select(
469 [proc.stderr, proc.stdout],
471 [proc.stderr, proc.stdout])
473 if proc.stderr in rdrdy:
474 # use os.read for fully unbuffered behavior
475 err.append(os.read(proc.stderr.fileno(), 4096))
477 if proc.stdout in rdrdy:
478 # use os.read for fully unbuffered behavior
479 dest.write(os.read(proc.stdout.fileno(), 4096))
483 err.append(proc.stderr.read())
486 return ((None,''.join(err)), proc)
488 raise AssertionError, "Unreachable code reached! :-Q"
491 args = ['scp', '-q', '-p', '-C',
492 # Don't bother with localhost. Makes test easier
493 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
495 args.append('-P%d' % port)
499 args.extend(('-i', ident_key))
503 # connects to the remote host and starts a remote connection
504 proc = subprocess.Popen(args,
505 stdout = subprocess.PIPE,
506 stdin = subprocess.PIPE,
507 stderr = subprocess.PIPE)
508 comm = proc.communicate()
512 def popen_ssh_subprocess(python_code, host, port, user, agent,
516 python_path.replace("'", r"'\''")
517 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
520 # Uncomment for debug (to run everything under strace)
521 # We had to verify if strace works (cannot nest them)
522 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
524 #if self.mode == MODE_SSH:
525 # cmd += "strace -f -tt -s 200 -o strace$$.out "
527 cmd += "import base64, os\n"
528 cmd += "cmd = \"\"\n"
529 cmd += "while True:\n"
530 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
531 cmd += " if cmd[-1] == \"\\n\": break\n"
532 cmd += "cmd = base64.b64decode(cmd)\n"
533 # Uncomment for debug
534 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
535 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
536 cmd += "exec(cmd)\n'"
539 # Don't bother with localhost. Makes test easier
540 '-o', 'NoHostAuthenticationForLocalhost=yes',
545 args.append('-p%d' % port)
547 args.extend(('-i', ident_key))
550 # connects to the remote host and starts a remote rpyc connection
551 proc = subprocess.Popen(args,
552 stdout = subprocess.PIPE,
553 stdin = subprocess.PIPE,
554 stderr = subprocess.PIPE)
555 # send the command to execute
556 os.write(proc.stdin.fileno(),
557 base64.b64encode(python_code) + "\n")
558 msg = os.read(proc.stdout.fileno(), 3)
560 raise RuntimeError("Failed to start remote python interpreter")